@@ -1124,117 +1124,184 @@ ReplicationSlotReserveWal(void)
11241124}
11251125
11261126/*
1127- *Mark any slot that points to an LSN older than the givensegment
1128- *as invalid; itrequires WAL that's about to be removed .
1127+ *Helper for InvalidateObsoleteReplicationSlots -- acquires the givenslot
1128+ *and mark itinvalid, if necessary and possible .
11291129 *
1130- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1130+ * Returns whether ReplicationSlotControlLock was released in the interim (and
1131+ * in that case we're not holding the lock at return, otherwise we are).
1132+ *
1133+ * This is inherently racy, because we release the LWLock
1134+ * for syscalls, so caller must restart if we return true.
11311135 */
1132- void
1133- InvalidateObsoleteReplicationSlots ( XLogSegNo oldestSegno )
1136+ static bool
1137+ InvalidatePossiblyObsoleteSlot ( ReplicationSlot * s , XLogRecPtr oldestLSN )
11341138{
1135- XLogRecPtr oldestLSN ;
1136-
1137- XLogSegNoOffsetToRecPtr (oldestSegno ,0 ,wal_segment_size ,oldestLSN );
1139+ int last_signaled_pid = 0 ;
1140+ bool released_lock = false;
11381141
1139- restart :
1140- LWLockAcquire (ReplicationSlotControlLock ,LW_SHARED );
1141- for (int i = 0 ;i < max_replication_slots ;i ++ )
1142+ for (;;)
11421143{
1143- ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1144- XLogRecPtr restart_lsn = InvalidXLogRecPtr ;
1144+ XLogRecPtr restart_lsn ;
11451145NameData slotname ;
1146- int wspid ;
1147- int last_signaled_pid = 0 ;
1146+ int active_pid = 0 ;
1147+
1148+ Assert (LWLockHeldByMeInMode (ReplicationSlotControlLock ,LW_SHARED ));
11481149
11491150if (!s -> in_use )
1150- continue ;
1151+ {
1152+ if (released_lock )
1153+ LWLockRelease (ReplicationSlotControlLock );
1154+ break ;
1155+ }
11511156
1157+ /*
1158+ * Check if the slot needs to be invalidated. If it needs to be
1159+ * invalidated, and is not currently acquired, acquire it and mark it
1160+ * as having been invalidated. We do this with the spinlock held to
1161+ * avoid race conditions -- for example the restart_lsn could move
1162+ * forward, or the slot could be dropped.
1163+ */
11521164SpinLockAcquire (& s -> mutex );
1153- slotname = s -> data . name ;
1165+
11541166restart_lsn = s -> data .restart_lsn ;
1155- SpinLockRelease (& s -> mutex );
11561167
1168+ /*
1169+ * If the slot is already invalid or is fresh enough, we don't need to
1170+ * do anything.
1171+ */
11571172if (XLogRecPtrIsInvalid (restart_lsn )|| restart_lsn >=oldestLSN )
1158- continue ;
1159- LWLockRelease (ReplicationSlotControlLock );
1160- CHECK_FOR_INTERRUPTS ();
1173+ {
1174+ SpinLockRelease (& s -> mutex );
1175+ if (released_lock )
1176+ LWLockRelease (ReplicationSlotControlLock );
1177+ break ;
1178+ }
1179+
1180+ slotname = s -> data .name ;
1181+ active_pid = s -> active_pid ;
1182+
1183+ /*
1184+ * If the slot can be acquired, do so and mark it invalidated
1185+ * immediately. Otherwise we'll signal the owning process, below, and
1186+ * retry.
1187+ */
1188+ if (active_pid == 0 )
1189+ {
1190+ MyReplicationSlot = s ;
1191+ s -> active_pid = MyProcPid ;
1192+ s -> data .invalidated_at = restart_lsn ;
1193+ s -> data .restart_lsn = InvalidXLogRecPtr ;
1194+ }
11611195
1162- /* Get ready to sleep on the slot in case it is active */
1163- ConditionVariablePrepareToSleep (& s -> active_cv );
1196+ SpinLockRelease (& s -> mutex );
11641197
1165- for (;; )
1198+ if ( active_pid != 0 )
11661199{
11671200/*
1168- * Try to mark this slot as used by this process.
1169- *
1170- * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
1171- * should not cancel the prepared condition variable
1172- * if this slot is active in other process. Because in this case
1173- * we have to wait on that CV for the process owning
1174- * the slot to be terminated, later.
1201+ * Prepare the sleep on the slot's condition variable before
1202+ * releasing the lock, to close a possible race condition if the
1203+ * slot is released before the sleep below.
11751204 */
1176- wspid = ReplicationSlotAcquireInternal ( s , NULL , SAB_Inquire );
1205+ ConditionVariablePrepareToSleep ( & s -> active_cv );
11771206
1178- /*
1179- * Exit the loop if we successfully acquired the slot or
1180- * the slot was dropped during waiting for the owning process
1181- * to be terminated. For example, the latter case is likely to
1182- * happen when the slot is temporary because it's automatically
1183- * dropped by the termination of the owning process.
1184- */
1185- if (wspid <=0 )
1186- break ;
1207+ LWLockRelease (ReplicationSlotControlLock );
1208+ released_lock = true;
11871209
11881210/*
1189- * Signal to terminate the process that owns the slot.
1211+ * Signal to terminate the process that owns the slot, if we
1212+ * haven't already signalled it. (Avoidance of repeated
1213+ * signalling is the only reason for there to be a loop in this
1214+ * routine; otherwise we could rely on caller's restart loop.)
11901215 *
1191- * There is the race condition where other process may own
1192- * the slot after the process using it was terminated and before
1193- * this process owns it. To handle this case, we signal again
1194- * if the PID of the owning process is changed than the last.
1195- *
1196- * XXX This logic assumes that the same PID is not reused
1197- * very quickly.
1216+ * There is the race condition that other process may own the slot
1217+ * after its current owner process is terminated and before this
1218+ * process owns it. To handle that, we signal only if the PID of
1219+ * the owning process has changed from the previous time. (This
1220+ * logic assumes that the same PID is not reused very quickly.)
11981221 */
1199- if (last_signaled_pid != wspid )
1222+ if (last_signaled_pid != active_pid )
12001223{
12011224ereport (LOG ,
1202- (errmsg ("terminating process %d because replication slot \"%s\" is too far behind" ,
1203- wspid ,NameStr (slotname ))));
1204- (void )kill (wspid ,SIGTERM );
1205- last_signaled_pid = wspid ;
1225+ (errmsg ("terminating process %d to release replication slot \"%s\"" ,
1226+ active_pid ,NameStr (slotname ))));
1227+
1228+ (void )kill (active_pid ,SIGTERM );
1229+ last_signaled_pid = active_pid ;
12061230}
12071231
1208- ConditionVariableTimedSleep (& s -> active_cv ,10 ,
1209- WAIT_EVENT_REPLICATION_SLOT_DROP );
1232+ /* Wait until the slot is released. */
1233+ ConditionVariableSleep (& s -> active_cv ,
1234+ WAIT_EVENT_REPLICATION_SLOT_DROP );
1235+
1236+ /*
1237+ * Re-acquire lock and start over; we expect to invalidate the slot
1238+ * next time (unless another process acquires the slot in the
1239+ * meantime).
1240+ */
1241+ LWLockAcquire (ReplicationSlotControlLock ,LW_SHARED );
1242+ continue ;
12101243}
1211- ConditionVariableCancelSleep ();
1244+ else
1245+ {
1246+ /*
1247+ * We hold the slot now and have already invalidated it; flush it
1248+ * to ensure that state persists.
1249+ *
1250+ * Don't want to hold ReplicationSlotControlLock across file
1251+ * system operations, so release it now but be sure to tell caller
1252+ * to restart from scratch.
1253+ */
1254+ LWLockRelease (ReplicationSlotControlLock );
1255+ released_lock = true;
12121256
1213- /*
1214- * Do nothing here and start from scratch if the slot has
1215- * already been dropped.
1216- */
1217- if (wspid == -1 )
1218- gotorestart ;
1257+ /* Make sure the invalidated state persists across server restart */
1258+ ReplicationSlotMarkDirty ();
1259+ ReplicationSlotSave ();
1260+ ReplicationSlotRelease ();
12191261
1220- ereport (LOG ,
1221- (errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
1222- NameStr (slotname ),
1223- (uint32 ) (restart_lsn >>32 ),
1224- (uint32 )restart_lsn )));
1262+ ereport (LOG ,
1263+ (errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
1264+ NameStr (slotname ),
1265+ (uint32 ) (restart_lsn >>32 ),
1266+ (uint32 )restart_lsn )));
12251267
1226- SpinLockAcquire ( & s -> mutex );
1227- s -> data . invalidated_at = s -> data . restart_lsn ;
1228- s -> data . restart_lsn = InvalidXLogRecPtr ;
1229- SpinLockRelease ( & s -> mutex );
1268+ /* done with this slot for now */
1269+ break ;
1270+ }
1271+ }
12301272
1231- /* Make sure the invalidated state persists across server restart */
1232- ReplicationSlotMarkDirty ();
1233- ReplicationSlotSave ();
1234- ReplicationSlotRelease ();
1273+ Assert (released_lock == !LWLockHeldByMe (ReplicationSlotControlLock ));
12351274
1236- /* if we did anything, start from scratch */
1237- gotorestart ;
1275+ return released_lock ;
1276+ }
1277+
1278+ /*
1279+ * Mark any slot that points to an LSN older than the given segment
1280+ * as invalid; it requires WAL that's about to be removed.
1281+ *
1282+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1283+ */
1284+ void
1285+ InvalidateObsoleteReplicationSlots (XLogSegNo oldestSegno )
1286+ {
1287+ XLogRecPtr oldestLSN ;
1288+
1289+ XLogSegNoOffsetToRecPtr (oldestSegno ,0 ,wal_segment_size ,oldestLSN );
1290+
1291+ restart :
1292+ LWLockAcquire (ReplicationSlotControlLock ,LW_SHARED );
1293+ for (int i = 0 ;i < max_replication_slots ;i ++ )
1294+ {
1295+ ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1296+
1297+ if (!s -> in_use )
1298+ continue ;
1299+
1300+ if (InvalidatePossiblyObsoleteSlot (s ,oldestLSN ))
1301+ {
1302+ /* if the lock was released, start from scratch */
1303+ gotorestart ;
1304+ }
12381305}
12391306LWLockRelease (ReplicationSlotControlLock );
12401307}