Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbe87200

Browse files
committed
Support invalidating replication slots due to horizon and wal_level
Needed for logical decoding on a standby. Slots need to be invalidated becauseof the horizon if rows required for logical decoding are removed. If theprimary's wal_level is lowered from 'logical', logical slots on the standbyneed to be invalidated.The new invalidation methods will be used in a subsequent commit.Logical slots that have been invalidated can be identified via the newpg_replication_slots.conflicting column.See6af1793 for an overall design of logical decoding on a standby.Bumps catversion for the addition of the new pg_replication_slots column.Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>Author: Andres Freund <andres@anarazel.de>Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>Reviewed-by: Andres Freund <andres@anarazel.de>Reviewed-by: Robert Haas <robertmhaas@gmail.com>Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>Discussion:https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
1 parent2ed16aa commitbe87200

File tree

10 files changed

+176
-37
lines changed

10 files changed

+176
-37
lines changed

‎doc/src/sgml/system-views.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
25172517
false for physical slots.
25182518
</para></entry>
25192519
</row>
2520+
2521+
<row>
2522+
<entry role="catalog_table_entry"><para role="column_definition">
2523+
<structfield>conflicting</structfield> <type>bool</type>
2524+
</para>
2525+
<para>
2526+
True if this logical slot conflicted with recovery (and so is now
2527+
invalidated). Always NULL for physical slots.
2528+
</para></entry>
2529+
</row>
25202530
</tbody>
25212531
</tgroup>
25222532
</table>

‎src/backend/access/transam/xlog.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6809,7 +6809,9 @@ CreateCheckPoint(int flags)
68096809
*/
68106810
XLByteToSeg(RedoRecPtr,_logSegNo,wal_segment_size);
68116811
KeepLogSeg(recptr,&_logSegNo);
6812-
if (InvalidateObsoleteReplicationSlots(_logSegNo))
6812+
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
6813+
_logSegNo,InvalidOid,
6814+
InvalidTransactionId))
68136815
{
68146816
/*
68156817
* Some slots have been invalidated; recalculate the old-segment
@@ -7253,7 +7255,9 @@ CreateRestartPoint(int flags)
72537255
replayPtr=GetXLogReplayRecPtr(&replayTLI);
72547256
endptr= (receivePtr<replayPtr) ?replayPtr :receivePtr;
72557257
KeepLogSeg(endptr,&_logSegNo);
7256-
if (InvalidateObsoleteReplicationSlots(_logSegNo))
7258+
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
7259+
_logSegNo,InvalidOid,
7260+
InvalidTransactionId))
72577261
{
72587262
/*
72597263
* Some slots have been invalidated; recalculate the old-segment

‎src/backend/catalog/system_views.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,8 @@ CREATE VIEW pg_replication_slots AS
10001000
L.confirmed_flush_lsn,
10011001
L.wal_status,
10021002
L.safe_wal_size,
1003-
L.two_phase
1003+
L.two_phase,
1004+
L.conflicting
10041005
FROM pg_get_replication_slots()AS L
10051006
LEFT JOIN pg_database DON (L.datoid=D.oid);
10061007

‎src/backend/replication/logical/logical.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
531531
NameStr(MyReplicationSlot->data.name)),
532532
errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
533533

534+
if (MyReplicationSlot->data.invalidated!=RS_INVAL_NONE)
535+
ereport(ERROR,
536+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
537+
errmsg("can no longer get changes from replication slot \"%s\"",
538+
NameStr(MyReplicationSlot->data.name)),
539+
errdetail("This slot has been invalidated because it was conflicting with recovery.")));
540+
534541
Assert(MyReplicationSlot->data.invalidated==RS_INVAL_NONE);
535542
Assert(MyReplicationSlot->data.restart_lsn!=InvalidXLogRecPtr);
536543

‎src/backend/replication/slot.c

Lines changed: 125 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
12411241
}
12421242

12431243
/*
1244-
* Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1245-
* and mark it invalid, if necessary and possible.
1244+
* Report that replication slot needs to be invalidated
1245+
*/
1246+
staticvoid
1247+
ReportSlotInvalidation(ReplicationSlotInvalidationCausecause,
1248+
boolterminating,
1249+
intpid,
1250+
NameDataslotname,
1251+
XLogRecPtrrestart_lsn,
1252+
XLogRecPtroldestLSN,
1253+
TransactionIdsnapshotConflictHorizon)
1254+
{
1255+
StringInfoDataerr_detail;
1256+
boolhint= false;
1257+
1258+
initStringInfo(&err_detail);
1259+
1260+
switch (cause)
1261+
{
1262+
caseRS_INVAL_WAL_REMOVED:
1263+
hint= true;
1264+
appendStringInfo(&err_detail,_("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
1265+
LSN_FORMAT_ARGS(restart_lsn),
1266+
(unsigned long long) (oldestLSN-restart_lsn));
1267+
break;
1268+
caseRS_INVAL_HORIZON:
1269+
appendStringInfo(&err_detail,_("The slot conflicted with xid horizon %u."),
1270+
snapshotConflictHorizon);
1271+
break;
1272+
1273+
caseRS_INVAL_WAL_LEVEL:
1274+
appendStringInfo(&err_detail,_("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
1275+
break;
1276+
caseRS_INVAL_NONE:
1277+
pg_unreachable();
1278+
}
1279+
1280+
ereport(LOG,
1281+
terminating ?
1282+
errmsg("terminating process %d to release replication slot \"%s\"",
1283+
pid,NameStr(slotname)) :
1284+
errmsg("invalidating obsolete replication slot \"%s\"",
1285+
NameStr(slotname)),
1286+
errdetail_internal("%s",err_detail.data),
1287+
hint ?errhint("You might need to increase max_slot_wal_keep_size.") :0);
1288+
1289+
pfree(err_detail.data);
1290+
}
1291+
1292+
/*
1293+
* Helper for InvalidateObsoleteReplicationSlots
1294+
*
1295+
* Acquires the given slot and mark it invalid, if necessary and possible.
12461296
*
12471297
* Returns whether ReplicationSlotControlLock was released in the interim (and
12481298
* in that case we're not holding the lock at return, otherwise we are).
@@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
12531303
* for syscalls, so caller must restart if we return true.
12541304
*/
12551305
staticbool
1256-
InvalidatePossiblyObsoleteSlot(ReplicationSlot*s,XLogRecPtroldestLSN,
1306+
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCausecause,
1307+
ReplicationSlot*s,
1308+
XLogRecPtroldestLSN,
1309+
Oiddboid,TransactionIdsnapshotConflictHorizon,
12571310
bool*invalidated)
12581311
{
12591312
intlast_signaled_pid=0;
@@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
12641317
XLogRecPtrrestart_lsn;
12651318
NameDataslotname;
12661319
intactive_pid=0;
1320+
ReplicationSlotInvalidationCauseconflict=RS_INVAL_NONE;
12671321

12681322
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,LW_SHARED));
12691323

@@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
12861340
restart_lsn=s->data.restart_lsn;
12871341

12881342
/*
1289-
* If the slot is already invalid or isfresh enough, we don't need to
1290-
* do anything.
1343+
* If the slot is already invalid or isa non conflicting slot, we
1344+
*don't need todo anything.
12911345
*/
1292-
if (XLogRecPtrIsInvalid(restart_lsn)||restart_lsn >=oldestLSN)
1346+
if (s->data.invalidated==RS_INVAL_NONE)
1347+
{
1348+
switch (cause)
1349+
{
1350+
caseRS_INVAL_WAL_REMOVED:
1351+
if (s->data.restart_lsn!=InvalidXLogRecPtr&&
1352+
s->data.restart_lsn<oldestLSN)
1353+
conflict=cause;
1354+
break;
1355+
caseRS_INVAL_HORIZON:
1356+
if (!SlotIsLogical(s))
1357+
break;
1358+
/* invalid DB oid signals a shared relation */
1359+
if (dboid!=InvalidOid&&dboid!=s->data.database)
1360+
break;
1361+
if (TransactionIdIsValid(s->effective_xmin)&&
1362+
TransactionIdPrecedesOrEquals(s->effective_xmin,
1363+
snapshotConflictHorizon))
1364+
conflict=cause;
1365+
elseif (TransactionIdIsValid(s->effective_catalog_xmin)&&
1366+
TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
1367+
snapshotConflictHorizon))
1368+
conflict=cause;
1369+
break;
1370+
caseRS_INVAL_WAL_LEVEL:
1371+
if (SlotIsLogical(s))
1372+
conflict=cause;
1373+
break;
1374+
caseRS_INVAL_NONE:
1375+
pg_unreachable();
1376+
}
1377+
}
1378+
1379+
/* if there's no conflict, we're done */
1380+
if (conflict==RS_INVAL_NONE)
12931381
{
12941382
SpinLockRelease(&s->mutex);
12951383
if (released_lock)
@@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13091397
{
13101398
MyReplicationSlot=s;
13111399
s->active_pid=MyProcPid;
1312-
s->data.invalidated=RS_INVAL_WAL_REMOVED;
1400+
s->data.invalidated=conflict;
13131401

13141402
/*
13151403
* XXX: We should consider not overwriting restart_lsn and instead
13161404
* just rely on .invalidated.
13171405
*/
1318-
s->data.restart_lsn=InvalidXLogRecPtr;
1406+
if (conflict==RS_INVAL_WAL_REMOVED)
1407+
s->data.restart_lsn=InvalidXLogRecPtr;
13191408

13201409
/* Let caller know */
13211410
*invalidated= true;
@@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13491438
*/
13501439
if (last_signaled_pid!=active_pid)
13511440
{
1352-
ereport(LOG,
1353-
errmsg("terminating process %d to release replication slot \"%s\"",
1354-
active_pid,NameStr(slotname)),
1355-
errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1356-
LSN_FORMAT_ARGS(restart_lsn),
1357-
(unsigned long long) (oldestLSN-restart_lsn)),
1358-
errhint("You might need to increase max_slot_wal_keep_size."));
1441+
ReportSlotInvalidation(conflict, true,active_pid,
1442+
slotname,restart_lsn,
1443+
oldestLSN,snapshotConflictHorizon);
13591444

13601445
(void)kill(active_pid,SIGTERM);
13611446
last_signaled_pid=active_pid;
@@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
13901475
ReplicationSlotMarkDirty();
13911476
ReplicationSlotSave();
13921477
ReplicationSlotRelease();
1478+
pgstat_drop_replslot(s);
13931479

1394-
ereport(LOG,
1395-
errmsg("invalidating obsolete replication slot \"%s\"",
1396-
NameStr(slotname)),
1397-
errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1398-
LSN_FORMAT_ARGS(restart_lsn),
1399-
(unsigned long long) (oldestLSN-restart_lsn)),
1400-
errhint("You might need to increase max_slot_wal_keep_size."));
1480+
ReportSlotInvalidation(conflict, false,active_pid,
1481+
slotname,restart_lsn,
1482+
oldestLSN,snapshotConflictHorizon);
14011483

14021484
/* done with this slot for now */
14031485
break;
@@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
14101492
}
14111493

14121494
/*
1413-
* Mark any slot that points to an LSN older than the given segment
1414-
* as invalid; it requires WAL that's about to be removed.
1495+
* Invalidate slots that require resources about to be removed.
14151496
*
14161497
* Returns true when any slot have got invalidated.
14171498
*
1499+
* Whether a slot needs to be invalidated depends on the cause. A slot is
1500+
* removed if it:
1501+
* - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1502+
* - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1503+
* db; dboid may be InvalidOid for shared relations
1504+
* - RS_INVAL_WAL_LEVEL: is logical
1505+
*
14181506
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
14191507
*/
14201508
bool
1421-
InvalidateObsoleteReplicationSlots(XLogSegNooldestSegno)
1509+
InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCausecause,
1510+
XLogSegNooldestSegno,Oiddboid,
1511+
TransactionIdsnapshotConflictHorizon)
14221512
{
14231513
XLogRecPtroldestLSN;
14241514
boolinvalidated= false;
14251515

1516+
Assert(cause!=RS_INVAL_HORIZON||TransactionIdIsValid(snapshotConflictHorizon));
1517+
Assert(cause!=RS_INVAL_WAL_REMOVED||oldestSegno>0);
1518+
Assert(cause!=RS_INVAL_NONE);
1519+
1520+
if (max_replication_slots==0)
1521+
returninvalidated;
1522+
14261523
XLogSegNoOffsetToRecPtr(oldestSegno,0,wal_segment_size,oldestLSN);
14271524

14281525
restart:
@@ -1434,7 +1531,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
14341531
if (!s->in_use)
14351532
continue;
14361533

1437-
if (InvalidatePossiblyObsoleteSlot(s,oldestLSN,&invalidated))
1534+
if (InvalidatePossiblyObsoleteSlot(cause,s,oldestLSN,dboid,
1535+
snapshotConflictHorizon,
1536+
&invalidated))
14381537
{
14391538
/* if the lock was released, start from scratch */
14401539
gotorestart;

‎src/backend/replication/slotfuncs.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
232232
Datum
233233
pg_get_replication_slots(PG_FUNCTION_ARGS)
234234
{
235-
#definePG_GET_REPLICATION_SLOTS_COLS14
235+
#definePG_GET_REPLICATION_SLOTS_COLS15
236236
ReturnSetInfo*rsinfo= (ReturnSetInfo*)fcinfo->resultinfo;
237237
XLogRecPtrcurrlsn;
238238
intslotno;
@@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
402402

403403
values[i++]=BoolGetDatum(slot_contents.data.two_phase);
404404

405+
if (slot_contents.data.database==InvalidOid)
406+
nulls[i++]= true;
407+
else
408+
{
409+
if (slot_contents.data.invalidated!=RS_INVAL_NONE)
410+
values[i++]=BoolGetDatum(true);
411+
else
412+
values[i++]=BoolGetDatum(false);
413+
}
414+
405415
Assert(i==PG_GET_REPLICATION_SLOTS_COLS);
406416

407417
tuplestore_putvalues(rsinfo->setResult,rsinfo->setDesc,

‎src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/*yyyymmddN */
60-
#defineCATALOG_VERSION_NO202304072
60+
#defineCATALOG_VERSION_NO202304073
6161

6262
#endif

‎src/include/catalog/pg_proc.dat

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11077,9 +11077,9 @@
1107711077
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
1107811078
proretset => 't', provolatile => 's', prorettype => 'record',
1107911079
proargtypes => '',
11080-
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
11081-
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11082-
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
11080+
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
11081+
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11082+
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
1108311083
prosrc => 'pg_get_replication_slots' },
1108411084
{ oid => '3786', descr => 'set up a logical replication slot',
1108511085
proname => 'pg_create_logical_replication_slot', provolatile => 'v',

‎src/include/replication/slot.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
4646
RS_INVAL_NONE,
4747
/* required WAL has been removed */
4848
RS_INVAL_WAL_REMOVED,
49+
/* required rows have been removed */
50+
RS_INVAL_HORIZON,
51+
/* wal_level insufficient for slot */
52+
RS_INVAL_WAL_LEVEL,
4953
}ReplicationSlotInvalidationCause;
5054

5155
/*
@@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
226230
externXLogRecPtrReplicationSlotsComputeLogicalRestartLSN(void);
227231
externboolReplicationSlotsCountDBSlots(Oiddboid,int*nslots,int*nactive);
228232
externvoidReplicationSlotsDropDBSlots(Oiddboid);
229-
externboolInvalidateObsoleteReplicationSlots(XLogSegNooldestSegno);
233+
externboolInvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCausecause,
234+
XLogSegNooldestSegno,
235+
Oiddboid,
236+
TransactionIdsnapshotConflictHorizon);
230237
externReplicationSlot*SearchNamedReplicationSlot(constchar*name,boolneed_lock);
231238
externintReplicationSlotIndex(ReplicationSlot*slot);
232239
externboolReplicationSlotName(intindex,Namename);

‎src/test/regress/expected/rules.out

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
14721472
l.confirmed_flush_lsn,
14731473
l.wal_status,
14741474
l.safe_wal_size,
1475-
l.two_phase
1476-
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
1475+
l.two_phase,
1476+
l.conflicting
1477+
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
14771478
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
14781479
pg_roles| SELECT pg_authid.rolname,
14791480
pg_authid.rolsuper,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp