|
7 | 7 | """
|
8 | 8 |
|
9 | 9 | importunittest
|
| 10 | +importmath |
10 | 11 | fromtestgresimportget_new_node,stop_all
|
11 | 12 | importtime
|
12 | 13 | importos
|
| 14 | +importre |
| 15 | +importsubprocess |
13 | 16 | importthreading
|
14 | 17 |
|
15 | 18 |
|
@@ -708,6 +711,56 @@ def con2_thread():
|
708 | 711 | node.stop()
|
709 | 712 | node.cleanup()
|
710 | 713 |
|
| 714 | +deftest_concurrent_detach(self): |
| 715 | +"""Test concurrent detach partition with contiguous tuple inserting and spawning new partitions""" |
| 716 | + |
| 717 | +# Init parameters |
| 718 | +num_insert_workers=8 |
| 719 | +detach_timeout=0.1# time in sec between successive inserts and detachs |
| 720 | +num_detachs=100# estimated number of detachs |
| 721 | +inserts_advance=1# abvance in sec of inserts process under detachs |
| 722 | +test_interval=int(math.ceil(detach_timeout*num_detachs)) |
| 723 | + |
| 724 | +# Create and start new instance |
| 725 | +node=self.start_new_pathman_cluster(allows_streaming=False) |
| 726 | + |
| 727 | +# Create partitioned table for testing that spawns new partition on each next *detach_timeout* sec |
| 728 | +withnode.connect()ascon0: |
| 729 | +con0.begin() |
| 730 | +con0.execute('create table ts_range_partitioned(ts timestamp not null)') |
| 731 | +con0.execute("select create_range_partitions('ts_range_partitioned', 'ts', current_timestamp, interval '%f', 1)"%detach_timeout) |
| 732 | +con0.commit() |
| 733 | + |
| 734 | +# Run in background inserts and detachs processes |
| 735 | +FNULL=open(os.devnull,'w') |
| 736 | +inserts=node.pgbench(stdout=FNULL,stderr=subprocess.PIPE,options=[ |
| 737 | +"-j","%i"%num_insert_workers, |
| 738 | +"-c","%i"%num_insert_workers, |
| 739 | +"-f","pgbench_scripts/insert_current_timestamp.pgbench", |
| 740 | +"-T","%i"% (test_interval+inserts_advance) |
| 741 | +]) |
| 742 | +time.sleep(inserts_advance) |
| 743 | +detachs=node.pgbench(stdout=FNULL,stderr=subprocess.PIPE,options=[ |
| 744 | +"-D","timeout=%f"%detach_timeout, |
| 745 | +"-f","pgbench_scripts/detachs_in_timeout.pgbench", |
| 746 | +"-T","%i"%test_interval |
| 747 | +]) |
| 748 | + |
| 749 | +# Wait for completion of processes |
| 750 | +inserts.wait() |
| 751 | +detachs.wait() |
| 752 | + |
| 753 | +# Obtain error log from inserts process |
| 754 | +inserts_errors=inserts.stderr.read() |
| 755 | + |
| 756 | +self.assertIsNone( |
| 757 | +re.search("ERROR: constraint",inserts_errors), |
| 758 | +msg="Race condition between detach and concurrent inserts with append partition is expired") |
| 759 | + |
| 760 | +# Stop instance and finish work |
| 761 | +node.stop() |
| 762 | +node.cleanup() |
| 763 | + |
711 | 764 |
|
712 | 765 | if__name__=="__main__":
|
713 | 766 | unittest.main()
|