Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3662839

Browse files
author
Amit Kapila
committed
Allow users to skip logical replication of data having origin.
This patch adds a new SUBSCRIPTION parameter "origin". It specifieswhether the subscription will request the publisher to only send changesthat don't have an origin or send changes regardless of origin. Setting itto "none" means that the subscription will request the publisher to onlysend changes that have no origin associated. Setting it to "any" meansthat the publisher sends changes regardless of their origin. The defaultis "any".Usage:CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'PUBLICATION pub1 WITH (origin = none);This can be used to avoid loops (infinite replication of the same data)among replication nodes.This feature allows filtering only the replication data originating fromWAL but for initial sync (initial copy of table data) we don't have such afacility as we can only distinguish the data based on origin from WAL. Asa follow-up patch, we are planning to forbid the initial sync if theorigin is specified as none and we notice that the publication tables werealso replicated from other publishers to avoid duplicate data or loops.We forbid to allow creating origin with names 'none' and 'any' to avoidconfusion with the same name options.Author: Vignesh C, Amit KapilaReviewed-By: Peter Smith, Amit Kapila, Dilip Kumar, Shi yu, Ashutosh Bapat, Hayato KurodaDiscussion:https://postgr.es/m/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com
1 parentf2d0c7f commit3662839

File tree

24 files changed

+463
-78
lines changed

24 files changed

+463
-78
lines changed

‎contrib/test_decoding/expected/replorigin.out‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ SELECT pg_replication_origin_drop('regress_test_decoding: temp');
5656

5757
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
5858
ERROR: replication origin "regress_test_decoding: temp" does not exist
59+
-- specifying reserved origin names is not supported
60+
SELECT pg_replication_origin_create('any');
61+
ERROR: replication origin name "any" is reserved
62+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
63+
SELECT pg_replication_origin_create('none');
64+
ERROR: replication origin name "none" is reserved
65+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
66+
SELECT pg_replication_origin_create('pg_replication_origin');
67+
ERROR: replication origin name "pg_replication_origin" is reserved
68+
DETAIL: Origin names "any", "none", and names starting with "pg_" are reserved.
5969
-- various failure checks for undefined slots
6070
select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
6171
ERROR: replication origin "regress_test_decoding: temp" does not exist

‎contrib/test_decoding/sql/replorigin.sql‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ SELECT pg_replication_origin_create('regress_test_decoding: temp');
3131
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
3232
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
3333

34+
-- specifying reserved origin names is not supported
35+
SELECT pg_replication_origin_create('any');
36+
SELECT pg_replication_origin_create('none');
37+
SELECT pg_replication_origin_create('pg_replication_origin');
38+
3439
-- various failure checks for undefined slots
3540
select pg_replication_origin_advance('regress_test_decoding: temp','0/1');
3641
select pg_replication_origin_session_setup('regress_test_decoding: temp');

‎doc/src/sgml/catalogs.sgml‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7943,6 +7943,20 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
79437943
see <xref linkend="logical-replication-publication"/>.
79447944
</para></entry>
79457945
</row>
7946+
7947+
<row>
7948+
<entry role="catalog_table_entry"><para role="column_definition">
7949+
<structfield>suborigin</structfield> <type>text</type>
7950+
</para>
7951+
<para>
7952+
The origin value must be either <literal>none</literal> or
7953+
<literal>any</literal>. The default is <literal>any</literal>.
7954+
If <literal>none</literal>, the subscription will request the publisher
7955+
to only send changes that don't have an origin. If
7956+
<literal>any</literal>, the publisher sends changes regardless of their
7957+
origin.
7958+
</para></entry>
7959+
</row>
79467960
</tbody>
79477961
</tgroup>
79487962
</table>

‎doc/src/sgml/ref/alter_subscription.sgml‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
207207
information. The parameters that can be altered
208208
are <literal>slot_name</literal>,
209209
<literal>synchronous_commit</literal>,
210-
<literal>binary</literal>, <literal>streaming</literal>, and
211-
<literal>disable_on_error</literal>.
210+
<literal>binary</literal>, <literal>streaming</literal>,
211+
<literal>disable_on_error</literal>, and
212+
<literal>origin</literal>.
212213
</para>
213214
</listitem>
214215
</varlistentry>

‎doc/src/sgml/ref/create_subscription.sgml‎

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,21 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
302302
</para>
303303
</listitem>
304304
</varlistentry>
305+
306+
<varlistentry>
307+
<term><literal>origin</literal> (<type>string</type>)</term>
308+
<listitem>
309+
<para>
310+
Specifies whether the subscription will request the publisher to only
311+
send changes that don't have an origin or send changes regardless of
312+
origin. Setting <literal>origin</literal> to <literal>none</literal>
313+
means that the subscription will request the publisher to only send
314+
changes that don't have an origin. Setting <literal>origin</literal>
315+
to <literal>any</literal> means that the publisher sends changes
316+
regardless of their origin. The default is <literal>any</literal>.
317+
</para>
318+
</listitem>
319+
</varlistentry>
305320
</variablelist></para>
306321

307322
</listitem>

‎src/backend/catalog/pg_subscription.c‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ GetSubscription(Oid subid, bool missing_ok)
106106
Assert(!isnull);
107107
sub->publications=textarray_to_stringlist(DatumGetArrayTypeP(datum));
108108

109+
/* Get origin */
110+
datum=SysCacheGetAttr(SUBSCRIPTIONOID,
111+
tup,
112+
Anum_pg_subscription_suborigin,
113+
&isnull);
114+
Assert(!isnull);
115+
sub->origin=TextDatumGetCString(datum);
116+
109117
ReleaseSysCache(tup);
110118

111119
returnsub;

‎src/backend/catalog/system_views.sql‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
12981298
-- All columns of pg_subscription except subconninfo are publicly readable.
12991299
REVOKE ALLON pg_subscriptionFROM public;
13001300
GRANTSELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
1301-
subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
1302-
subsynccommit, subpublications)
1301+
subbinary, substream, subtwophasestate, subdisableonerr,
1302+
subslotname,subsynccommit, subpublications, suborigin)
13031303
ON pg_subscription TO public;
13041304

13051305
CREATEVIEWpg_stat_subscription_statsAS

‎src/backend/commands/subscriptioncmds.c‎

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#defineSUBOPT_TWOPHASE_COMMIT0x00000200
6565
#defineSUBOPT_DISABLE_ON_ERR0x00000400
6666
#defineSUBOPT_LSN0x00000800
67+
#defineSUBOPT_ORIGIN0x00001000
6768

6869
/* check if the 'val' has 'bits' set */
6970
#defineIsSet(val,bits) (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
8687
boolstreaming;
8788
booltwophase;
8889
booldisableonerr;
90+
char*origin;
8991
XLogRecPtrlsn;
9092
}SubOpts;
9193

@@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
118120
IsSet(supported_opts,SUBOPT_ENABLED |SUBOPT_CREATE_SLOT |
119121
SUBOPT_COPY_DATA));
120122

121-
/* Set default values for thebooleansupported options. */
123+
/* Set default values for the supported options. */
122124
if (IsSet(supported_opts,SUBOPT_CONNECT))
123125
opts->connect= true;
124126
if (IsSet(supported_opts,SUBOPT_ENABLED))
@@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
137139
opts->twophase= false;
138140
if (IsSet(supported_opts,SUBOPT_DISABLE_ON_ERR))
139141
opts->disableonerr= false;
142+
if (IsSet(supported_opts,SUBOPT_ORIGIN))
143+
opts->origin=pstrdup(LOGICALREP_ORIGIN_ANY);
140144

141145
/* Parse options */
142146
foreach(lc,stmt_options)
@@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
265269
opts->specified_opts |=SUBOPT_DISABLE_ON_ERR;
266270
opts->disableonerr=defGetBoolean(defel);
267271
}
272+
elseif (IsSet(supported_opts,SUBOPT_ORIGIN)&&
273+
strcmp(defel->defname,"origin")==0)
274+
{
275+
if (IsSet(opts->specified_opts,SUBOPT_ORIGIN))
276+
errorConflictingDefElem(defel,pstate);
277+
278+
opts->specified_opts |=SUBOPT_ORIGIN;
279+
pfree(opts->origin);
280+
281+
/*
282+
* Even though the "origin" parameter allows only "none" and "any"
283+
* values, it is implemented as a string type so that the
284+
* parameter can be extended in future versions to support
285+
* filtering using origin names specified by the user.
286+
*/
287+
opts->origin=defGetString(defel);
288+
289+
if ((pg_strcasecmp(opts->origin,LOGICALREP_ORIGIN_NONE)!=0)&&
290+
(pg_strcasecmp(opts->origin,LOGICALREP_ORIGIN_ANY)!=0))
291+
ereport(ERROR,
292+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293+
errmsg("unrecognized origin value: \"%s\"",opts->origin));
294+
}
268295
elseif (IsSet(supported_opts,SUBOPT_LSN)&&
269296
strcmp(defel->defname,"lsn")==0)
270297
{
@@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
530557
SUBOPT_SLOT_NAME |SUBOPT_COPY_DATA |
531558
SUBOPT_SYNCHRONOUS_COMMIT |SUBOPT_BINARY |
532559
SUBOPT_STREAMING |SUBOPT_TWOPHASE_COMMIT |
533-
SUBOPT_DISABLE_ON_ERR);
560+
SUBOPT_DISABLE_ON_ERR |SUBOPT_ORIGIN);
534561
parse_subscription_options(pstate,stmt->options,supported_opts,&opts);
535562

536563
/*
@@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
617644
CStringGetTextDatum(opts.synchronous_commit);
618645
values[Anum_pg_subscription_subpublications-1]=
619646
publicationListToArray(publications);
647+
values[Anum_pg_subscription_suborigin-1]=
648+
CStringGetTextDatum(opts.origin);
620649

621650
tup=heap_form_tuple(RelationGetDescr(rel),values,nulls);
622651

@@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10141043
{
10151044
supported_opts= (SUBOPT_SLOT_NAME |
10161045
SUBOPT_SYNCHRONOUS_COMMIT |SUBOPT_BINARY |
1017-
SUBOPT_STREAMING |SUBOPT_DISABLE_ON_ERR);
1046+
SUBOPT_STREAMING |SUBOPT_DISABLE_ON_ERR |
1047+
SUBOPT_ORIGIN);
10181048

10191049
parse_subscription_options(pstate,stmt->options,
10201050
supported_opts,&opts);
@@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
10711101
= true;
10721102
}
10731103

1104+
if (IsSet(opts.specified_opts,SUBOPT_ORIGIN))
1105+
{
1106+
values[Anum_pg_subscription_suborigin-1]=
1107+
CStringGetTextDatum(opts.origin);
1108+
replaces[Anum_pg_subscription_suborigin-1]= true;
1109+
}
1110+
10741111
update_tuple= true;
10751112
break;
10761113
}

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
451451
PQserverVersion(conn->streamConn) >=150000)
452452
appendStringInfoString(&cmd,", two_phase 'on'");
453453

454+
if (options->proto.logical.origin&&
455+
PQserverVersion(conn->streamConn) >=160000)
456+
appendStringInfo(&cmd,", origin '%s'",
457+
options->proto.logical.origin);
458+
454459
pubnames=options->proto.logical.publication_names;
455460
pubnames_str=stringlist_to_identifierstr(conn->streamConn,pubnames);
456461
if (!pubnames_str)

‎src/backend/replication/logical/origin.c‎

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
#include"access/xloginsert.h"
7878
#include"catalog/catalog.h"
7979
#include"catalog/indexing.h"
80+
#include"catalog/pg_subscription.h"
8081
#include"funcapi.h"
8182
#include"miscadmin.h"
8283
#include"nodes/execnodes.h"
@@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
195196
}
196197

197198

199+
/*
200+
* IsReservedOriginName
201+
*True iff name is either "none" or "any".
202+
*/
203+
staticbool
204+
IsReservedOriginName(constchar*name)
205+
{
206+
return ((pg_strcasecmp(name,LOGICALREP_ORIGIN_NONE)==0)||
207+
(pg_strcasecmp(name,LOGICALREP_ORIGIN_ANY)==0));
208+
}
209+
198210
/* ---------------------------------------------------------------------------
199211
* Functions for working with replication origins themselves.
200212
* ---------------------------------------------------------------------------
@@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
12441256

12451257
name=text_to_cstring((text*)DatumGetPointer(PG_GETARG_DATUM(0)));
12461258

1247-
/* Replication origins "pg_xxx" are reserved for internal use */
1248-
if (IsReservedName(name))
1259+
/*
1260+
* Replication origins "any and "none" are reserved for system options.
1261+
* The origins "pg_xxx" are reserved for internal use.
1262+
*/
1263+
if (IsReservedName(name)||IsReservedOriginName(name))
12491264
ereport(ERROR,
12501265
(errcode(ERRCODE_RESERVED_NAME),
12511266
errmsg("replication origin name \"%s\" is reserved",
12521267
name),
1253-
errdetail("Origin names starting with \"pg_\" are reserved.")));
1268+
errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1269+
LOGICALREP_ORIGIN_ANY,LOGICALREP_ORIGIN_NONE)));
12541270

12551271
/*
12561272
* If built with appropriate switch, whine when regression-testing

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp