@@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
12411241}
12421242
12431243/*
1244- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1245- * and mark it invalid, if necessary and possible.
1244+ * Report that replication slot needs to be invalidated
1245+ */
1246+ static void
1247+ ReportSlotInvalidation (ReplicationSlotInvalidationCause cause ,
1248+ bool terminating ,
1249+ int pid ,
1250+ NameData slotname ,
1251+ XLogRecPtr restart_lsn ,
1252+ XLogRecPtr oldestLSN ,
1253+ TransactionId snapshotConflictHorizon )
1254+ {
1255+ StringInfoData err_detail ;
1256+ bool hint = false;
1257+
1258+ initStringInfo (& err_detail );
1259+
1260+ switch (cause )
1261+ {
1262+ case RS_INVAL_WAL_REMOVED :
1263+ hint = true;
1264+ appendStringInfo (& err_detail ,_ ("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes." ),
1265+ LSN_FORMAT_ARGS (restart_lsn ),
1266+ (unsigned long long ) (oldestLSN - restart_lsn ));
1267+ break ;
1268+ case RS_INVAL_HORIZON :
1269+ appendStringInfo (& err_detail ,_ ("The slot conflicted with xid horizon %u." ),
1270+ snapshotConflictHorizon );
1271+ break ;
1272+
1273+ case RS_INVAL_WAL_LEVEL :
1274+ appendStringInfo (& err_detail ,_ ("Logical decoding on standby requires wal_level to be at least logical on the primary server" ));
1275+ break ;
1276+ case RS_INVAL_NONE :
1277+ pg_unreachable ();
1278+ }
1279+
1280+ ereport (LOG ,
1281+ terminating ?
1282+ errmsg ("terminating process %d to release replication slot \"%s\"" ,
1283+ pid ,NameStr (slotname )) :
1284+ errmsg ("invalidating obsolete replication slot \"%s\"" ,
1285+ NameStr (slotname )),
1286+ errdetail_internal ("%s" ,err_detail .data ),
1287+ hint ?errhint ("You might need to increase max_slot_wal_keep_size." ) :0 );
1288+
1289+ pfree (err_detail .data );
1290+ }
1291+
1292+ /*
1293+ * Helper for InvalidateObsoleteReplicationSlots
1294+ *
1295+ * Acquires the given slot and mark it invalid, if necessary and possible.
12461296 *
12471297 * Returns whether ReplicationSlotControlLock was released in the interim (and
12481298 * in that case we're not holding the lock at return, otherwise we are).
@@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
12531303 * for syscalls, so caller must restart if we return true.
12541304 */
12551305static bool
1256- InvalidatePossiblyObsoleteSlot (ReplicationSlot * s ,XLogRecPtr oldestLSN ,
1306+ InvalidatePossiblyObsoleteSlot (ReplicationSlotInvalidationCause cause ,
1307+ ReplicationSlot * s ,
1308+ XLogRecPtr oldestLSN ,
1309+ Oid dboid ,TransactionId snapshotConflictHorizon ,
12571310bool * invalidated )
12581311{
12591312int last_signaled_pid = 0 ;
@@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
12641317XLogRecPtr restart_lsn ;
12651318NameData slotname ;
12661319int active_pid = 0 ;
1320+ ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE ;
12671321
12681322Assert (LWLockHeldByMeInMode (ReplicationSlotControlLock ,LW_SHARED ));
12691323
@@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
12861340restart_lsn = s -> data .restart_lsn ;
12871341
12881342/*
1289- * If the slot is already invalid or isfresh enough, we don't need to
1290- * do anything.
1343+ * If the slot is already invalid or isa non conflicting slot, we
1344+ *don't need to do anything.
12911345 */
1292- if (XLogRecPtrIsInvalid (restart_lsn )|| restart_lsn >=oldestLSN )
1346+ if (s -> data .invalidated == RS_INVAL_NONE )
1347+ {
1348+ switch (cause )
1349+ {
1350+ case RS_INVAL_WAL_REMOVED :
1351+ if (s -> data .restart_lsn != InvalidXLogRecPtr &&
1352+ s -> data .restart_lsn < oldestLSN )
1353+ conflict = cause ;
1354+ break ;
1355+ case RS_INVAL_HORIZON :
1356+ if (!SlotIsLogical (s ))
1357+ break ;
1358+ /* invalid DB oid signals a shared relation */
1359+ if (dboid != InvalidOid && dboid != s -> data .database )
1360+ break ;
1361+ if (TransactionIdIsValid (s -> effective_xmin )&&
1362+ TransactionIdPrecedesOrEquals (s -> effective_xmin ,
1363+ snapshotConflictHorizon ))
1364+ conflict = cause ;
1365+ else if (TransactionIdIsValid (s -> effective_catalog_xmin )&&
1366+ TransactionIdPrecedesOrEquals (s -> effective_catalog_xmin ,
1367+ snapshotConflictHorizon ))
1368+ conflict = cause ;
1369+ break ;
1370+ case RS_INVAL_WAL_LEVEL :
1371+ if (SlotIsLogical (s ))
1372+ conflict = cause ;
1373+ break ;
1374+ case RS_INVAL_NONE :
1375+ pg_unreachable ();
1376+ }
1377+ }
1378+
1379+ /* if there's no conflict, we're done */
1380+ if (conflict == RS_INVAL_NONE )
12931381{
12941382SpinLockRelease (& s -> mutex );
12951383if (released_lock )
@@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13091397{
13101398MyReplicationSlot = s ;
13111399s -> active_pid = MyProcPid ;
1312- s -> data .invalidated = RS_INVAL_WAL_REMOVED ;
1400+ s -> data .invalidated = conflict ;
13131401
13141402/*
13151403 * XXX: We should consider not overwriting restart_lsn and instead
13161404 * just rely on .invalidated.
13171405 */
1318- s -> data .restart_lsn = InvalidXLogRecPtr ;
1406+ if (conflict == RS_INVAL_WAL_REMOVED )
1407+ s -> data .restart_lsn = InvalidXLogRecPtr ;
13191408
13201409/* Let caller know */
13211410* invalidated = true;
@@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13491438 */
13501439if (last_signaled_pid != active_pid )
13511440{
1352- ereport (LOG ,
1353- errmsg ("terminating process %d to release replication slot \"%s\"" ,
1354- active_pid ,NameStr (slotname )),
1355- errdetail ("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes." ,
1356- LSN_FORMAT_ARGS (restart_lsn ),
1357- (unsigned long long ) (oldestLSN - restart_lsn )),
1358- errhint ("You might need to increase max_slot_wal_keep_size." ));
1441+ ReportSlotInvalidation (conflict , true,active_pid ,
1442+ slotname ,restart_lsn ,
1443+ oldestLSN ,snapshotConflictHorizon );
13591444
13601445(void )kill (active_pid ,SIGTERM );
13611446last_signaled_pid = active_pid ;
@@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13901475ReplicationSlotMarkDirty ();
13911476ReplicationSlotSave ();
13921477ReplicationSlotRelease ();
1478+ pgstat_drop_replslot (s );
13931479
1394- ereport (LOG ,
1395- errmsg ("invalidating obsolete replication slot \"%s\"" ,
1396- NameStr (slotname )),
1397- errdetail ("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes." ,
1398- LSN_FORMAT_ARGS (restart_lsn ),
1399- (unsigned long long ) (oldestLSN - restart_lsn )),
1400- errhint ("You might need to increase max_slot_wal_keep_size." ));
1480+ ReportSlotInvalidation (conflict , false,active_pid ,
1481+ slotname ,restart_lsn ,
1482+ oldestLSN ,snapshotConflictHorizon );
14011483
14021484/* done with this slot for now */
14031485break ;
@@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
14101492}
14111493
14121494/*
1413- * Mark any slot that points to an LSN older than the given segment
1414- * as invalid; it requires WAL that's about to be removed.
1495+ * Invalidate slots that require resources about to be removed.
14151496 *
14161497 * Returns true when any slot have got invalidated.
14171498 *
1499+ * Whether a slot needs to be invalidated depends on the cause. A slot is
1500+ * removed if it:
1501+ * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1502+ * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1503+ * db; dboid may be InvalidOid for shared relations
1504+ * - RS_INVAL_WAL_LEVEL: is logical
1505+ *
14181506 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
14191507 */
14201508bool
1421- InvalidateObsoleteReplicationSlots (XLogSegNo oldestSegno )
1509+ InvalidateObsoleteReplicationSlots (ReplicationSlotInvalidationCause cause ,
1510+ XLogSegNo oldestSegno ,Oid dboid ,
1511+ TransactionId snapshotConflictHorizon )
14221512{
14231513XLogRecPtr oldestLSN ;
14241514bool invalidated = false;
14251515
1516+ Assert (cause != RS_INVAL_HORIZON || TransactionIdIsValid (snapshotConflictHorizon ));
1517+ Assert (cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0 );
1518+ Assert (cause != RS_INVAL_NONE );
1519+
1520+ if (max_replication_slots == 0 )
1521+ return invalidated ;
1522+
14261523XLogSegNoOffsetToRecPtr (oldestSegno ,0 ,wal_segment_size ,oldestLSN );
14271524
14281525restart :
@@ -1434,7 +1531,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
14341531if (!s -> in_use )
14351532continue ;
14361533
1437- if (InvalidatePossiblyObsoleteSlot (s ,oldestLSN ,& invalidated ))
1534+ if (InvalidatePossiblyObsoleteSlot (cause ,s ,oldestLSN ,dboid ,
1535+ snapshotConflictHorizon ,
1536+ & invalidated ))
14381537{
14391538/* if the lock was released, start from scratch */
14401539gotorestart ;