Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc55040c

Browse files
author
Amit Kapila
committed
WAL Log invalidations at command end with wal_level=logical.
When wal_level=logical, write invalidations at command end into WAL sothat decoding can use this information.This patch is required to allow the streaming of in-progress transactionsin logical decoding.  The actual work to allow streaming will be committedas a separate patch.We still add the invalidations to the cache and write them to WAL atcommit time in RecordTransactionCommit(). This uses the existingXLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resourcemanager (see LogStandbyInvalidations for details).So existing code relying on those invalidations (e.g. redo) does not needto be changed.The invalidations written at command end uses a new xlog record typeXLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. SeeLogLogicalInvalidations for details.These new xlog records are ignored by existing redo procedures, whichstill rely on the invalidations written to commit records.The invalidations are decoded and accumulated in top-transaction, and thenexecuted during replay.  This obviates the need to decode theinvalidations as part of a commit record.Bump XLOG_PAGE_MAGIC, since this introduces XLOG_XACT_INVALIDATIONS.Author: Dilip Kumar, Tomas Vondra, Amit KapilaReviewed-by: Amit KapilaTested-by: Neha Sharma and Mahendra Singh ThalorDiscussion:https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent38f60f1 commitc55040c

File tree

9 files changed

+166
-34
lines changed

9 files changed

+166
-34
lines changed

‎src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,13 @@ xact_desc(StringInfo buf, XLogReaderState *record)
396396
appendStringInfo(buf,"xtop %u: ",xlrec->xtop);
397397
xact_desc_assignment(buf,xlrec);
398398
}
399+
elseif (info==XLOG_XACT_INVALIDATIONS)
400+
{
401+
xl_xact_invals*xlrec= (xl_xact_invals*)rec;
402+
403+
standby_desc_invalidations(buf,xlrec->nmsgs,xlrec->msgs,InvalidOid,
404+
InvalidOid, false);
405+
}
399406
}
400407

401408
constchar*
@@ -423,6 +430,9 @@ xact_identify(uint8 info)
423430
caseXLOG_XACT_ASSIGNMENT:
424431
id="ASSIGNMENT";
425432
break;
433+
caseXLOG_XACT_INVALIDATIONS:
434+
id="INVALIDATION";
435+
break;
426436
}
427437

428438
returnid;

‎src/backend/access/transam/xact.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,16 @@ RecordTransactionCommit(void)
12241224
boolRelcacheInitFileInval= false;
12251225
boolwrote_xlog;
12261226

1227+
/*
1228+
* Log pending invalidations for logical decoding of in-progress
1229+
* transactions. Normally for DDLs, we log this at each command end,
1230+
* however, for certain cases where we directly update the system table
1231+
* without a transaction block, the invalidations are not logged till this
1232+
* time.
1233+
*/
1234+
if (XLogLogicalInfoActive())
1235+
LogLogicalInvalidations();
1236+
12271237
/* Get data needed for commit record */
12281238
nrels=smgrGetPendingDeletes(true,&rels);
12291239
nchildren=xactGetCommittedChildren(&children);
@@ -6022,6 +6032,13 @@ xact_redo(XLogReaderState *record)
60226032
ProcArrayApplyXidAssignment(xlrec->xtop,
60236033
xlrec->nsubxacts,xlrec->xsub);
60246034
}
6035+
elseif (info==XLOG_XACT_INVALIDATIONS)
6036+
{
6037+
/*
6038+
* XXX we do ignore this for now, what matters are invalidations
6039+
* written into the commit record.
6040+
*/
6041+
}
60256042
else
60266043
elog(PANIC,"xact_redo: unknown op code %u",info);
60276044
}

‎src/backend/replication/logical/decode.c

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
278278

279279
/*
280280
* We assign subxact to the toplevel xact while processing each
281-
* record if required. So, we don't need to do anything here.
282-
*SeeLogicalDecodingProcessRecord.
281+
* record if required. So, we don't need to do anything here. See
282+
* LogicalDecodingProcessRecord.
283283
*/
284284
break;
285+
caseXLOG_XACT_INVALIDATIONS:
286+
{
287+
TransactionIdxid;
288+
xl_xact_invals*invals;
289+
290+
xid=XLogRecGetXid(r);
291+
invals= (xl_xact_invals*)XLogRecGetData(r);
292+
293+
/*
294+
* Execute the invalidations for xid-less transactions,
295+
* otherwise, accumulate them so that they can be processed at
296+
* the commit time.
297+
*/
298+
if (TransactionIdIsValid(xid))
299+
{
300+
if (!ctx->fast_forward)
301+
ReorderBufferAddInvalidations(reorder,xid,
302+
buf->origptr,
303+
invals->nmsgs,
304+
invals->msgs);
305+
ReorderBufferXidSetCatalogChanges(ctx->reorder,xid,
306+
buf->origptr);
307+
}
308+
elseif ((!ctx->fast_forward))
309+
ReorderBufferImmediateInvalidation(ctx->reorder,
310+
invals->nmsgs,
311+
invals->msgs);
312+
}
313+
break;
285314
caseXLOG_XACT_PREPARE:
286315

287316
/*
@@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
334363
caseXLOG_STANDBY_LOCK:
335364
break;
336365
caseXLOG_INVALIDATIONS:
337-
{
338-
xl_invalidations*invalidations=
339-
(xl_invalidations*)XLogRecGetData(r);
340366

341-
if (!ctx->fast_forward)
342-
ReorderBufferImmediateInvalidation(ctx->reorder,
343-
invalidations->nmsgs,
344-
invalidations->msgs);
345-
}
367+
/*
368+
* We are processing the invalidations at the command level via
369+
* XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
370+
*/
346371
break;
347372
default:
348373
elog(ERROR,"unexpected RM_STANDBY_ID record type: %u",info);
@@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
573598
commit_time=parsed->origin_timestamp;
574599
}
575600

576-
/*
577-
* Process invalidation messages, even if we're not interested in the
578-
* transaction's contents, since the various caches need to always be
579-
* consistent.
580-
*/
581-
if (parsed->nmsgs>0)
582-
{
583-
if (!ctx->fast_forward)
584-
ReorderBufferAddInvalidations(ctx->reorder,xid,buf->origptr,
585-
parsed->nmsgs,parsed->msgs);
586-
ReorderBufferXidSetCatalogChanges(ctx->reorder,xid,buf->origptr);
587-
}
588-
589601
SnapBuildCommitTxn(ctx->snapshot_builder,buf->origptr,xid,
590602
parsed->nsubxacts,parsed->subxacts);
591603

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
856856
subtxn->toplevel_xid=xid;
857857
Assert(subtxn->nsubtxns==0);
858858

859+
/* set the reference to top-level transaction */
860+
subtxn->toptxn=txn;
861+
859862
/* add to subtransaction list */
860863
dlist_push_tail(&txn->subtxns,&subtxn->node);
861864
txn->nsubtxns++;
@@ -2201,7 +2204,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
22012204
/*
22022205
* Setup the invalidation of the toplevel transaction.
22032206
*
2204-
* This needs to be done before ReorderBufferCommit is called!
2207+
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
2208+
* accumulates all the invalidation messages in the toplevel transaction.
2209+
* This is required because in some cases where we skip processing the
2210+
* transaction (see ReorderBufferForget), we need to execute all the
2211+
* invalidations together.
22052212
*/
22062213
void
22072214
ReorderBufferAddInvalidations(ReorderBuffer*rb,TransactionIdxid,
@@ -2212,17 +2219,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
22122219

22132220
txn=ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
22142221

2215-
if (txn->ninvalidations!=0)
2216-
elog(ERROR,"only ever add one set of invalidations");
2222+
/*
2223+
* We collect all the invalidations under the top transaction so that we
2224+
* can execute them all together.
2225+
*/
2226+
if (txn->toptxn)
2227+
txn=txn->toptxn;
22172228

22182229
Assert(nmsgs>0);
22192230

2220-
txn->ninvalidations=nmsgs;
2221-
txn->invalidations= (SharedInvalidationMessage*)
2222-
MemoryContextAlloc(rb->context,
2223-
sizeof(SharedInvalidationMessage)*nmsgs);
2224-
memcpy(txn->invalidations,msgs,
2225-
sizeof(SharedInvalidationMessage)*nmsgs);
2231+
/* Accumulate invalidations. */
2232+
if (txn->ninvalidations==0)
2233+
{
2234+
txn->ninvalidations=nmsgs;
2235+
txn->invalidations= (SharedInvalidationMessage*)
2236+
MemoryContextAlloc(rb->context,
2237+
sizeof(SharedInvalidationMessage)*nmsgs);
2238+
memcpy(txn->invalidations,msgs,
2239+
sizeof(SharedInvalidationMessage)*nmsgs);
2240+
}
2241+
else
2242+
{
2243+
txn->invalidations= (SharedInvalidationMessage*)
2244+
repalloc(txn->invalidations,sizeof(SharedInvalidationMessage)*
2245+
(txn->ninvalidations+nmsgs));
2246+
2247+
memcpy(txn->invalidations+txn->ninvalidations,msgs,
2248+
nmsgs*sizeof(SharedInvalidationMessage));
2249+
txn->ninvalidations+=nmsgs;
2250+
}
22262251
}
22272252

22282253
/*
@@ -2250,6 +2275,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
22502275
txn=ReorderBufferTXNByXid(rb,xid, true,NULL,lsn, true);
22512276

22522277
txn->txn_flags |=RBTXN_HAS_CATALOG_CHANGES;
2278+
2279+
/*
2280+
* Mark top-level transaction as having catalog changes too if one of its
2281+
* children has so that the ReorderBufferBuildTupleCidHash can
2282+
* conveniently check just top-level transaction and decide whether to
2283+
* build the hash table or not.
2284+
*/
2285+
if (txn->toptxn!=NULL)
2286+
txn->toptxn->txn_flags |=RBTXN_HAS_CATALOG_CHANGES;
22532287
}
22542288

22552289
/*

‎src/backend/utils/cache/inval.c

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
*worth trying to avoid sending such inval traffic in the future, if those
8686
*problems can be overcome cheaply.
8787
*
88+
*When wal_level=logical, write invalidations into WAL at each command end to
89+
*support the decoding of the in-progress transactions. See
90+
*CommandEndInvalidationMessages.
8891
*
8992
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
9093
* Portions Copyright (c) 1994, Regents of the University of California
@@ -1094,6 +1097,11 @@ CommandEndInvalidationMessages(void)
10941097

10951098
ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
10961099
LocalExecuteInvalidationMessage);
1100+
1101+
/* WAL Log per-command invalidation messages for wal_level=logical */
1102+
if (XLogLogicalInfoActive())
1103+
LogLogicalInvalidations();
1104+
10971105
AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
10981106
&transInvalInfo->CurrentCmdInvalidMsgs);
10991107
}
@@ -1501,3 +1509,49 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
15011509
i=ccitem->link-1;
15021510
}
15031511
}
1512+
1513+
/*
1514+
* LogLogicalInvalidations
1515+
*
1516+
* Emit WAL for invalidations. This is currently only used for logging
1517+
* invalidations at the command end or at commit time if any invalidations
1518+
* are pending.
1519+
*/
1520+
void
1521+
LogLogicalInvalidations()
1522+
{
1523+
xl_xact_invalsxlrec;
1524+
SharedInvalidationMessage*invalMessages;
1525+
intnmsgs=0;
1526+
1527+
/* Quick exit if we haven't done anything with invalidation messages. */
1528+
if (transInvalInfo==NULL)
1529+
return;
1530+
1531+
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
1532+
MakeSharedInvalidMessagesArray);
1533+
1534+
Assert(!(numSharedInvalidMessagesArray>0&&
1535+
SharedInvalidMessagesArray==NULL));
1536+
1537+
invalMessages=SharedInvalidMessagesArray;
1538+
nmsgs=numSharedInvalidMessagesArray;
1539+
SharedInvalidMessagesArray=NULL;
1540+
numSharedInvalidMessagesArray=0;
1541+
1542+
if (nmsgs>0)
1543+
{
1544+
/* prepare record */
1545+
memset(&xlrec,0,MinSizeOfXactInvals);
1546+
xlrec.nmsgs=nmsgs;
1547+
1548+
/* perform insertion */
1549+
XLogBeginInsert();
1550+
XLogRegisterData((char*) (&xlrec),MinSizeOfXactInvals);
1551+
XLogRegisterData((char*)invalMessages,
1552+
nmsgs*sizeof(SharedInvalidationMessage));
1553+
XLogInsert(RM_XACT_ID,XLOG_XACT_INVALIDATIONS);
1554+
1555+
pfree(invalMessages);
1556+
}
1557+
}

‎src/include/access/xact.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
146146
#defineXLOG_XACT_COMMIT_PREPARED0x30
147147
#defineXLOG_XACT_ABORT_PREPARED0x40
148148
#defineXLOG_XACT_ASSIGNMENT0x50
149-
/* free opcode0x60 */
149+
#defineXLOG_XACT_INVALIDATIONS0x60
150150
/* free opcode 0x70 */
151151

152152
/* mask for filtering opcodes out of xl_info */

‎src/include/access/xlog_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
/*
3232
* Each page of XLOG file has a header like this:
3333
*/
34-
#defineXLOG_PAGE_MAGIC0xD107/* can be used as WAL version indicator */
34+
#defineXLOG_PAGE_MAGIC0xD108/* can be used as WAL version indicator */
3535

3636
typedefstructXLogPageHeaderData
3737
{

‎src/include/replication/reorderbuffer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ typedef struct ReorderBufferTXN
220220
*/
221221
XLogRecPtrend_lsn;
222222

223+
/* Toplevel transaction for this subxact (NULL for top-level). */
224+
structReorderBufferTXN*toptxn;
225+
223226
/*
224227
* LSN of the last lsn at which snapshot information reside, so we can
225228
* restart decoding from there and fully recover this transaction from

‎src/include/utils/inval.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,6 @@ extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
6161
externvoidCallSyscacheCallbacks(intcacheid,uint32hashvalue);
6262

6363
externvoidInvalidateSystemCaches(void);
64+
65+
externvoidLogLogicalInvalidations(void);
6466
#endif/* INVAL_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp