Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit54ccfd6

Browse files
author
Amit Kapila
committed
Fix the misuse of origin filter across multiple pg_logical_slot_get_changes() calls.
The pgoutput module uses a global variable (publish_no_origin) to cachethe action for the origin filter, but we didn't reset the flag whenshutting down the output plugin, so subsequent retries may access theprevious publish_no_origin value.We fix this by storing the flag in the output plugin's private data.Additionally, the patch removes the currently unused origin string from thestructure.For the back branch, to avoid changing the exposed structure, we eliminated theglobal variable and instead directly used the origin string for changefiltering.Author: Hou ZhijieReviewed-by: Amit Kapila, Michael PaquierBackpatch-through: 16Discussion:http://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent6fc3a13 commit54ccfd6

File tree

4 files changed

+90
-9
lines changed

4 files changed

+90
-9
lines changed

‎contrib/test_decoding/expected/replorigin.out

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
267267

268268
(1 row)
269269

270+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
271+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
272+
?column?
273+
----------
274+
init
275+
(1 row)
276+
277+
CREATE PUBLICATION pub FOR TABLE target_tbl;
278+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
279+
pg_replication_origin_create
280+
------------------------------
281+
1
282+
(1 row)
283+
284+
-- mark session as replaying
285+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
286+
pg_replication_origin_session_setup
287+
-------------------------------------
288+
289+
(1 row)
290+
291+
INSERT INTO target_tbl(data) VALUES ('test data');
292+
-- The replayed change will be filtered.
293+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
294+
?column?
295+
----------
296+
t
297+
(1 row)
298+
299+
-- The replayed change will be output if the origin value is not specified.
300+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
301+
?column?
302+
----------
303+
t
304+
(1 row)
305+
306+
-- Clean up
307+
SELECT pg_replication_origin_session_reset();
308+
pg_replication_origin_session_reset
309+
-------------------------------------
310+
311+
(1 row)
312+
313+
SELECT pg_drop_replication_slot('regression_slot');
314+
pg_drop_replication_slot
315+
--------------------------
316+
317+
(1 row)
318+
319+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
320+
pg_replication_origin_drop
321+
----------------------------
322+
323+
(1 row)
324+
325+
DROP PUBLICATION pub;

‎contrib/test_decoding/sql/replorigin.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
124124
SELECT pg_replication_origin_session_reset();
125125
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
126126
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
127+
128+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
129+
SELECT'init'FROM pg_create_logical_replication_slot('regression_slot','pgoutput');
130+
CREATE PUBLICATION pub FOR TABLE target_tbl;
131+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
132+
133+
-- mark session as replaying
134+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
135+
136+
INSERT INTO target_tbl(data)VALUES ('test data');
137+
138+
-- The replayed change will be filtered.
139+
SELECTcount(*)=0FROM pg_logical_slot_peek_binary_changes('regression_slot',NULL,NULL,'proto_version','4','publication_names','pub','origin','none');
140+
141+
-- The replayed change will be output if the origin value is not specified.
142+
SELECTcount(*)!=0FROM pg_logical_slot_peek_binary_changes('regression_slot',NULL,NULL,'proto_version','4','publication_names','pub');
143+
144+
-- Clean up
145+
SELECT pg_replication_origin_session_reset();
146+
SELECT pg_drop_replication_slot('regression_slot');
147+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
148+
DROP PUBLICATION pub;

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8282

8383
staticboolpublications_valid;
8484
staticboolin_streaming;
85-
staticboolpublish_no_origin;
8685

8786
staticList*LoadPublications(List*pubnames);
8887
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
@@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data)
381380
}
382381
elseif (strcmp(defel->defname,"origin")==0)
383382
{
383+
char*origin;
384+
384385
if (origin_option_given)
385386
ereport(ERROR,
386387
errcode(ERRCODE_SYNTAX_ERROR),
387388
errmsg("conflicting or redundant options"));
388389
origin_option_given= true;
389390

390-
data->origin=defGetString(defel);
391-
if (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_NONE)==0)
392-
publish_no_origin= true;
393-
elseif (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_ANY)==0)
394-
publish_no_origin= false;
391+
origin=defGetString(defel);
392+
if (pg_strcasecmp(origin,LOGICALREP_ORIGIN_NONE)==0)
393+
data->publish_no_origin= true;
394+
elseif (pg_strcasecmp(origin,LOGICALREP_ORIGIN_ANY)==0)
395+
data->publish_no_origin= false;
395396
else
396397
ereport(ERROR,
397398
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398-
errmsg("unrecognized origin value: \"%s\"",data->origin));
399+
errmsg("unrecognized origin value: \"%s\"",origin));
399400
}
400401
else
401402
elog(ERROR,"unrecognized pgoutput option: %s",defel->defname);
@@ -1673,7 +1674,9 @@ static bool
16731674
pgoutput_origin_filter(LogicalDecodingContext*ctx,
16741675
RepOriginIdorigin_id)
16751676
{
1676-
if (publish_no_origin&&origin_id!=InvalidRepOriginId)
1677+
PGOutputData*data= (PGOutputData*)ctx->output_plugin_private;
1678+
1679+
if (data->publish_no_origin&&origin_id!=InvalidRepOriginId)
16771680
return true;
16781681

16791682
return false;

‎src/include/replication/pgoutput.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ typedef struct PGOutputData
2929
charstreaming;
3030
boolmessages;
3131
booltwo_phase;
32-
char*origin;
32+
boolpublish_no_origin;
3333
}PGOutputData;
3434

3535
#endif/* PGOUTPUT_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp