Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3662839

Browse files
author
Amit Kapila
committed
Allow users to skip logical replication of data having origin.
This patch adds a new SUBSCRIPTION parameter "origin". It specifieswhether the subscription will request the publisher to only send changesthat don't have an origin or send changes regardless of origin. Setting itto "none" means that the subscription will request the publisher to onlysend changes that have no origin associated. Setting it to "any" meansthat the publisher sends changes regardless of their origin. The defaultis "any".Usage:CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'PUBLICATION pub1 WITH (origin = none);This can be used to avoid loops (infinite replication of the same data)among replication nodes.This feature allows filtering only the replication data originating fromWAL but for initial sync (initial copy of table data) we don't have such afacility as we can only distinguish the data based on origin from WAL. Asa follow-up patch, we are planning to forbid the initial sync if theorigin is specified as none and we notice that the publication tables werealso replicated from other publishers to avoid duplicate data or loops.We forbid to allow creating origin with names 'none' and 'any' to avoidconfusion with the same name options.Author: Vignesh C, Amit KapilaReviewed-By: Peter Smith, Amit Kapila, Dilip Kumar, Shi yu, Ashutosh Bapat, Hayato KurodaDiscussion:https://postgr.es/m/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com
1 parentf2d0c7f commit3662839

File tree

24 files changed

+463
-78
lines changed

24 files changed

+463
-78
lines changed

‎contrib/test_decoding/expected/replorigin.out

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ SELECT pg_replication_origin_drop('regress_test_decoding: temp');
5656

5757
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
5858
ERROR: replication origin "regress_test_decoding: temp" does not exist
59+
-- specifying reserved origin names is not supported
60+
SELECT pg_replication_origin_create('any');
61+
ERROR: replication origin name "any" is reserved
62+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
63+
SELECT pg_replication_origin_create('none');
64+
ERROR: replication origin name "none" is reserved
65+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
66+
SELECT pg_replication_origin_create('pg_replication_origin');
67+
ERROR: replication origin name "pg_replication_origin" is reserved
68+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
5969
-- various failure checks for undefined slots
6070
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
6171
ERROR: replication origin "regress_test_decoding: temp" does not exist

‎contrib/test_decoding/sql/replorigin.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ SELECT pg_replication_origin_create('regress_test_decoding: temp');
3131
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
3232
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
3333

34+
-- specifying reserved origin names is not supported
35+
SELECT pg_replication_origin_create('any');
36+
SELECT pg_replication_origin_create('none');
37+
SELECT pg_replication_origin_create('pg_replication_origin');
38+
3439
-- various failure checks for undefined slots
3540
select pg_replication_origin_advance('regress_test_decoding: temp','0/1');
3641
select pg_replication_origin_session_setup('regress_test_decoding: temp');

‎doc/src/sgml/catalogs.sgml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7943,6 +7943,20 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
79437943
see <xref linkend="logical-replication-publication"/>.
79447944
</para></entry>
79457945
</row>
7946+
7947+
<row>
7948+
<entry role="catalog_table_entry"><para role="column_definition">
7949+
<structfield>suborigin</structfield> <type>text</type>
7950+
</para>
7951+
<para>
7952+
The origin value must be either <literal>none</literal> or
7953+
<literal>any</literal>. The default is <literal>any</literal>.
7954+
If <literal>none</literal>, the subscription will request the publisher
7955+
to only send changes that don't have an origin. If
7956+
<literal>any</literal>, the publisher sends changes regardless of their
7957+
origin.
7958+
</para></entry>
7959+
</row>
79467960
</tbody>
79477961
</tgroup>
79487962
</table>

‎doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
207207
information. The parameters that can be altered
208208
are <literal>slot_name</literal>,
209209
<literal>synchronous_commit</literal>,
210-
<literal>binary</literal>, <literal>streaming</literal>, and
211-
<literal>disable_on_error</literal>.
210+
<literal>binary</literal>, <literal>streaming</literal>,
211+
<literal>disable_on_error</literal>, and
212+
<literal>origin</literal>.
212213
</para>
213214
</listitem>
214215
</varlistentry>

‎doc/src/sgml/ref/create_subscription.sgml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,21 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
302302
</para>
303303
</listitem>
304304
</varlistentry>
305+
306+
<varlistentry>
307+
<term><literal>origin</literal> (<type>string</type>)</term>
308+
<listitem>
309+
<para>
310+
Specifies whether the subscription will request the publisher to only
311+
send changes that don't have an origin or send changes regardless of
312+
origin. Setting <literal>origin</literal> to <literal>none</literal>
313+
means that the subscription will request the publisher to only send
314+
changes that don't have an origin. Setting <literal>origin</literal>
315+
to <literal>any</literal> means that the publisher sends changes
316+
regardless of their origin. The default is <literal>any</literal>.
317+
</para>
318+
</listitem>
319+
</varlistentry>
305320
</variablelist></para>
306321

307322
</listitem>

‎src/backend/catalog/pg_subscription.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ GetSubscription(Oid subid, bool missing_ok)
106106
Assert(!isnull);
107107
sub->publications=textarray_to_stringlist(DatumGetArrayTypeP(datum));
108108

109+
/* Get origin */
110+
datum=SysCacheGetAttr(SUBSCRIPTIONOID,
111+
tup,
112+
Anum_pg_subscription_suborigin,
113+
&isnull);
114+
Assert(!isnull);
115+
sub->origin=TextDatumGetCString(datum);
116+
109117
ReleaseSysCache(tup);
110118

111119
returnsub;

‎src/backend/catalog/system_views.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
12981298
-- All columns of pg_subscription except subconninfo are publicly readable.
12991299
REVOKE ALLON pg_subscriptionFROM public;
13001300
GRANTSELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
1301-
subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
1302-
subsynccommit, subpublications)
1301+
subbinary, substream, subtwophasestate, subdisableonerr,
1302+
subslotname,subsynccommit, subpublications, suborigin)
13031303
ON pg_subscription TO public;
13041304

13051305
CREATEVIEWpg_stat_subscription_statsAS

‎src/backend/commands/subscriptioncmds.c

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#defineSUBOPT_TWOPHASE_COMMIT0x00000200
6565
#defineSUBOPT_DISABLE_ON_ERR0x00000400
6666
#defineSUBOPT_LSN0x00000800
67+
#defineSUBOPT_ORIGIN0x00001000
6768

6869
/* check if the 'val' has 'bits' set */
6970
#defineIsSet(val,bits) (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
8687
boolstreaming;
8788
booltwophase;
8889
booldisableonerr;
90+
char*origin;
8991
XLogRecPtrlsn;
9092
}SubOpts;
9193

@@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
118120
IsSet(supported_opts,SUBOPT_ENABLED |SUBOPT_CREATE_SLOT |
119121
SUBOPT_COPY_DATA));
120122

121-
/* Set default values for thebooleansupported options. */
123+
/* Set default values for the supported options. */
122124
if (IsSet(supported_opts,SUBOPT_CONNECT))
123125
opts->connect= true;
124126
if (IsSet(supported_opts,SUBOPT_ENABLED))
@@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
137139
opts->twophase= false;
138140
if (IsSet(supported_opts,SUBOPT_DISABLE_ON_ERR))
139141
opts->disableonerr= false;
142+
if (IsSet(supported_opts,SUBOPT_ORIGIN))
143+
opts->origin=pstrdup(LOGICALREP_ORIGIN_ANY);
140144

141145
/* Parse options */
142146
foreach(lc,stmt_options)
@@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
265269
opts->specified_opts |=SUBOPT_DISABLE_ON_ERR;
266270
opts->disableonerr=defGetBoolean(defel);
267271
}
272+
elseif (IsSet(supported_opts,SUBOPT_ORIGIN)&&
273+
strcmp(defel->defname,"origin")==0)
274+
{
275+
if (IsSet(opts->specified_opts,SUBOPT_ORIGIN))
276+
errorConflictingDefElem(defel,pstate);
277+
278+
opts->specified_opts |=SUBOPT_ORIGIN;
279+
pfree(opts->origin);
280+
281+
/*
282+
* Even though the "origin" parameter allows only "none" and "any"
283+
* values, it is implemented as a string type so that the
284+
* parameter can be extended in future versions to support
285+
* filtering using origin names specified by the user.
286+
*/
287+
opts->origin=defGetString(defel);
288+
289+
if ((pg_strcasecmp(opts->origin,LOGICALREP_ORIGIN_NONE)!=0)&&
290+
(pg_strcasecmp(opts->origin,LOGICALREP_ORIGIN_ANY)!=0))
291+
ereport(ERROR,
292+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293+
errmsg("unrecognized origin value: \"%s\"",opts->origin));
294+
}
268295
elseif (IsSet(supported_opts,SUBOPT_LSN)&&
269296
strcmp(defel->defname,"lsn")==0)
270297
{
@@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
530557
SUBOPT_SLOT_NAME |SUBOPT_COPY_DATA |
531558
SUBOPT_SYNCHRONOUS_COMMIT |SUBOPT_BINARY |
532559
SUBOPT_STREAMING |SUBOPT_TWOPHASE_COMMIT |
533-
SUBOPT_DISABLE_ON_ERR);
560+
SUBOPT_DISABLE_ON_ERR |SUBOPT_ORIGIN);
534561
parse_subscription_options(pstate,stmt->options,supported_opts,&opts);
535562

536563
/*
@@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
617644
CStringGetTextDatum(opts.synchronous_commit);
618645
values[Anum_pg_subscription_subpublications-1]=
619646
publicationListToArray(publications);
647+
values[Anum_pg_subscription_suborigin-1]=
648+
CStringGetTextDatum(opts.origin);
620649

621650
tup=heap_form_tuple(RelationGetDescr(rel),values,nulls);
622651

@@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10141043
{
10151044
supported_opts= (SUBOPT_SLOT_NAME |
10161045
SUBOPT_SYNCHRONOUS_COMMIT |SUBOPT_BINARY |
1017-
SUBOPT_STREAMING |SUBOPT_DISABLE_ON_ERR);
1046+
SUBOPT_STREAMING |SUBOPT_DISABLE_ON_ERR |
1047+
SUBOPT_ORIGIN);
10181048

10191049
parse_subscription_options(pstate,stmt->options,
10201050
supported_opts,&opts);
@@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10711101
= true;
10721102
}
10731103

1104+
if (IsSet(opts.specified_opts,SUBOPT_ORIGIN))
1105+
{
1106+
values[Anum_pg_subscription_suborigin-1]=
1107+
CStringGetTextDatum(opts.origin);
1108+
replaces[Anum_pg_subscription_suborigin-1]= true;
1109+
}
1110+
10741111
update_tuple= true;
10751112
break;
10761113
}

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
451451
PQserverVersion(conn->streamConn) >=150000)
452452
appendStringInfoString(&cmd,", two_phase 'on'");
453453

454+
if (options->proto.logical.origin&&
455+
PQserverVersion(conn->streamConn) >=160000)
456+
appendStringInfo(&cmd,", origin '%s'",
457+
options->proto.logical.origin);
458+
454459
pubnames=options->proto.logical.publication_names;
455460
pubnames_str=stringlist_to_identifierstr(conn->streamConn,pubnames);
456461
if (!pubnames_str)

‎src/backend/replication/logical/origin.c

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
#include"access/xloginsert.h"
7878
#include"catalog/catalog.h"
7979
#include"catalog/indexing.h"
80+
#include"catalog/pg_subscription.h"
8081
#include"funcapi.h"
8182
#include"miscadmin.h"
8283
#include"nodes/execnodes.h"
@@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
195196
}
196197

197198

199+
/*
200+
* IsReservedOriginName
201+
*True iff name is either "none" or "any".
202+
*/
203+
staticbool
204+
IsReservedOriginName(constchar*name)
205+
{
206+
return ((pg_strcasecmp(name,LOGICALREP_ORIGIN_NONE)==0)||
207+
(pg_strcasecmp(name,LOGICALREP_ORIGIN_ANY)==0));
208+
}
209+
198210
/* ---------------------------------------------------------------------------
199211
* Functions for working with replication origins themselves.
200212
* ---------------------------------------------------------------------------
@@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
12441256

12451257
name=text_to_cstring((text*)DatumGetPointer(PG_GETARG_DATUM(0)));
12461258

1247-
/* Replication origins "pg_xxx" are reserved for internal use */
1248-
if (IsReservedName(name))
1259+
/*
1260+
* Replication origins "any and "none" are reserved for system options.
1261+
* The origins "pg_xxx" are reserved for internal use.
1262+
*/
1263+
if (IsReservedName(name)||IsReservedOriginName(name))
12491264
ereport(ERROR,
12501265
(errcode(ERRCODE_RESERVED_NAME),
12511266
errmsg("replication origin name \"%s\" is reserved",
12521267
name),
1253-
errdetail("Origin names starting with \"pg_\" are reserved.")));
1268+
errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1269+
LOGICALREP_ORIGIN_ANY,LOGICALREP_ORIGIN_NONE)));
12541270

12551271
/*
12561272
* If built with appropriate switch, whine when regression-testing

‎src/backend/replication/logical/worker.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3077,6 +3077,7 @@ maybe_reread_subscription(void)
30773077
strcmp(newsub->slotname,MySubscription->slotname)!=0||
30783078
newsub->binary!=MySubscription->binary||
30793079
newsub->stream!=MySubscription->stream||
3080+
strcmp(newsub->origin,MySubscription->origin)!=0||
30803081
newsub->owner!=MySubscription->owner||
30813082
!equal(newsub->publications,MySubscription->publications))
30823083
{
@@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg)
37583759
options.proto.logical.binary=MySubscription->binary;
37593760
options.proto.logical.streaming=MySubscription->stream;
37603761
options.proto.logical.twophase= false;
3762+
options.proto.logical.origin=pstrdup(MySubscription->origin);
37613763

37623764
if (!am_tablesync_worker())
37633765
{

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include"catalog/partition.h"
1717
#include"catalog/pg_publication.h"
1818
#include"catalog/pg_publication_rel.h"
19+
#include"catalog/pg_subscription.h"
1920
#include"commands/defrem.h"
2021
#include"executor/executor.h"
2122
#include"fmgr.h"
@@ -79,6 +80,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
7980

8081
staticboolpublications_valid;
8182
staticboolin_streaming;
83+
staticboolpublish_no_origin;
8284

8385
staticList*LoadPublications(List*pubnames);
8486
staticvoidpublication_invalidation_cb(Datumarg,intcacheid,
@@ -285,6 +287,7 @@ parse_output_parameters(List *options, PGOutputData *data)
285287
boolmessages_option_given= false;
286288
boolstreaming_given= false;
287289
booltwo_phase_option_given= false;
290+
boolorigin_option_given= false;
288291

289292
data->binary= false;
290293
data->streaming= false;
@@ -378,6 +381,24 @@ parse_output_parameters(List *options, PGOutputData *data)
378381

379382
data->two_phase=defGetBoolean(defel);
380383
}
384+
elseif (strcmp(defel->defname,"origin")==0)
385+
{
386+
if (origin_option_given)
387+
ereport(ERROR,
388+
errcode(ERRCODE_SYNTAX_ERROR),
389+
errmsg("conflicting or redundant options"));
390+
origin_option_given= true;
391+
392+
data->origin=defGetString(defel);
393+
if (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_NONE)==0)
394+
publish_no_origin= true;
395+
elseif (pg_strcasecmp(data->origin,LOGICALREP_ORIGIN_ANY)==0)
396+
publish_no_origin= false;
397+
else
398+
ereport(ERROR,
399+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
400+
errmsg("unrecognized origin value: \"%s\"",data->origin));
401+
}
381402
else
382403
elog(ERROR,"unrecognized pgoutput option: %s",defel->defname);
383404
}
@@ -1696,12 +1717,16 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
16961717
}
16971718

16981719
/*
1699-
* Currently we always forward.
1720+
* Return true if the data is associated with an origin and the user has
1721+
* requested the changes that don't have an origin, false otherwise.
17001722
*/
17011723
staticbool
17021724
pgoutput_origin_filter(LogicalDecodingContext*ctx,
17031725
RepOriginIdorigin_id)
17041726
{
1727+
if (publish_no_origin&&origin_id!=InvalidRepOriginId)
1728+
return true;
1729+
17051730
return false;
17061731
}
17071732

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp