|
7 | 7 | Copyright (c) 2015-2017, Postgres Professional
|
8 | 8 | """
|
9 | 9 |
|
| 10 | +importfunctools |
10 | 11 | importjson
|
11 | 12 | importmath
|
| 13 | +importmultiprocessing |
12 | 14 | importos
|
| 15 | +importrandom |
13 | 16 | importre
|
14 | 17 | importsubprocess
|
| 18 | +importsys |
15 | 19 | importthreading
|
16 | 20 | importtime
|
17 | 21 | importunittest
|
18 |
| -importfunctools |
19 | 22 |
|
20 | 23 | fromdistutils.versionimportLooseVersion
|
21 | 24 | fromtestgresimportget_new_node,get_pg_version
|
@@ -85,10 +88,17 @@ def set_trace(self, con, command="pg_debug"):
|
85 | 88 | p=subprocess.Popen([command],stdin=subprocess.PIPE)
|
86 | 89 | p.communicate(str(pid).encode())
|
87 | 90 |
|
88 |
| -defstart_new_pathman_cluster(self,allow_streaming=False,test_data=False): |
| 91 | +defstart_new_pathman_cluster(self, |
| 92 | +allow_streaming=False, |
| 93 | +test_data=False, |
| 94 | +enable_partitionrouter=False): |
| 95 | + |
89 | 96 | node=get_new_node()
|
90 | 97 | node.init(allow_streaming=allow_streaming)
|
91 | 98 | node.append_conf("shared_preload_libraries='pg_pathman'\n")
|
| 99 | +ifenable_partitionrouter: |
| 100 | +node.append_conf("pg_pathman.enable_partitionrouter=on\n") |
| 101 | + |
92 | 102 | node.start()
|
93 | 103 | node.psql('create extension pg_pathman')
|
94 | 104 |
|
@@ -1065,6 +1075,57 @@ def test_update_node_plan1(self):
|
1065 | 1075 | node.psql('postgres','DROP SCHEMA test_update_node CASCADE;')
|
1066 | 1076 | node.psql('postgres','DROP EXTENSION pg_pathman CASCADE;')
|
1067 | 1077 |
|
| 1078 | +deftest_concurrent_updates(self): |
| 1079 | +''' |
| 1080 | + Test whether conncurrent updates work correctly between |
| 1081 | + partitions. |
| 1082 | + ''' |
| 1083 | + |
| 1084 | +create_sql=''' |
| 1085 | + CREATE TABLE test1(id INT, b INT NOT NULL); |
| 1086 | + INSERT INTO test1 |
| 1087 | + SELECT i, i FROM generate_series(1, 100) i; |
| 1088 | + SELECT create_range_partitions('test1', 'b', 1, 5); |
| 1089 | + ''' |
| 1090 | + |
| 1091 | +withself.start_new_pathman_cluster(enable_partitionrouter=True)asnode: |
| 1092 | +node.safe_psql(create_sql) |
| 1093 | + |
| 1094 | +pool=multiprocessing.Pool(processes=4) |
| 1095 | +forcountinrange(1,200): |
| 1096 | +pool.apply_async(make_updates, (node,count, )) |
| 1097 | + |
| 1098 | +pool.close() |
| 1099 | +pool.join() |
| 1100 | + |
| 1101 | +# check all data is there and not duplicated |
| 1102 | +withnode.connect()ascon: |
| 1103 | +foriinrange(1,100): |
| 1104 | +row=con.execute("select count(*) from test1 where id = %d"%i)[0] |
| 1105 | +self.assertEqual(row[0],1) |
| 1106 | + |
| 1107 | +self.assertEqual(node.execute("select count(*) from test1")[0][0],100) |
| 1108 | + |
| 1109 | + |
| 1110 | +defmake_updates(node,count): |
| 1111 | +update_sql=''' |
| 1112 | + BEGIN; |
| 1113 | + UPDATE test1 SET b = trunc(random() * 100 + 1) WHERE id in (%s); |
| 1114 | + COMMIT; |
| 1115 | + ''' |
| 1116 | + |
| 1117 | +withnode.connect()ascon: |
| 1118 | +foriinrange(count): |
| 1119 | +rows_to_update=random.randint(20,50) |
| 1120 | +ids=set([str(random.randint(1,100))foriinrange(rows_to_update)]) |
| 1121 | +con.execute(update_sql%','.join(ids)) |
| 1122 | + |
1068 | 1123 |
|
1069 | 1124 | if__name__=="__main__":
|
1070 |
| -unittest.main() |
| 1125 | +iflen(sys.argv)>1: |
| 1126 | +suite=unittest.TestLoader().loadTestsFromName(sys.argv[1], |
| 1127 | +module=sys.modules[__name__]) |
| 1128 | +else: |
| 1129 | +suite=unittest.TestLoader().loadTestsFromTestCase(Tests) |
| 1130 | + |
| 1131 | +unittest.TextTestRunner(verbosity=2,failfast=True).run(suite) |