Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit705e20f

Browse files
author
Amit Kapila
committed
Optionally disable subscriptions on error.
Logical replication apply workers for a subscription can easily get stuckin an infinite loop of attempting to apply a change, triggering an error(such as a constraint violation), exiting with the error written to thesubscription server log, and restarting.To partially remedy the situation, this patch adds a new subscriptionoption named 'disable_on_error'. To be consistent with old behavior, thisoption defaults to false. When true, both the tablesync worker and applyworker catch any errors thrown and disable the subscription in order tobreak the loop. The error is still also written in the logs.Once the subscription is disabled, users can either manually resolve theconflict/error or skip the conflicting transaction by usingpg_replication_origin_advance() function. After resolving the conflict,users need to enable the subscription to allow apply process to proceed.Author: Osumi Takamichi and Mark DilgerReviewed-by: Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying, Peter Smith, Masahiko Sawada, Shi YuDiscussion :https://postgr.es/m/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
1 parent369398e commit705e20f

File tree

17 files changed

+421
-108
lines changed

17 files changed

+421
-108
lines changed

‎doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7769,6 +7769,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
77697769
</para></entry>
77707770
</row>
77717771

7772+
<row>
7773+
<entry role="catalog_table_entry"><para role="column_definition">
7774+
<structfield>subdisableonerr</structfield> <type>bool</type>
7775+
</para>
7776+
<para>
7777+
If true, the subscription will be disabled if one of its workers
7778+
detects an error
7779+
</para></entry>
7780+
</row>
7781+
77727782
<row>
77737783
<entry role="catalog_table_entry"><para role="column_definition">
77747784
<structfield>subconninfo</structfield> <type>text</type>

‎doc/src/sgml/logical-replication.sgml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
364364
the replication origin name can be found from the server log (LSN 0/14C0378 and
365365
replication origin <literal>pg_16395</literal> in the above case). To skip the
366366
transaction, the subscription needs to be disabled temporarily by
367-
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
368-
can be skipped by calling the
367+
<command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
368+
subscription can be used with the <literal>disable_on_error</literal> option.
369+
Then, the transaction can be skipped by calling the
369370
<link linkend="pg-replication-origin-advance">
370371
<function>pg_replication_origin_advance()</function></link> function with
371372
the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the

‎doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
204204
information. The parameters that can be altered
205205
are <literal>slot_name</literal>,
206206
<literal>synchronous_commit</literal>,
207-
<literal>binary</literal>, and
208-
<literal>streaming</literal>.
207+
<literal>binary</literal>,<literal>streaming</literal>,and
208+
<literal>disable_on_error</literal>.
209209
</para>
210210
</listitem>
211211
</varlistentry>

‎doc/src/sgml/ref/create_subscription.sgml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
290290

291291
</listitem>
292292
</varlistentry>
293+
294+
<varlistentry>
295+
<term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
296+
<listitem>
297+
<para>
298+
Specifies whether the subscription should be automatically disabled
299+
if any errors are detected by subscription workers during data
300+
replication from the publisher. The default is
301+
<literal>false</literal>.
302+
</para>
303+
</listitem>
304+
</varlistentry>
293305
</variablelist>
294306
</para>
295307

‎src/backend/catalog/pg_subscription.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
6969
sub->binary=subform->subbinary;
7070
sub->stream=subform->substream;
7171
sub->twophasestate=subform->subtwophasestate;
72+
sub->disableonerr=subform->subdisableonerr;
7273

7374
/* Get conninfo */
7475
datum=SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -156,6 +157,45 @@ FreeSubscription(Subscription *sub)
156157
pfree(sub);
157158
}
158159

160+
/*
161+
* Disable the given subscription.
162+
*/
163+
void
164+
DisableSubscription(Oidsubid)
165+
{
166+
Relationrel;
167+
boolnulls[Natts_pg_subscription];
168+
boolreplaces[Natts_pg_subscription];
169+
Datumvalues[Natts_pg_subscription];
170+
HeapTupletup;
171+
172+
/* Look up the subscription in the catalog */
173+
rel=table_open(SubscriptionRelationId,RowExclusiveLock);
174+
tup=SearchSysCacheCopy1(SUBSCRIPTIONOID,ObjectIdGetDatum(subid));
175+
176+
if (!HeapTupleIsValid(tup))
177+
elog(ERROR,"cache lookup failed for subscription %u",subid);
178+
179+
LockSharedObject(SubscriptionRelationId,subid,0,AccessShareLock);
180+
181+
/* Form a new tuple. */
182+
memset(values,0,sizeof(values));
183+
memset(nulls, false,sizeof(nulls));
184+
memset(replaces, false,sizeof(replaces));
185+
186+
/* Set the subscription to disabled. */
187+
values[Anum_pg_subscription_subenabled-1]=BoolGetDatum(false);
188+
replaces[Anum_pg_subscription_subenabled-1]= true;
189+
190+
/* Update the catalog */
191+
tup=heap_modify_tuple(tup,RelationGetDescr(rel),values,nulls,
192+
replaces);
193+
CatalogTupleUpdate(rel,&tup->t_self,tup);
194+
heap_freetuple(tup);
195+
196+
table_close(rel,NoLock);
197+
}
198+
159199
/*
160200
* get_subscription_oid - given a subscription name, look up the OID
161201
*

‎src/backend/catalog/system_views.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
12611261
-- All columns of pg_subscription except subconninfo are publicly readable.
12621262
REVOKE ALLON pg_subscriptionFROM public;
12631263
GRANTSELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
1264-
substream, subtwophasestate, subslotname, subsynccommit, subpublications)
1264+
substream, subtwophasestate, subdisableonerr, subslotname,
1265+
subsynccommit, subpublications)
12651266
ON pg_subscription TO public;
12661267

12671268
CREATEVIEWpg_stat_subscription_statsAS

‎src/backend/commands/subscriptioncmds.c

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#defineSUBOPT_BINARY0x00000080
6262
#defineSUBOPT_STREAMING0x00000100
6363
#defineSUBOPT_TWOPHASE_COMMIT0x00000200
64+
#defineSUBOPT_DISABLE_ON_ERR0x00000400
6465

6566
/* check if the 'val' has 'bits' set */
6667
#defineIsSet(val,bits) (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
8283
boolbinary;
8384
boolstreaming;
8485
booltwophase;
86+
booldisableonerr;
8587
}SubOpts;
8688

8789
staticList*fetch_table_list(WalReceiverConn*wrconn,List*publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
130132
opts->streaming= false;
131133
if (IsSet(supported_opts,SUBOPT_TWOPHASE_COMMIT))
132134
opts->twophase= false;
135+
if (IsSet(supported_opts,SUBOPT_DISABLE_ON_ERR))
136+
opts->disableonerr= false;
133137

134138
/* Parse options */
135139
foreach(lc,stmt_options)
@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
249253
opts->specified_opts |=SUBOPT_TWOPHASE_COMMIT;
250254
opts->twophase=defGetBoolean(defel);
251255
}
256+
elseif (IsSet(supported_opts,SUBOPT_DISABLE_ON_ERR)&&
257+
strcmp(defel->defname,"disable_on_error")==0)
258+
{
259+
if (IsSet(opts->specified_opts,SUBOPT_DISABLE_ON_ERR))
260+
errorConflictingDefElem(defel,pstate);
261+
262+
opts->specified_opts |=SUBOPT_DISABLE_ON_ERR;
263+
opts->disableonerr=defGetBoolean(defel);
264+
}
252265
else
253266
ereport(ERROR,
254267
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
390403
supported_opts= (SUBOPT_CONNECT |SUBOPT_ENABLED |SUBOPT_CREATE_SLOT |
391404
SUBOPT_SLOT_NAME |SUBOPT_COPY_DATA |
392405
SUBOPT_SYNCHRONOUS_COMMIT |SUBOPT_BINARY |
393-
SUBOPT_STREAMING |SUBOPT_TWOPHASE_COMMIT);
406+
SUBOPT_STREAMING |SUBOPT_TWOPHASE_COMMIT |
407+
SUBOPT_DISABLE_ON_ERR);
394408
parse_subscription_options(pstate,stmt->options,supported_opts,&opts);
395409

396410
/*
@@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
464478
CharGetDatum(opts.twophase ?
465479
LOGICALREP_TWOPHASE_STATE_PENDING :
466480
LOGICALREP_TWOPHASE_STATE_DISABLED);
481+
values[Anum_pg_subscription_subdisableonerr-1]=BoolGetDatum(opts.disableonerr);
467482
values[Anum_pg_subscription_subconninfo-1]=
468483
CStringGetTextDatum(conninfo);
469484
if (opts.slot_name)
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
864879
{
865880
supported_opts= (SUBOPT_SLOT_NAME |
866881
SUBOPT_SYNCHRONOUS_COMMIT |SUBOPT_BINARY |
867-
SUBOPT_STREAMING);
882+
SUBOPT_STREAMING |SUBOPT_DISABLE_ON_ERR);
868883

869884
parse_subscription_options(pstate,stmt->options,
870885
supported_opts,&opts);
@@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
913928
replaces[Anum_pg_subscription_substream-1]= true;
914929
}
915930

931+
if (IsSet(opts.specified_opts,SUBOPT_DISABLE_ON_ERR))
932+
{
933+
values[Anum_pg_subscription_subdisableonerr-1]
934+
=BoolGetDatum(opts.disableonerr);
935+
replaces[Anum_pg_subscription_subdisableonerr-1]
936+
= true;
937+
}
938+
916939
update_tuple= true;
917940
break;
918941
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp