Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc6ff84b

Browse files
committed
Emit invalidations to standby for transactions without xid.
So far, when a transaction with pending invalidations, but without anassigned xid, committed, we simply ignored those invalidationmessages. That's problematic, because those are actually sent for areason.Known symptoms of this include that existing sessions on a hot-standbyreplica sometimes fail to notice new concurrently built indexes andvisibility map updates.The solution is to WAL log such invalidations in transactions without anxid. We considered to alternatively force-assign an xid, but that'd beproblematic for vacuum, which might be run in systems with few xids.Important: This adds a new WAL record, but as the patch has to beback-patched, we can't bump the WAL page magic. This means that standbyshave to be updated before primaries; otherwise"PANIC: standby_redo: unknown op code 32" errors can be encountered.XXX:Reported-By: Васильев Дмитрий, Masahiko SawadaDiscussion: CAB-SwXY6oH=9twBkXJtgR4UC1NqT-vpYAtxCseME62ADwyK5OA@mail.gmail.com CAD21AoDpZ6Xjg=gFrGPnSn4oTRRcwK1EBrWCq9OqOHuAcMMC=w@mail.gmail.com
1 parent2ac3be2 commitc6ff84b

File tree

10 files changed

+181
-48
lines changed

10 files changed

+181
-48
lines changed

‎src/backend/access/rmgrdesc/standbydesc.c

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
5858

5959
standby_desc_running_xacts(buf,xlrec);
6060
}
61+
elseif (info==XLOG_INVALIDATIONS)
62+
{
63+
xl_invalidations*xlrec= (xl_invalidations*)rec;
64+
65+
standby_desc_invalidations(buf,xlrec->nmsgs,xlrec->msgs,
66+
xlrec->dbId,xlrec->tsId,
67+
xlrec->relcacheInitFileInval);
68+
}
6169
}
6270

6371
constchar*
@@ -73,7 +81,53 @@ standby_identify(uint8 info)
7381
caseXLOG_RUNNING_XACTS:
7482
id="RUNNING_XACTS";
7583
break;
84+
caseXLOG_INVALIDATIONS:
85+
id="INVALIDATIONS";
86+
break;
7687
}
7788

7889
returnid;
7990
}
91+
92+
/*
93+
* This routine is used by both standby_desc and xact_desc, because
94+
* transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
95+
* it seems pointless to duplicate the code.
96+
*/
97+
void
98+
standby_desc_invalidations(StringInfobuf,
99+
intnmsgs,SharedInvalidationMessage*msgs,
100+
OiddbId,OidtsId,
101+
boolrelcacheInitFileInval)
102+
{
103+
inti;
104+
105+
if (relcacheInitFileInval)
106+
appendStringInfo(buf,"; relcache init file inval dbid %u tsid %u",
107+
dbId,tsId);
108+
109+
appendStringInfoString(buf,"; inval msgs:");
110+
for (i=0;i<nmsgs;i++)
111+
{
112+
SharedInvalidationMessage*msg=&msgs[i];
113+
114+
if (msg->id >=0)
115+
appendStringInfo(buf," catcache %d",msg->id);
116+
elseif (msg->id==SHAREDINVALCATALOG_ID)
117+
appendStringInfo(buf," catalog %u",msg->cat.catId);
118+
elseif (msg->id==SHAREDINVALRELCACHE_ID)
119+
appendStringInfo(buf," relcache %u",msg->rc.relId);
120+
/* not expected, but print something anyway */
121+
elseif (msg->id==SHAREDINVALSMGR_ID)
122+
appendStringInfoString(buf," smgr");
123+
/* not expected, but print something anyway */
124+
elseif (msg->id==SHAREDINVALRELMAP_ID)
125+
appendStringInfoString(buf," relmap");
126+
elseif (msg->id==SHAREDINVALRELMAP_ID)
127+
appendStringInfo(buf," relmap db %u",msg->rm.dbId);
128+
elseif (msg->id==SHAREDINVALSNAPSHOT_ID)
129+
appendStringInfo(buf," snapshot %u",msg->sn.relId);
130+
else
131+
appendStringInfo(buf," unknown id %d",msg->id);
132+
}
133+
}

‎src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include"access/xact.h"
1919
#include"catalog/catalog.h"
2020
#include"storage/sinval.h"
21+
#include"storage/standbydefs.h"
2122
#include"utils/timestamp.h"
2223

2324
/*
@@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
203204
}
204205
if (parsed.nmsgs>0)
205206
{
206-
if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
207-
appendStringInfo(buf,"; relcache init file inval dbid %u tsid %u",
208-
parsed.dbId,parsed.tsId);
209-
210-
appendStringInfoString(buf,"; inval msgs:");
211-
for (i=0;i<parsed.nmsgs;i++)
212-
{
213-
SharedInvalidationMessage*msg=&parsed.msgs[i];
214-
215-
if (msg->id >=0)
216-
appendStringInfo(buf," catcache %d",msg->id);
217-
elseif (msg->id==SHAREDINVALCATALOG_ID)
218-
appendStringInfo(buf," catalog %u",msg->cat.catId);
219-
elseif (msg->id==SHAREDINVALRELCACHE_ID)
220-
appendStringInfo(buf," relcache %u",msg->rc.relId);
221-
/* not expected, but print something anyway */
222-
elseif (msg->id==SHAREDINVALSMGR_ID)
223-
appendStringInfoString(buf," smgr");
224-
/* not expected, but print something anyway */
225-
elseif (msg->id==SHAREDINVALRELMAP_ID)
226-
appendStringInfoString(buf," relmap");
227-
elseif (msg->id==SHAREDINVALSNAPSHOT_ID)
228-
appendStringInfo(buf," snapshot %u",msg->sn.relId);
229-
else
230-
appendStringInfo(buf," unknown id %d",msg->id);
231-
}
207+
standby_desc_invalidations(
208+
buf,parsed.nmsgs,parsed.msgs,parsed.dbId,parsed.tsId,
209+
XactCompletionRelcacheInitFileInval(parsed.xinfo));
232210
}
233211

234212
if (XactCompletionForceSyncCommit(parsed.xinfo))

‎src/backend/access/transam/xact.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,24 @@ RecordTransactionCommit(void)
11631163
/* Can't have child XIDs either; AssignTransactionId enforces this */
11641164
Assert(nchildren==0);
11651165

1166+
/*
1167+
* Transactions without an assigned xid can contain invalidation
1168+
* messages (e.g. explicit relcache invalidations or catcache
1169+
* invalidations for inplace updates); standbys need to process
1170+
* those. We can't emit a commit record without an xid, and we don't
1171+
* want to force assigning an xid, because that'd be problematic for
1172+
* e.g. vacuum. Hence we emit a bespoke record for the
1173+
* invalidations. We don't want to use that in case a commit record is
1174+
* emitted, so they happen synchronously with commits (besides not
1175+
* wanting to emit more WAL recoreds).
1176+
*/
1177+
if (nmsgs!=0)
1178+
{
1179+
LogStandbyInvalidations(nmsgs,invalMessages,
1180+
RelcacheInitFileInval);
1181+
wrote_xlog= true;/* not strictly necessary */
1182+
}
1183+
11661184
/*
11671185
* If we didn't create XLOG entries, we're done here; otherwise we
11681186
* should trigger flushing those entries the same as a commit record

‎src/backend/replication/logical/decode.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
327327
break;
328328
caseXLOG_STANDBY_LOCK:
329329
break;
330+
caseXLOG_INVALIDATIONS:
331+
{
332+
xl_invalidations*invalidations=
333+
(xl_invalidations*)XLogRecGetData(r);
334+
335+
ReorderBufferImmediateInvalidation(
336+
ctx->reorder,invalidations->nmsgs,invalidations->msgs);
337+
}
338+
break;
330339
default:
331340
elog(ERROR,"unexpected RM_STANDBY_ID record type: %u",info);
332341
}

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,33 +1810,46 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
18101810
* catalog and we need to update the caches according to that.
18111811
*/
18121812
if (txn->base_snapshot!=NULL&&txn->ninvalidations>0)
1813-
{
1814-
booluse_subtxn=IsTransactionOrTransactionBlock();
1815-
1816-
if (use_subtxn)
1817-
BeginInternalSubTransaction("replay");
1818-
1819-
/*
1820-
* Force invalidations to happen outside of a valid transaction - that
1821-
* way entries will just be marked as invalid without accessing the
1822-
* catalog. That's advantageous because we don't need to setup the
1823-
* full state necessary for catalog access.
1824-
*/
1825-
if (use_subtxn)
1826-
AbortCurrentTransaction();
1827-
1828-
ReorderBufferExecuteInvalidations(rb,txn);
1829-
1830-
if (use_subtxn)
1831-
RollbackAndReleaseCurrentSubTransaction();
1832-
}
1813+
ReorderBufferImmediateInvalidation(rb,txn->ninvalidations,
1814+
txn->invalidations);
18331815
else
18341816
Assert(txn->ninvalidations==0);
18351817

18361818
/* remove potential on-disk data, and deallocate */
18371819
ReorderBufferCleanupTXN(rb,txn);
18381820
}
18391821

1822+
/*
1823+
* Execute invalidations happening outside the context of a decoded
1824+
* transaction. That currently happens either for xid-less commits
1825+
* (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
1826+
* transactions (via ReorderBufferForget()).
1827+
*/
1828+
void
1829+
ReorderBufferImmediateInvalidation(ReorderBuffer*rb,uint32ninvalidations,
1830+
SharedInvalidationMessage*invalidations)
1831+
{
1832+
booluse_subtxn=IsTransactionOrTransactionBlock();
1833+
inti;
1834+
1835+
if (use_subtxn)
1836+
BeginInternalSubTransaction("replay");
1837+
1838+
/*
1839+
* Force invalidations to happen outside of a valid transaction - that
1840+
* way entries will just be marked as invalid without accessing the
1841+
* catalog. That's advantageous because we don't need to setup the
1842+
* full state necessary for catalog access.
1843+
*/
1844+
if (use_subtxn)
1845+
AbortCurrentTransaction();
1846+
1847+
for (i=0;i<ninvalidations;i++)
1848+
LocalExecuteInvalidationMessage(&invalidations[i]);
1849+
1850+
if (use_subtxn)
1851+
RollbackAndReleaseCurrentSubTransaction();
1852+
}
18401853

18411854
/*
18421855
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at

‎src/backend/storage/ipc/standby.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record)
825825

826826
ProcArrayApplyRecoveryInfo(&running);
827827
}
828+
elseif (info==XLOG_INVALIDATIONS)
829+
{
830+
xl_invalidations*xlrec= (xl_invalidations*)XLogRecGetData(record);
831+
832+
ProcessCommittedInvalidationMessages(xlrec->msgs,
833+
xlrec->nmsgs,
834+
xlrec->relcacheInitFileInval,
835+
xlrec->dbId,
836+
xlrec->tsId);
837+
}
828838
else
829839
elog(PANIC,"standby_redo: unknown op code %u",info);
830840
}
@@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void)
10681078
*/
10691079
(void)GetTopTransactionId();
10701080
}
1081+
1082+
/*
1083+
* Emit WAL for invalidations. This currently is only used for commits without
1084+
* an xid but which contain invalidations.
1085+
*/
1086+
void
1087+
LogStandbyInvalidations(intnmsgs,SharedInvalidationMessage*msgs,
1088+
boolrelcacheInitFileInval)
1089+
{
1090+
xl_invalidationsxlrec;
1091+
1092+
/* prepare record */
1093+
memset(&xlrec,0,sizeof(xlrec));
1094+
xlrec.dbId=MyDatabaseId;
1095+
xlrec.tsId=MyDatabaseTableSpace;
1096+
xlrec.relcacheInitFileInval=relcacheInitFileInval;
1097+
xlrec.nmsgs=nmsgs;
1098+
1099+
/* perform insertion */
1100+
XLogBeginInsert();
1101+
XLogRegisterData((char*) (&xlrec),MinSizeOfInvalidations);
1102+
XLogRegisterData((char*)msgs,
1103+
nmsgs*sizeof(SharedInvalidationMessage));
1104+
XLogInsert(RM_STANDBY_ID,XLOG_INVALIDATIONS);
1105+
}

‎src/backend/utils/cache/inval.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
842842
}
843843

844844
/*
845-
* ProcessCommittedInvalidationMessages is executed by xact_redo_commit()
846-
* to process invalidation messages added to commit records.
845+
* ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or
846+
* standby_redo() to process invalidation messages. Currently that happens
847+
* only at end-of-xact.
847848
*
848849
* Relcache init file invalidation requires processing both
849850
* before and after we send the SI messages. See AtEOXact_Inval()

‎src/include/replication/reorderbuffer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
391391
CommandIdcmin,CommandIdcmax,CommandIdcombocid);
392392
voidReorderBufferAddInvalidations(ReorderBuffer*,TransactionId,XLogRecPtrlsn,
393393
Sizenmsgs,SharedInvalidationMessage*msgs);
394+
voidReorderBufferImmediateInvalidation(ReorderBuffer*,uint32ninvalidations,
395+
SharedInvalidationMessage*invalidations);
394396
voidReorderBufferProcessXid(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
395397
voidReorderBufferXidSetCatalogChanges(ReorderBuffer*,TransactionIdxid,XLogRecPtrlsn);
396398
boolReorderBufferXidHasCatalogChanges(ReorderBuffer*,TransactionIdxid);

‎src/include/storage/standby.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
8585
externvoidLogAccessExclusiveLockPrepare(void);
8686

8787
externXLogRecPtrLogStandbySnapshot(void);
88+
externvoidLogStandbyInvalidations(intnmsgs,SharedInvalidationMessage*msgs,
89+
boolrelcacheInitFileInval);
8890

8991
#endif/* STANDBY_H */

‎src/include/storage/standbydefs.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,23 @@
1717
#include"access/xlogreader.h"
1818
#include"lib/stringinfo.h"
1919
#include"storage/lockdefs.h"
20+
#include"storage/sinval.h"
2021

2122
/* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
2223
externvoidstandby_redo(XLogReaderState*record);
2324
externvoidstandby_desc(StringInfobuf,XLogReaderState*record);
2425
externconstchar*standby_identify(uint8info);
26+
externvoidstandby_desc_invalidations(StringInfobuf,
27+
intnmsgs,SharedInvalidationMessage*msgs,
28+
OiddbId,OidtsId,
29+
boolrelcacheInitFileInval);
2530

2631
/*
2732
* XLOG message types
2833
*/
2934
#defineXLOG_STANDBY_LOCK0x00
3035
#defineXLOG_RUNNING_XACTS0x10
36+
#defineXLOG_INVALIDATIONS0x20
3137

3238
typedefstructxl_standby_locks
3339
{
@@ -50,4 +56,19 @@ typedef struct xl_running_xacts
5056
TransactionIdxids[FLEXIBLE_ARRAY_MEMBER];
5157
}xl_running_xacts;
5258

59+
/*
60+
* Invalidations for standby, currently only when transactions without an
61+
* assigned xid commit.
62+
*/
63+
typedefstructxl_invalidations
64+
{
65+
OiddbId;/* MyDatabaseId */
66+
OidtsId;/* MyDatabaseTableSpace */
67+
boolrelcacheInitFileInval;/* invalidate relcache init file */
68+
intnmsgs;/* number of shared inval msgs */
69+
SharedInvalidationMessagemsgs[FLEXIBLE_ARRAY_MEMBER];
70+
}xl_invalidations;
71+
72+
#defineMinSizeOfInvalidations offsetof(xl_invalidations, msgs)
73+
5374
#endif/* STANDBYDEFS_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp