Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7329240

Browse files
author
Amit Kapila
committed
Allow setting failover property in the replication command.
This commit implements a new replication command calledALTER_REPLICATION_SLOT and a corresponding walreceiver API function namedwalrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command hasbeen extended to support the failover option.These new additions allow the modification of the failover property of areplication slot on the publisher. A subsequent commit will make use ofthese commands in subscription commands and will add the tests as well tocover the functionality added/changed by this commit.Author: Hou Zhijie, Shveta MalikReviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit KapilaDiscussion:https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
1 parent08e6344 commit7329240

File tree

13 files changed

+230
-11
lines changed

13 files changed

+230
-11
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
20602060
</para>
20612061
</listitem>
20622062
</varlistentry>
2063+
2064+
<varlistentry>
2065+
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
2066+
<listitem>
2067+
<para>
2068+
If true, the slot is enabled to be synced to the standbys.
2069+
The default is false.
2070+
</para>
2071+
</listitem>
2072+
</varlistentry>
20632073
</variablelist>
20642074

20652075
<para>
@@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
21242134
</listitem>
21252135
</varlistentry>
21262136

2137+
<varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
2138+
<term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
2139+
<indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
2140+
</term>
2141+
<listitem>
2142+
<para>
2143+
Change the definition of a replication slot.
2144+
See <xref linkend="streaming-replication-slots"/> for more about
2145+
replication slots. This command is currently only supported for logical
2146+
replication slots.
2147+
</para>
2148+
2149+
<variablelist>
2150+
<varlistentry>
2151+
<term><replaceable class="parameter">slot_name</replaceable></term>
2152+
<listitem>
2153+
<para>
2154+
The name of the slot to alter. Must be a valid replication slot
2155+
name (see <xref linkend="streaming-replication-slots-manipulation"/>).
2156+
</para>
2157+
</listitem>
2158+
</varlistentry>
2159+
</variablelist>
2160+
2161+
<para>The following option is supported:</para>
2162+
2163+
<variablelist>
2164+
<varlistentry>
2165+
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
2166+
<listitem>
2167+
<para>
2168+
If true, the slot is enabled to be synced to the standbys.
2169+
</para>
2170+
</listitem>
2171+
</varlistentry>
2172+
</variablelist>
2173+
2174+
</listitem>
2175+
</varlistentry>
2176+
21272177
<varlistentry id="protocol-replication-read-replication-slot">
21282178
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
21292179
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>

‎src/backend/commands/subscriptioncmds.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
807807
twophase_enabled= true;
808808

809809
walrcv_create_slot(wrconn,opts.slot_name, false,twophase_enabled,
810-
CRS_NOEXPORT_SNAPSHOT,NULL);
810+
false,CRS_NOEXPORT_SNAPSHOT,NULL);
811811

812812
if (twophase_enabled)
813813
UpdateTwoPhaseState(subid,LOGICALREP_TWOPHASE_STATE_ENABLED);

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
7373
constchar*slotname,
7474
booltemporary,
7575
booltwo_phase,
76+
boolfailover,
7677
CRSSnapshotActionsnapshot_action,
7778
XLogRecPtr*lsn);
79+
staticvoidlibpqrcv_alter_slot(WalReceiverConn*conn,constchar*slotname,
80+
boolfailover);
7881
staticpid_tlibpqrcv_get_backend_pid(WalReceiverConn*conn);
7982
staticWalRcvExecResult*libpqrcv_exec(WalReceiverConn*conn,
8083
constchar*query,
@@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
9598
.walrcv_receive=libpqrcv_receive,
9699
.walrcv_send=libpqrcv_send,
97100
.walrcv_create_slot=libpqrcv_create_slot,
101+
.walrcv_alter_slot=libpqrcv_alter_slot,
98102
.walrcv_get_backend_pid=libpqrcv_get_backend_pid,
99103
.walrcv_exec=libpqrcv_exec,
100104
.walrcv_disconnect=libpqrcv_disconnect
@@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
938942
*/
939943
staticchar*
940944
libpqrcv_create_slot(WalReceiverConn*conn,constchar*slotname,
941-
booltemporary,booltwo_phase,CRSSnapshotActionsnapshot_action,
942-
XLogRecPtr*lsn)
945+
booltemporary,booltwo_phase,boolfailover,
946+
CRSSnapshotActionsnapshot_action,XLogRecPtr*lsn)
943947
{
944948
PGresult*res;
945949
StringInfoDatacmd;
@@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
969973
appendStringInfoChar(&cmd,' ');
970974
}
971975

976+
if (failover)
977+
{
978+
appendStringInfoString(&cmd,"FAILOVER");
979+
if (use_new_options_syntax)
980+
appendStringInfoString(&cmd,", ");
981+
else
982+
appendStringInfoChar(&cmd,' ');
983+
}
984+
972985
if (use_new_options_syntax)
973986
{
974987
switch (snapshot_action)
@@ -1037,6 +1050,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
10371050
returnsnapshot;
10381051
}
10391052

1053+
/*
1054+
* Change the definition of the replication slot.
1055+
*/
1056+
staticvoid
1057+
libpqrcv_alter_slot(WalReceiverConn*conn,constchar*slotname,
1058+
boolfailover)
1059+
{
1060+
StringInfoDatacmd;
1061+
PGresult*res;
1062+
1063+
initStringInfo(&cmd);
1064+
appendStringInfo(&cmd,"ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
1065+
quote_identifier(slotname),
1066+
failover ?"true" :"false");
1067+
1068+
res=libpqrcv_PQexec(conn->streamConn,cmd.data);
1069+
pfree(cmd.data);
1070+
1071+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
1072+
ereport(ERROR,
1073+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1074+
errmsg("could not alter replication slot \"%s\": %s",
1075+
slotname,pchomp(PQerrorMessage(conn->streamConn)))));
1076+
1077+
PQclear(res);
1078+
}
1079+
10401080
/*
10411081
* Return PID of remote backend process.
10421082
*/

‎src/backend/replication/logical/tablesync.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
14301430
*/
14311431
walrcv_create_slot(LogRepWorkerWalRcvConn,
14321432
slotname, false/* permanent */ , false/* two_phase */ ,
1433+
false,
14331434
CRS_USE_SNAPSHOT,origin_startpos);
14341435

14351436
/*

‎src/backend/replication/repl_gram.y

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ Node *replication_parse_result;
6464
%tokenK_START_REPLICATION
6565
%tokenK_CREATE_REPLICATION_SLOT
6666
%tokenK_DROP_REPLICATION_SLOT
67+
%tokenK_ALTER_REPLICATION_SLOT
6768
%tokenK_TIMELINE_HISTORY
6869
%tokenK_WAIT
6970
%tokenK_TIMELINE
@@ -80,8 +81,9 @@ Node *replication_parse_result;
8081

8182
%type<node>command
8283
%type<node>base_backupstart_replicationstart_logical_replication
83-
create_replication_slotdrop_replication_slotidentify_system
84-
read_replication_slottimeline_historyshowupload_manifest
84+
create_replication_slotdrop_replication_slot
85+
alter_replication_slotidentify_systemread_replication_slot
86+
timeline_historyshowupload_manifest
8587
%type<list>generic_option_list
8688
%type<defelt>generic_option
8789
%type<uintval>opt_timeline
@@ -112,6 +114,7 @@ command:
112114
|start_logical_replication
113115
|create_replication_slot
114116
|drop_replication_slot
117+
|alter_replication_slot
115118
|read_replication_slot
116119
|timeline_history
117120
|show
@@ -259,6 +262,18 @@ drop_replication_slot:
259262
}
260263
;
261264

265+
/* ALTER_REPLICATION_SLOT slot (options)*/
266+
alter_replication_slot:
267+
K_ALTER_REPLICATION_SLOTIDENT'('generic_option_list')'
268+
{
269+
AlterReplicationSlotCmd *cmd;
270+
cmd = makeNode(AlterReplicationSlotCmd);
271+
cmd->slotname =$2;
272+
cmd->options =$4;
273+
$$ = (Node *) cmd;
274+
}
275+
;
276+
262277
/*
263278
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
264279
*/
@@ -410,6 +425,7 @@ ident_or_keyword:
410425
|K_START_REPLICATION{$$ ="start_replication"; }
411426
|K_CREATE_REPLICATION_SLOT{$$ ="create_replication_slot"; }
412427
|K_DROP_REPLICATION_SLOT{$$ ="drop_replication_slot"; }
428+
|K_ALTER_REPLICATION_SLOT{$$ ="alter_replication_slot"; }
413429
|K_TIMELINE_HISTORY{$$ ="timeline_history"; }
414430
|K_WAIT{$$ ="wait"; }
415431
|K_TIMELINE{$$ ="timeline"; }

‎src/backend/replication/repl_scanner.l

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ TIMELINE{ return K_TIMELINE; }
125125
START_REPLICATION{return K_START_REPLICATION; }
126126
CREATE_REPLICATION_SLOT{return K_CREATE_REPLICATION_SLOT; }
127127
DROP_REPLICATION_SLOT{return K_DROP_REPLICATION_SLOT; }
128+
ALTER_REPLICATION_SLOT{return K_ALTER_REPLICATION_SLOT; }
128129
TIMELINE_HISTORY{return K_TIMELINE_HISTORY; }
129130
PHYSICAL{return K_PHYSICAL; }
130131
RESERVE_WAL{return K_RESERVE_WAL; }
@@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
302303
case K_START_REPLICATION:
303304
case K_CREATE_REPLICATION_SLOT:
304305
case K_DROP_REPLICATION_SLOT:
306+
case K_ALTER_REPLICATION_SLOT:
305307
case K_READ_REPLICATION_SLOT:
306308
case K_TIMELINE_HISTORY:
307309
case K_UPLOAD_MANIFEST:

‎src/backend/replication/slot.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
683683
ReplicationSlotDropAcquired();
684684
}
685685

686+
/*
687+
* Change the definition of the slot identified by the specified name.
688+
*/
689+
void
690+
ReplicationSlotAlter(constchar*name,boolfailover)
691+
{
692+
Assert(MyReplicationSlot==NULL);
693+
694+
ReplicationSlotAcquire(name, false);
695+
696+
if (SlotIsPhysical(MyReplicationSlot))
697+
ereport(ERROR,
698+
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
699+
errmsg("cannot use %s with a physical replication slot",
700+
"ALTER_REPLICATION_SLOT"));
701+
702+
SpinLockAcquire(&MyReplicationSlot->mutex);
703+
MyReplicationSlot->data.failover=failover;
704+
SpinLockRelease(&MyReplicationSlot->mutex);
705+
706+
ReplicationSlotMarkDirty();
707+
ReplicationSlotSave();
708+
ReplicationSlotRelease();
709+
}
710+
686711
/*
687712
* Permanently drop the currently acquired replication slot.
688713
*/

‎src/backend/replication/walreceiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ WalReceiverMain(void)
387387
"pg_walreceiver_%lld",
388388
(long longint)walrcv_get_backend_pid(wrconn));
389389

390-
walrcv_create_slot(wrconn,slotname, true, false,0,NULL);
390+
walrcv_create_slot(wrconn,slotname, true, false,false,0,NULL);
391391

392392
SpinLockAcquire(&walrcv->mutex);
393393
strlcpy(walrcv->slotname,slotname,NAMEDATALEN);

‎src/backend/replication/walsender.c

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,12 +1126,13 @@ static void
11261126
parseCreateReplSlotOptions(CreateReplicationSlotCmd*cmd,
11271127
bool*reserve_wal,
11281128
CRSSnapshotAction*snapshot_action,
1129-
bool*two_phase)
1129+
bool*two_phase,bool*failover)
11301130
{
11311131
ListCell*lc;
11321132
boolsnapshot_action_given= false;
11331133
boolreserve_wal_given= false;
11341134
booltwo_phase_given= false;
1135+
boolfailover_given= false;
11351136

11361137
/* Parse options */
11371138
foreach(lc,cmd->options)
@@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
11811182
two_phase_given= true;
11821183
*two_phase=defGetBoolean(defel);
11831184
}
1185+
elseif (strcmp(defel->defname,"failover")==0)
1186+
{
1187+
if (failover_given||cmd->kind!=REPLICATION_KIND_LOGICAL)
1188+
ereport(ERROR,
1189+
(errcode(ERRCODE_SYNTAX_ERROR),
1190+
errmsg("conflicting or redundant options")));
1191+
failover_given= true;
1192+
*failover=defGetBoolean(defel);
1193+
}
11841194
else
11851195
elog(ERROR,"unrecognized option: %s",defel->defname);
11861196
}
@@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
11971207
char*slot_name;
11981208
boolreserve_wal= false;
11991209
booltwo_phase= false;
1210+
boolfailover= false;
12001211
CRSSnapshotActionsnapshot_action=CRS_EXPORT_SNAPSHOT;
12011212
DestReceiver*dest;
12021213
TupOutputState*tstate;
@@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12061217

12071218
Assert(!MyReplicationSlot);
12081219

1209-
parseCreateReplSlotOptions(cmd,&reserve_wal,&snapshot_action,&two_phase);
1220+
parseCreateReplSlotOptions(cmd,&reserve_wal,&snapshot_action,&two_phase,
1221+
&failover);
12101222

12111223
if (cmd->kind==REPLICATION_KIND_PHYSICAL)
12121224
{
@@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12431255
*/
12441256
ReplicationSlotCreate(cmd->slotname, true,
12451257
cmd->temporary ?RS_TEMPORARY :RS_EPHEMERAL,
1246-
two_phase,false);
1258+
two_phase,failover);
12471259

12481260
/*
12491261
* Do options check early so that we can bail before calling the
@@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
13981410
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
13991411
}
14001412

1413+
/*
1414+
* Process extra options given to ALTER_REPLICATION_SLOT.
1415+
*/
1416+
staticvoid
1417+
ParseAlterReplSlotOptions(AlterReplicationSlotCmd*cmd,bool*failover)
1418+
{
1419+
boolfailover_given= false;
1420+
1421+
/* Parse options */
1422+
foreach_ptr(DefElem,defel,cmd->options)
1423+
{
1424+
if (strcmp(defel->defname,"failover")==0)
1425+
{
1426+
if (failover_given)
1427+
ereport(ERROR,
1428+
(errcode(ERRCODE_SYNTAX_ERROR),
1429+
errmsg("conflicting or redundant options")));
1430+
failover_given= true;
1431+
*failover=defGetBoolean(defel);
1432+
}
1433+
else
1434+
elog(ERROR,"unrecognized option: %s",defel->defname);
1435+
}
1436+
}
1437+
1438+
/*
1439+
* Change the definition of a replication slot.
1440+
*/
1441+
staticvoid
1442+
AlterReplicationSlot(AlterReplicationSlotCmd*cmd)
1443+
{
1444+
boolfailover= false;
1445+
1446+
ParseAlterReplSlotOptions(cmd,&failover);
1447+
ReplicationSlotAlter(cmd->slotname,failover);
1448+
}
1449+
14011450
/*
14021451
* Load previously initiated logical slot and prepare for sending data (via
14031452
* WalSndLoop).
@@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
19712020
EndReplicationCommand(cmdtag);
19722021
break;
19732022

2023+
caseT_AlterReplicationSlotCmd:
2024+
cmdtag="ALTER_REPLICATION_SLOT";
2025+
set_ps_display(cmdtag);
2026+
AlterReplicationSlot((AlterReplicationSlotCmd*)cmd_node);
2027+
EndReplicationCommand(cmdtag);
2028+
break;
2029+
19742030
caseT_StartReplicationCmd:
19752031
{
19762032
StartReplicationCmd*cmd= (StartReplicationCmd*)cmd_node;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp