Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf9c842e

Browse files
committed
make concurrent part worker more reliable, new tests
1 parentfda1b36 commitf9c842e

File tree

4 files changed

+197
-51
lines changed

4 files changed

+197
-51
lines changed

‎expected/pathman_bgw.out

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ SELECT set_init_callback('test_bgw.test_5', 'test_bgw.abort_xact(jsonb)');
132132
(1 row)
133133

134134
INSERT INTO test_bgw.test_5 VALUES (-100);
135-
ERROR:Attempt to spawn new partitions of relation "test_5" failed
135+
ERROR:attempt to spawn new partitions of relation "test_5" failed
136136
SELECT * FROM pathman_partition_list ORDER BY partition; /* should contain 3 partitions */
137137
parent | partition | parttype | expr | range_min | range_max
138138
-----------------+-------------------+----------+------+-----------+-----------
@@ -143,5 +143,80 @@ SELECT * FROM pathman_partition_list ORDER BY partition; /* should contain 3 par
143143
DROP FUNCTION test_bgw.abort_xact(args JSONB);
144144
DROP TABLE test_bgw.test_5 CASCADE;
145145
NOTICE: drop cascades to 3 other objects
146+
/*
147+
* Tests for ConcurrentPartWorker
148+
*/
149+
CREATE TABLE test_bgw.conc_part(id INT4 NOT NULL);
150+
INSERT INTO test_bgw.conc_part SELECT generate_series(1, 500);
151+
SELECT create_hash_partitions('test_bgw.conc_part', 'id', 5, false);
152+
create_hash_partitions
153+
------------------------
154+
5
155+
(1 row)
156+
157+
BEGIN;
158+
/* Also test FOR SHARE/UPDATE conflicts in BGW */
159+
SELECT * FROM test_bgw.conc_part ORDER BY id LIMIT 1 FOR SHARE;
160+
id
161+
----
162+
1
163+
(1 row)
164+
165+
/* Run partitioning bgworker */
166+
SELECT partition_table_concurrently('test_bgw.conc_part', 10, 1);
167+
NOTICE: worker started, you can stop it with the following command: select public.stop_concurrent_part_task('conc_part');
168+
partition_table_concurrently
169+
------------------------------
170+
171+
(1 row)
172+
173+
/* Wait until bgworker starts */
174+
SELECT pg_sleep(1);
175+
pg_sleep
176+
----------
177+
178+
(1 row)
179+
180+
ROLLBACK;
181+
/* Wait until it finises */
182+
DO $$
183+
DECLARE
184+
ops int8;
185+
BEGIN
186+
LOOP
187+
SELECT count(*)
188+
FROM pathman_concurrent_part_tasks
189+
WHERE processed < 500 -- protect from endless loops
190+
INTO ops;
191+
192+
IF ops > 0 THEN
193+
PERFORM pg_sleep(0.2);
194+
ELSE
195+
EXIT;
196+
END IF;
197+
END LOOP;
198+
END
199+
$$ LANGUAGE plpgsql;
200+
/* Check amount of tasks and rows in parent and partitions */
201+
SELECT count(*) FROM pathman_concurrent_part_tasks;
202+
count
203+
-------
204+
0
205+
(1 row)
206+
207+
SELECT count(*) FROM ONLY test_bgw.conc_part;
208+
count
209+
-------
210+
0
211+
(1 row)
212+
213+
SELECT count(*) FROM test_bgw.conc_part;
214+
count
215+
-------
216+
500
217+
(1 row)
218+
219+
DROP TABLE test_bgw.conc_part CASCADE;
220+
NOTICE: drop cascades to 5 other objects
146221
DROP SCHEMA test_bgw CASCADE;
147222
DROP EXTENSION pg_pathman;

‎sql/pathman_bgw.sql

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CREATE EXTENSION pg_pathman;
55
CREATESCHEMAtest_bgw;
66

77

8+
89
/*
910
* Tests for SpawnPartitionsWorker
1011
*/
@@ -74,5 +75,51 @@ DROP TABLE test_bgw.test_5 CASCADE;
7475

7576

7677

78+
/*
79+
* Tests for ConcurrentPartWorker
80+
*/
81+
82+
CREATETABLEtest_bgw.conc_part(id INT4NOT NULL);
83+
INSERT INTOtest_bgw.conc_partSELECT generate_series(1,500);
84+
SELECT create_hash_partitions('test_bgw.conc_part','id',5, false);
85+
86+
BEGIN;
87+
/* Also test FOR SHARE/UPDATE conflicts in BGW*/
88+
SELECT*FROMtest_bgw.conc_partORDER BY idLIMIT1 FOR SHARE;
89+
/* Run partitioning bgworker*/
90+
SELECT partition_table_concurrently('test_bgw.conc_part',10,1);
91+
/* Wait until bgworker starts*/
92+
SELECT pg_sleep(1);
93+
ROLLBACK;
94+
95+
/* Wait until it finises*/
96+
DO $$
97+
DECLARE
98+
ops int8;
99+
BEGIN
100+
LOOP
101+
SELECTcount(*)
102+
FROM pathman_concurrent_part_tasks
103+
WHERE processed<500-- protect from endless loops
104+
INTO ops;
105+
106+
IF ops>0 THEN
107+
PERFORM pg_sleep(0.2);
108+
ELSE
109+
EXIT;
110+
END IF;
111+
END LOOP;
112+
END
113+
$$ LANGUAGE plpgsql;
114+
115+
/* Check amount of tasks and rows in parent and partitions*/
116+
SELECTcount(*)FROM pathman_concurrent_part_tasks;
117+
SELECTcount(*)FROM ONLYtest_bgw.conc_part;
118+
SELECTcount(*)FROMtest_bgw.conc_part;
119+
120+
DROPTABLEtest_bgw.conc_part CASCADE;
121+
122+
123+
77124
DROPSCHEMA test_bgw CASCADE;
78125
DROP EXTENSION pg_pathman;

‎src/include/pathman_workers.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,29 @@ cps_set_status(ConcurrentPartSlot *slot, ConcurrentPartSlotStatus status)
112112
SpinLockRelease(&slot->mutex);
113113
}
114114

115+
staticinlineconstchar*
116+
cps_print_status(ConcurrentPartSlotStatusstatus)
117+
{
118+
switch(status)
119+
{
120+
caseCPS_FREE:
121+
return"free";
122+
123+
caseCPS_WORKING:
124+
return"working";
125+
126+
caseCPS_STOPPING:
127+
return"stopping";
128+
129+
default:
130+
return"[unknown]";
131+
}
132+
}
133+
115134

116135

117136
/* Number of worker slots for concurrent partitioning */
118-
#definePART_WORKER_SLOTS10
137+
#definePART_WORKER_SLOTSmax_worker_processes
119138

120139
/* Max number of attempts per batch */
121140
#definePART_WORKER_MAX_ATTEMPTS60

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp