Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit218b101

Browse files
committed
Fix race condition in invalidating obsolete replication slots
The code added to mark replication slots invalid in commitc655077had the race condition that a slot can be dropped or advancedconcurrently with checkpointer trying to invalidate it. Rewrite thecode to close those races.The changes to ReplicationSlotAcquire's API added withc655077 arenot necessary anymore. To avoid an ABI break in released branches, thiscommit leaves that unchanged; it'll be changed in a master-only commitseparately.Backpatch to 13, where this code first appeared.Reported-by: Andres Freund <andres@anarazel.de>Author: Andres Freund <andres@anarazel.de>Author: Álvaro Herrera <alvherre@alvh.no-ip.org>Discussion:https://postgr.es/m/20210408001037.wfmk6jud36auhfqm@alap3.anarazel.de
1 parent6e43f1c commit218b101

File tree

1 file changed

+145
-78
lines changed

1 file changed

+145
-78
lines changed

‎src/backend/replication/slot.c

Lines changed: 145 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,117 +1124,184 @@ ReplicationSlotReserveWal(void)
11241124
}
11251125

11261126
/*
1127-
*Mark any slot that points to an LSN older thanthe givensegment
1128-
*as invalid; itrequires WAL that's about to be removed.
1127+
*Helper for InvalidateObsoleteReplicationSlots -- acquiresthe givenslot
1128+
*and mark itinvalid, if necessary and possible.
11291129
*
1130-
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
1130+
* Returns whether ReplicationSlotControlLock was released in the interim (and
1131+
* in that case we're not holding the lock at return, otherwise we are).
1132+
*
1133+
* This is inherently racy, because we release the LWLock
1134+
* for syscalls, so caller must restart if we return true.
11311135
*/
1132-
void
1133-
InvalidateObsoleteReplicationSlots(XLogSegNooldestSegno)
1136+
staticbool
1137+
InvalidatePossiblyObsoleteSlot(ReplicationSlot*s,XLogRecPtroldestLSN)
11341138
{
1135-
XLogRecPtroldestLSN;
1136-
1137-
XLogSegNoOffsetToRecPtr(oldestSegno,0,wal_segment_size,oldestLSN);
1139+
intlast_signaled_pid=0;
1140+
boolreleased_lock= false;
11381141

1139-
restart:
1140-
LWLockAcquire(ReplicationSlotControlLock,LW_SHARED);
1141-
for (inti=0;i<max_replication_slots;i++)
1142+
for (;;)
11421143
{
1143-
ReplicationSlot*s=&ReplicationSlotCtl->replication_slots[i];
1144-
XLogRecPtrrestart_lsn=InvalidXLogRecPtr;
1144+
XLogRecPtrrestart_lsn;
11451145
NameDataslotname;
1146-
intwspid;
1147-
intlast_signaled_pid=0;
1146+
intactive_pid=0;
1147+
1148+
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,LW_SHARED));
11481149

11491150
if (!s->in_use)
1150-
continue;
1151+
{
1152+
if (released_lock)
1153+
LWLockRelease(ReplicationSlotControlLock);
1154+
break;
1155+
}
11511156

1157+
/*
1158+
* Check if the slot needs to be invalidated. If it needs to be
1159+
* invalidated, and is not currently acquired, acquire it and mark it
1160+
* as having been invalidated. We do this with the spinlock held to
1161+
* avoid race conditions -- for example the restart_lsn could move
1162+
* forward, or the slot could be dropped.
1163+
*/
11521164
SpinLockAcquire(&s->mutex);
1153-
slotname=s->data.name;
1165+
11541166
restart_lsn=s->data.restart_lsn;
1155-
SpinLockRelease(&s->mutex);
11561167

1168+
/*
1169+
* If the slot is already invalid or is fresh enough, we don't need to
1170+
* do anything.
1171+
*/
11571172
if (XLogRecPtrIsInvalid(restart_lsn)||restart_lsn >=oldestLSN)
1158-
continue;
1159-
LWLockRelease(ReplicationSlotControlLock);
1160-
CHECK_FOR_INTERRUPTS();
1173+
{
1174+
SpinLockRelease(&s->mutex);
1175+
if (released_lock)
1176+
LWLockRelease(ReplicationSlotControlLock);
1177+
break;
1178+
}
1179+
1180+
slotname=s->data.name;
1181+
active_pid=s->active_pid;
1182+
1183+
/*
1184+
* If the slot can be acquired, do so and mark it invalidated
1185+
* immediately. Otherwise we'll signal the owning process, below, and
1186+
* retry.
1187+
*/
1188+
if (active_pid==0)
1189+
{
1190+
MyReplicationSlot=s;
1191+
s->active_pid=MyProcPid;
1192+
s->data.invalidated_at=restart_lsn;
1193+
s->data.restart_lsn=InvalidXLogRecPtr;
1194+
}
11611195

1162-
/* Get ready to sleep on the slot in case it is active */
1163-
ConditionVariablePrepareToSleep(&s->active_cv);
1196+
SpinLockRelease(&s->mutex);
11641197

1165-
for (;;)
1198+
if (active_pid!=0)
11661199
{
11671200
/*
1168-
* Try to mark this slot as used by this process.
1169-
*
1170-
* Note that ReplicationSlotAcquireInternal(SAB_Inquire)
1171-
* should not cancel the prepared condition variable
1172-
* if this slot is active in other process. Because in this case
1173-
* we have to wait on that CV for the process owning
1174-
* the slot to be terminated, later.
1201+
* Prepare the sleep on the slot's condition variable before
1202+
* releasing the lock, to close a possible race condition if the
1203+
* slot is released before the sleep below.
11751204
*/
1176-
wspid=ReplicationSlotAcquireInternal(s,NULL,SAB_Inquire);
1205+
ConditionVariablePrepareToSleep(&s->active_cv);
11771206

1178-
/*
1179-
* Exit the loop if we successfully acquired the slot or
1180-
* the slot was dropped during waiting for the owning process
1181-
* to be terminated. For example, the latter case is likely to
1182-
* happen when the slot is temporary because it's automatically
1183-
* dropped by the termination of the owning process.
1184-
*/
1185-
if (wspid <=0)
1186-
break;
1207+
LWLockRelease(ReplicationSlotControlLock);
1208+
released_lock= true;
11871209

11881210
/*
1189-
* Signal to terminate the process that owns the slot.
1211+
* Signal to terminate the process that owns the slot, if we
1212+
* haven't already signalled it. (Avoidance of repeated
1213+
* signalling is the only reason for there to be a loop in this
1214+
* routine; otherwise we could rely on caller's restart loop.)
11901215
*
1191-
* There is the race condition where other process may own
1192-
* the slot after the process using it was terminated and before
1193-
* this process owns it. To handle this case, we signal again
1194-
* if the PID of the owning process is changed than the last.
1195-
*
1196-
* XXX This logic assumes that the same PID is not reused
1197-
* very quickly.
1216+
* There is the race condition that other process may own the slot
1217+
* after its current owner process is terminated and before this
1218+
* process owns it. To handle that, we signal only if the PID of
1219+
* the owning process has changed from the previous time. (This
1220+
* logic assumes that the same PID is not reused very quickly.)
11981221
*/
1199-
if (last_signaled_pid!=wspid)
1222+
if (last_signaled_pid!=active_pid)
12001223
{
12011224
ereport(LOG,
1202-
(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
1203-
wspid,NameStr(slotname))));
1204-
(void)kill(wspid,SIGTERM);
1205-
last_signaled_pid=wspid;
1225+
(errmsg("terminating process %d to release replication slot \"%s\"",
1226+
active_pid,NameStr(slotname))));
1227+
1228+
(void)kill(active_pid,SIGTERM);
1229+
last_signaled_pid=active_pid;
12061230
}
12071231

1208-
ConditionVariableTimedSleep(&s->active_cv,10,
1209-
WAIT_EVENT_REPLICATION_SLOT_DROP);
1232+
/* Wait until the slot is released. */
1233+
ConditionVariableSleep(&s->active_cv,
1234+
WAIT_EVENT_REPLICATION_SLOT_DROP);
1235+
1236+
/*
1237+
* Re-acquire lock and start over; we expect to invalidate the slot
1238+
* next time (unless another process acquires the slot in the
1239+
* meantime).
1240+
*/
1241+
LWLockAcquire(ReplicationSlotControlLock,LW_SHARED);
1242+
continue;
12101243
}
1211-
ConditionVariableCancelSleep();
1244+
else
1245+
{
1246+
/*
1247+
* We hold the slot now and have already invalidated it; flush it
1248+
* to ensure that state persists.
1249+
*
1250+
* Don't want to hold ReplicationSlotControlLock across file
1251+
* system operations, so release it now but be sure to tell caller
1252+
* to restart from scratch.
1253+
*/
1254+
LWLockRelease(ReplicationSlotControlLock);
1255+
released_lock= true;
12121256

1213-
/*
1214-
* Do nothing here and start from scratch if the slot has
1215-
* already been dropped.
1216-
*/
1217-
if (wspid==-1)
1218-
gotorestart;
1257+
/* Make sure the invalidated state persists across server restart */
1258+
ReplicationSlotMarkDirty();
1259+
ReplicationSlotSave();
1260+
ReplicationSlotRelease();
12191261

1220-
ereport(LOG,
1221-
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
1222-
NameStr(slotname),
1223-
(uint32) (restart_lsn >>32),
1224-
(uint32)restart_lsn)));
1262+
ereport(LOG,
1263+
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
1264+
NameStr(slotname),
1265+
(uint32) (restart_lsn >>32),
1266+
(uint32)restart_lsn)));
12251267

1226-
SpinLockAcquire(&s->mutex);
1227-
s->data.invalidated_at=s->data.restart_lsn;
1228-
s->data.restart_lsn=InvalidXLogRecPtr;
1229-
SpinLockRelease(&s->mutex);
1268+
/* done with this slot for now */
1269+
break;
1270+
}
1271+
}
12301272

1231-
/* Make sure the invalidated state persists across server restart */
1232-
ReplicationSlotMarkDirty();
1233-
ReplicationSlotSave();
1234-
ReplicationSlotRelease();
1273+
Assert(released_lock== !LWLockHeldByMe(ReplicationSlotControlLock));
12351274

1236-
/* if we did anything, start from scratch */
1237-
gotorestart;
1275+
returnreleased_lock;
1276+
}
1277+
1278+
/*
1279+
* Mark any slot that points to an LSN older than the given segment
1280+
* as invalid; it requires WAL that's about to be removed.
1281+
*
1282+
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
1283+
*/
1284+
void
1285+
InvalidateObsoleteReplicationSlots(XLogSegNooldestSegno)
1286+
{
1287+
XLogRecPtroldestLSN;
1288+
1289+
XLogSegNoOffsetToRecPtr(oldestSegno,0,wal_segment_size,oldestLSN);
1290+
1291+
restart:
1292+
LWLockAcquire(ReplicationSlotControlLock,LW_SHARED);
1293+
for (inti=0;i<max_replication_slots;i++)
1294+
{
1295+
ReplicationSlot*s=&ReplicationSlotCtl->replication_slots[i];
1296+
1297+
if (!s->in_use)
1298+
continue;
1299+
1300+
if (InvalidatePossiblyObsoleteSlot(s,oldestLSN))
1301+
{
1302+
/* if the lock was released, start from scratch */
1303+
gotorestart;
1304+
}
12381305
}
12391306
LWLockRelease(ReplicationSlotControlLock);
12401307
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp