Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc393308

Browse files
author
Amit Kapila
committed
Allow to enable failover property for replication slots via SQL API.
This commit adds the failover property to the replication slot. Thefailover property indicates whether the slot will be synced to the standbyservers, enabling the resumption of corresponding logical replicationafter failover. But note that this commit does not yet include thecapability to sync the replication slot; the subsequent commits will addthat capability.A new optional parameter 'failover' is added to thepg_create_logical_replication_slot() function. We will also enable to set'failover' option for slots via the subscription commands in thesubsequent commits.The value of the 'failover' flag is displayed as part ofpg_replication_slots view.Author: Hou Zhijie, Shveta Malik, Ajin CherianReviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit KapilaDiscussion:https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
1 parent86232a4 commitc393308

File tree

16 files changed

+141
-25
lines changed

16 files changed

+141
-25
lines changed

‎contrib/test_decoding/expected/slot.out

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,61 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp');
406406

407407
(1 row)
408408

409+
-- Test failover option of slots.
410+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
411+
?column?
412+
----------
413+
init
414+
(1 row)
415+
416+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
417+
?column?
418+
----------
419+
init
420+
(1 row)
421+
422+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
423+
?column?
424+
----------
425+
init
426+
(1 row)
427+
428+
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
429+
?column?
430+
----------
431+
init
432+
(1 row)
433+
434+
SELECT slot_name, slot_type, failover FROM pg_replication_slots;
435+
slot_name | slot_type | failover
436+
-----------------------+-----------+----------
437+
failover_true_slot | logical | t
438+
failover_false_slot | logical | f
439+
failover_default_slot | logical | f
440+
physical_slot | physical | f
441+
(4 rows)
442+
443+
SELECT pg_drop_replication_slot('failover_true_slot');
444+
pg_drop_replication_slot
445+
--------------------------
446+
447+
(1 row)
448+
449+
SELECT pg_drop_replication_slot('failover_false_slot');
450+
pg_drop_replication_slot
451+
--------------------------
452+
453+
(1 row)
454+
455+
SELECT pg_drop_replication_slot('failover_default_slot');
456+
pg_drop_replication_slot
457+
--------------------------
458+
459+
(1 row)
460+
461+
SELECT pg_drop_replication_slot('physical_slot');
462+
pg_drop_replication_slot
463+
--------------------------
464+
465+
(1 row)
466+

‎contrib/test_decoding/sql/slot.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,16 @@ ORDER BY o.slot_name, c.slot_name;
176176
SELECT pg_drop_replication_slot('orig_slot2');
177177
SELECT pg_drop_replication_slot('copied_slot2_no_change');
178178
SELECT pg_drop_replication_slot('copied_slot2_notemp');
179+
180+
-- Test failover option of slots.
181+
SELECT'init'FROM pg_create_logical_replication_slot('failover_true_slot','test_decoding', false, false, true);
182+
SELECT'init'FROM pg_create_logical_replication_slot('failover_false_slot','test_decoding', false, false, false);
183+
SELECT'init'FROM pg_create_logical_replication_slot('failover_default_slot','test_decoding', false, false);
184+
SELECT'init'FROM pg_create_physical_replication_slot('physical_slot');
185+
186+
SELECT slot_name, slot_type, failoverFROM pg_replication_slots;
187+
188+
SELECT pg_drop_replication_slot('failover_true_slot');
189+
SELECT pg_drop_replication_slot('failover_false_slot');
190+
SELECT pg_drop_replication_slot('failover_default_slot');
191+
SELECT pg_drop_replication_slot('physical_slot');

