@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
118118 * Helper function for creating a new logical replication slot with
119119 * given arguments. Note that this function doesn't release the created
120120 * slot.
121+ *
122+ * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123+ * caller's responsibility to ensure it's set to something sensible.
121124 */
122125static void
123126create_logical_replication_slot (char * name ,char * plugin ,
124- bool temporary ,XLogRecPtr restart_lsn )
127+ bool temporary ,XLogRecPtr restart_lsn ,
128+ bool find_startpoint )
125129{
126130LogicalDecodingContext * ctx = NULL ;
127131
@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
139143temporary ?RS_TEMPORARY :RS_EPHEMERAL );
140144
141145/*
142- * Create logical decoding context, to build the initial snapshot.
146+ * Create logical decoding context to find start point or, if we don't
147+ * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
148+ *
149+ * Note: when !find_startpoint this is still important, because it's at
150+ * this point that the output plugin is validated.
143151 */
144152ctx = CreateInitDecodingContext (plugin ,NIL ,
145- false,/*do not build snapshot */
153+ false,/*just catalogs is OK */
146154restart_lsn ,
147155logical_read_local_xlog_page ,NULL ,NULL ,
148156NULL );
149157
150- /* build initial snapshot, might take a while */
151- DecodingContextFindStartpoint (ctx );
158+ /*
159+ * If caller needs us to determine the decoding start point, do so now.
160+ * This might take a while.
161+ */
162+ if (find_startpoint )
163+ DecodingContextFindStartpoint (ctx );
152164
153165/* don't need the decoding context anymore */
154166FreeDecodingContext (ctx );
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
179191create_logical_replication_slot (NameStr (* name ),
180192NameStr (* plugin ),
181193temporary ,
182- InvalidXLogRecPtr );
194+ InvalidXLogRecPtr ,
195+ true);
183196
184197values [0 ]= NameGetDatum (& MyReplicationSlot -> data .name );
185198values [1 ]= LSNGetDatum (MyReplicationSlot -> data .confirmed_flush );
@@ -691,10 +704,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
691704
692705/* Create new slot and acquire it */
693706if (logical_slot )
707+ {
708+ /*
709+ * We must not try to read WAL, since we haven't reserved it yet --
710+ * hence pass find_startpoint false. confirmed_flush will be set
711+ * below, by copying from the source slot.
712+ */
694713create_logical_replication_slot (NameStr (* dst_name ),
695714plugin ,
696715temporary ,
697- src_restart_lsn );
716+ src_restart_lsn ,
717+ false);
718+ }
698719else
699720create_physical_replication_slot (NameStr (* dst_name ),
700721 true,
@@ -711,6 +732,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
711732TransactionId copy_xmin ;
712733TransactionId copy_catalog_xmin ;
713734XLogRecPtr copy_restart_lsn ;
735+ XLogRecPtr copy_confirmed_flush ;
714736bool copy_islogical ;
715737char * copy_name ;
716738
@@ -722,6 +744,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
722744copy_xmin = src -> data .xmin ;
723745copy_catalog_xmin = src -> data .catalog_xmin ;
724746copy_restart_lsn = src -> data .restart_lsn ;
747+ copy_confirmed_flush = src -> data .confirmed_flush ;
725748
726749/* for existence check */
727750copy_name = pstrdup (NameStr (src -> data .name ));
@@ -746,6 +769,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
746769NameStr (* src_name )),
747770errdetail ("The source replication slot was modified incompatibly during the copy operation." )));
748771
772+ /* The source slot must have a consistent snapshot */
773+ if (src_islogical && XLogRecPtrIsInvalid (copy_confirmed_flush ))
774+ ereport (ERROR ,
775+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
776+ errmsg ("cannot copy unfinished logical replication slot \"%s\"" ,
777+ NameStr (* src_name )),
778+ errhint ("Retry when the source replication slot's confirmed_flush_lsn is valid." )));
779+
749780/* Install copied values again */
750781SpinLockAcquire (& MyReplicationSlot -> mutex );
751782MyReplicationSlot -> effective_xmin = copy_effective_xmin ;
@@ -754,6 +785,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
754785MyReplicationSlot -> data .xmin = copy_xmin ;
755786MyReplicationSlot -> data .catalog_xmin = copy_catalog_xmin ;
756787MyReplicationSlot -> data .restart_lsn = copy_restart_lsn ;
788+ MyReplicationSlot -> data .confirmed_flush = copy_confirmed_flush ;
757789SpinLockRelease (& MyReplicationSlot -> mutex );
758790
759791ReplicationSlotMarkDirty ();