Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4867f8a

Browse files
author
Amit Kapila
committed
Drop pre-existing subscriptions from the converted subscriber.
We don't need the pre-existing subscriptions on the newly formedsubscriber by using pg_createsubscriber. The apply workers correspondingto these subscriptions can connect to other publisher nodes and either getsome unwarranted data or can lead to ERRORs in connecting to such nodes.Author: Kuroda HayatoReviewed-by: Amit Kapila, Shlok Kyal, Vignesh CBackpatch-through: 17Discussion:https://postgr.es/m/OSBPR01MB25526A30A1FBF863ACCDDA3AF5C92@OSBPR01MB2552.jpnprd01.prod.outlook.com
1 parent8f8bcb8 commit4867f8a

File tree

2 files changed

+120
-5
lines changed

2 files changed

+120
-5
lines changed

‎src/bin/pg_basebackup/pg_createsubscriber.c

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
9292
constchar*slot_name);
9393
staticvoidpg_ctl_status(constchar*pg_ctl_cmd,intrc);
9494
staticvoidstart_standby_server(conststructCreateSubscriberOptions*opt,
95-
boolrestricted_access);
95+
boolrestricted_access,
96+
boolrestrict_logical_worker);
9697
staticvoidstop_standby_server(constchar*datadir);
9798
staticvoidwait_for_end_recovery(constchar*conninfo,
9899
conststructCreateSubscriberOptions*opt);
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
102103
staticvoidset_replication_progress(PGconn*conn,conststructLogicalRepInfo*dbinfo,
103104
constchar*lsn);
104105
staticvoidenable_subscription(PGconn*conn,conststructLogicalRepInfo*dbinfo);
106+
staticvoidcheck_and_drop_existing_subscriptions(PGconn*conn,
107+
conststructLogicalRepInfo*dbinfo);
108+
staticvoiddrop_existing_subscriptions(PGconn*conn,constchar*subname,
109+
constchar*dbname);
105110

106111
#defineUSEC_PER_SEC1000000
107112
#defineWAIT_INTERVAL1/* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
10251030
exit(1);
10261031
}
10271032

1033+
/*
1034+
* Drop a specified subscription. This is to avoid duplicate subscriptions on
1035+
* the primary (publisher node) and the newly created subscriber. We
1036+
* shouldn't drop the associated slot as that would be used by the publisher
1037+
* node.
1038+
*/
1039+
staticvoid
1040+
drop_existing_subscriptions(PGconn*conn,constchar*subname,constchar*dbname)
1041+
{
1042+
PQExpBufferquery=createPQExpBuffer();
1043+
PGresult*res;
1044+
1045+
Assert(conn!=NULL);
1046+
1047+
/*
1048+
* Construct a query string. These commands are allowed to be executed
1049+
* within a transaction.
1050+
*/
1051+
appendPQExpBuffer(query,"ALTER SUBSCRIPTION %s DISABLE;",
1052+
subname);
1053+
appendPQExpBuffer(query," ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1054+
subname);
1055+
appendPQExpBuffer(query," DROP SUBSCRIPTION %s;",subname);
1056+
1057+
pg_log_info("dropping subscription \"%s\" on database \"%s\"",
1058+
subname,dbname);
1059+
1060+
if (!dry_run)
1061+
{
1062+
res=PQexec(conn,query->data);
1063+
1064+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
1065+
{
1066+
pg_log_error("could not drop a subscription \"%s\" settings: %s",
1067+
subname,PQresultErrorMessage(res));
1068+
disconnect_database(conn, true);
1069+
}
1070+
1071+
PQclear(res);
1072+
}
1073+
1074+
destroyPQExpBuffer(query);
1075+
}
1076+
1077+
/*
1078+
* Retrieve and drop the pre-existing subscriptions.
1079+
*/
1080+
staticvoid
1081+
check_and_drop_existing_subscriptions(PGconn*conn,
1082+
conststructLogicalRepInfo*dbinfo)
1083+
{
1084+
PQExpBufferquery=createPQExpBuffer();
1085+
char*dbname;
1086+
PGresult*res;
1087+
1088+
Assert(conn!=NULL);
1089+
1090+
dbname=PQescapeLiteral(conn,dbinfo->dbname,strlen(dbinfo->dbname));
1091+
1092+
appendPQExpBuffer(query,
1093+
"SELECT s.subname FROM pg_catalog.pg_subscription s "
1094+
"INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1095+
"WHERE d.datname = %s",
1096+
dbname);
1097+
res=PQexec(conn,query->data);
1098+
1099+
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
1100+
{
1101+
pg_log_error("could not obtain pre-existing subscriptions: %s",
1102+
PQresultErrorMessage(res));
1103+
disconnect_database(conn, true);
1104+
}
1105+
1106+
for (inti=0;i<PQntuples(res);i++)
1107+
drop_existing_subscriptions(conn,PQgetvalue(res,i,0),
1108+
dbinfo->dbname);
1109+
1110+
PQclear(res);
1111+
destroyPQExpBuffer(query);
1112+
}
1113+
10281114
/*
10291115
* Create the subscriptions, adjust the initial location for logical
10301116
* replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
10401126
/* Connect to subscriber. */
10411127
conn=connect_database(dbinfo[i].subconninfo, true);
10421128