‎doc/src/sgml/func.sgml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27707,7 +27707,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2770727707
<indexterm>
2770827708
<primary>pg_create_logical_replication_slot</primary>
2770927709
</indexterm>
27710-
<function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type> </optional> )
27710+
<function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type>, <parameter>failover</parameter> <type>boolean</type> </optional> )
2771127711
<returnvalue>record</returnvalue>
2771227712
( <parameter>slot_name</parameter> <type>name</type>,
2771327713
<parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -27722,8 +27722,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2772227722
released upon any error. The optional fourth parameter,
2772327723
<parameter>twophase</parameter>, when set to true, specifies
2772427724
that the decoding of prepared transactions is enabled for this
27725-
slot. A call to this function has the same effect as the replication
27726-
protocol command <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
27725+
slot. The optional fifth parameter,
27726+
<parameter>failover</parameter>, when set to true,
27727+
specifies that this slot is enabled to be synced to the
27728+
standbys so that logical replication can be resumed after
27729+
failover. A call to this function has the same effect as
27730+
the replication protocol command
27731+
<literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
2772727732
</para></entry>
2772827733
</row>
2772927734

‎doc/src/sgml/system-views.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2555,6 +2555,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
25552555
</itemizedlist>
25562556
</para></entry>
25572557
</row>
2558+
2559+
<row>
2560+
<entry role="catalog_table_entry"><para role="column_definition">
2561+
<structfield>failover</structfield> <type>bool</type>
2562+
</para>
2563+
<para>
2564+
True if this is a logical slot enabled to be synced to the standbys.
2565+
Always false for physical slots.
2566+
</para></entry>
2567+
</row>
25582568
</tbody>
25592569
</tgroup>
25602570
</table>

‎src/backend/catalog/system_functions.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
479479
IN slot_name name,IN plugin name,
480480
IN temporaryboolean DEFAULT false,
481481
IN twophaseboolean DEFAULT false,
482+
IN failoverboolean DEFAULT false,
482483
OUT slot_name name, OUT lsn pg_lsn)
483484
RETURNS RECORD
484485
LANGUAGE INTERNAL

‎src/backend/catalog/system_views.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS
10231023
L.wal_status,
10241024
L.safe_wal_size,
10251025
L.two_phase,
1026-
L.conflict_reason
1026+
L.conflict_reason,
1027+
L.failover
10271028
FROM pg_get_replication_slots()AS L
10281029
LEFT JOIN pg_database DON (L.datoid=D.oid);
10291030

‎src/backend/replication/slot.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk
9090
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
9191

9292
#defineSLOT_MAGIC0x1051CA1/* format identifier */
93-
#defineSLOT_VERSION3/* version for new files */
93+
#defineSLOT_VERSION4/* version for new files */
9494

9595
/* Control array for replication slot management */
9696
ReplicationSlotCtlData*ReplicationSlotCtl=NULL;
@@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel)
248248
* during getting changes, if the two_phase option is enabled it can skip
249249
* prepare because by that time start decoding point has been moved. So the
250250
* user will only get commit prepared.
251+
* failover: If enabled, allows the slot to be synced to standbys so
252+
* that logical replication can be resumed after failover.
251253
*/
252254
void
253255
ReplicationSlotCreate(constchar*name,booldb_specific,
254-
ReplicationSlotPersistencypersistency,booltwo_phase)
256+
ReplicationSlotPersistencypersistency,
257+
booltwo_phase,boolfailover)
255258
{
256259
ReplicationSlot*slot=NULL;
257260
inti;
@@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
311314
slot->data.persistency=persistency;
312315
slot->data.two_phase=two_phase;
313316
slot->data.two_phase_at=InvalidXLogRecPtr;
317+
slot->data.failover=failover;
314318

315319
/* and then data only present in shared memory */
316320
slot->just_dirtied= false;

‎src/backend/replication/slotfuncs.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
4242

4343
/* acquire replication slot, this will check for conflicting names */
4444
ReplicationSlotCreate(name, false,
45-
temporary ?RS_TEMPORARY :RS_PERSISTENT, false);
45+
temporary ?RS_TEMPORARY :RS_PERSISTENT, false,
46+
false);
4647

4748
if (immediately_reserve)
4849
{
@@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
117118
staticvoid
118119
create_logical_replication_slot(char*name,char*plugin,
119120
booltemporary,booltwo_phase,
121+
boolfailover,
120122
XLogRecPtrrestart_lsn,
121123
boolfind_startpoint)
122124
{
@@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin,
133135
* error as well.
134136
*/
135137
ReplicationSlotCreate(name, true,
136-
temporary ?RS_TEMPORARY :RS_EPHEMERAL,two_phase);
138+
temporary ?RS_TEMPORARY :RS_EPHEMERAL,two_phase,
139+
failover);
137140

138141
/*
139142
* Create logical decoding context to find start point or, if we don't
@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
171174
Nameplugin=PG_GETARG_NAME(1);
172175
booltemporary=PG_GETARG_BOOL(2);
173176
booltwo_phase=PG_GETARG_BOOL(3);
177+
boolfailover=PG_GETARG_BOOL(4);
174178
Datumresult;
175179
TupleDesctupdesc;
176180
HeapTupletuple;
@@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
188192
NameStr(*plugin),
189193
temporary,
190194
two_phase,
195+
failover,
191196
InvalidXLogRecPtr,
192197
true);
193198

@@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
232237
Datum
233238
pg_get_replication_slots(PG_FUNCTION_ARGS)
234239
{
235-
#definePG_GET_REPLICATION_SLOTS_COLS15
240+
#definePG_GET_REPLICATION_SLOTS_COLS16
236241
ReturnSetInfo*rsinfo= (ReturnSetInfo*)fcinfo->resultinfo;
237242
XLogRecPtrcurrlsn;
238243
intslotno;
@@ -426,6 +431,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
426431
}
427432
}
428433

434+
values[i++]=BoolGetDatum(slot_contents.data.failover);
435+
429436
Assert(i==PG_GET_REPLICATION_SLOTS_COLS);
430437

431438
tuplestore_putvalues(rsinfo->setResult,rsinfo->setDesc,
@@ -693,6 +700,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
693700
XLogRecPtrsrc_restart_lsn;
694701
boolsrc_islogical;
695702
booltemporary;
703+
boolfailover;
696704
char*plugin;
697705
Datumvalues[2];
698706
boolnulls[2];
@@ -748,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
748756
src_islogical=SlotIsLogical(&first_slot_contents);
749757
src_restart_lsn=first_slot_contents.data.restart_lsn;
750758
temporary= (first_slot_contents.data.persistency==RS_TEMPORARY);
759+
failover=first_slot_contents.data.failover;
751760
plugin=logical_slot ?NameStr(first_slot_contents.data.plugin) :NULL;
752761

753762
/* Check type of replication slot */
@@ -787,6 +796,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
787796
plugin,
788797
temporary,
789798
false,
799+
failover,
790800
src_restart_lsn,
791801
false);
792802
}

‎src/backend/replication/walsender.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,7 +1212,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12121212
{
12131213
ReplicationSlotCreate(cmd->slotname, false,
12141214
cmd->temporary ?RS_TEMPORARY :RS_PERSISTENT,
1215-
false);
1215+
false, false);
12161216

12171217
if (reserve_wal)
12181218
{
@@ -1243,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12431243
*/
12441244
ReplicationSlotCreate(cmd->slotname, true,
12451245
cmd->temporary ?RS_TEMPORARY :RS_EPHEMERAL,
1246-
two_phase);
1246+
two_phase, false);
12471247

12481248
/*
12491249
* Do options check early so that we can bail before calling the

‎src/bin/pg_upgrade/info.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
666666
* started and stopped several times causing any temporary slots to be
667667
* removed.
668668
*/
669-
res=executeQueryOrDie(conn,"SELECT slot_name, plugin, two_phase, "
669+
res=executeQueryOrDie(conn,"SELECT slot_name, plugin, two_phase,failover,"
670670
"%s as caught_up, conflict_reason IS NOT NULL as invalid "
671671
"FROM pg_catalog.pg_replication_slots "
672672
"WHERE slot_type = 'logical' AND "
@@ -684,6 +684,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
684684
inti_slotname;
685685
inti_plugin;
686686
inti_twophase;
687+
inti_failover;
687688
inti_caught_up;
688689
inti_invalid;
689690

@@ -692,6 +693,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
692693
i_slotname=PQfnumber(res,"slot_name");
693694
i_plugin=PQfnumber(res,"plugin");
694695
i_twophase=PQfnumber(res,"two_phase");
696+
i_failover=PQfnumber(res,"failover");
695697
i_caught_up=PQfnumber(res,"caught_up");
696698
i_invalid=PQfnumber(res,"invalid");
697699

@@ -702,6 +704,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
702704
curr->slotname=pg_strdup(PQgetvalue(res,slotnum,i_slotname));
703705
curr->plugin=pg_strdup(PQgetvalue(res,slotnum,i_plugin));
704706
curr->two_phase= (strcmp(PQgetvalue(res,slotnum,i_twophase),"t")==0);
707+
curr->failover= (strcmp(PQgetvalue(res,slotnum,i_failover),"t")==0);
705708
curr->caught_up= (strcmp(PQgetvalue(res,slotnum,i_caught_up),"t")==0);
706709
curr->invalid= (strcmp(PQgetvalue(res,slotnum,i_invalid),"t")==0);
707710
}

‎src/bin/pg_upgrade/pg_upgrade.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -916,8 +916,10 @@ create_logical_replication_slots(void)
916916
appendStringLiteralConn(query,slot_info->slotname,conn);
917917
appendPQExpBuffer(query,", ");
918918
appendStringLiteralConn(query,slot_info->plugin,conn);
919-
appendPQExpBuffer(query,", false, %s);",
920-
slot_info->two_phase ?"true" :"false");
919+
920+
appendPQExpBuffer(query,", false, %s, %s);",
921+
slot_info->two_phase ?"true" :"false",
922+
slot_info->failover ?"true" :"false");
921923

922924
PQclear(executeQueryOrDie(conn,"%s",query->data));
923925

‎src/bin/pg_upgrade/pg_upgrade.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ typedef struct
160160
booltwo_phase;/* can the slot decode 2PC? */
161161
boolcaught_up;/* has the slot caught up to latest changes? */
162162
boolinvalid;/* if true, the slot is unusable */
163+
boolfailover;/* is the slot designated to be synced to the
164+
* physical standby? */
163165
}LogicalSlotInfo;
164166

165167
typedefstruct

‎src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/*yyyymmddN */
60-
#defineCATALOG_VERSION_NO202401251
60+
#defineCATALOG_VERSION_NO202401252
6161

6262
#endif

‎src/include/catalog/pg_proc.dat

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11127,17 +11127,17 @@
1112711127
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
1112811128
proretset => 't', provolatile => 's', prorettype => 'record',
1112911129
proargtypes => '',
11130-
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}',
11131-
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11132-
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason}',
11130+
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}',
11131+
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11132+
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}',
1113311133
prosrc => 'pg_get_replication_slots' },
1113411134
{ oid => '3786', descr => 'set up a logical replication slot',
1113511135
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
1113611136
proparallel => 'u', prorettype => 'record',
11137-
proargtypes => 'name name bool bool',
11138-
proallargtypes => '{name,name,bool,bool,name,pg_lsn}',
11139-
proargmodes => '{i,i,i,i,o,o}',
11140-
proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}',
11137+
proargtypes => 'name name bool bool bool',
11138+
proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
11139+
proargmodes => '{i,i,i,i,i,o,o}',
11140+
proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
1114111141
prosrc => 'pg_create_logical_replication_slot' },
1114211142
{ oid => '4222',
1114311143
descr => 'copy a logical replication slot, changing temporality and plugin',

‎src/include/replication/slot.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData
111111

112112
/* plugin name */
113113
NameDataplugin;
114+
115+
/*
116+
* Is this a failover slot (sync candidate for standbys)? Only relevant
117+
* for logical slots on the primary server.
118+
*/
119+
boolfailover;
114120
}ReplicationSlotPersistentData;
115121

116122
/*
@@ -218,7 +224,7 @@ extern void ReplicationSlotsShmemInit(void);
218224
/* management of individual slots */
219225
externvoidReplicationSlotCreate(constchar*name,booldb_specific,
220226
ReplicationSlotPersistencypersistency,
221-
booltwo_phase);
227+
booltwo_phase,boolfailover);
222228
externvoidReplicationSlotPersist(void);
223229
externvoidReplicationSlotDrop(constchar*name,boolnowait);
224230

‎src/test/regress/expected/rules.out

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name,
14731473
l.wal_status,
14741474
l.safe_wal_size,
14751475
l.two_phase,
1476-
l.conflict_reason
1477-
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason)
1476+
l.conflict_reason,
1477+
l.failover
1478+
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover)
14781479
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
14791480
pg_roles| SELECT pg_authid.rolname,
14801481
pg_authid.rolsuper,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp