Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite117cfb

Browse files
author
Amit Kapila
committed
Add two-phase option in pg_createsubscriber.
This patch introduces the '--enable-two-phase' option to the'pg_createsubscriber' utility, allowing users to enable two-phase commitfor all subscriptions during their creation.Note that even without this option users can enable the two_phase optionfor the subscriptions created by pg_createsubscriber. However, it requiresthe subscription to be disabled first which could be inconvenient forusers.When two-phase commit is enabled, prepared transactions are sent to thesubscriber at the time of 'PREPARE TRANSACTION', and they are processed astwo-phase transactions on the subscriber as well. If disabled, preparedtransactions are sent only when committed and are processed immediately bythe subscriber.Author: Shubham Khanna <khannashubham1197@gmail.com>Reviewed-by: vignesh C <vignesh21@gmail.com>Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>Reviewed-by: Peter Smith <smithpb2250@gmail.com>Reviewed-by: Ajin Cherian <itsajin@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Discussion:https://postgr.es/m/CAHv8RjLPdFP=kA5LNSmWZ=+GMXmO+LczvV6p9HJjsXxZz10KGA@mail.gmail.com
1 parentadc6032 commite117cfb

File tree

3 files changed

+83
-30
lines changed

3 files changed

+83
-30
lines changed

‎doc/src/sgml/ref/pg_createsubscriber.sgml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,19 @@ PostgreSQL documentation
165165
</listitem>
166166
</varlistentry>
167167

168+
<varlistentry>
169+
<term><option>-T</option></term>
170+
<term><option>--enable-two-phase</option></term>
171+
<listitem>
172+
<para>
173+
Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
174+
commit for the subscription. When multiple databases are specified, this
175+
option applies uniformly to all subscriptions created on those databases.
176+
The default is <literal>false</literal>.
177+
</para>
178+
</listitem>
179+
</varlistentry>
180+
168181
<varlistentry>
169182
<term><option>-U <replaceable class="parameter">username</replaceable></option></term>
170183
<term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
@@ -300,7 +313,9 @@ PostgreSQL documentation
300313
greater than or equal to the number of specified databases. The target
301314
server must have <xref linkend="guc-max-worker-processes"/> configured to a
302315
value greater than the number of specified databases. The target server
303-
must accept local connections.
316+
must accept local connections. If you are planning to use the
317+
<option>--enable-two-phase</option> switch then you will also need to set
318+
the <xref linkend="guc-max-prepared-transactions"/> appropriately.
304319
</para>
305320

306321
<para>
@@ -360,6 +375,7 @@ PostgreSQL documentation
360375
</para>
361376

362377
<para>
378+
Unless the <option>--enable-two-phase</option> switch is specified,
363379
<application>pg_createsubscriber</application> sets up logical
364380
replication with two-phase commit disabled. This means that any
365381
prepared transactions will be replicated at the time

‎src/bin/pg_basebackup/pg_createsubscriber.c

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@ struct CreateSubscriberOptions
3838
char*socket_dir;/* directory for Unix-domain socket, if any */
3939
char*sub_port;/* subscriber port number */
4040
constchar*sub_username;/* subscriber username */
41+
booltwo_phase;/* enable-two-phase option */
4142
SimpleStringListdatabase_names;/* list of database names */
4243
SimpleStringListpub_names;/* list of publication names */
4344
SimpleStringListsub_names;/* list of subscription names */
4445
SimpleStringListreplslot_names;/* list of replication slot names */
4546
intrecovery_timeout;/* stop recovery after this time */
4647
};
4748

49+
/* per-database publication/subscription info */
4850
structLogicalRepInfo
4951
{
5052
char*dbname;/* database name */
@@ -58,6 +60,16 @@ struct LogicalRepInfo
5860
boolmade_publication;/* publication was created */
5961
};
6062

63+
/*
64+
* Information shared across all the databases (or publications and
65+
* subscriptions).
66+
*/
67+
structLogicalRepInfos
68+
{
69+
structLogicalRepInfo*dbinfo;
70+
booltwo_phase;/* enable-two-phase option */
71+
};
72+
6173
staticvoidcleanup_objects_atexit(void);
6274
staticvoidusage();
6375
staticchar*get_base_conninfo(constchar*conninfo,char**dbname);
@@ -117,7 +129,7 @@ static bool dry_run = false;
117129

118130
staticboolsuccess= false;
119131

120-
staticstructLogicalRepInfo*dbinfo;
132+
staticstructLogicalRepInfosdbinfos;
121133
staticintnum_dbs=0;/* number of specified databases */
122134
staticintnum_pubs=0;/* number of specified publications */
123135
staticintnum_subs=0;/* number of specified subscriptions */
@@ -172,17 +184,17 @@ cleanup_objects_atexit(void)
172184

173185
for (inti=0;i<num_dbs;i++)
174186
{
175-
if (dbinfo[i].made_publication||dbinfo[i].made_replslot)
187+
if (dbinfos.dbinfo[i].made_publication||dbinfos.dbinfo[i].made_replslot)
176188
{
177189
PGconn*conn;
178190

179-
conn=connect_database(dbinfo[i].pubconninfo, false);
191+
conn=connect_database(dbinfos.dbinfo[i].pubconninfo, false);
180192
if (conn!=NULL)
181193
{
182-
if (dbinfo[i].made_publication)
183-
drop_publication(conn,&dbinfo[i]);
184-
if (dbinfo[i].made_replslot)
185-
drop_replication_slot(conn,&dbinfo[i],dbinfo[i].replslotname);
194+
if (dbinfos.dbinfo[i].made_publication)
195+
drop_publication(conn,&dbinfos.dbinfo[i]);
196+
if (dbinfos.dbinfo[i].made_replslot)
197+
drop_replication_slot(conn,&dbinfos.dbinfo[i],dbinfos.dbinfo[i].replslotname);
186198
disconnect_database(conn, false);
187199
}
188200
else
@@ -192,16 +204,18 @@ cleanup_objects_atexit(void)
192204
* that some objects were left on primary and should be
193205
* removed before trying again.
194206
*/
195-
if (dbinfo[i].made_publication)
207+
if (dbinfos.dbinfo[i].made_publication)
196208
{
197209
pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
198-
dbinfo[i].pubname,dbinfo[i].dbname);
210+
dbinfos.dbinfo[i].pubname,
211+
dbinfos.dbinfo[i].dbname);
199212
pg_log_warning_hint("Drop this publication before trying again.");
200213
}
201-
if (dbinfo[i].made_replslot)
214+
if (dbinfos.dbinfo[i].made_replslot)
202215
{
203216
pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
204-
dbinfo[i].replslotname,dbinfo[i].dbname);
217+
dbinfos.dbinfo[i].replslotname,
218+
dbinfos.dbinfo[i].dbname);
205219
pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
206220
}
207221
}
@@ -227,6 +241,7 @@ usage(void)
227241
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
228242
printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
229243
printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
244+
printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
230245
printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
231246
printf(_(" -v, --verbose output verbose messages\n"));
232247
printf(_(" --config-file=FILENAME use specified main server configuration\n"
@@ -479,9 +494,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
479494
dbinfo[i].pubname ?dbinfo[i].pubname :"(auto)",
480495
dbinfo[i].replslotname ?dbinfo[i].replslotname :"(auto)",
481496
dbinfo[i].pubconninfo);
482-
pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s",i,
497+
pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s",i,
483498
dbinfo[i].subname ?dbinfo[i].subname :"(auto)",
484-
dbinfo[i].subconninfo);
499+
dbinfo[i].subconninfo,
500+
dbinfos.two_phase ?"true" :"false");
485501

486502
if (num_pubs>0)
487503
pubcell=pubcell->next;
@@ -938,11 +954,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
938954
failed= true;
939955
}
940956

941-
if (max_prepared_transactions!=0)
957+
if (max_prepared_transactions!=0&& !dbinfos.two_phase)
942958
{
943959
pg_log_warning("two_phase option will not be enabled for replication slots");
944960
pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
945961
"Prepared transactions will be replicated at COMMIT PREPARED.");
962+
pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
946963
}
947964

948965
/*
@@ -1345,8 +1362,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
13451362
slot_name_esc=PQescapeLiteral(conn,slot_name,strlen(slot_name));
13461363

13471364
appendPQExpBuffer(str,
1348-
"SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1349-
slot_name_esc);
1365+
"SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1366+
slot_name_esc,
1367+
dbinfos.two_phase ?"true" :"false");
13501368

13511369
PQfreemem(slot_name_esc);
13521370

@@ -1722,8 +1740,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
17221740
appendPQExpBuffer(str,
17231741
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
17241742
"WITH (create_slot = false, enabled = false, "
1725-
"slot_name = %s, copy_data = false)",
1726-
subname_esc,pubconninfo_esc,pubname_esc,replslotname_esc);
1743+
"slot_name = %s, copy_data = false, two_phase = %s)",
1744+
subname_esc,pubconninfo_esc,pubname_esc,replslotname_esc,
1745+
dbinfos.two_phase ?"true" :"false");
17271746

17281747
PQfreemem(pubname_esc);
17291748
PQfreemem(subname_esc);
@@ -1895,6 +1914,7 @@ main(int argc, char **argv)
18951914
{"publisher-server",required_argument,NULL,'P'},
18961915
{"socketdir",required_argument,NULL,'s'},
18971916
{"recovery-timeout",required_argument,NULL,'t'},
1917+
{"enable-two-phase",no_argument,NULL,'T'},
18981918
{"subscriber-username",required_argument,NULL,'U'},
18991919
{"verbose",no_argument,NULL,'v'},
19001920
{"version",no_argument,NULL,'V'},
@@ -1950,6 +1970,7 @@ main(int argc, char **argv)
19501970
opt.socket_dir=NULL;
19511971
opt.sub_port=DEFAULT_SUB_PORT;
19521972
opt.sub_username=NULL;
1973+
opt.two_phase= false;
19531974
opt.database_names= (SimpleStringList)
19541975
{
19551976
0
@@ -1972,7 +1993,7 @@ main(int argc, char **argv)
19721993

19731994
get_restricted_token();
19741995

1975-
while ((c=getopt_long(argc,argv,"d:D:np:P:s:t:U:v",
1996+
while ((c=getopt_long(argc,argv,"d:D:np:P:s:t:TU:v",
19761997
long_options,&option_index))!=-1)
19771998
{
19781999
switch (c)
@@ -2009,6 +2030,9 @@ main(int argc, char **argv)
20092030
case't':
20102031
opt.recovery_timeout=atoi(optarg);
20112032
break;
2033+
case'T':
2034+
opt.two_phase= true;
2035+
break;
20122036
case'U':
20132037
opt.sub_username=pg_strdup(optarg);
20142038
break;
@@ -2170,12 +2194,14 @@ main(int argc, char **argv)
21702194
/* Rudimentary check for a data directory */
21712195
check_data_directory(subscriber_dir);
21722196

2197+
dbinfos.two_phase=opt.two_phase;
2198+
21732199
/*
21742200
* Store database information for publisher and subscriber. It should be
21752201
* called before atexit() because its return is used in the
21762202
* cleanup_objects_atexit().
21772203
*/
2178-
dbinfo=store_pub_sub_info(&opt,pub_base_conninfo,sub_base_conninfo);
2204+
dbinfos.dbinfo=store_pub_sub_info(&opt,pub_base_conninfo,sub_base_conninfo);
21792205

21802206
/* Register a function to clean up objects in case of failure */
21812207
atexit(cleanup_objects_atexit);
@@ -2184,7 +2210,7 @@ main(int argc, char **argv)
21842210
* Check if the subscriber data directory has the same system identifier
21852211
* than the publisher data directory.
21862212
*/
2187-
pub_sysid=get_primary_sysid(dbinfo[0].pubconninfo);
2213+
pub_sysid=get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
21882214
sub_sysid=get_standby_sysid(subscriber_dir);
21892215
if (pub_sysid!=sub_sysid)
21902216
pg_fatal("subscriber data directory is not a copy of the source database cluster");
@@ -2214,10 +2240,10 @@ main(int argc, char **argv)
22142240
start_standby_server(&opt, true, false);
22152241

22162242
/* Check if the standby server is ready for logical replication */
2217-
check_subscriber(dbinfo);
2243+
check_subscriber(dbinfos.dbinfo);
22182244

22192245
/* Check if the primary server is ready for logical replication */
2220-
check_publisher(dbinfo);
2246+
check_publisher(dbinfos.dbinfo);
22212247

22222248
/*
22232249
* Stop the target server. The recovery process requires that the server
@@ -2230,10 +2256,10 @@ main(int argc, char **argv)
22302256
stop_standby_server(subscriber_dir);
22312257

22322258
/* Create the required objects for each database on publisher */
2233-
consistent_lsn=setup_publisher(dbinfo);
2259+
consistent_lsn=setup_publisher(dbinfos.dbinfo);
22342260

22352261
/* Write the required recovery parameters */
2236-
setup_recovery(dbinfo,subscriber_dir,consistent_lsn);
2262+
setup_recovery(dbinfos.dbinfo,subscriber_dir,consistent_lsn);
22372263

22382264
/*
22392265
* Start subscriber so the recovery parameters will take effect. Wait
@@ -2244,21 +2270,21 @@ main(int argc, char **argv)
22442270
start_standby_server(&opt, true, true);
22452271

22462272
/* Waiting the subscriber to be promoted */
2247-
wait_for_end_recovery(dbinfo[0].subconninfo,&opt);
2273+
wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo,&opt);
22482274

22492275
/*
22502276
* Create the subscription for each database on subscriber. It does not
22512277
* enable it immediately because it needs to adjust the replication start
22522278
* point to the LSN reported by setup_publisher(). It also cleans up
22532279
* publications created by this tool and replication to the standby.
22542280
*/
2255-
setup_subscriber(dbinfo,consistent_lsn);
2281+
setup_subscriber(dbinfos.dbinfo,consistent_lsn);
22562282

22572283
/* Remove primary_slot_name if it exists on primary */
2258-
drop_primary_replication_slot(dbinfo,primary_slot_name);
2284+
drop_primary_replication_slot(dbinfos.dbinfo,primary_slot_name);
22592285

22602286
/* Remove failover replication slots if they exist on subscriber */
2261-
drop_failover_replication_slots(dbinfo);
2287+
drop_failover_replication_slots(dbinfos.dbinfo);
22622288

22632289
/* Stop the subscriber */
22642290
pg_log_info("stopping the subscriber");

‎src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ sub generate_db
373373

374374
# Run pg_createsubscriber on node S. --verbose is used twice
375375
# to show more information.
376+
# In passing, also test the --enable-two-phase option
376377
command_ok(
377378
[
378379
'pg_createsubscriber',
@@ -388,6 +389,7 @@ sub generate_db
388389
'--replication-slot'=>'replslot2',
389390
'--database'=>$db1,
390391
'--database'=>$db2,
392+
'--enable-two-phase'
391393
],
392394
'run pg_createsubscriber on node S');
393395

@@ -406,6 +408,15 @@ sub generate_db
406408
# Start subscriber
407409
$node_s->start;
408410

411+
# Verify that all subtwophase states are pending or enabled,
412+
# e.g. there are no subscriptions where subtwophase is disabled ('d')
413+
is($node_s->safe_psql(
414+
'postgres',
415+
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
416+
),
417+
't',
418+
'subscriptions are created with the two-phase option enabled');
419+
409420
# Confirm the pre-existing subscription has been removed
410421
$result =$node_s->safe_psql(
411422
'postgres',qq(

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp