1010from testgres import get_new_node ,stop_all
1111import time
1212import os
13+ import threading
1314
1415
1516def if_fdw_enabled (func ):
@@ -26,15 +27,13 @@ class PartitioningTests(unittest.TestCase):
2627
2728def setUp (self ):
2829self .setup_cmd = [
29- # 'create extension pg_pathman',
3030'create table abc(id serial, t text)' ,
3131'insert into abc select generate_series(1, 300000)' ,
3232'select create_hash_partitions(\' abc\' ,\' id\' , 3, partition_data := false)' ,
3333 ]
3434
3535def tearDown (self ):
3636stop_all ()
37- # clean_all()
3837
3938def start_new_pathman_cluster (self ,name = 'test' ,allows_streaming = False ):
4039node = get_new_node (name )
@@ -571,7 +570,7 @@ def ordered(obj):
571570 ]
572571 }
573572 }
574- ]
573+ ]
575574 """ )
576575self .assertEqual (ordered (plan ),ordered (expected ))
577576
@@ -596,7 +595,6 @@ def ordered(obj):
596595 ]
597596 """ )
598597self .assertEqual (ordered (plan ),ordered (expected ))
599- # import ipdb; ipdb.set_trace()
600598
601599# Remove all objects for testing
602600node .psql ('postgres' ,'drop table range_partitioned cascade' )
@@ -607,6 +605,63 @@ def ordered(obj):
607605node .stop ()
608606node .cleanup ()
609607
608+ def test_conc_part_creation_insert (self ):
609+ """Test concurrent partition creation on INSERT"""
610+
611+ # Create and start new instance
612+ node = self .start_new_pathman_cluster (allows_streaming = False )
613+
614+ # Create table 'ins_test' and partition it
615+ with node .connect ()as con0 :
616+ con0 .begin ()
617+ con0 .execute ('create table ins_test(val int not null)' )
618+ con0 .execute ('insert into ins_test select generate_series(1, 50)' )
619+ con0 .execute ("select create_range_partitions('ins_test', 'val', 1, 10)" )
620+ con0 .commit ()
621+
622+ # Create two separate connections for this test
623+ with node .connect ()as con1 ,node .connect ()as con2 :
624+
625+ # Thread for connection #2 (it has to wait)
626+ def con2_thread ():
627+ con2 .execute ('insert into ins_test values(51)' )
628+
629+ # Step 1: lock partitioned table in con1
630+ con1 .begin ()
631+ con1 .execute ('lock table ins_test in share update exclusive mode' )
632+
633+ # Step 2: try inserting new value in con2 (waiting)
634+ t = threading .Thread (target = con2_thread )
635+ t .start ()
636+
637+ # Step 3: try inserting new value in con1 (success, unlock)
638+ con1 .execute ('insert into ins_test values(52)' )
639+ con1 .commit ()
640+
641+ # Step 4: wait for con2
642+ t .join ()
643+
644+ rows = con1 .execute ("""
645+ select * from pathman_partition_list
646+ where parent = 'ins_test'::regclass
647+ order by range_min, range_max
648+ """ )
649+
650+ # check number of partitions
651+ self .assertEqual (len (rows ),6 )
652+
653+ # check range_max of partitions
654+ self .assertEqual (int (rows [0 ][5 ]),11 )
655+ self .assertEqual (int (rows [1 ][5 ]),21 )
656+ self .assertEqual (int (rows [2 ][5 ]),31 )
657+ self .assertEqual (int (rows [3 ][5 ]),41 )
658+ self .assertEqual (int (rows [4 ][5 ]),51 )
659+ self .assertEqual (int (rows [5 ][5 ]),61 )
660+
661+ # Stop instance and finish work
662+ node .stop ()
663+ node .cleanup ()
664+
610665
611666if __name__ == "__main__" :
612667unittest .main ()