@@ -81,115 +81,102 @@ def catchup_replica(self, master, replica):
8181% replica .name
8282master .poll_query_until ('postgres' ,wait_lsn_query )
8383
84- def printlog (self ,logfile ):
85- with open (logfile ,'r' )as log :
86- for line in log .readlines ():
87- print (line )
88-
8984def test_concurrent (self ):
9085"""Tests concurrent partitioning"""
91- try :
92- node = self .start_new_pathman_cluster ()
93- self .init_test_data (node )
9486
95- node .psql (
87+ node = self .start_new_pathman_cluster ()
88+ self .init_test_data (node )
89+
90+ node .psql (
91+ 'postgres' ,
92+ 'select partition_table_concurrently(\' abc\' )' )
93+
94+ while True :
95+ # update some rows to check for deadlocks
96+ node .safe_psql (
9697'postgres' ,
97- 'select partition_table_concurrently(\' abc\' )' )
98+ '''
99+ update abc set t = 'test'
100+ where id in (select (random() * 300000)::int
101+ from generate_series(1, 3000))
102+ ''' )
98103
99- while True :
100- # update some rows to check for deadlocks
101- node .safe_psql (
102- 'postgres' ,
103- '''
104- update abc set t = 'test'
105- where id in (select (random() * 300000)::int
106- from generate_series(1, 3000))
107- ''' )
108-
109- count = node .execute (
110- 'postgres' ,
111- 'select count(*) from pathman_concurrent_part_tasks' )
112-
113- # if there is no active workers then it means work is done
114- if count [0 ][0 ]== 0 :
115- break
116- time .sleep (1 )
117-
118- data = node .execute ('postgres' ,'select count(*) from only abc' )
119- self .assertEqual (data [0 ][0 ],0 )
120- data = node .execute ('postgres' ,'select count(*) from abc' )
121- self .assertEqual (data [0 ][0 ],300000 )
122-
123- node .stop ()
124- except Exception as e :
125- self .printlog (node .logs_dir + '/postgresql.log' )
126- raise e
104+ count = node .execute (
105+ 'postgres' ,
106+ 'select count(*) from pathman_concurrent_part_tasks' )
107+
108+ # if there is no active workers then it means work is done
109+ if count [0 ][0 ]== 0 :
110+ break
111+ time .sleep (1 )
112+
113+ data = node .execute ('postgres' ,'select count(*) from only abc' )
114+ self .assertEqual (data [0 ][0 ],0 )
115+ data = node .execute ('postgres' ,'select count(*) from abc' )
116+ self .assertEqual (data [0 ][0 ],300000 )
117+
118+ node .stop ()
127119
128120def test_replication (self ):
129121"""Tests how pg_pathman works with replication"""
130122node = get_new_node ('master' )
131123replica = get_new_node ('repl' )
132124
133- try :
134125# initialize master server
135- node = self .start_new_pathman_cluster (allows_streaming = True )
136- node .backup ('my_backup' )
137-
138- # initialize replica from backup
139- replica .init_from_backup (node ,'my_backup' ,has_streaming = True )
140- replica .start ()
141-
142- # initialize pg_pathman extension and some test data
143- self .init_test_data (node )
144-
145- # wait until replica catches up
146- self .catchup_replica (node ,replica )
147-
148- # check that results are equal
149- self .assertEqual (
150- node .psql ('postgres' ,'explain (costs off) select * from abc' ),
151- replica .psql ('postgres' ,'explain (costs off) select * from abc' )
152- )
153-
154- # enable parent and see if it is enabled in replica
155- node .psql ('postgres' ,'select enable_parent(\' abc\' ' )
156-
157- self .catchup_replica (node ,replica )
158- self .assertEqual (
159- node .psql ('postgres' ,'explain (costs off) select * from abc' ),
160- replica .psql ('postgres' ,'explain (costs off) select * from abc' )
161- )
162- self .assertEqual (
163- node .psql ('postgres' ,'select * from abc' ),
164- replica .psql ('postgres' ,'select * from abc' )
165- )
166- self .assertEqual (
167- node .execute ('postgres' ,'select count(*) from abc' )[0 ][0 ],
168- 300000
169- )
170-
171- # check that direct UPDATE in pathman_config_params invalidates
172- # cache
173- node .psql (
174- 'postgres' ,
175- 'update pathman_config_params set enable_parent = false' )
176- self .catchup_replica (node ,replica )
177- self .assertEqual (
178- node .psql ('postgres' ,'explain (costs off) select * from abc' ),
179- replica .psql ('postgres' ,'explain (costs off) select * from abc' )
180- )
181- self .assertEqual (
182- node .psql ('postgres' ,'select * from abc' ),
183- replica .psql ('postgres' ,'select * from abc' )
184- )
185- self .assertEqual (
186- node .execute ('postgres' ,'select count(*) from abc' )[0 ][0 ],
187- 0
188- )
189- except Exception as e :
190- self .printlog (node .logs_dir + '/postgresql.log' )
191- self .printlog (replica .logs_dir + '/postgresql.log' )
192- raise e
126+ node = self .start_new_pathman_cluster (allows_streaming = True )
127+ node .backup ('my_backup' )
128+
129+ # initialize replica from backup
130+ replica .init_from_backup (node ,'my_backup' ,has_streaming = True )
131+ replica .start ()
132+
133+ # initialize pg_pathman extension and some test data
134+ self .init_test_data (node )
135+
136+ # wait until replica catches up
137+ self .catchup_replica (node ,replica )
138+
139+ # check that results are equal
140+ self .assertEqual (
141+ node .psql ('postgres' ,'explain (costs off) select * from abc' ),
142+ replica .psql ('postgres' ,'explain (costs off) select * from abc' )
143+ )
144+
145+ # enable parent and see if it is enabled in replica
146+ node .psql ('postgres' ,'select enable_parent(\' abc\' ' )
147+
148+ self .catchup_replica (node ,replica )
149+ self .assertEqual (
150+ node .psql ('postgres' ,'explain (costs off) select * from abc' ),
151+ replica .psql ('postgres' ,'explain (costs off) select * from abc' )
152+ )
153+ self .assertEqual (
154+ node .psql ('postgres' ,'select * from abc' ),
155+ replica .psql ('postgres' ,'select * from abc' )
156+ )
157+ self .assertEqual (
158+ node .execute ('postgres' ,'select count(*) from abc' )[0 ][0 ],
159+ 300000
160+ )
161+
162+ # check that direct UPDATE in pathman_config_params invalidates
163+ # cache
164+ node .psql (
165+ 'postgres' ,
166+ 'update pathman_config_params set enable_parent = false' )
167+ self .catchup_replica (node ,replica )
168+ self .assertEqual (
169+ node .psql ('postgres' ,'explain (costs off) select * from abc' ),
170+ replica .psql ('postgres' ,'explain (costs off) select * from abc' )
171+ )
172+ self .assertEqual (
173+ node .psql ('postgres' ,'select * from abc' ),
174+ replica .psql ('postgres' ,'select * from abc' )
175+ )
176+ self .assertEqual (
177+ node .execute ('postgres' ,'select count(*) from abc' )[0 ][0 ],
178+ 0
179+ )
193180
194181def test_locks (self ):
195182"""Test that a session trying to create new partitions waits for other
@@ -225,71 +212,67 @@ def add_partition(node, flag, query):
225212# Initialize master server
226213node = get_new_node ('master' )
227214
228- try :
229- node .init ()
230- node .append_conf (
231- 'postgresql.conf' ,
232- 'shared_preload_libraries=\' pg_pathman\' \n ' )
233- node .start ()
234- node .safe_psql (
235- 'postgres' ,
236- 'create extension pg_pathman; ' +
237- 'create table abc(id serial, t text); ' +
238- 'insert into abc select generate_series(1, 100000); ' +
239- 'select create_range_partitions(\' abc\' ,\' id\' , 1, 50000);'
240- )
241-
242- # Start transaction that will create partition
243- con = node .connect ()
244- con .begin ()
245- con .execute ('select append_range_partition(\' abc\' )' )
246-
247- # Start threads that suppose to add new partitions and wait some
248- # time
249- query = [
250- 'select prepend_range_partition(\' abc\' )' ,
251- 'select append_range_partition(\' abc\' )' ,
252- 'select add_range_partition(\' abc\' , 500000, 550000)' ,
253- ]
254- threads = []
215+ node .init ()
216+ node .append_conf (
217+ 'postgresql.conf' ,
218+ 'shared_preload_libraries=\' pg_pathman\' \n ' )
219+ node .start ()
220+ node .safe_psql (
221+ 'postgres' ,
222+ 'create extension pg_pathman; ' +
223+ 'create table abc(id serial, t text); ' +
224+ 'insert into abc select generate_series(1, 100000); ' +
225+ 'select create_range_partitions(\' abc\' ,\' id\' , 1, 50000);'
226+ )
227+
228+ # Start transaction that will create partition
229+ con = node .connect ()
230+ con .begin ()
231+ con .execute ('select append_range_partition(\' abc\' )' )
232+
233+ # Start threads that suppose to add new partitions and wait some
234+ # time
235+ query = [
236+ 'select prepend_range_partition(\' abc\' )' ,
237+ 'select append_range_partition(\' abc\' )' ,
238+ 'select add_range_partition(\' abc\' , 500000, 550000)' ,
239+ ]
240+ threads = []
241+ for i in range (3 ):
242+ thread = threading .Thread (
243+ target = add_partition ,
244+ args = (node ,flags [i ],query [i ]))
245+ threads .append (thread )
246+ thread .start ()
247+ time .sleep (3 )
248+
249+ # This threads should wait until current transaction finished
250+ with lock :
255251for i in range (3 ):
256- thread = threading .Thread (
257- target = add_partition ,
258- args = (node ,flags [i ],query [i ]))
259- threads .append (thread )
260- thread .start ()
261- time .sleep (3 )
262-
263- # This threads should wait until current transaction finished
264- with lock :
265- for i in range (3 ):
266- self .assertEqual (flags [i ].get (),False )
252+ self .assertEqual (flags [i ].get (),False )
267253
268- # Commit transaction. Since then other sessions can create
269- # partitions
270- con .commit ()
254+ # Commit transaction. Since then other sessions can create
255+ # partitions
256+ con .commit ()
271257
272- # Now wait until each thread finishes
273- for thread in threads :
274- thread .join ()
258+ # Now wait until each thread finishes
259+ for thread in threads :
260+ thread .join ()
275261
276- # Check flags, it should be true which means that threads are
277- # finished
278- with lock :
279- for i in range (3 ):
280- self .assertEqual (flags [i ].get (),True )
281-
282- # Check that all partitions are created
283- self .assertEqual (
284- node .safe_psql (
285- 'postgres' ,
286- 'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
287- ),
288- b'6\n '
289- )
290- except Exception as e :
291- self .printlog (node .logs_dir + '/postgresql.log' )
292- raise e
262+ # Check flags, it should be true which means that threads are
263+ # finished
264+ with lock :
265+ for i in range (3 ):
266+ self .assertEqual (flags [i ].get (),True )
267+
268+ # Check that all partitions are created
269+ self .assertEqual (
270+ node .safe_psql (
271+ 'postgres' ,
272+ 'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
273+ ),
274+ b'6\n '
275+ )
293276
294277def test_tablespace (self ):
295278"""Check tablespace support"""