Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfbf7706

Browse files
authored
Merge pull request#31 from postgrespro/PGPRO-5075
Python tests fixed
2 parents37da7d0 +beb8587 commitfbf7706

File tree

3 files changed

+154
-50
lines changed

3 files changed

+154
-50
lines changed

‎tests/common.py‎

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,21 @@ def n_close(conns):
4343
forconninconns:
4444
conn.close()
4545

46+
defpg_query_state_locks(config,pid,conn,verbose=False,costs=False,timing=False, \
47+
buffers=False,triggers=False,format='text'):
48+
"""
49+
Get query state from backend with specified pid and optional parameters.
50+
Save any warning, info, notice and log data in global variable 'notices'
51+
"""
52+
53+
curs=conn.cursor()
54+
curs.callproc('pg_query_state', (pid,verbose,costs,timing,buffers,triggers,format))
55+
wait(conn)
56+
result=curs.fetchall()
57+
notices=conn.notices[:]
58+
59+
returnresult,notices
60+
4661
defpg_query_state(config,pid,verbose=False,costs=False,timing=False, \
4762
buffers=False,triggers=False,format='text'):
4863
"""
@@ -52,14 +67,63 @@ def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \
5267

5368
conn=psycopg2.connect(**config)
5469
curs=conn.cursor()
55-
5670
curs.callproc('pg_query_state', (pid,verbose,costs,timing,buffers,triggers,format))
5771
result=curs.fetchall()
5872
notices=conn.notices[:]
5973
conn.close()
6074

6175
returnresult,notices
6276

77+
defonetime_query_state_locks(config,acon_query,acon_pg,query,args={},num_workers=0):
78+
"""
79+
Get intermediate state of 'query' on connection 'acon_query' after number of 'steps'
80+
of node executions from start of query
81+
"""
82+
83+
curs_query=acon_query.cursor()
84+
curs_pg=acon_pg.cursor()
85+
curs_query.execute("select pg_advisory_lock(1);")
86+
curs_pg.execute("select pg_advisory_lock(2);")
87+
wait(acon_query)
88+
wait(acon_pg)
89+
curs_pg.execute("select pg_advisory_lock(1);")
90+
set_guc(acon_query,'enable_mergejoin','off')
91+
set_guc(acon_query,'max_parallel_workers_per_gather',num_workers)
92+
curs_query.execute(query)
93+
# extract current state of query progress
94+
MAX_PG_QS_RETRIES=10
95+
DELAY_BETWEEN_RETRIES=0.1
96+
pg_qs_args= {
97+
'config':config,
98+
'pid':acon_query.get_backend_pid(),
99+
'conn':acon_pg
100+
}
101+
fork,vinargs.items():
102+
pg_qs_args[k]=v
103+
n_retries=0
104+
105+
wait(acon_pg)
106+
107+
whileTrue:
108+
result,notices=pg_query_state_locks(**pg_qs_args)
109+
n_retries+=1
110+
iflen(result)>0:
111+
break
112+
ifn_retries>=MAX_PG_QS_RETRIES:
113+
# pg_query_state callings don't return any result, more likely run
114+
# query has completed
115+
break
116+
time.sleep(DELAY_BETWEEN_RETRIES)
117+
118+
curs_pg.execute("select pg_advisory_unlock(2);")
119+
wait(acon_pg)
120+
wait(acon_query)
121+
122+
set_guc(acon_query,'enable_mergejoin','on')
123+
curs_query.execute("select pg_advisory_unlock(2);")
124+
curs_pg.execute("select pg_advisory_unlock(1);")
125+
returnresult,notices
126+
63127
defonetime_query_state(config,async_conn,query,args={},num_workers=0):
64128
"""
65129
Get intermediate state of 'query' on connection 'async_conn' after number of 'steps'

‎tests/pg_qs_test_runner.py‎

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'''
22
pg_qs_test_runner.py
3-
Copyright (c) 2016-2020, Postgres Professional
3+
Copyright (c) 2016-2021, Postgres Professional
44
'''
55

66
importargparse
@@ -22,6 +22,20 @@ def __call__(self, parser, args, values, option_string=None):
2222
classSetupException(Exception):pass
2323
classTeardownException(Exception):pass
2424

25+
unlock_if_eq_1="""
26+
CREATE OR REPLACE FUNCTION unlock_if_eq_1(x integer) RETURNS integer AS $$
27+
BEGIN
28+
IF x = 1 THEN
29+
perform pg_advisory_unlock(1);
30+
perform pg_advisory_lock(2);
31+
return 1;
32+
ELSE
33+
return x;
34+
END IF;
35+
END;
36+
$$ LANGUAGE plpgsql
37+
"""
38+
2539
setup_cmd= [
2640
'drop extension if exists pg_query_state cascade',
2741
'drop table if exists foo cascade',
@@ -33,6 +47,7 @@ class TeardownException(Exception): pass
3347
'insert into bar select i, i%2=1 from generate_series(1, 500000) as i',
3448
'analyze foo',
3549
'analyze bar',
50+
unlock_if_eq_1,
3651
]
3752

3853
teardown_cmd= [

‎tests/test_cases.py‎

Lines changed: 73 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'''
22
test_cases.py
3-
Copyright (c) 2016-2020, Postgres Professional
3+
Copyright (c) 2016-2021, Postgres Professional
44
'''
55

66
importjson
@@ -42,21 +42,28 @@ def test_deadlock(config):
4242
deftest_simple_query(config):
4343
"""test statistics of simple query"""
4444

45-
acon,=common.n_async_connect(config)
46-
query='select count(*) from foo join bar on foo.c1=bar.c1'
45+
acon1,acon2=common.n_async_connect(config,2)
46+
query='select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1'
4747
expected=r"""Aggregate \(Current loop: actual rows=\d+, loop number=1\)
48-
-> Hash Join \(Current loop: actual rows=\d+, loop number=1\)
48+
-> Hash Join \(Current loop: actual rows=62473, loop number=1\)
4949
Hash Cond: \(foo.c1 = bar.c1\)
50+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
5051
-> Seq Scan on foo \(Current loop: actual rows=\d+, loop number=1\)
5152
-> Hash \(Current loop: actual rows=\d+, loop number=1\)
5253
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
5354
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
5455

55-
qs,_=common.onetime_query_state(config,acon,query)
56-
assertqs[0][0]==acon.get_backend_pid()andqs[0][1]==0 \
57-
andqs[0][2]==queryandre.match(expected,qs[0][3])andqs[0][4]==None
56+
qs,_=common.onetime_query_state_locks(config,acon1,acon2,query)
5857

59-
common.n_close((acon,))
58+
assertqs[0][0]==acon1.get_backend_pid()
59+
assertqs[0][1]==0
60+
assertqs[0][2]==query
61+
assertre.match(expected,qs[0][3])
62+
assertqs[0][4]==None
63+
# assert qs[0][0] == acon.get_backend_pid() and qs[0][1] == 0 \
64+
# and qs[0][2] == query and re.match(expected, qs[0][3]) and qs[0][4] == None
65+
66+
common.n_close((acon1,acon2))
6067

6168
deftest_concurrent_access(config):
6269
"""test when two backends compete with each other to extract state from third running backend"""
@@ -87,50 +94,56 @@ def test_concurrent_access(config):
8794
deftest_nested_call(config):
8895
"""test statistics under calling function"""
8996

90-
acon,=common.n_async_connect(config)
97+
acon1,acon2=common.n_async_connect(config,2)
9198
util_conn=psycopg2.connect(**config)
9299
util_curs=util_conn.cursor()
93100
create_function="""
94101
create or replace function n_join_foo_bar() returns integer as $$
95102
begin
96-
return (select count(*) from foo join bar on foo.c1=bar.c1);
103+
return (select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1);
97104
end;
98105
$$ language plpgsql"""
99106
drop_function='drop function n_join_foo_bar()'
100107
call_function='select * from n_join_foo_bar()'
101-
nested_query='SELECT (select count(*) from foo join bar on foo.c1=bar.c1)'
108+
nested_query1='(select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1)'
109+
nested_query2='SELECT (select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1)'
102110
expected='Function Scan on n_join_foo_bar (Current loop: actual rows=0, loop number=1)'
103111
expected_nested=r"""Result \(Current loop: actual rows=0, loop number=1\)
104112
InitPlan 1 \(returns \$0\)
105113
-> Aggregate \(Current loop: actual rows=0, loop number=1\)
106-
-> Hash Join \(Current loop: actual rows=0, loop number=1\)
114+
-> Hash Join \(Current loop: actual rows=62473, loop number=1\)
107115
Hash Cond: \(foo.c1 = bar.c1\)
108-
-> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
109-
-> Hash \(Current loop: actual rows=0, loop number=1\)
116+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
117+
-> Seq Scan on foo \(Current loop: actual rows=1000000, loop number=1\)
118+
-> Hash \(Current loop: actual rows=500000, loop number=1\)
110119
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
111120
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
112121

122+
113123
util_curs.execute(create_function)
114124
util_conn.commit()
115125

116-
qs,notices=common.onetime_query_state(config,acon,call_function)
126+
qs,notices=common.onetime_query_state_locks(config,acon1,acon2,call_function)
117127

118128
# Print some debug output before assertion
119129
iflen(qs)<2:
120130
print(qs)
121131

122-
assertlen(qs)==2 \
123-
andqs[0][0]==qs[1][0]==acon.get_backend_pid() \
124-
andqs[0][1]==0andqs[1][1]==1 \
125-
andqs[0][2]==call_functionandqs[0][3]==expected \
126-
andqs[1][2]==nested_queryandre.match(expected_nested,qs[1][3]) \
127-
andqs[0][4]==qs[1][4]==None
132+
assertlen(qs)==3
133+
assertqs[0][0]==qs[1][0]==acon1.get_backend_pid()
134+
assertqs[0][1]==0
135+
assertqs[1][1]==1
136+
assertqs[0][2]==call_function
137+
assertqs[0][3]==expected
138+
assertqs[1][2]==nested_query1orqs[1][2]==nested_query2
139+
assertre.match(expected_nested,qs[1][3])
140+
assertqs[0][4]==qs[1][4]==None
128141
assertlen(notices)==0
129142

130143
util_curs.execute(drop_function)
131144

132145
util_conn.close()
133-
common.n_close((acon,))
146+
common.n_close((acon1,acon2))
134147

135148
deftest_insert_on_conflict(config):
136149
"""test statistics on conflicting tuples under INSERT ON CONFLICT query"""
@@ -212,65 +225,77 @@ def test_trigger(config):
212225
deftest_costs(config):
213226
"""test plan costs"""
214227

215-
acon,=common.n_async_connect(config)
216-
query='select count(*) from foo join bar on foo.c1=bar.c1'
228+
acon1,acon2=common.n_async_connect(config,2)
229+
query='select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1;'
230+
217231
expected=r"""Aggregate \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=8\) \(Current loop: actual rows=0, loop number=1\)
218-
-> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=0, loop number=1\)
232+
-> Hash Join \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=0\) \(Current loop: actual rows=\d+, loop number=1\)
219233
Hash Cond: \(foo.c1 = bar.c1\)
220-
-> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1, loop number=1\)
221-
-> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=0, loop number=1\)
234+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
235+
-> Seq Scan on foo \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=1000000, loop number=1\)
236+
-> Hash \(cost=\d+.\d+..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=500000, loop number=1\)
222237
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
223238
-> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=\d+, loop number=1\)"""
224239

225-
qs,notices=common.onetime_query_state(config,acon,query, {'costs':True})
226-
assertlen(qs)==1andre.match(expected,qs[0][3])
240+
qs,notices=common.onetime_query_state_locks(config,acon1,acon2,query, {'costs':True})
241+
242+
assertlen(qs)==2andre.match(expected,qs[0][3])
227243
assertlen(notices)==0
228244

229-
common.n_close((acon,))
245+
common.n_close((acon1,acon2))
230246

231247
deftest_buffers(config):
232248
"""test buffer statistics"""
233249

234-
acon,=common.n_async_connect(config)
235-
query='select count(*) from foo join bar on foo.c1=bar.c1'
250+
acon1,acon2=common.n_async_connect(config,2)
251+
query='select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1'
236252
expected=r"""Aggregate \(Current loop: actual rows=0, loop number=1\)
237-
-> Hash Join \(Current loop: actual rows=0, loop number=1\)
253+
-> Hash Join \(Current loop: actual rows=\d+, loop number=1\)
238254
Hash Cond: \(foo.c1 = bar.c1\)
239-
-> Seq Scan on foo \(Current loop: actual rows=1, loop number=1\)
255+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
256+
Buffers: shared hit=\d+, temp read=\d+ written=\d+
257+
-> Seq Scan on foo \(Current loop: actual rows=1000000, loop number=1\)
240258
Buffers: [^\n]*
241-
-> Hash \(Current loop: actual rows=0, loop number=1\)
259+
-> Hash \(Current loop: actual rows=500000, loop number=1\)
242260
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
261+
Buffers: shared hit=\d+, temp written=\d+
243262
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)
244263
Buffers: .*"""
245264

246-
common.set_guc(acon,'pg_query_state.enable_buffers','on')
265+
common.set_guc(acon1,'pg_query_state.enable_buffers','on')
247266

248-
qs,notices=common.onetime_query_state(config,acon,query, {'buffers':True})
249-
assertlen(qs)==1andre.match(expected,qs[0][3])
267+
qs,notices=common.onetime_query_state_locks(config,acon1,acon2,query, {'buffers':True})
268+
269+
assertlen(qs)==2
270+
assertre.match(expected,qs[0][3])
250271
assertlen(notices)==0
251272

252-
common.n_close((acon,))
273+
common.n_close((acon1,acon2))
253274

254275
deftest_timing(config):
255276
"""test timing statistics"""
256277

257-
acon,=common.n_async_connect(config)
258-
query='select count(*) from foo join bar on foo.c1=bar.c1'
278+
acon1,acon2=common.n_async_connect(config,2)
279+
query='select count(*) from foo join bar on foo.c1=bar.c1 and unlock_if_eq_1(foo.c1)=bar.c1'
280+
259281
expected=r"""Aggregate \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
260-
-> Hash Join \(Current loop:running time=\d+.\d+ actualrows=0, loop number=1\)
282+
-> Hash Join \(Current loop:actual time=\d+.\d+..\d+.\d+rows=\d+, loop number=1\)
261283
Hash Cond: \(foo.c1 = bar.c1\)
262-
-> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1, loop number=1\)
263-
-> Hash \(Current loop: running time=\d+.\d+ actual rows=0, loop number=1\)
284+
Join Filter: \(unlock_if_eq_1\(foo.c1\) = bar.c1\)
285+
-> Seq Scan on foo \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=1000000, loop number=1\)
286+
-> Hash \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=500000, loop number=1\)
264287
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
265288
-> Seq Scan on bar \(Current loop: actual time=\d+.\d+..\d+.\d+ rows=\d+, loop number=1\)"""
266289

267-
common.set_guc(acon,'pg_query_state.enable_timing','on')
290+
common.set_guc(acon1,'pg_query_state.enable_timing','on')
268291

269-
qs,notices=common.onetime_query_state(config,acon,query, {'timing':True})
270-
assertlen(qs)==1andre.match(expected,qs[0][3])
292+
qs,notices=common.onetime_query_state_locks(config,acon1,acon2,query, {'timing':True})
293+
294+
assertlen(qs)==2
295+
assertre.match(expected,qs[0][3])
271296
assertlen(notices)==0
272297

273-
common.n_close((acon,))
298+
common.n_close((acon1,acon2))
274299

275300
defcheck_plan(plan):
276301
assert'Current loop'inplan

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp