@@ -667,7 +667,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
667667XLogRecPtr pagePtr ,
668668TimeLineID newTLI );
669669static void CheckPointGuts (XLogRecPtr checkPointRedo ,int flags );
670- static void KeepLogSeg (XLogRecPtr recptr ,XLogSegNo * logSegNo );
670+ static void KeepLogSeg (XLogRecPtr recptr ,XLogRecPtr slotsMinLSN ,
671+ XLogSegNo * logSegNo );
671672static XLogRecPtr XLogGetReplicationSlotMinimumLSN (void );
672673
673674static void AdvanceXLInsertBuffer (XLogRecPtr upto ,TimeLineID tli ,
@@ -6891,6 +6892,7 @@ CreateCheckPoint(int flags)
68916892VirtualTransactionId * vxids ;
68926893int nvxids ;
68936894int oldXLogAllowed = 0 ;
6895+ XLogRecPtr slotsMinReqLSN ;
68946896
68956897/*
68966898 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -7119,6 +7121,15 @@ CreateCheckPoint(int flags)
71197121 */
71207122END_CRIT_SECTION ();
71217123
7124+ /*
7125+ * Get the current minimum LSN to be used later in the WAL segment
7126+ * cleanup. We may clean up only WAL segments, which are not needed
7127+ * according to synchronized LSNs of replication slots. The slot's LSN
7128+ * might be advanced concurrently, so we call this before
7129+ * CheckPointReplicationSlots() synchronizes replication slots.
7130+ */
7131+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7132+
71227133/*
71237134 * In some cases there are groups of actions that must all occur on one
71247135 * side or the other of a checkpoint record. Before flushing the
@@ -7307,17 +7318,25 @@ CreateCheckPoint(int flags)
73077318 * prevent the disk holding the xlog from growing full.
73087319 */
73097320XLByteToSeg (RedoRecPtr ,_logSegNo ,wal_segment_size );
7310- KeepLogSeg (recptr ,& _logSegNo );
7321+ KeepLogSeg (recptr ,slotsMinReqLSN , & _logSegNo );
73117322if (InvalidateObsoleteReplicationSlots (RS_INVAL_WAL_REMOVED ,
73127323_logSegNo ,InvalidOid ,
73137324InvalidTransactionId ))
73147325{
7326+ /*
7327+ * Recalculate the current minimum LSN to be used in the WAL segment
7328+ * cleanup. Then, we must synchronize the replication slots again in
7329+ * order to make this LSN safe to use.
7330+ */
7331+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7332+ CheckPointReplicationSlots (shutdown );
7333+
73157334/*
73167335 * Some slots have been invalidated; recalculate the old-segment
73177336 * horizon, starting again from RedoRecPtr.
73187337 */
73197338XLByteToSeg (RedoRecPtr ,_logSegNo ,wal_segment_size );
7320- KeepLogSeg (recptr ,& _logSegNo );
7339+ KeepLogSeg (recptr ,slotsMinReqLSN , & _logSegNo );
73217340}
73227341_logSegNo -- ;
73237342RemoveOldXlogFiles (_logSegNo ,RedoRecPtr ,recptr ,
@@ -7590,6 +7609,7 @@ CreateRestartPoint(int flags)
75907609XLogRecPtr endptr ;
75917610XLogSegNo _logSegNo ;
75927611TimestampTz xtime ;
7612+ XLogRecPtr slotsMinReqLSN ;
75937613
75947614/* Concurrent checkpoint/restartpoint cannot happen */
75957615Assert (!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER );
@@ -7672,6 +7692,15 @@ CreateRestartPoint(int flags)
76727692MemSet (& CheckpointStats ,0 ,sizeof (CheckpointStats ));
76737693CheckpointStats .ckpt_start_t = GetCurrentTimestamp ();
76747694
7695+ /*
7696+ * Get the current minimum LSN to be used later in the WAL segment
7697+ * cleanup. We may clean up only WAL segments, which are not needed
7698+ * according to synchronized LSNs of replication slots. The slot's LSN
7699+ * might be advanced concurrently, so we call this before
7700+ * CheckPointReplicationSlots() synchronizes replication slots.
7701+ */
7702+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7703+
76757704if (log_checkpoints )
76767705LogCheckpointStart (flags , true);
76777706
@@ -7760,17 +7789,25 @@ CreateRestartPoint(int flags)
77607789receivePtr = GetWalRcvFlushRecPtr (NULL ,NULL );
77617790replayPtr = GetXLogReplayRecPtr (& replayTLI );
77627791endptr = (receivePtr < replayPtr ) ?replayPtr :receivePtr ;
7763- KeepLogSeg (endptr ,& _logSegNo );
7792+ KeepLogSeg (endptr ,slotsMinReqLSN , & _logSegNo );
77647793if (InvalidateObsoleteReplicationSlots (RS_INVAL_WAL_REMOVED ,
77657794_logSegNo ,InvalidOid ,
77667795InvalidTransactionId ))
77677796{
7797+ /*
7798+ * Recalculate the current minimum LSN to be used in the WAL segment
7799+ * cleanup. Then, we must synchronize the replication slots again in
7800+ * order to make this LSN safe to use.
7801+ */
7802+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7803+ CheckPointReplicationSlots (flags & CHECKPOINT_IS_SHUTDOWN );
7804+
77687805/*
77697806 * Some slots have been invalidated; recalculate the old-segment
77707807 * horizon, starting again from RedoRecPtr.
77717808 */
77727809XLByteToSeg (RedoRecPtr ,_logSegNo ,wal_segment_size );
7773- KeepLogSeg (endptr ,& _logSegNo );
7810+ KeepLogSeg (endptr ,slotsMinReqLSN , & _logSegNo );
77747811}
77757812_logSegNo -- ;
77767813
@@ -7865,6 +7902,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
78657902XLogSegNo oldestSegMaxWalSize ;/* oldest segid kept by max_wal_size */
78667903XLogSegNo oldestSlotSeg ;/* oldest segid kept by slot */
78677904uint64 keepSegs ;
7905+ XLogRecPtr slotsMinReqLSN ;
78687906
78697907/*
78707908 * slot does not reserve WAL. Either deactivated, or has never been active
@@ -7878,8 +7916,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
78787916 * oldestSlotSeg to the current segment.
78797917 */
78807918currpos = GetXLogWriteRecPtr ();
7919+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
78817920XLByteToSeg (currpos ,oldestSlotSeg ,wal_segment_size );
7882- KeepLogSeg (currpos ,& oldestSlotSeg );
7921+ KeepLogSeg (currpos ,slotsMinReqLSN , & oldestSlotSeg );
78837922
78847923/*
78857924 * Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -7940,7 +7979,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
79407979 * invalidation is optionally done here, instead.
79417980 */
79427981static void
7943- KeepLogSeg (XLogRecPtr recptr ,XLogSegNo * logSegNo )
7982+ KeepLogSeg (XLogRecPtr recptr ,XLogRecPtr slotsMinReqLSN , XLogSegNo * logSegNo )
79447983{
79457984XLogSegNo currSegNo ;
79467985XLogSegNo segno ;
@@ -7953,7 +7992,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
79537992 * Calculate how many segments are kept by slots first, adjusting for
79547993 * max_slot_wal_keep_size.
79557994 */
7956- keep = XLogGetReplicationSlotMinimumLSN () ;
7995+ keep = slotsMinReqLSN ;
79577996if (keep != InvalidXLogRecPtr && keep < recptr )
79587997{
79597998XLByteToSeg (keep ,segno ,wal_segment_size );