Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb4ada4e

Browse files
committed
Add replication command READ_REPLICATION_SLOT
The command is supported for physical slots for now, and returns thetype of slot, its restart_lsn and its restart_tli.This will be useful for an upcoming patch related to pg_receivewal, toallow the tool to be able to stream from the position of a slot, ratherthan the last WAL position flushed by the backend (as reported byIDENTIFY_SYSTEM) if the archive directory is found as empty, which wouldbe an advantage in the case of switching to a different archivelocations with the same slot used to avoid holes in WAL segmentarchives.Author: Ronan DunklauReviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath RupireddyDiscussion:https://postgr.es/m/18708360.4lzOvYHigE@aivenronan
1 parent70bef49 commitb4ada4e

File tree

9 files changed

+224
-3
lines changed

9 files changed

+224
-3
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2067,6 +2067,54 @@ The commands accepted in replication mode are:
20672067
</listitem>
20682068
</varlistentry>
20692069

2070+
<varlistentry>
2071+
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
2072+
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
2073+
</term>
2074+
<listitem>
2075+
<para>
2076+
Read some information associated to a replication slot. Returns a tuple
2077+
with <literal>NULL</literal> values if the replication slot does not
2078+
exist. This command is currently only supported for physical replication
2079+
slots.
2080+
</para>
2081+
<para>
2082+
In response to this command, the server will return a one-row result set,
2083+
containing the following fields:
2084+
<variablelist>
2085+
<varlistentry>
2086+
<term><literal>slot_type</literal> (<type>text</type>)</term>
2087+
<listitem>
2088+
<para>
2089+
The replication slot's type, either <literal>physical</literal> or
2090+
<literal>NULL</literal>.
2091+
</para>
2092+
</listitem>
2093+
</varlistentry>
2094+
2095+
<varlistentry>
2096+
<term><literal>restart_lsn</literal> (<type>text</type>)</term>
2097+
<listitem>
2098+
<para>
2099+
The replication slot's <literal>restart_lsn</literal>.
2100+
</para>
2101+
</listitem>
2102+
</varlistentry>
2103+
2104+
<varlistentry>
2105+
<term><literal>restart_tli</literal> (<type>int8</type>)</term>
2106+
<listitem>
2107+
<para>
2108+
The timeline ID associated to <literal>restart_lsn</literal>,
2109+
following the current timeline history.
2110+
</para>
2111+
</listitem>
2112+
</varlistentry>
2113+
</variablelist>
2114+
</para>
2115+
</listitem>
2116+
</varlistentry>
2117+
20702118
<varlistentry>
20712119
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
20722120
<indexterm><primary>START_REPLICATION</primary></indexterm>

‎src/backend/replication/repl_gram.y

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
6464
/* Keyword tokens.*/
6565
%tokenK_BASE_BACKUP
6666
%tokenK_IDENTIFY_SYSTEM
67+
%tokenK_READ_REPLICATION_SLOT
6768
%tokenK_SHOW
6869
%tokenK_START_REPLICATION
6970
%tokenK_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
9495
%type<node>command
9596
%type<node>base_backupstart_replicationstart_logical_replication
9697
create_replication_slotdrop_replication_slotidentify_system
97-
timeline_historyshowsql_cmd
98+
read_replication_slottimeline_historyshowsql_cmd
9899
%type<list>base_backup_legacy_opt_listgeneric_option_list
99100
%type<defelt>base_backup_legacy_optgeneric_option
100101
%type<uintval>opt_timeline
@@ -125,6 +126,7 @@ command:
125126
|start_logical_replication
126127
|create_replication_slot
127128
|drop_replication_slot
129+
|read_replication_slot
128130
|timeline_history
129131
|show
130132
|sql_cmd
@@ -140,6 +142,18 @@ identify_system:
140142
}
141143
;
142144

145+
/*
146+
* READ_REPLICATION_SLOT %s
147+
*/
148+
read_replication_slot:
149+
K_READ_REPLICATION_SLOTvar_name
150+
{
151+
ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
152+
n->slotname =$2;
153+
$$ = (Node *) n;
154+
}
155+
;
156+
143157
/*
144158
* SHOW setting
145159
*/

‎src/backend/replication/repl_scanner.l

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ identifier{ident_start}{ident_cont}*
8585
BASE_BACKUP{return K_BASE_BACKUP; }
8686
FAST{return K_FAST; }
8787
IDENTIFY_SYSTEM{return K_IDENTIFY_SYSTEM; }
88+
READ_REPLICATION_SLOT{return K_READ_REPLICATION_SLOT; }
8889
SHOW{return K_SHOW; }
8990
LABEL{return K_LABEL; }
9091
NOWAIT{return K_NOWAIT; }

‎src/backend/replication/walsender.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
232232
staticvoidWalSndDone(WalSndSendDataCallbacksend_data);
233233
staticXLogRecPtrGetStandbyFlushRecPtr(void);
234234
staticvoidIdentifySystem(void);
235+
staticvoidReadReplicationSlot(ReadReplicationSlotCmd*cmd);
235236
staticvoidCreateReplicationSlot(CreateReplicationSlotCmd*cmd);
236237
staticvoidDropReplicationSlot(DropReplicationSlotCmd*cmd);
237238
staticvoidStartReplication(StartReplicationCmd*cmd);
@@ -457,6 +458,104 @@ IdentifySystem(void)
457458
end_tup_output(tstate);
458459
}
459460

461+
/* Handle READ_REPLICATION_SLOT command */
462+
staticvoid
463+
ReadReplicationSlot(ReadReplicationSlotCmd*cmd)
464+
{
465+
#defineREAD_REPLICATION_SLOT_COLS 3
466+
ReplicationSlot*slot;
467+
DestReceiver*dest;
468+
TupOutputState*tstate;
469+
TupleDesctupdesc;
470+
Datumvalues[READ_REPLICATION_SLOT_COLS];
471+
boolnulls[READ_REPLICATION_SLOT_COLS];
472+
473+
tupdesc=CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
474+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)1,"slot_type",
475+
TEXTOID,-1,0);
476+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)2,"restart_lsn",
477+
TEXTOID,-1,0);
478+
/* TimeLineID is unsigned, so int4 is not wide enough. */
479+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)3,"restart_tli",
480+
INT8OID,-1,0);
481+
482+
MemSet(values,0,READ_REPLICATION_SLOT_COLS*sizeof(Datum));
483+
MemSet(nulls, true,READ_REPLICATION_SLOT_COLS*sizeof(bool));
484+
485+
LWLockAcquire(ReplicationSlotControlLock,LW_SHARED);
486+
slot=SearchNamedReplicationSlot(cmd->slotname, false);
487+
if (slot==NULL|| !slot->in_use)
488+
{
489+
LWLockRelease(ReplicationSlotControlLock);
490+
}
491+
else
492+
{
493+
ReplicationSlotslot_contents;
494+
inti=0;
495+
496+
/* Copy slot contents while holding spinlock */
497+
SpinLockAcquire(&slot->mutex);
498+
slot_contents=*slot;
499+
SpinLockRelease(&slot->mutex);
500+
LWLockRelease(ReplicationSlotControlLock);
501+
502+
if (OidIsValid(slot_contents.data.database))
503+
ereport(ERROR,
504+
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
505+
errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
506+
"READ_REPLICATION_SLOT",
507+
NameStr(slot_contents.data.name)));
508+
509+
/* slot type */
510+
values[i]=CStringGetTextDatum("physical");
511+
nulls[i]= false;
512+
i++;
513+
514+
/* start LSN */
515+
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
516+
{
517+
charxloc[64];
518+
519+
snprintf(xloc,sizeof(xloc),"%X/%X",
520+
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
521+
values[i]=CStringGetTextDatum(xloc);
522+
nulls[i]= false;
523+
}
524+
i++;
525+
526+
/* timeline this WAL was produced on */
527+
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
528+
{
529+
TimeLineIDslots_position_timeline;
530+
TimeLineIDcurrent_timeline;
531+
List*timeline_history=NIL;
532+
533+
/*
534+
* While in recovery, use as timeline the currently-replaying one
535+
* to get the LSN position's history.
536+
*/
537+
if (RecoveryInProgress())
538+
(void)GetXLogReplayRecPtr(&current_timeline);
539+
else
540+
current_timeline=ThisTimeLineID;
541+
542+
timeline_history=readTimeLineHistory(current_timeline);
543+
slots_position_timeline=tliOfPointInHistory(slot_contents.data.restart_lsn,
544+
timeline_history);
545+
values[i]=Int64GetDatum((int64)slots_position_timeline);
546+
nulls[i]= false;
547+
}
548+
i++;
549+
550+
Assert(i==READ_REPLICATION_SLOT_COLS);
551+
}
552+
553+
dest=CreateDestReceiver(DestRemoteSimple);
554+
tstate=begin_tup_output_tupdesc(dest,tupdesc,&TTSOpsVirtual);
555+
do_tup_output(tstate,values,nulls);
556+
end_tup_output(tstate);
557+
}
558+
460559

461560
/*
462561
* Handle TIMELINE_HISTORY command.
@@ -1622,6 +1721,13 @@ exec_replication_command(const char *cmd_string)
16221721
EndReplicationCommand(cmdtag);
16231722
break;
16241723

1724+
caseT_ReadReplicationSlotCmd:
1725+
cmdtag="READ_REPLICATION_SLOT";
1726+
set_ps_display(cmdtag);
1727+
ReadReplicationSlot((ReadReplicationSlotCmd*)cmd_node);
1728+
EndReplicationCommand(cmdtag);
1729+
break;
1730+
16251731
caseT_BaseBackupCmd:
16261732
cmdtag="BASE_BACKUP";
16271733
set_ps_display(cmdtag);

‎src/include/nodes/nodes.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ typedef enum NodeTag
496496
T_BaseBackupCmd,
497497
T_CreateReplicationSlotCmd,
498498
T_DropReplicationSlotCmd,
499+
T_ReadReplicationSlotCmd,
499500
T_StartReplicationCmd,
500501
T_TimeLineHistoryCmd,
501502
T_SQLCmd,

‎src/include/nodes/replnodes.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ typedef struct StartReplicationCmd
8787
}StartReplicationCmd;
8888

8989

90+
/* ----------------------
91+
*READ_REPLICATION_SLOT command
92+
* ----------------------
93+
*/
94+
typedefstructReadReplicationSlotCmd
95+
{
96+
NodeTagtype;
97+
char*slotname;
98+
}ReadReplicationSlotCmd;
99+
100+
90101
/* ----------------------
91102
*TIMELINE_HISTORY command
92103
* ----------------------

‎src/test/recovery/t/001_stream_rep.pl

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use warnings;
77
use PostgreSQL::Test::Cluster;
88
use PostgreSQL::Test::Utils;
9-
use Test::Moretests=>49;
9+
use Test::Moretests=>53;
1010

1111
# Initialize primary node
1212
my$node_primary = PostgreSQL::Test::Cluster->new('primary');
@@ -254,6 +254,36 @@ sub test_target_session_attrs
254254
"SHOW with superuser-settable parameter, replication role and logical replication"
255255
);
256256

257+
note"testing READ_REPLICATION_SLOT command for replication connection";
258+
259+
my$slotname ='test_read_replication_slot_physical';
260+
261+
($ret,$stdout,$stderr) =$node_primary->psql(
262+
'postgres',
263+
'READ_REPLICATION_SLOT non_existent_slot;',
264+
extra_params=> ['-d',$connstr_rep ]);
265+
ok($ret == 0,"READ_REPLICATION_SLOT exit code 0 on success");
266+
like($stdout,qr/^||$/,
267+
"READ_REPLICATION_SLOT returns NULL values if slot does not exist");
268+
269+
$node_primary->psql(
270+
'postgres',
271+
"CREATE_REPLICATION_SLOT$slotname PHYSICAL RESERVE_WAL;",
272+
extra_params=> ['-d',$connstr_rep ]);
273+
274+
($ret,$stdout,$stderr) =$node_primary->psql(
275+
'postgres',
276+
"READ_REPLICATION_SLOT$slotname;",
277+
extra_params=> ['-d',$connstr_rep ]);
278+
ok($ret == 0,"READ_REPLICATION_SLOT success with existing slot");
279+
like($stdout,qr/^physical\|[^|]*\|1$/,
280+
"READ_REPLICATION_SLOT returns tuple with slot information");
281+
282+
$node_primary->psql(
283+
'postgres',
284+
"DROP_REPLICATION_SLOT$slotname;",
285+
extra_params=> ['-d',$connstr_rep ]);
286+
257287
note"switching to physical replication slot";
258288

259289
# Switch to using a physical replication slot. We can do this without a new

‎src/test/recovery/t/006_logical_decoding.pl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use warnings;
1111
use PostgreSQL::Test::Cluster;
1212
use PostgreSQL::Test::Utils;
13-
use Test::Moretests=>14;
13+
use Test::Moretests=>15;
1414
use Config;
1515

1616
# Initialize primary node
@@ -39,6 +39,15 @@
3939
m/replication slot "test_slot" was not created in this database/,
4040
"Logical decoding correctly fails to start");
4141

42+
($result,$stdout,$stderr) =$node_primary->psql(
43+
'template1',
44+
qq[READ_REPLICATION_SLOT test_slot;],
45+
replication=>'database');
46+
like(
47+
$stderr,
48+
qr/cannot use "READ_REPLICATION_SLOT" with logical replication slot "test_slot"/,
49+
'READ_REPLICATION_SLOT not supported for logical slots');
50+
4251
# Check case of walsender not using a database connection. Logical
4352
# decoding should not be allowed.
4453
($result,$stdout,$stderr) =$node_primary->psql(

‎src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,6 +2129,7 @@ ReadBufferMode
21292129
ReadBytePtrType
21302130
ReadExtraTocPtrType
21312131
ReadFunc
2132+
ReadReplicationSlotCmd
21322133
ReassignOwnedStmt
21332134
RecheckForeignScan_function
21342135
RecordCacheEntry

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp