66import time
77import xml .etree .ElementTree as ET
88import yaml
9+ from time import sleep
910
1011def wait (conn ):
1112"""wait for some event on connection to postgres"""
@@ -52,10 +53,10 @@ def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \
5253
5354conn = psycopg2 .connect (** config )
5455curs = conn .cursor ()
55-
56- curs . callproc ( 'pg_query_state' , ( pid , verbose , costs , timing , buffers , triggers , format ))
57- result = curs .fetchall ( )
58-
56+ result = []
57+ while not result :
58+ curs .callproc ( 'pg_query_state' , ( pid , verbose , costs , timing , buffers , triggers , format ) )
59+ result = curs . fetchall ()
5960notices = conn .notices [:]
6061conn .close ()
6162return result
@@ -85,7 +86,7 @@ def test_deadlock(config):
8586
8687n_close ((acon1 ,acon2 ))
8788
88- def query_state (config ,async_conn ,query ,steps , args = {},num_workers = 0 ):
89+ def query_state (config ,async_conn ,query ,args = {},num_workers = 0 ):
8990"""
9091Get intermediate state of 'query' on connection 'async_conn' after number of 'steps'
9192of node executions from start of query
@@ -97,13 +98,7 @@ def query_state(config, async_conn, query, steps, args={}, num_workers=0):
9798
9899set_guc (async_conn ,'enable_mergejoin' ,'off' )
99100set_guc (async_conn ,'max_parallel_workers_per_gather' ,num_workers )
100- set_guc (async_conn ,'pg_query_state.executor_trace' ,'on' )
101-
102- # execute 'query' specific number of 'steps'
103101acurs .execute (query )
104- for _ in xrange (steps ):
105- curs .callproc ('executor_step' , (async_conn .get_backend_pid (),))
106- # import ipdb; ipdb.set_trace()
107102
108103# extract current state of query progress
109104pg_qs_args = {
@@ -113,9 +108,6 @@ def query_state(config, async_conn, query, steps, args={}, num_workers=0):
113108for k ,v in args .iteritems ():
114109pg_qs_args [k ]= v
115110result = pg_query_state (** pg_qs_args )
116-
117- # resume query progress and complete it
118- curs .callproc ('executor_continue' , (async_conn .get_backend_pid (),))
119111wait (async_conn )
120112
121113set_guc (async_conn ,'pg_query_state.executor_trace' ,'off' )
@@ -129,16 +121,15 @@ def test_simple_query(config):
129121
130122acon ,= n_async_connect (config )
131123query = 'select count(*) from foo join bar on foo.c1=bar.c1'
132- num_steps = 10
133- expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
134- -> Hash Join \(Current loop: actual rows=0, loop number=1\)
124+ expected = r"""Aggregate \(Current loop: actual rows=\d+, loop number=1\)
125+ -> Hash Join \(Current loop: actual rows=\d+, loop number=1\)
135126 Hash Cond: \(foo.c1 = bar.c1\)
136- -> Seq Scan on foo \(Current loop: actual rows=1 , loop number=1\)
137- -> Hash \(Current loop: actual rows=0 , loop number=1\)
127+ -> Seq Scan on foo \(Current loop: actual rows=\d+ , loop number=1\)
128+ -> Hash \(Current loop: actual rows=\d+ , loop number=1\)
138129 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
139- -> Seq Scan on bar \(Current loop: actual rows=9 , loop number=1\)"""
130+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)"""
140131
141- qs = query_state (config ,acon ,query , num_steps )
132+ qs = query_state (config ,acon ,query )
142133assert len (qs )== 1 and qs [0 ][0 ]== acon .get_backend_pid ()and qs [0 ][1 ]== 0 \
143134and qs [0 ][2 ]== query and re .match (expected ,qs [0 ][3 ])and qs [0 ][4 ]== None
144135assert len (notices )== 0
@@ -187,7 +178,6 @@ def test_nested_call(config):
187178drop_function = 'drop function n_join_foo_bar()'
188179call_function = 'select * from n_join_foo_bar()'
189180nested_query = 'SELECT (select count(*) from foo join bar on foo.c1=bar.c1)'
190- num_steps = 10
191181expected = 'Function Scan on n_join_foo_bar (Current loop: actual rows=0, loop number=1)'
192182expected_nested = r"""Result \(Current loop: actual rows=0, loop number=1\)
193183 InitPlan 1 \(returns \$0\)
@@ -197,12 +187,12 @@ def test_nested_call(config):
197187 -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
198188 -> Hash \(Current loop: actual rows=0, loop number=1\)
199189 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
200- -> Seq Scan on bar \(Current loop: actual rows=8 , loop number=1\)"""
190+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)"""
201191
202192util_curs .execute (create_function )
203193util_conn .commit ()
204194
205- qs = query_state (config ,acon ,call_function , num_steps )
195+ qs = query_state (config ,acon ,call_function )
206196assert len (qs )== 2 \
207197and qs [0 ][0 ]== qs [1 ][0 ]== acon .get_backend_pid () \
208198and qs [0 ][1 ]== 0 and qs [1 ][1 ]== 1 \
@@ -224,24 +214,26 @@ def test_insert_on_conflict(config):
224214util_curs = util_conn .cursor ()
225215add_field_uniqueness = 'alter table foo add constraint unique_c1 unique(c1)'
226216drop_field_uniqueness = 'alter table foo drop constraint unique_c1'
227- num_steps = 10
228- query = 'insert into foo select i, md5(random()::text) from generate_series(1, %d) as i on conflict do nothing' % ( num_steps + 1 )
229- expected = """Insert on foo (Current loop: actual rows=0 , loop number=1 )
217+ query = 'insert into foo select i, md5(random()::text) from generate_series(1, 30000) as i on conflict do nothing'
218+
219+ expected = r """Insert on foo\ (Current loop: actual rows=\d+ , loop number=\d+\ )
230220 Conflict Resolution: NOTHING
231- Conflicting Tuples:9
232- -> Function Scan on generate_series i (Current loop: actual rows=10 , loop number=1 )"""
221+ Conflicting Tuples:\d+
222+ -> Function Scan on generate_series i\ (Current loop: actual rows=\d+ , loop number=\d+\ )"""
233223
234224util_curs .execute (add_field_uniqueness )
235225util_conn .commit ()
236226
237- qs = query_state (config ,acon ,query , num_steps )
227+ qs = query_state (config ,acon ,query )
238228assert len (qs )== 1 \
239229and qs [0 ][0 ]== acon .get_backend_pid ()and qs [0 ][1 ]== 0 \
240- and qs [0 ][2 ]== query and qs [0 ][3 ]== expected \
230+ and qs [0 ][2 ]== query and re . match ( expected , qs [0 ][3 ]) \
241231and qs [0 ][4 ]== None
242232assert len (notices )== 0
243233
244234util_curs .execute (drop_field_uniqueness )
235+ util_curs .execute ("ANALYZE foo" )
236+ util_curs .execute ("ANALYZE bar" )
245237
246238util_conn .close ()
247239n_close ((acon ,))
@@ -270,40 +262,27 @@ def test_trigger(config):
270262create_trigger = """
271263create trigger unique_foo_c1
272264before insert or update of c1 on foo for row
273- execute procedure unique_c1_in_foo()"""
265+ execute procedure unique_c1_in_foo()"""
274266drop_temps = 'drop function unique_c1_in_foo() cascade'
275- num_steps = 10
276- query = 'insert into foo select i, md5(random()::text) from generate_series(1, %d) as i' % (num_steps + 1 )
277- expected_upper = """Insert on foo (Current loop: actual rows=0, loop number=1)
278- -> Function Scan on generate_series i (Current loop: actual rows=2, loop number=1)"""
279- trigger_suffix = 'Trigger unique_foo_c1: calls=1'
280- expected_inner = """Result (Current loop: actual rows=0, loop number=1)
281- SubPlan 1
282- -> Materialize (Current loop: actual rows=1, loop number=1)
283- -> Seq Scan on foo (Current loop: actual rows=1, loop number=1)"""
267+ query = 'insert into foo select i, md5(random()::text) from generate_series(1, 10000) as i'
268+ expected_upper = r"""Insert on foo \(Current loop: actual rows=\d+, loop number=1\)
269+ -> Function Scan on generate_series i \(Current loop: actual rows=\d+, loop number=1\)"""
270+ trigger_suffix = r"""Trigger unique_foo_c1: calls=\d+"""
284271
285272util_curs .execute (create_trigger_function )
286273util_curs .execute (create_trigger )
287274util_conn .commit ()
288275
289- qs = query_state (config ,acon ,query ,num_steps , {'triggers' :True })
290- assert len (qs )== 2 \
291- and qs [0 ][0 ]== acon .get_backend_pid ()and qs [0 ][1 ]== 0 \
292- and qs [0 ][2 ]== query and qs [0 ][3 ]== expected_upper + '\n ' + trigger_suffix \
293- and qs [0 ][4 ]== None \
294- and qs [1 ][0 ]== acon .get_backend_pid ()and qs [1 ][1 ]== 1 \
295- and qs [1 ][2 ]== 'SELECT new.c1 in (select c1 from foo)' and qs [1 ][3 ]== expected_inner \
296- and qs [1 ][4 ]== None
276+ qs = query_state (config ,acon ,query , {'triggers' :True })
277+ assert qs [0 ][0 ]== acon .get_backend_pid ()and qs [0 ][1 ]== 0 \
278+ and qs [0 ][2 ]== query and re .match (expected_upper ,qs [0 ][3 ]) \
279+ and qs [0 ][4 ]== None
297280assert len (notices )== 0
298281
299- qs = query_state (config ,acon ,query ,num_steps , {'triggers' :False })
300- assert len (qs )== 2 \
301- and qs [0 ][0 ]== acon .get_backend_pid ()and qs [0 ][1 ]== 0 \
302- and qs [0 ][2 ]== query and qs [0 ][3 ]== expected_upper \
303- and qs [0 ][4 ]== None \
304- and qs [1 ][0 ]== acon .get_backend_pid ()and qs [1 ][1 ]== 1 \
305- and qs [1 ][2 ]== 'SELECT new.c1 in (select c1 from foo)' and qs [1 ][3 ]== expected_inner \
306- and qs [1 ][4 ]== None
282+ qs = query_state (config ,acon ,query , {'triggers' :False })
283+ assert qs [0 ][0 ]== acon .get_backend_pid ()and qs [0 ][1 ]== 0 \
284+ and qs [0 ][2 ]== query and re .match (expected_upper ,qs [0 ][3 ]) \
285+ and qs [0 ][4 ]== None
307286assert len (notices )== 0
308287
309288util_curs .execute (drop_temps )
@@ -316,16 +295,15 @@ def test_costs(config):
316295
317296acon ,= n_async_connect (config )
318297query = 'select count(*) from foo join bar on foo.c1=bar.c1'
319- num_steps = 10
320298expected = r"""Aggregate \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=8\) \(Current loop: actual rows=0, loop number=1\)
321299 -> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=0, loop number=1\)
322300 Hash Cond: \(foo.c1 = bar.c1\)
323301 -> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1, loop number=1\)
324302 -> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=0, loop number=1\)
325303 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
326- -> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=9 , loop number=1\)"""
304+ -> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=\d+ , loop number=1\)"""
327305
328- qs = query_state (config ,acon ,query ,num_steps , {'costs' :True })
306+ qs = query_state (config ,acon ,query , {'costs' :True })
329307assert len (qs )== 1 and re .match (expected ,qs [0 ][3 ])
330308assert len (notices )== 0
331309
@@ -336,20 +314,19 @@ def test_buffers(config):
336314
337315acon ,= n_async_connect (config )
338316query = 'select count(*) from foo join bar on foo.c1=bar.c1'
339- num_steps = 10
340317expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
341318 -> Hash Join \(Current loop: actual rows=0, loop number=1\)
342319 Hash Cond: \(foo.c1 = bar.c1\)
343320 -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
344321 Buffers: [^\n]*
345322 -> Hash \(Current loop: actual rows=0, loop number=1\)
346323 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
347- -> Seq Scan on bar \(Current loop: actual rows=9 , loop number=1\)
324+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)
348325 Buffers: .*"""
349326
350327set_guc (acon ,'pg_query_state.enable_buffers' ,'on' )
351328
352- qs = query_state (config ,acon ,query ,num_steps , {'buffers' :True })
329+ qs = query_state (config ,acon ,query , {'buffers' :True })
353330assert len (qs )== 1 and re .match (expected ,qs [0 ][3 ])
354331assert len (notices )== 0
355332
@@ -360,18 +337,17 @@ def test_timing(config):
360337
361338acon ,= n_async_connect (config )
362339query = 'select count(*) from foo join bar on foo.c1=bar.c1'
363- num_steps = 10
364340expected = r"""Aggregate \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
365341 -> Hash Join \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
366342 Hash Cond: \(foo.c1 = bar.c1\)
367343 -> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1, loop number=1\)
368344 -> Hash \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
369345 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
370- -> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=9 , loop number=1\)"""
346+ -> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+ , loop number=1\)"""
371347
372348set_guc (acon ,'pg_query_state.enable_timing' ,'on' )
373349
374- qs = query_state (config ,acon ,query ,num_steps , {'timing' :True })
350+ qs = query_state (config ,acon ,query , {'timing' :True })
375351assert len (qs )== 1 and re .match (expected ,qs [0 ][3 ])
376352assert len (notices )== 0
377353
@@ -402,20 +378,19 @@ def test_formats(config):
402378
403379acon ,= n_async_connect (config )
404380query = 'select count(*) from foo join bar on foo.c1=bar.c1'
405- num_steps = 10
406381expected = r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
407382 -> Hash Join \(Current loop: actual rows=0, loop number=1\)
408383 Hash Cond: \(foo.c1 = bar.c1\)
409384 -> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
410385 -> Hash \(Current loop: actual rows=0, loop number=1\)
411386 Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
412- -> Seq Scan on bar \(Current loop: actual rows=9 , loop number=1\)"""
387+ -> Seq Scan on bar \(Current loop: actual rows=\d+ , loop number=1\)"""
413388
414- qs = query_state (config ,acon ,query ,num_steps , {'format' :'text' })
389+ qs = query_state (config ,acon ,query , {'format' :'text' })
415390assert len (qs )== 1 and re .match (expected ,qs [0 ][3 ])
416391assert len (notices )== 0
417392
418- qs = query_state (config ,acon ,query ,num_steps , {'format' :'json' })
393+ qs = query_state (config ,acon ,query , {'format' :'json' })
419394try :
420395js_obj = json .loads (qs [0 ][3 ])
421396except ValueError :
@@ -424,7 +399,7 @@ def test_formats(config):
424399assert len (notices )== 0
425400check_plan (js_obj ['Plan' ])
426401
427- qs = query_state (config ,acon ,query ,num_steps , {'format' :'xml' })
402+ qs = query_state (config ,acon ,query , {'format' :'xml' })
428403assert len (qs )== 1
429404assert len (notices )== 0
430405try :
@@ -433,7 +408,7 @@ def test_formats(config):
433408assert False ,'Invalid xml format'
434409check_xml (xml_root )
435410
436- qs = query_state (config ,acon ,query ,num_steps , {'format' :'yaml' })
411+ qs = query_state (config ,acon ,query , {'format' :'yaml' })
437412try :
438413yaml_doc = yaml .load (qs [0 ][3 ])
439414except :
@@ -449,19 +424,18 @@ def test_timing_buffers_conflicts(config):
449424
450425acon ,= n_async_connect (config )
451426query = 'select count(*) from foo join bar on foo.c1=bar.c1'
452- num_steps = 10
453427timing_pattern = '(?:running time=\d+.\d+)|(?:actual time=\d+.\d+..\d+.\d+)'
454428buffers_pattern = 'Buffers:'
455429
456- qs = query_state (config ,acon ,query ,num_steps , {'timing' :True ,'buffers' :False })
430+ qs = query_state (config ,acon ,query , {'timing' :True ,'buffers' :False })
457431assert len (qs )== 1 and not re .search (timing_pattern ,qs [0 ][3 ])
458432assert notices == ['WARNING: timing statistics disabled\n ' ]
459433
460- qs = query_state (config ,acon ,query ,num_steps , {'timing' :False ,'buffers' :True })
434+ qs = query_state (config ,acon ,query , {'timing' :False ,'buffers' :True })
461435assert len (qs )== 1 and not re .search (buffers_pattern ,qs [0 ][3 ])
462436assert notices == ['WARNING: buffers statistics disabled\n ' ]
463437
464- qs = query_state (config ,acon ,query ,num_steps , {'timing' :True ,'buffers' :True })
438+ qs = query_state (config ,acon ,query , {'timing' :True ,'buffers' :True })
465439assert len (qs )== 1 and not re .search (timing_pattern ,qs [0 ][3 ]) \
466440and not re .search (buffers_pattern ,qs [0 ][3 ])
467441assert len (notices )== 2 and 'WARNING: timing statistics disabled\n ' in notices \