@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
9999int max_replication_slots = 0 ;/* the maximum number of replication
100100 * slots */
101101
102+ static ReplicationSlot * SearchNamedReplicationSlot (const char * name );
103+ static int ReplicationSlotAcquireInternal (ReplicationSlot * slot ,
104+ const char * name ,SlotAcquireBehavior behavior );
102105static void ReplicationSlotDropAcquired (void );
103106static void ReplicationSlotDropPtr (ReplicationSlot * slot );
104107
@@ -322,102 +325,142 @@ ReplicationSlotCreate(const char *name, bool db_specific,
322325}
323326
324327/*
325- *Find a previously created slot and mark it as used by this backend .
328+ *Search for the named replication slot .
326329 *
327- * The return value is only useful if behavior is SAB_Inquire, in which
328- * it's zero if we successfully acquired the slot, or the PID of the
329- * owning process otherwise. If behavior is SAB_Error, then trying to
330- * acquire an owned slot is an error. If SAB_Block, we sleep until the
331- * slot is released by the owning process.
330+ * Return the replication slot if found, otherwise NULL.
331+ *
332+ * The caller must hold ReplicationSlotControlLock in shared mode.
332333 */
333- int
334- ReplicationSlotAcquire (const char * name , SlotAcquireBehavior behavior )
334+ static ReplicationSlot *
335+ SearchNamedReplicationSlot (const char * name )
335336{
336- ReplicationSlot * slot ;
337- int active_pid ;
338337int i ;
338+ ReplicationSlot * slot = NULL ;
339339
340- retry :
341- Assert ( MyReplicationSlot == NULL );
340+ Assert ( LWLockHeldByMeInMode ( ReplicationSlotControlLock ,
341+ LW_SHARED ) );
342342
343- /*
344- * Search for the named slot and mark it active if we find it. If the
345- * slot is already active, we exit the loop with active_pid set to the PID
346- * of the backend that owns it.
347- */
348- active_pid = 0 ;
349- slot = NULL ;
350- LWLockAcquire (ReplicationSlotControlLock ,LW_SHARED );
351343for (i = 0 ;i < max_replication_slots ;i ++ )
352344{
353345ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
354346
355347if (s -> in_use && strcmp (name ,NameStr (s -> data .name ))== 0 )
356348{
357- /*
358- * This is the slot we want; check if it's active under some other
359- * process. In single user mode, we don't need this check.
360- */
361- if (IsUnderPostmaster )
362- {
363- /*
364- * Get ready to sleep on it in case it is active. (We may end
365- * up not sleeping, but we don't want to do this while holding
366- * the spinlock.)
367- */
368- ConditionVariablePrepareToSleep (& s -> active_cv );
369-
370- SpinLockAcquire (& s -> mutex );
371-
372- active_pid = s -> active_pid ;
373- if (active_pid == 0 )
374- active_pid = s -> active_pid = MyProcPid ;
375-
376- SpinLockRelease (& s -> mutex );
377- }
378- else
379- active_pid = MyProcPid ;
380349slot = s ;
381-
382350break ;
383351}
384352}
385- LWLockRelease (ReplicationSlotControlLock );
386353
387- /* If we did not find the slot, error out. */
388- if (slot == NULL )
354+ return slot ;
355+ }
356+
357+ /*
358+ * Find a previously created slot and mark it as used by this process.
359+ *
360+ * The return value is only useful if behavior is SAB_Inquire, in which
361+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
362+ * exists, or the PID of the owning process otherwise. If behavior is
363+ * SAB_Error, then trying to acquire an owned slot is an error.
364+ * If SAB_Block, we sleep until the slot is released by the owning process.
365+ */
366+ int
367+ ReplicationSlotAcquire (const char * name ,SlotAcquireBehavior behavior )
368+ {
369+ return ReplicationSlotAcquireInternal (NULL ,name ,behavior );
370+ }
371+
372+ /*
373+ * Mark the specified slot as used by this process.
374+ *
375+ * Only one of slot and name can be specified.
376+ * If slot == NULL, search for the slot with the given name.
377+ *
378+ * See comments about the return value in ReplicationSlotAcquire().
379+ */
380+ static int
381+ ReplicationSlotAcquireInternal (ReplicationSlot * slot ,const char * name ,
382+ SlotAcquireBehavior behavior )
383+ {
384+ ReplicationSlot * s ;
385+ int active_pid ;
386+
387+ AssertArg ((slot == NULL ) ^ (name == NULL ));
388+
389+ retry :
390+ Assert (MyReplicationSlot == NULL );
391+
392+ LWLockAcquire (ReplicationSlotControlLock ,LW_SHARED );
393+
394+ /*
395+ * Search for the slot with the specified name if the slot to acquire is
396+ * not given. If the slot is not found, we either return -1 or error out.
397+ */
398+ s = slot ?slot :SearchNamedReplicationSlot (name );
399+ if (s == NULL || !s -> in_use )
400+ {
401+ LWLockRelease (ReplicationSlotControlLock );
402+
403+ if (behavior == SAB_Inquire )
404+ return -1 ;
389405ereport (ERROR ,
390406(errcode (ERRCODE_UNDEFINED_OBJECT ),
391- errmsg ("replication slot \"%s\" does not exist" ,name )));
407+ errmsg ("replication slot \"%s\" does not exist" ,
408+ name ?name :NameStr (slot -> data .name ))));
409+ }
392410
393411/*
394- * If we found the slot but it's already active in another backend, we
395- * either error out or retry after a short wait, as caller specified.
412+ * This is the slot we want; check if it's active under some other
413+ * process. In single user mode, we don't need this check.
414+ */
415+ if (IsUnderPostmaster )
416+ {
417+ /*
418+ * Get ready to sleep on the slot in case it is active if SAB_Block.
419+ * (We may end up not sleeping, but we don't want to do this while
420+ * holding the spinlock.)
421+ */
422+ if (behavior == SAB_Block )
423+ ConditionVariablePrepareToSleep (& s -> active_cv );
424+
425+ SpinLockAcquire (& s -> mutex );
426+ if (s -> active_pid == 0 )
427+ s -> active_pid = MyProcPid ;
428+ active_pid = s -> active_pid ;
429+ SpinLockRelease (& s -> mutex );
430+ }
431+ else
432+ active_pid = MyProcPid ;
433+ LWLockRelease (ReplicationSlotControlLock );
434+
435+ /*
436+ * If we found the slot but it's already active in another process, we
437+ * either error out, return the PID of the owning process, or retry
438+ * after a short wait, as caller specified.
396439 */
397440if (active_pid != MyProcPid )
398441{
399442if (behavior == SAB_Error )
400443ereport (ERROR ,
401444(errcode (ERRCODE_OBJECT_IN_USE ),
402445errmsg ("replication slot \"%s\" is active for PID %d" ,
403- name ,active_pid )));
446+ NameStr ( s -> data . name ) ,active_pid )));
404447else if (behavior == SAB_Inquire )
405448return active_pid ;
406449
407450/* Wait here until we get signaled, and then restart */
408- ConditionVariableSleep (& slot -> active_cv ,
451+ ConditionVariableSleep (& s -> active_cv ,
409452WAIT_EVENT_REPLICATION_SLOT_DROP );
410453ConditionVariableCancelSleep ();
411454gotoretry ;
412455}
413- else
414- ConditionVariableCancelSleep ();/* no sleep needed after all */
456+ else if ( behavior == SAB_Block )
457+ ConditionVariableCancelSleep ();/* no sleep needed after all */
415458
416459/* Let everybody know we've modified this slot */
417- ConditionVariableBroadcast (& slot -> active_cv );
460+ ConditionVariableBroadcast (& s -> active_cv );
418461
419462/* We made this slot active, so it's ours now. */
420- MyReplicationSlot = slot ;
463+ MyReplicationSlot = s ;
421464
422465/* success */
423466return 0 ;
@@ -1100,43 +1143,82 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
11001143ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
11011144XLogRecPtr restart_lsn = InvalidXLogRecPtr ;
11021145NameData slotname ;
1146+ int wspid ;
1147+ int last_signaled_pid = 0 ;
11031148
11041149if (!s -> in_use )
11051150continue ;
11061151
11071152SpinLockAcquire (& s -> mutex );
1108- if (s -> data .restart_lsn == InvalidXLogRecPtr ||
1109- s -> data .restart_lsn >=oldestLSN )
1110- {
1111- SpinLockRelease (& s -> mutex );
1112- continue ;
1113- }
1114-
11151153slotname = s -> data .name ;
11161154restart_lsn = s -> data .restart_lsn ;
1117-
11181155SpinLockRelease (& s -> mutex );
1156+
1157+ if (XLogRecPtrIsInvalid (restart_lsn )|| restart_lsn >=oldestLSN )
1158+ continue ;
11191159LWLockRelease (ReplicationSlotControlLock );
11201160
1161+ /* Get ready to sleep on the slot in case it is active */
1162+ ConditionVariablePrepareToSleep (& s -> active_cv );
1163+
11211164for (;;)
11221165{
1123- int wspid = ReplicationSlotAcquire (NameStr (slotname ),
1124- SAB_Inquire );
1166+ /*
1167+ * Try to mark this slot as used by this process.
1168+ *
1169+ * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
1170+ * should not cancel the prepared condition variable
1171+ * if this slot is active in other process. Because in this case
1172+ * we have to wait on that CV for the process owning
1173+ * the slot to be terminated, later.
1174+ */
1175+ wspid = ReplicationSlotAcquireInternal (s ,NULL ,SAB_Inquire );
11251176
1126- /* no walsender? success! */
1127- if (wspid == 0 )
1177+ /*
1178+ * Exit the loop if we successfully acquired the slot or
1179+ * the slot was dropped during waiting for the owning process
1180+ * to be terminated. For example, the latter case is likely to
1181+ * happen when the slot is temporary because it's automatically
1182+ * dropped by the termination of the owning process.
1183+ */
1184+ if (wspid <=0 )
11281185break ;
11291186
1130- ereport (LOG ,
1131- (errmsg ("terminating walsender %d because replication slot \"%s\" is too far behind" ,
1132- wspid ,NameStr (slotname ))));
1133- (void )kill (wspid ,SIGTERM );
1187+ /*
1188+ * Signal to terminate the process that owns the slot.
1189+ *
1190+ * There is the race condition where other process may own
1191+ * the slot after the process using it was terminated and before
1192+ * this process owns it. To handle this case, we signal again
1193+ * if the PID of the owning process is changed than the last.
1194+ *
1195+ * XXX This logic assumes that the same PID is not reused
1196+ * very quickly.
1197+ */
1198+ if (last_signaled_pid != wspid )
1199+ {
1200+ ereport (LOG ,
1201+ (errmsg ("terminating process %d because replication slot \"%s\" is too far behind" ,
1202+ wspid ,NameStr (slotname ))));
1203+ (void )kill (wspid ,SIGTERM );
1204+ last_signaled_pid = wspid ;
1205+ }
11341206
11351207ConditionVariableTimedSleep (& s -> active_cv ,10 ,
11361208WAIT_EVENT_REPLICATION_SLOT_DROP );
11371209}
11381210ConditionVariableCancelSleep ();
11391211
1212+ /*
1213+ * Do nothing here and start from scratch if the slot has
1214+ * already been dropped.
1215+ */
1216+ if (wspid == -1 )
1217+ {
1218+ CHECK_FOR_INTERRUPTS ();
1219+ gotorestart ;
1220+ }
1221+
11401222ereport (LOG ,
11411223(errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
11421224NameStr (slotname ),