Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commiteb4da3e

Browse files
committed
Add option to control snapshot export to CREATE_REPLICATION_SLOT
We used to export snapshots unconditionally in CREATE_REPLICATION_SLOTin the replication protocol, but several upcoming patches want morecontrol over what happens.Suppress snapshot export in pg_recvlogical, which neither needs nor canuse the exported snapshot. Since snapshot exporting can fail thisimproves reliability.This also paves the way for allowing the creation of replication slotson standbys, which cannot export snapshots because they cannot allocatenew XIDs.Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
1 parent7150402 commiteb4da3e

File tree

10 files changed

+141
-26
lines changed

10 files changed

+141
-26
lines changed

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,9 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
271271
<sect2>
272272
<title>Exported Snapshots</title>
273273
<para>
274-
When a new replication slot is created using the streaming replication interface,
275-
a snapshot is exported
274+
When a new replication slot is created using the streaming replication
275+
interface (see <xref linkend="protocol-replication-create-slot">), a
276+
snapshot is exported
276277
(see <xref linkend="functions-snapshot-synchronization">), which will show
277278
exactly the state of the database after which all changes will be
278279
included in the change stream. This can be used to create a new replica by
@@ -282,6 +283,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
282283
database's state at that point in time, which afterwards can be updated
283284
using the slot's contents without losing any changes.
284285
</para>
286+
<para>
287+
Creation of a snapshot is not always possible. In particular, it will
288+
fail when connected to a hot standby. Applications that do not require
289+
snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</>
290+
option.
291+
</para>
285292
</sect2>
286293
</sect1>
287294

‎doc/src/sgml/protocol.sgml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,8 +1486,8 @@ The commands accepted in walsender mode are:
14861486
</listitem>
14871487
</varlistentry>
14881488

1489-
<varlistentry>
1490-
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
1489+
<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
1490+
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</>[ <literal>EXPORT_SNAPSHOT</> | <literal>NOEXPORT_SNAPSHOT</> ]}
14911491
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
14921492
</term>
14931493
<listitem>
@@ -1538,6 +1538,21 @@ The commands accepted in walsender mode are:
15381538
</para>
15391539
</listitem>
15401540
</varlistentry>
1541+
1542+
<varlistentry>
1543+
<term><literal>EXPORT_SNAPSHOT</></term>
1544+
<term><literal>NOEXPORT_SNAPSHOT</></term>
1545+
<listitem>
1546+
<para>
1547+
Decides what to do with the snapshot created during logical slot
1548+
initialization. <literal>EXPORT_SNAPSHOT</>, which is the default,
1549+
will export the snapshot for use in other sessions. This option can't
1550+
be used inside a transaction. <literal>NOEXPORT_SNAPSHOT</> will
1551+
just use the snapshot for logical decoding as normal but won't do
1552+
anything else with it.
1553+
</para>
1554+
</listitem>
1555+
</varlistentry>
15411556
</variablelist>
15421557

15431558
<para>

‎src/backend/commands/subscriptioncmds.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
314314

315315
PG_TRY();
316316
{
317-
walrcv_create_slot(wrconn,slotname, false,&lsn);
317+
/*
318+
* Create permanent slot for the subscription. We won't use the
319+
* initial snapshot for anything, so no need to export it.
320+
*/
321+
walrcv_create_slot(wrconn,slotname, false, false,&lsn);
318322
ereport(NOTICE,
319323
(errmsg("created replication slot \"%s\" on publisher",
320324
slotname)));

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
6868
staticchar*libpqrcv_create_slot(WalReceiverConn*conn,
6969
constchar*slotname,
7070
booltemporary,
71+
boolexport_snapshot,
7172
XLogRecPtr*lsn);
7273
staticboollibpqrcv_command(WalReceiverConn*conn,
7374
constchar*cmd,char**err);
@@ -720,21 +721,27 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
720721
*/
721722
staticchar*
722723
libpqrcv_create_slot(WalReceiverConn*conn,constchar*slotname,
723-
booltemporary,XLogRecPtr*lsn)
724+
booltemporary,boolexport_snapshot,XLogRecPtr*lsn)
724725
{
725726
PGresult*res;
726727
StringInfoDatacmd;
727728
char*snapshot;
728729

729730
initStringInfo(&cmd);
730731

731-
appendStringInfo(&cmd,"CREATE_REPLICATION_SLOT \"%s\"",slotname);
732+
appendStringInfo(&cmd,"CREATE_REPLICATION_SLOT \"%s\"",slotname);
732733

733734
if (temporary)
734-
appendStringInfo(&cmd,"TEMPORARY");
735+
appendStringInfo(&cmd," TEMPORARY");
735736

736737
if (conn->logical)
737-
appendStringInfo(&cmd,"LOGICAL pgoutput");
738+
{
739+
appendStringInfo(&cmd," LOGICAL pgoutput");
740+
if (export_snapshot)
741+
appendStringInfo(&cmd," EXPORT_SNAPSHOT");
742+
else
743+
appendStringInfo(&cmd," NOEXPORT_SNAPSHOT");
744+
}
738745

739746
res=libpqrcv_PQexec(conn->streamConn,cmd.data);
740747
pfree(cmd.data);

‎src/backend/replication/repl_gram.y

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ Node *replication_parse_result;
7979
%tokenK_SLOT
8080
%tokenK_RESERVE_WAL
8181
%tokenK_TEMPORARY
82+
%tokenK_EXPORT_SNAPSHOT
83+
%tokenK_NOEXPORT_SNAPSHOT
8284

8385
%type<node>command
8486
%type<node>base_backupstart_replicationstart_logical_replication
@@ -91,7 +93,9 @@ Node *replication_parse_result;
9193
%type<defelt>plugin_opt_elem
9294
%type<node>plugin_opt_arg
9395
%type<str>opt_slotvar_name
94-
%type<boolval>opt_reserve_walopt_temporary
96+
%type<boolval>opt_temporary
97+
%type<list>create_slot_opt_list
98+
%type<defelt>create_slot_opt
9599

96100
%%
97101

@@ -202,29 +206,55 @@ base_backup_opt:
202206

203207
create_replication_slot:
204208
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL*/
205-
K_CREATE_REPLICATION_SLOTIDENTopt_temporaryK_PHYSICALopt_reserve_wal
209+
K_CREATE_REPLICATION_SLOTIDENTopt_temporaryK_PHYSICALcreate_slot_opt_list
206210
{
207211
CreateReplicationSlotCmd *cmd;
208212
cmd = makeNode(CreateReplicationSlotCmd);
209213
cmd->kind = REPLICATION_KIND_PHYSICAL;
210214
cmd->slotname =$2;
211215
cmd->temporary =$3;
212-
cmd->reserve_wal =$5;
216+
cmd->options =$5;
213217
$$ = (Node *) cmd;
214218
}
215219
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin*/
216-
|K_CREATE_REPLICATION_SLOTIDENTopt_temporaryK_LOGICALIDENT
220+
|K_CREATE_REPLICATION_SLOTIDENTopt_temporaryK_LOGICALIDENTcreate_slot_opt_list
217221
{
218222
CreateReplicationSlotCmd *cmd;
219223
cmd = makeNode(CreateReplicationSlotCmd);
220224
cmd->kind = REPLICATION_KIND_LOGICAL;
221225
cmd->slotname =$2;
222226
cmd->temporary =$3;
223227
cmd->plugin =$5;
228+
cmd->options =$6;
224229
$$ = (Node *) cmd;
225230
}
226231
;
227232

233+
create_slot_opt_list:
234+
create_slot_opt_listcreate_slot_opt
235+
{$$ = lappend($1,$2); }
236+
|/* EMPTY*/
237+
{$$ = NIL; }
238+
;
239+
240+
create_slot_opt:
241+
K_EXPORT_SNAPSHOT
242+
{
243+
$$ = makeDefElem("export_snapshot",
244+
(Node *)makeInteger(TRUE), -1);
245+
}
246+
|K_NOEXPORT_SNAPSHOT
247+
{
248+
$$ = makeDefElem("export_snapshot",
249+
(Node *)makeInteger(FALSE), -1);
250+
}
251+
|K_RESERVE_WAL
252+
{
253+
$$ = makeDefElem("reserve_wal",
254+
(Node *)makeInteger(TRUE), -1);
255+
}
256+
;
257+
228258
/* DROP_REPLICATION_SLOT slot*/
229259
drop_replication_slot:
230260
K_DROP_REPLICATION_SLOTIDENT
@@ -291,11 +321,6 @@ opt_physical:
291321
|/* EMPTY*/
292322
;
293323

294-
opt_reserve_wal:
295-
K_RESERVE_WAL{$$ =true; }
296-
|/* EMPTY*/{$$ =false; }
297-
;
298-
299324
opt_temporary:
300325
K_TEMPORARY{$$ =true; }
301326
|/* EMPTY*/{$$ =false; }

‎src/backend/replication/repl_scanner.l

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ RESERVE_WAL{ return K_RESERVE_WAL; }
100100
LOGICAL{return K_LOGICAL; }
101101
SLOT{return K_SLOT; }
102102
TEMPORARY{return K_TEMPORARY; }
103+
EXPORT_SNAPSHOT{return K_EXPORT_SNAPSHOT; }
104+
NOEXPORT_SNAPSHOT{return K_NOEXPORT_SNAPSHOT; }
103105

104106
","{return','; }
105107
";"{return';'; }

‎src/backend/replication/walsender.c

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
#include"catalog/pg_type.h"
5353
#include"commands/dbcommands.h"
54+
#include"commands/defrem.h"
5455
#include"funcapi.h"
5556
#include"libpq/libpq.h"
5657
#include"libpq/pqformat.h"
@@ -737,6 +738,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
737738
returncount;
738739
}
739740

741+
/*
742+
* Process extra options given to CREATE_REPLICATION_SLOT.
743+
*/
744+
staticvoid
745+
parseCreateReplSlotOptions(CreateReplicationSlotCmd*cmd,
746+
bool*reserve_wal,
747+
bool*export_snapshot)
748+
{
749+
ListCell*lc;
750+
boolsnapshot_action_given= false;
751+
boolreserve_wal_given= false;
752+
753+
/* Parse options */
754+
foreach (lc,cmd->options)
755+
{
756+
DefElem*defel= (DefElem*)lfirst(lc);
757+
758+
if (strcmp(defel->defname,"export_snapshot")==0)
759+
{
760+
if (snapshot_action_given||cmd->kind!=REPLICATION_KIND_LOGICAL)
761+
ereport(ERROR,
762+
(errcode(ERRCODE_SYNTAX_ERROR),
763+
errmsg("conflicting or redundant options")));
764+
765+
snapshot_action_given= true;
766+
*export_snapshot=defGetBoolean(defel);
767+
}
768+
elseif (strcmp(defel->defname,"reserve_wal")==0)
769+
{
770+
if (reserve_wal_given||cmd->kind!=REPLICATION_KIND_PHYSICAL)
771+
ereport(ERROR,
772+
(errcode(ERRCODE_SYNTAX_ERROR),
773+
errmsg("conflicting or redundant options")));
774+
775+
reserve_wal_given= true;
776+
*reserve_wal= true;
777+
}
778+
else
779+
elog(ERROR,"unrecognized option: %s",defel->defname);
780+
}
781+
}
782+
740783
/*
741784
* Create a new replication slot.
742785
*/
@@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
746789
constchar*snapshot_name=NULL;
747790
charxpos[MAXFNAMELEN];
748791
char*slot_name;
792+
boolreserve_wal= false;
793+
boolexport_snapshot= true;
749794
DestReceiver*dest;
750795
TupOutputState*tstate;
751796
TupleDesctupdesc;
@@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
754799

755800
Assert(!MyReplicationSlot);
756801

802+
parseCreateReplSlotOptions(cmd,&reserve_wal,&export_snapshot);
803+
757804
/* setup state for XLogReadPage */
758805
sendTimeLineIsHistoric= false;
759806
sendTimeLine=ThisTimeLineID;
@@ -799,18 +846,21 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
799846
DecodingContextFindStartpoint(ctx);
800847

801848
/*
802-
* Export a plain (not of the snapbuild.c type) snapshot to the user
803-
* that can be imported into another session.
849+
* Export the snapshot if we've been asked to do so.
850+
*
851+
* NB. We will convert the snapbuild.c kind of snapshot to normal
852+
* snapshot when doing this.
804853
*/
805-
snapshot_name=SnapBuildExportSnapshot(ctx->snapshot_builder);
854+
if (export_snapshot)
855+
snapshot_name=SnapBuildExportSnapshot(ctx->snapshot_builder);
806856

807857
/* don't need the decoding context anymore */
808858
FreeDecodingContext(ctx);
809859

810860
if (!cmd->temporary)
811861
ReplicationSlotPersist();
812862
}
813-
elseif (cmd->kind==REPLICATION_KIND_PHYSICAL&&cmd->reserve_wal)
863+
elseif (cmd->kind==REPLICATION_KIND_PHYSICAL&&reserve_wal)
814864
{
815865
ReplicationSlotReserveWal();
816866

‎src/bin/pg_basebackup/streamutil.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
338338
appendPQExpBuffer(query,"CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
339339
slot_name);
340340
else
341+
{
341342
appendPQExpBuffer(query,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
342343
slot_name,plugin);
344+
if (PQserverVersion(conn) >=100000)
345+
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
346+
appendPQExpBuffer(query," NOEXPORT_SNAPSHOT");
347+
}
343348

344349
res=PQexec(conn,query->data);
345350
if (PQresultStatus(res)!=PGRES_TUPLES_OK)

‎src/include/nodes/replnodes.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd
5656
ReplicationKindkind;
5757
char*plugin;
5858
booltemporary;
59-
boolreserve_wal;
59+
List*options;
6060
}CreateReplicationSlotCmd;
6161

6262

‎src/include/replication/walreceiver.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
183183
intnbytes);
184184
typedefchar*(*walrcv_create_slot_fn) (WalReceiverConn*conn,
185185
constchar*slotname,booltemporary,
186-
XLogRecPtr*lsn);
186+
boolexport_snapshot,XLogRecPtr*lsn);
187187
typedefbool (*walrcv_command_fn) (WalReceiverConn*conn,constchar*cmd,
188188
char**err);
189189
typedefvoid (*walrcv_disconnect_fn) (WalReceiverConn*conn);
@@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
224224
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
225225
#definewalrcv_send(conn,buffer,nbytes) \
226226
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
227-
#definewalrcv_create_slot(conn,slotname,temporary,lsn) \
228-
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn)
227+
#definewalrcv_create_slot(conn,slotname,temporary,export_snapshot,lsn) \
228+
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary,export_snapshot,lsn)
229229
#definewalrcv_command(conn,cmd,err) \
230230
WalReceiverFunctions->walrcv_command(conn, cmd, err)
231231
#definewalrcv_disconnect(conn) \

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp