@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
118118 * Helper function for creating a new logical replication slot with
119119 * given arguments. Note that this function doesn't release the created
120120 * slot.
121+ *
122+ * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123+ * caller's responsibility to ensure it's set to something sensible.
121124 */
122125static void
123126create_logical_replication_slot (char * name ,char * plugin ,
124- bool temporary ,XLogRecPtr restart_lsn )
127+ bool temporary ,XLogRecPtr restart_lsn ,
128+ bool find_startpoint )
125129{
126130LogicalDecodingContext * ctx = NULL ;
127131
@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
139143temporary ?RS_TEMPORARY :RS_EPHEMERAL );
140144
141145/*
142- * Create logical decoding context, to build the initial snapshot.
146+ * Create logical decoding context to find start point or, if we don't
147+ * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
148+ *
149+ * Note: when !find_startpoint this is still important, because it's at
150+ * this point that the output plugin is validated.
143151 */
144152ctx = CreateInitDecodingContext (plugin ,NIL ,
145- false,/*do not build snapshot */
153+ false,/*just catalogs is OK */
146154restart_lsn ,
147155logical_read_local_xlog_page ,NULL ,NULL ,
148156NULL );
149157
150- /* build initial snapshot, might take a while */
151- DecodingContextFindStartpoint (ctx );
158+ /*
159+ * If caller needs us to determine the decoding start point, do so now.
160+ * This might take a while.
161+ */
162+ if (find_startpoint )
163+ DecodingContextFindStartpoint (ctx );
152164
153165/* don't need the decoding context anymore */
154166FreeDecodingContext (ctx );
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
179191create_logical_replication_slot (NameStr (* name ),
180192NameStr (* plugin ),
181193temporary ,
182- InvalidXLogRecPtr );
194+ InvalidXLogRecPtr ,
195+ true);
183196
184197values [0 ]= NameGetDatum (& MyReplicationSlot -> data .name );
185198values [1 ]= LSNGetDatum (MyReplicationSlot -> data .confirmed_flush );
@@ -683,10 +696,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
683696
684697/* Create new slot and acquire it */
685698if (logical_slot )
699+ {
700+ /*
701+ * We must not try to read WAL, since we haven't reserved it yet --
702+ * hence pass find_startpoint false. confirmed_flush will be set
703+ * below, by copying from the source slot.
704+ */
686705create_logical_replication_slot (NameStr (* dst_name ),
687706plugin ,
688707temporary ,
689- src_restart_lsn );
708+ src_restart_lsn ,
709+ false);
710+ }
690711else
691712create_physical_replication_slot (NameStr (* dst_name ),
692713 true,
@@ -703,6 +724,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
703724TransactionId copy_xmin ;
704725TransactionId copy_catalog_xmin ;
705726XLogRecPtr copy_restart_lsn ;
727+ XLogRecPtr copy_confirmed_flush ;
706728bool copy_islogical ;
707729char * copy_name ;
708730
@@ -714,6 +736,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
714736copy_xmin = src -> data .xmin ;
715737copy_catalog_xmin = src -> data .catalog_xmin ;
716738copy_restart_lsn = src -> data .restart_lsn ;
739+ copy_confirmed_flush = src -> data .confirmed_flush ;
717740
718741/* for existence check */
719742copy_name = pstrdup (NameStr (src -> data .name ));
@@ -738,6 +761,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
738761NameStr (* src_name )),
739762errdetail ("The source replication slot was modified incompatibly during the copy operation." )));
740763
764+ /* The source slot must have a consistent snapshot */
765+ if (src_islogical && XLogRecPtrIsInvalid (copy_confirmed_flush ))
766+ ereport (ERROR ,
767+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
768+ errmsg ("cannot copy unfinished logical replication slot \"%s\"" ,
769+ NameStr (* src_name )),
770+ errhint ("Retry when the source replication slot's confirmed_flush_lsn is valid." )));
771+
741772/* Install copied values again */
742773SpinLockAcquire (& MyReplicationSlot -> mutex );
743774MyReplicationSlot -> effective_xmin = copy_effective_xmin ;
@@ -746,6 +777,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
746777MyReplicationSlot -> data .xmin = copy_xmin ;
747778MyReplicationSlot -> data .catalog_xmin = copy_catalog_xmin ;
748779MyReplicationSlot -> data .restart_lsn = copy_restart_lsn ;
780+ MyReplicationSlot -> data .confirmed_flush = copy_confirmed_flush ;
749781SpinLockRelease (& MyReplicationSlot -> mutex );
750782
751783ReplicationSlotMarkDirty ();