1129+
/*
1130+
* We don't need the pre-existing subscriptions on the newly formed
1131+
* subscriber. They can connect to other publisher nodes and either
1132+
* get some unwarranted data or can lead to ERRORs in connecting to
1133+
* such nodes.
1134+
*/
1135+
check_and_drop_existing_subscriptions(conn,&dbinfo[i]);
1136+
10431137
/*
10441138
* Since the publication was created before the consistent LSN, it is
10451139
* available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
13141408
}
13151409

13161410
staticvoid
1317-
start_standby_server(conststructCreateSubscriberOptions*opt,boolrestricted_access)
1411+
start_standby_server(conststructCreateSubscriberOptions*opt,boolrestricted_access,
1412+
boolrestrict_logical_worker)
13181413
{
13191414
PQExpBufferpg_ctl_cmd=createPQExpBuffer();
13201415
intrc;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
13431438
if (opt->config_file!=NULL)
13441439
appendPQExpBuffer(pg_ctl_cmd," -o \"-c config_file=%s\"",
13451440
opt->config_file);
1441+
1442+
/* Suppress to start logical replication if requested */
1443+
if (restrict_logical_worker)
1444+
appendPQExpBuffer(pg_ctl_cmd," -o \"-c max_logical_replication_workers=0\"");
1445+
13461446
pg_log_debug("pg_ctl command is: %s",pg_ctl_cmd->data);
13471447
rc=system(pg_ctl_cmd->data);
13481448
pg_ctl_status(pg_ctl_cmd->data,rc);
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
20672167
* transformation steps.
20682168
*/
20692169
pg_log_info("starting the standby with command-line options");
2070-
start_standby_server(&opt, true);
2170+
start_standby_server(&opt, true, false);
20712171

20722172
/* Check if the standby server is ready for logical replication */
20732173
check_subscriber(dbinfo);
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
20982198

20992199
/*
21002200
* Start subscriber so the recovery parameters will take effect. Wait
2101-
* until accepting connections.
2201+
* until accepting connections. We don't want to start logical replication
2202+
* during setup.
21022203
*/
21032204
pg_log_info("starting the subscriber");
2104-
start_standby_server(&opt, true);
2205+
start_standby_server(&opt, true, true);
21052206

21062207
/* Waiting the subscriber to be promoted */
21072208
wait_for_end_recovery(dbinfo[0].subconninfo,&opt);

‎src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,13 @@ sub generate_db
298298
"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
299299
);
300300
is($result,'failover_slot','failover slot is synced');
301+
302+
# Create subscription to test its removal
303+
my$dummy_sub ='regress_sub_dummy';
304+
$node_p->safe_psql($db1,
305+
"CREATE SUBSCRIPTION$dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
306+
);
307+
$node_p->wait_for_replay_catchup($node_s);
301308
$node_s->stop;
302309

303310
# dry run mode on node S
@@ -372,6 +379,13 @@ sub generate_db
372379
# Start subscriber
373380
$node_s->start;
374381

382+
# Confirm the pre-existing subscription has been removed
383+
$result =$node_s->safe_psql(
384+
'postgres',qq(
385+
SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
386+
));
387+
is($result,qq(0),'pre-existing subscription was dropped');
388+
375389
# Get subscription names
376390
$result =$node_s->safe_psql(
377391
'postgres',qq(

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp