Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2ac1f8a

Browse files
committed
Make periodic pg_query_state calls to backend running TPC-DS bench
Also refactor common functions for python tests
1 parent772062a commit2ac1f8a

File tree

3 files changed

+66
-47
lines changed

3 files changed

+66
-47
lines changed

‎tests/common.py

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,7 @@
66
importpsycopg2
77
importpsycopg2.extensions
88
importselect
9-
10-
# Some queries from TPC-DS may freeze or be even broken,
11-
# so we allow some sort of failure, since we do not test
12-
# Postgres, but rather that pg_query_state do not crash
13-
# anything under stress load.
14-
MAX_PG_QS_RETRIES=50
15-
9+
importtime
1610

1711
defwait(conn):
1812
"""wait for some event on connection to postgres"""
@@ -47,8 +41,7 @@ def n_close(conns):
4741
conn.close()
4842

4943
defpg_query_state(config,pid,verbose=False,costs=False,timing=False, \
50-
buffers=False,triggers=False,format='text', \
51-
stress_in_progress=False):
44+
buffers=False,triggers=False,format='text'):
5245
"""
5346
Get query state from backend with specified pid and optional parameters.
5447
Save any warning, info, notice and log data in global variable 'notices'
@@ -57,53 +50,48 @@ def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \
5750
conn=psycopg2.connect(**config)
5851
curs=conn.cursor()
5952

60-
ifstress_in_progress:
61-
set_guc(conn,'statement_timeout',TPC_DS_STATEMENT_TIMEOUT)
62-
n_retries=0
63-
64-
result= []
65-
whilenotresult:
66-
curs.callproc('pg_query_state', (pid,verbose,costs,timing,buffers,triggers,format))
67-
result=curs.fetchall()
68-
69-
ifstress_in_progress:
70-
n_retries+=1
71-
ifn_retries>=MAX_PG_QS_RETRIES:
72-
print('\npg_query_state tried %s times with no effect, giving up'%MAX_PG_QS_RETRIES)
73-
break
74-
53+
curs.callproc('pg_query_state', (pid,verbose,costs,timing,buffers,triggers,format))
54+
result=curs.fetchall()
7555
notices=conn.notices[:]
7656
conn.close()
57+
7758
returnresult,notices
7859

79-
defquery_state(config,async_conn,query,args={},num_workers=0,stress_in_progress=False):
60+
defonetime_query_state(config,async_conn,query,args={},num_workers=0):
8061
"""
8162
Get intermediate state of 'query' on connection 'async_conn' after number of 'steps'
8263
of node executions from start of query
8364
"""
8465

8566
acurs=async_conn.cursor()
86-
conn=psycopg2.connect(**config)
87-
curs=conn.cursor()
8867

8968
set_guc(async_conn,'enable_mergejoin','off')
9069
set_guc(async_conn,'max_parallel_workers_per_gather',num_workers)
9170
acurs.execute(query)
9271

9372
# extract current state of query progress
73+
MAX_PG_QS_RETRIES=10
74+
DELAY_BETWEEN_RETRIES=0.1
9475
pg_qs_args= {
9576
'config':config,
9677
'pid':async_conn.get_backend_pid()
9778
}
9879
fork,vinargs.items():
9980
pg_qs_args[k]=v
100-
result,notices=pg_query_state(**pg_qs_args)
81+
n_retries=0
82+
whileTrue:
83+
result,notices=pg_query_state(**pg_qs_args)
84+
n_retries+=1
85+
iflen(result)>0:
86+
break
87+
ifn_retries>=MAX_PG_QS_RETRIES:
88+
# pg_query_state callings don't return any result, more likely run
89+
# query has completed
90+
break
91+
time.sleep(DELAY_BETWEEN_RETRIES)
10192
wait(async_conn)
10293

103-
set_guc(async_conn,'pg_query_state.executor_trace','off')
10494
set_guc(async_conn,'enable_mergejoin','on')
105-
106-
conn.close()
10795
returnresult,notices
10896

10997
defset_guc(async_conn,param,value):

‎tests/test_cases.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def test_simple_query(config):
5050
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
5151
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
5252

53-
qs,_=common.query_state(config,acon,query)
53+
qs,_=common.onetime_query_state(config,acon,query)
5454
assertqs[0][0]==acon.get_backend_pid()andqs[0][1]==0 \
5555
andqs[0][2]==queryandre.match(expected,qs[0][3])andqs[0][4]==None
5656

@@ -111,7 +111,7 @@ def test_nested_call(config):
111111
util_curs.execute(create_function)
112112
util_conn.commit()
113113

114-
qs,notices=common.query_state(config,acon,call_function)
114+
qs,notices=common.onetime_query_state(config,acon,call_function)
115115
assertlen(qs)==2 \
116116
andqs[0][0]==qs[1][0]==acon.get_backend_pid() \
117117
andqs[0][1]==0andqs[1][1]==1 \
@@ -143,7 +143,7 @@ def test_insert_on_conflict(config):
143143
util_curs.execute(add_field_uniqueness)
144144
util_conn.commit()
145145

146-
qs,notices=common.query_state(config,acon,query)
146+
qs,notices=common.onetime_query_state(config,acon,query)
147147

148148
assertqs[0][0]==acon.get_backend_pid()andqs[0][1]==0 \
149149
andqs[0][2]==queryandre.match(expected,qs[0][3]) \
@@ -185,13 +185,13 @@ def test_trigger(config):
185185
util_curs.execute(create_trigger)
186186
util_conn.commit()
187187

188-
qs,notices=common.query_state(config,acon,query, {'triggers':True})
188+
qs,notices=common.onetime_query_state(config,acon,query, {'triggers':True})
189189
assertqs[0][0]==acon.get_backend_pid()andqs[0][1]==0 \
190190
andqs[0][2]==queryandre.match(expected_upper,qs[0][3]) \
191191
andqs[0][4]==None
192192
assertlen(notices)==0
193193

194-
qs,notices=common.query_state(config,acon,query, {'triggers':False})
194+
qs,notices=common.onetime_query_state(config,acon,query, {'triggers':False})
195195
assertqs[0][0]==acon.get_backend_pid()andqs[0][1]==0 \
196196
andqs[0][2]==queryandre.match(expected_upper,qs[0][3]) \
197197
andqs[0][4]==None
@@ -215,7 +215,7 @@ def test_costs(config):
215215
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
216216
-> Seq Scan on bar \(cost=0.00..\d+.\d+ rows=\d+ width=4\) \(Current loop: actual rows=\d+, loop number=1\)"""
217217

218-
qs,notices=common.query_state(config,acon,query, {'costs':True})
218+
qs,notices=common.onetime_query_state(config,acon,query, {'costs':True})
219219
assertlen(qs)==1andre.match(expected,qs[0][3])
220220
assertlen(notices)==0
221221

@@ -238,7 +238,7 @@ def test_buffers(config):
238238

239239
common.set_guc(acon,'pg_query_state.enable_buffers','on')
240240

241-
qs,notices=common.query_state(config,acon,query, {'buffers':True})
241+
qs,notices=common.onetime_query_state(config,acon,query, {'buffers':True})
242242
assertlen(qs)==1andre.match(expected,qs[0][3])
243243
assertlen(notices)==0
244244

@@ -259,7 +259,7 @@ def test_timing(config):
259259

260260
common.set_guc(acon,'pg_query_state.enable_timing','on')
261261

262-
qs,notices=common.query_state(config,acon,query, {'timing':True})
262+
qs,notices=common.onetime_query_state(config,acon,query, {'timing':True})
263263
assertlen(qs)==1andre.match(expected,qs[0][3])
264264
assertlen(notices)==0
265265

@@ -298,11 +298,11 @@ def test_formats(config):
298298
Buckets: \d+ Batches: \d+ Memory Usage: \d+kB
299299
-> Seq Scan on bar \(Current loop: actual rows=\d+, loop number=1\)"""
300300

301-
qs,notices=common.query_state(config,acon,query, {'format':'text'})
301+
qs,notices=common.onetime_query_state(config,acon,query, {'format':'text'})
302302
assertlen(qs)==1andre.match(expected,qs[0][3])
303303
assertlen(notices)==0
304304

305-
qs,notices=common.query_state(config,acon,query, {'format':'json'})
305+
qs,notices=common.onetime_query_state(config,acon,query, {'format':'json'})
306306
try:
307307
js_obj=json.loads(qs[0][3])
308308
exceptValueError:
@@ -311,7 +311,7 @@ def test_formats(config):
311311
assertlen(notices)==0
312312
check_plan(js_obj['Plan'])
313313

314-
qs,notices=common.query_state(config,acon,query, {'format':'xml'})
314+
qs,notices=common.onetime_query_state(config,acon,query, {'format':'xml'})
315315
assertlen(qs)==1
316316
assertlen(notices)==0
317317
try:
@@ -320,7 +320,7 @@ def test_formats(config):
320320
assertFalse,'Invalid xml format'
321321
check_xml(xml_root)
322322

323-
qs,_=common.query_state(config,acon,query, {'format':'yaml'})
323+
qs,_=common.onetime_query_state(config,acon,query, {'format':'yaml'})
324324
try:
325325
yaml_doc=yaml.load(qs[0][3],Loader=yaml.FullLoader)
326326
except:
@@ -339,15 +339,15 @@ def test_timing_buffers_conflicts(config):
339339
timing_pattern='(?:running time=\d+.\d+)|(?:actual time=\d+.\d+..\d+.\d+)'
340340
buffers_pattern='Buffers:'
341341

342-
qs,notices=common.query_state(config,acon,query, {'timing':True,'buffers':False})
342+
qs,notices=common.onetime_query_state(config,acon,query, {'timing':True,'buffers':False})
343343
assertlen(qs)==1andnotre.search(timing_pattern,qs[0][3])
344344
assertnotices== ['WARNING: timing statistics disabled\n']
345345

346-
qs,notices=common.query_state(config,acon,query, {'timing':False,'buffers':True})
346+
qs,notices=common.onetime_query_state(config,acon,query, {'timing':False,'buffers':True})
347347
assertlen(qs)==1andnotre.search(buffers_pattern,qs[0][3])
348348
assertnotices== ['WARNING: buffers statistics disabled\n']
349349

350-
qs,notices=common.query_state(config,acon,query, {'timing':True,'buffers':True})
350+
qs,notices=common.onetime_query_state(config,acon,query, {'timing':True,'buffers':True})
351351
assertlen(qs)==1andnotre.search(timing_pattern,qs[0][3]) \
352352
andnotre.search(buffers_pattern,qs[0][3])
353353
assertlen(notices)==2and'WARNING: timing statistics disabled\n'innotices \

‎tests/tpcds.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
importprogressbar
99
importpsycopg2.extensions
1010
importsubprocess
11+
importtime
1112

1213
classDataLoadException(Exception):pass
1314
classStressTestException(Exception):pass
@@ -58,6 +59,7 @@ def run_tpcds(config):
5859
queries.append(f.read())
5960

6061
acon,=common.n_async_connect(config)
62+
pid=acon.get_backend_pid()
6163

6264
print('Starting TPC-DS queries...')
6365
timeout_list= []
@@ -69,7 +71,36 @@ def run_tpcds(config):
6971
try:
7072
# Set query timeout to TPC_DS_STATEMENT_TIMEOUT / 1000 seconds
7173
common.set_guc(acon,'statement_timeout',TPC_DS_STATEMENT_TIMEOUT)
72-
qs=common.query_state(config,acon,query,stress_in_progress=True)
74+
75+
# run query
76+
acurs=acon.cursor()
77+
acurs.execute(query)
78+
79+
# periodically run pg_query_state on running backend trying to get
80+
# crash of PostgreSQL
81+
MAX_PG_QS_RETRIES=10
82+
PG_QS_DELAY,BEFORE_GOT_QS_DELAY=0.1,0.1
83+
BEFORE_GOT_QS,GOT_QS=range(2)
84+
state,n_retries=BEFORE_GOT_QS,0
85+
whileTrue:
86+
result,_=common.pg_query_state(config,pid)
87+
ifstate==BEFORE_GOT_QS:
88+
iflen(result)>0:
89+
state=GOT_QS
90+
continue
91+
n_retries+=1
92+
ifn_retries>=MAX_PG_QS_RETRIES:
93+
# pg_query_state callings don't return any result, more likely run
94+
# query has completed
95+
break
96+
time.sleep(BEFORE_GOT_QS_DELAY)
97+
ifstate==GOT_QS:
98+
iflen(result)==0:
99+
break
100+
time.sleep(PG_QS_DELAY)
101+
102+
# wait for real query completion
103+
common.wait(acon)
73104

74105
exceptpsycopg2.extensions.QueryCanceledError:
75106
timeout_list.append(i+1)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp