Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8d05be9

Browse files
author
Amit Kapila
committed
Fix the misuse of origin filter across multiple pg_logical_slot_get_changes() calls.
The pgoutput module uses a global variable (publish_no_origin) to cachethe action for the origin filter, but we didn't reset the flag whenshutting down the output plugin, so subsequent retries may access theprevious publish_no_origin value.We fix this by storing the flag in the output plugin's private data.Additionally, the patch removes the currently unused origin string from thestructure.For the back branch, to avoid changing the exposed structure, we eliminated theglobal variable and instead directly used the origin string for changefiltering.Author: Hou ZhijieReviewed-by: Amit Kapila, Michael PaquierBackpatch-through: 16Discussion:http://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent641db60 commit8d05be9

File tree

3 files changed

+85
-7
lines changed

3 files changed

+85
-7
lines changed

‎contrib/test_decoding/expected/replorigin.out‎

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
267267

268268
(1 row)
269269

270+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
271+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
272+
?column?
273+
----------
274+
init
275+
(1 row)
276+
277+
CREATE PUBLICATION pub FOR TABLE target_tbl;
278+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
279+
pg_replication_origin_create
280+
------------------------------
281+
1
282+
(1 row)
283+
284+
-- mark session as replaying
285+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
286+
pg_replication_origin_session_setup
287+
-------------------------------------
288+
289+
(1 row)
290+
291+
INSERT INTO target_tbl(data) VALUES ('test data');
292+
-- The replayed change will be filtered.
293+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
294+
?column?
295+
----------
296+
t
297+
(1 row)
298+
299+
-- The replayed change will be output if the origin value is not specified.
300+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
301+
?column?
302+
----------
303+
t
304+
(1 row)
305+
306+
-- Clean up
307+
SELECT pg_replication_origin_session_reset();
308+
pg_replication_origin_session_reset
309+
-------------------------------------
310+
311+
(1 row)
312+
313+
SELECT pg_drop_replication_slot('regression_slot');
314+
pg_drop_replication_slot
315+
--------------------------
316+
317+
(1 row)
318+
319+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
320+
pg_replication_origin_drop
321+
----------------------------
322+
323+
(1 row)
324+
325+
DROP PUBLICATION pub;

‎contrib/test_decoding/sql/replorigin.sql‎

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
124124
SELECT pg_replication_origin_session_reset();
125125
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
126126
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
127+
128+
-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
129+
SELECT'init'FROM pg_create_logical_replication_slot('regression_slot','pgoutput');
130+
CREATE PUBLICATION pub FOR TABLE target_tbl;
131+
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
132+
133+
-- mark session as replaying
134+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
135+
136+
INSERT INTO target_tbl(data)VALUES ('test data');
137+
138+
-- The replayed change will be filtered.
139+
SELECTcount(*)=0FROM pg_logical_slot_peek_binary_changes('regression_slot',NULL,NULL,'proto_version','4','publication_names','pub','origin','none');
140+
141+
-- The replayed change will be output if the origin value is not specified.
142+
SELECTcount(*)!=0FROM pg_logical_slot_peek_binary_changes('regression_slot',NULL,NULL,'proto_version','4','publication_names','pub');
143+
144+
-- Clean up
145+
SELECT pg_replication_origin_session_reset();
146+
SELECT pg_drop_replication_slot('regression_slot');
147+
SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
148+
DROP PUBLICATION pub;

‎src/backend/replication/pgoutput/pgoutput.c‎

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8282

8383
staticboolpublications_valid;
8484
staticboolin_streaming;
85-
staticboolpublish_no_origin;
8685

8786
staticList*LoadPublications(List*pubnames);
8887
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
@@ -388,11 +387,9 @@ parse_output_parameters(List *options, PGOutputData *data)
388387
origin_option_given= true;
389388

390389
data->origin=defGetString(defel);
391-
if (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_NONE)==0)
392-
publish_no_origin= true;
393-
elseif (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_ANY)==0)
394-
publish_no_origin= false;
395-
else
390+
391+
if (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_NONE)!=0&&
392+
pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_ANY)!=0)
396393
ereport(ERROR,
397394
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398395
errmsg("unrecognized origin value: \"%s\"",data->origin));
@@ -1673,7 +1670,10 @@ static bool
16731670
pgoutput_origin_filter(LogicalDecodingContext*ctx,
16741671
RepOriginIdorigin_id)
16751672
{
1676-
if (publish_no_origin&&origin_id!=InvalidRepOriginId)
1673+
PGOutputData*data= (PGOutputData*)ctx->output_plugin_private;
1674+
1675+
if (data->origin&& (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_NONE)==0)&&
1676+
origin_id!=InvalidRepOriginId)
16771677
return true;
16781678

16791679
return false;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp