Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit887227a

Browse files
committed
Add option to modify sync commit per subscription
This also changes default behaviour of subscription workers tosynchronous_commit = off.Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
1 parent25371a7 commit887227a

File tree

13 files changed

+145
-33
lines changed

13 files changed

+145
-33
lines changed

‎doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6530,6 +6530,16 @@
65306530
<entry>If true, the subscription is enabled and should be replicating.</entry>
65316531
</row>
65326532

6533+
<row>
6534+
<entry><structfield>subsynccommit</structfield></entry>
6535+
<entry><type>text</type></entry>
6536+
<entry></entry>
6537+
<entry>
6538+
Contains the value of the <varname>synchronous_commit</varname>
6539+
setting for the subscription workers.
6540+
</entry>
6541+
</row>
6542+
65336543
<row>
65346544
<entry><structfield>subconninfo</structfield></entry>
65356545
<entry><type>text</type></entry>

‎doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep
2626
<phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase>
2727

2828
SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
29+
| SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
2930

3031
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION <replaceable class="PARAMETER">publication_name</replaceable> [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH }
3132
ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] )
@@ -91,6 +92,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> DISABLE
9192
<varlistentry>
9293
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
9394
<term><literal>SLOT NAME = <replaceable class="parameter">slot_name</replaceable></literal></term>
95+
<term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
9496
<listitem>
9597
<para>
9698
These clauses alter properties originally set by

‎doc/src/sgml/ref/create_subscription.sgml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
3232
| CREATE SLOT | NOCREATE SLOT
3333
| SLOT NAME = <replaceable class="PARAMETER">slot_name</replaceable>
3434
| COPY DATA | NOCOPY DATA
35+
| SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable>
3536
| NOCONNECT
3637
</synopsis>
3738
</refsynopsisdiv>
@@ -147,6 +148,36 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl
147148
</listitem>
148149
</varlistentry>
149150

151+
<varlistentry>
152+
<term><literal>SYNCHRONOUS_COMMIT = <replaceable class="PARAMETER">synchronous_commit</replaceable></literal></term>
153+
<listitem>
154+
<para>
155+
The value of this parameter overrides the
156+
<xref linkend="guc-synchronous-commit"> setting. The default value is
157+
<literal>off</literal>.
158+
</para>
159+
160+
<para>
161+
It is safe to use <literal>off</literal> for logical replication: If the
162+
subscriber loses transactions because of missing synchronization, the
163+
data will be resent from the publisher.
164+
</para>
165+
166+
<para>
167+
A different setting might be appropriate when doing synchronous logical
168+
replication. The logical replication workers report the positions of
169+
writes and flushes to the publisher, and when using synchronous
170+
replication, the publisher will wait for the actual flush. This means
171+
that setting <literal>SYNCHRONOUS_COMMIT</literal> for the subscriber
172+
to <literal>off</literal> when the subscription is used for synchronous
173+
replication might increase the latency for <command>COMMIT</command> on
174+
the publisher. In this scenario, it can be advantageous to set
175+
<literal>SYNCHRONOUS_COMMIT</literal> to <literal>local</literal> or
176+
higher.
177+
</para>
178+
</listitem>
179+
</varlistentry>
180+
150181
<varlistentry>
151182
<term><literal>NOCONNECT</literal></term>
152183
<listitem>

‎src/backend/catalog/pg_subscription.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ GetSubscription(Oid subid, bool missing_ok)
8585
Assert(!isnull);
8686
sub->slotname=pstrdup(NameStr(*DatumGetName(datum)));
8787

88+
/* Get synccommit */
89+
datum=SysCacheGetAttr(SUBSCRIPTIONOID,
90+
tup,
91+
Anum_pg_subscription_subsynccommit,
92+
&isnull);
93+
Assert(!isnull);
94+
sub->synccommit=TextDatumGetCString(datum);
95+
8896
/* Get publications */
8997
datum=SysCacheGetAttr(SUBSCRIPTIONOID,
9098
tup,

‎src/backend/commands/subscriptioncmds.c

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include"storage/lmgr.h"
4545

4646
#include"utils/builtins.h"
47+
#include"utils/guc.h"
4748
#include"utils/lsyscache.h"
4849
#include"utils/memutils.h"
4950
#include"utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
6061
staticvoid
6162
parse_subscription_options(List*options,bool*connect,bool*enabled_given,
6263
bool*enabled,bool*create_slot,char**slot_name,
63-
bool*copy_data)
64+
bool*copy_data,char**synchronous_commit)
6465
{
6566
ListCell*lc;
6667
boolconnect_given= false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
8081
*slot_name=NULL;
8182
if (copy_data)
8283
*copy_data= true;
84+
if (synchronous_commit)
85+
*synchronous_commit=NULL;
8386

8487
/* Parse options */
8588
foreach (lc,options)
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
165168
copy_data_given= true;
166169
*copy_data= !defGetBoolean(defel);
167170
}
171+
elseif (strcmp(defel->defname,"synchronous_commit")==0&&
172+
synchronous_commit)
173+
{
174+
if (*synchronous_commit)
175+
ereport(ERROR,
176+
(errcode(ERRCODE_SYNTAX_ERROR),
177+
errmsg("conflicting or redundant options")));
178+
179+
*synchronous_commit=defGetString(defel);
180+
181+
/* Test if the given value is valid for synchronous_commit GUC. */
182+
(void)set_config_option("synchronous_commit",*synchronous_commit,
183+
PGC_BACKEND,PGC_S_TEST,GUC_ACTION_SET,
184+
false,0, false);
185+
}
168186
else
169187
elog(ERROR,"unrecognized option: %s",defel->defname);
170188
}
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
269287
boolenabled_given;
270288
boolenabled;
271289
boolcopy_data;
290+
char*synchronous_commit;
272291
char*conninfo;
273292
char*slotname;
274293
charoriginname[NAMEDATALEN];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
280299
* Connection and publication should not be specified here.
281300
*/
282301
parse_subscription_options(stmt->options,&connect,&enabled_given,
283-
&enabled,&create_slot,&slotname,&copy_data);
302+
&enabled,&create_slot,&slotname,&copy_data,
303+
&synchronous_commit);
284304

285305
/*
286306
* Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
311331

312332
if (slotname==NULL)
313333
slotname=stmt->subname;
334+
/* The default for synchronous_commit of subscriptions is off. */
335+
if (synchronous_commit==NULL)
336+
synchronous_commit="off";
314337

315338
conninfo=stmt->conninfo;
316339
publications=stmt->publication;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
334357
CStringGetTextDatum(conninfo);
335358
values[Anum_pg_subscription_subslotname-1]=
336359
DirectFunctionCall1(namein,CStringGetDatum(slotname));
360+
values[Anum_pg_subscription_subsynccommit-1]=
361+
CStringGetTextDatum(synchronous_commit);
337362
values[Anum_pg_subscription_subpublications-1]=
338363
publicationListToArray(publications);
339364

@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
582607
caseALTER_SUBSCRIPTION_OPTIONS:
583608
{
584609
char*slot_name;
610+
char*synchronous_commit;
585611

586612
parse_subscription_options(stmt->options,NULL,NULL,NULL,
587-
NULL,&slot_name,NULL);
613+
NULL,&slot_name,NULL,
614+
&synchronous_commit);
588615

589-
values[Anum_pg_subscription_subslotname-1]=
590-
DirectFunctionCall1(namein,CStringGetDatum(slot_name));
591-
replaces[Anum_pg_subscription_subslotname-1]= true;
616+
if (slot_name)
617+
{
618+
values[Anum_pg_subscription_subslotname-1]=
619+
DirectFunctionCall1(namein,CStringGetDatum(slot_name));
620+
replaces[Anum_pg_subscription_subslotname-1]= true;
621+
}
622+
if (synchronous_commit)
623+
{
624+
values[Anum_pg_subscription_subsynccommit-1]=
625+
CStringGetTextDatum(synchronous_commit);
626+
replaces[Anum_pg_subscription_subsynccommit-1]= true;
627+
}
592628

593629
update_tuple= true;
594630
break;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
601637

602638
parse_subscription_options(stmt->options,NULL,
603639
&enabled_given,&enabled,NULL,
604-
NULL,NULL);
640+
NULL,NULL,NULL);
605641
Assert(enabled_given);
606642

607643
values[Anum_pg_subscription_subenabled-1]=
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
626662
Subscription*sub=GetSubscription(subid, false);
627663

628664
parse_subscription_options(stmt->options,NULL,NULL,NULL,
629-
NULL,NULL,&copy_data);
665+
NULL,NULL,&copy_data,NULL);
630666

631667
values[Anum_pg_subscription_subpublications-1]=
632668
publicationListToArray(stmt->publication);
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
652688
Subscription*sub=GetSubscription(subid, false);
653689

654690
parse_subscription_options(stmt->options,NULL,NULL,NULL,
655-
NULL,NULL,&copy_data);
691+
NULL,NULL,&copy_data,NULL);
656692

657693
AlterSubscription_refresh(sub,copy_data);
658694

‎src/backend/replication/logical/launcher.c

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,13 @@ get_subscription_list(void)
129129
*/
130130
oldcxt=MemoryContextSwitchTo(resultcxt);
131131

132-
sub= (Subscription*)palloc(sizeof(Subscription));
132+
sub= (Subscription*)palloc0(sizeof(Subscription));
133133
sub->oid=HeapTupleGetOid(tup);
134134
sub->dbid=subform->subdbid;
135135
sub->owner=subform->subowner;
136136
sub->enabled=subform->subenabled;
137137
sub->name=pstrdup(NameStr(subform->subname));
138-
139138
/* We don't fill fields we are not interested in. */
140-
sub->conninfo=NULL;
141-
sub->slotname=NULL;
142-
sub->publications=NIL;
143139

144140
res=lappend(res,sub);
145141
MemoryContextSwitchTo(oldcxt);

‎src/backend/replication/logical/worker.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,6 +1416,10 @@ reread_subscription(void)
14161416

14171417
MemoryContextSwitchTo(oldctx);
14181418

1419+
/* Change synchronous commit according to the user's wishes */
1420+
SetConfigOption("synchronous_commit",MySubscription->synccommit,
1421+
PGC_BACKEND,PGC_S_OVERRIDE);
1422+
14191423
if (started_tx)
14201424
CommitTransactionCommand();
14211425

@@ -1485,6 +1489,10 @@ ApplyWorkerMain(Datum main_arg)
14851489
MySubscriptionValid= true;
14861490
MemoryContextSwitchTo(oldctx);
14871491

1492+
/* Setup synchronous commit according to the user's wishes */
1493+
SetConfigOption("synchronous_commit",MySubscription->synccommit,
1494+
PGC_BACKEND,PGC_S_OVERRIDE);
1495+
14881496
if (!MySubscription->enabled)
14891497
{
14901498
ereport(LOG,

‎src/bin/pg_dump/pg_dump.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3683,6 +3683,7 @@ getSubscriptions(Archive *fout)
36833683
inti_rolname;
36843684
inti_subconninfo;
36853685
inti_subslotname;
3686+
inti_subsynccommit;
36863687
inti_subpublications;
36873688
inti,
36883689
ntups;
@@ -3714,7 +3715,8 @@ getSubscriptions(Archive *fout)
37143715
appendPQExpBuffer(query,
37153716
"SELECT s.tableoid, s.oid, s.subname,"
37163717
"(%s s.subowner) AS rolname, "
3717-
" s.subconninfo, s.subslotname, s.subpublications "
3718+
" s.subconninfo, s.subslotname, s.subsynccommit, "
3719+
" s.subpublications "
37183720
"FROM pg_catalog.pg_subscription s "
37193721
"WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
37203722
" WHERE datname = current_database())",
@@ -3729,6 +3731,7 @@ getSubscriptions(Archive *fout)
37293731
i_rolname = PQfnumber(res, "rolname");
37303732
i_subconninfo = PQfnumber(res, "subconninfo");
37313733
i_subslotname = PQfnumber(res, "subslotname");
3734+
i_subsynccommit = PQfnumber(res, "subsynccommit");
37323735
i_subpublications = PQfnumber(res, "subpublications");
37333736

37343737
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -3744,6 +3747,8 @@ getSubscriptions(Archive *fout)
37443747
subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
37453748
subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
37463749
subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
3750+
subinfo[i].subsynccommit =
3751+
pg_strdup(PQgetvalue(res, i, i_subsynccommit));
37473752
subinfo[i].subpublications =
37483753
pg_strdup(PQgetvalue(res, i, i_subpublications));
37493754

@@ -3810,6 +3815,10 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
38103815

38113816
appendPQExpBuffer(query, " PUBLICATION %s WITH (NOCONNECT, SLOT NAME = ", publications->data);
38123817
appendStringLiteralAH(query, subinfo->subslotname, fout);
3818+
3819+
if (strcmp(subinfo->subsynccommit, "off") != 0)
3820+
appendPQExpBuffer(query, ", SYNCHRONOUS_COMMIT = %s", fmtId(subinfo->subsynccommit));
3821+
38133822
appendPQExpBufferStr(query, ");\n");
38143823

38153824
appendPQExpBuffer(labelq, "SUBSCRIPTION %s", fmtId(subinfo->dobj.name));

‎src/bin/pg_dump/pg_dump.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ typedef struct _SubscriptionInfo
616616
char*rolname;
617617
char*subconninfo;
618618
char*subslotname;
619+
char*subsynccommit;
619620
char*subpublications;
620621
}SubscriptionInfo;
621622

‎src/bin/psql/describe.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5199,7 +5199,8 @@ describeSubscriptions(const char *pattern, bool verbose)
51995199
PQExpBufferDatabuf;
52005200
PGresult*res;
52015201
printQueryOptmyopt=pset.popt;
5202-
staticconstbooltranslate_columns[]= {false, false, false, false, false};
5202+
staticconstbooltranslate_columns[]= {false, false, false, false,
5203+
false, false};
52035204

52045205
if (pset.sversion<100000)
52055206
{
@@ -5225,7 +5226,9 @@ describeSubscriptions(const char *pattern, bool verbose)
52255226
if (verbose)
52265227
{
52275228
appendPQExpBuffer(&buf,
5229+
", subsynccommit AS \"%s\"\n"
52285230
", subconninfo AS \"%s\"\n",
5231+
gettext_noop("Synchronous commit"),
52295232
gettext_noop("Conninfo"));
52305233
}
52315234

‎src/include/catalog/pg_subscription.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE
4343
#ifdefCATALOG_VARLEN/* variable-length fields start here */
4444
textsubconninfo;/* Connection string to the publisher */
4545
NameDatasubslotname;/* Slot name on publisher */
46-
46+
textsubsynccommit;/* Synchronous commit setting for worker */
4747
textsubpublications[1];/* List of publications subscribed to */
4848
#endif
4949
}FormData_pg_subscription;
@@ -54,14 +54,15 @@ typedef FormData_pg_subscription *Form_pg_subscription;
5454
*compiler constants for pg_subscription
5555
* ----------------
5656
*/
57-
#defineNatts_pg_subscription7
57+
#defineNatts_pg_subscription8
5858
#defineAnum_pg_subscription_subdbid1
5959
#defineAnum_pg_subscription_subname2
6060
#defineAnum_pg_subscription_subowner3
6161
#defineAnum_pg_subscription_subenabled4
6262
#defineAnum_pg_subscription_subconninfo5
6363
#defineAnum_pg_subscription_subslotname6
64-
#defineAnum_pg_subscription_subpublications7
64+
#defineAnum_pg_subscription_subsynccommit7
65+
#defineAnum_pg_subscription_subpublications8
6566

6667

6768
typedefstructSubscription
@@ -73,6 +74,7 @@ typedef struct Subscription
7374
boolenabled;/* Indicates if the subscription is enabled */
7475
char*conninfo;/* Connection string to the publisher */
7576
char*slotname;/* Name of the replication slot */
77+
char*synccommit;/* Synchronous commit setting for worker */
7678
List*publications;/* List of publication names to subscribe to */
7779
}Subscription;
7880

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp