Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0bead9a

Browse files
author
Amit Kapila
committed
Immediately WAL-log subtransaction and top-level XID association.
The logical decoding infrastructure needs to know which top-leveltransaction the subxact belongs to, in order to decode all thechanges. Until now that might be delayed until commit, due to thecaching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiringincremental decoding.So we also write the assignment info into WAL immediately, as partof the next WAL record (to minimize overhead) only when wal_level=logical.We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that isrequired for avoiding overflow in the hot standby snapshot.Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID.Author: Tomas Vondra, Dilip Kumar, Amit KapilaReviewed-by: Amit KapilaTested-by: Neha Sharma and Mahendra Singh ThalorDiscussion:https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parentd05b172 commit0bead9a

File tree

9 files changed

+108
-24
lines changed

9 files changed

+108
-24
lines changed

‎src/backend/access/transam/xact.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ typedef struct TransactionStateData
191191
booldidLogXid;/* has xid been included in WAL record? */
192192
intparallelModeLevel;/* Enter/ExitParallelMode counter */
193193
boolchain;/* start a new block after this one */
194+
boolassigned;/* assigned to top-level XID */
194195
structTransactionStateData*parent;/* back link to parent */
195196
}TransactionStateData;
196197

@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
223224
staticTransactionStateDataTopTransactionStateData= {
224225
.state=TRANS_DEFAULT,
225226
.blockState=TBLOCK_DEFAULT,
227+
.assigned= false,
226228
};
227229

228230
/*
@@ -5120,6 +5122,7 @@ PushTransaction(void)
51205122
GetUserIdAndSecContext(&s->prevUser,&s->prevSecContext);
51215123
s->prevXactReadOnly=XactReadOnly;
51225124
s->parallelModeLevel=0;
5125+
s->assigned= false;
51235126

51245127
CurrentTransactionState=s;
51255128

@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
60226025
else
60236026
elog(PANIC,"xact_redo: unknown op code %u",info);
60246027
}
6028+
6029+
/*
6030+
* IsSubTransactionAssignmentPending
6031+
*
6032+
* This is used to decide whether we need to WAL log the top-level XID for
6033+
* operation in a subtransaction. We require that for logical decoding, see
6034+
* LogicalDecodingProcessRecord.
6035+
*
6036+
* This returns true if wal_level >= logical and we are inside a valid
6037+
* subtransaction, for which the assignment was not yet written to any WAL
6038+
* record.
6039+
*/
6040+
bool
6041+
IsSubTransactionAssignmentPending(void)
6042+
{
6043+
/* wal_level has to be logical */
6044+
if (!XLogLogicalInfoActive())
6045+
return false;
6046+
6047+
/* we need to be in a transaction state */
6048+
if (!IsTransactionState())
6049+
return false;
6050+
6051+
/* it has to be a subtransaction */
6052+
if (!IsSubTransaction())
6053+
return false;
6054+
6055+
/* the subtransaction has to have a XID assigned */
6056+
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
6057+
return false;
6058+
6059+
/* and it should not be already 'assigned' */
6060+
return !CurrentTransactionState->assigned;
6061+
}
6062+
6063+
/*
6064+
* MarkSubTransactionAssigned
6065+
*
6066+
* Mark the subtransaction assignment as completed.
6067+
*/
6068+
void
6069+
MarkSubTransactionAssigned(void)
6070+
{
6071+
Assert(IsSubTransactionAssignmentPending());
6072+
6073+
CurrentTransactionState->assigned= true;
6074+
}

‎src/backend/access/transam/xloginsert.c

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
8989
staticchar*hdr_scratch=NULL;
9090

9191
#defineSizeOfXlogOrigin(sizeof(RepOriginId) + sizeof(char))
92+
#defineSizeOfXLogTransactionId(sizeof(TransactionId) + sizeof(char))
9293

9394
#defineHEADER_SCRATCH_SIZE \
9495
(SizeOfXLogRecord + \
9596
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
96-
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
97+
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
98+
SizeOfXLogTransactionId)
9799

98100
/*
99101
* An array of XLogRecData structs, to hold registered data.
@@ -195,6 +197,10 @@ XLogResetInsertion(void)
195197
{
196198
inti;
197199

200+
/* reset the subxact assignment flag (if needed) */
201+
if (curinsert_flags&XLOG_INCLUDE_XID)
202+
MarkSubTransactionAssigned();
203+
198204
for (i=0;i<max_registered_block_id;i++)
199205
registered_buffers[i].in_use= false;
200206

@@ -398,7 +404,7 @@ void
398404
XLogSetRecordFlags(uint8flags)
399405
{
400406
Assert(begininsert_called);
401-
curinsert_flags=flags;
407+
curinsert_flags|=flags;
402408
}
403409

404410
/*
@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
748754
scratch+=sizeof(replorigin_session_origin);
749755
}
750756

757+
/* followed by toplevel XID, if not already included in previous record */
758+
if (IsSubTransactionAssignmentPending())
759+
{
760+
TransactionIdxid=GetTopTransactionIdIfAny();
761+
762+
/* update the flag (later used by XLogResetInsertion) */
763+
XLogSetRecordFlags(XLOG_INCLUDE_XID);
764+
765+
*(scratch++)= (char)XLR_BLOCK_ID_TOPLEVEL_XID;
766+
memcpy(scratch,&xid,sizeof(TransactionId));
767+
scratch+=sizeof(TransactionId);
768+
}
769+
751770
/* followed by main data, if any */
752771
if (mainrdata_len>0)
753772
{

‎src/backend/access/transam/xlogreader.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
11971197

11981198
state->decoded_record=record;
11991199
state->record_origin=InvalidRepOriginId;
1200+
state->toplevel_xid=InvalidTransactionId;
12001201

12011202
ptr= (char*)record;
12021203
ptr+=SizeOfXLogRecord;
@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
12351236
{
12361237
COPY_HEADER_FIELD(&state->record_origin,sizeof(RepOriginId));
12371238
}
1239+
elseif (block_id==XLR_BLOCK_ID_TOPLEVEL_XID)
1240+
{
1241+
COPY_HEADER_FIELD(&state->toplevel_xid,sizeof(TransactionId));
1242+
}
12381243
elseif (block_id <=XLR_MAX_BLOCK_ID)
12391244
{
12401245
/* XLogRecordBlockHeader */

‎src/backend/replication/logical/decode.c

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,27 @@ void
9494
LogicalDecodingProcessRecord(LogicalDecodingContext*ctx,XLogReaderState*record)
9595
{
9696
XLogRecordBufferbuf;
97+
TransactionIdtxid;
9798

9899
buf.origptr=ctx->reader->ReadRecPtr;
99100
buf.endptr=ctx->reader->EndRecPtr;
100101
buf.record=record;
101102

103+
txid=XLogRecGetTopXid(record);
104+
105+
/*
106+
* If the top-level xid is valid, we need to assign the subxact to the
107+
* top-level xact. We need to do this for all records, hence we do it
108+
* before the switch.
109+
*/
110+
if (TransactionIdIsValid(txid))
111+
{
112+
ReorderBufferAssignChild(ctx->reorder,
113+
txid,
114+
record->decoded_record->xl_xid,
115+
buf.origptr);
116+
}
117+
102118
/* cast so we get a warning when new rmgrs are added */
103119
switch ((RmgrId)XLogRecGetRmid(record))
104120
{
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
216232
/*
217233
* If the snapshot isn't yet fully built, we cannot decode anything, so
218234
* bail out.
219-
*
220-
* However, it's critical to process XLOG_XACT_ASSIGNMENT records even
221-
* when the snapshot is being built: it is possible to get later records
222-
* that require subxids to be properly assigned.
223235
*/
224-
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT&&
225-
info!=XLOG_XACT_ASSIGNMENT)
236+
if (SnapBuildCurrentState(builder)<SNAPBUILD_FULL_SNAPSHOT)
226237
return;
227238

228239
switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
264275
break;
265276
}
266277
caseXLOG_XACT_ASSIGNMENT:
267-
{
268-
xl_xact_assignment*xlrec;
269-
inti;
270-
TransactionId*sub_xid;
271278

272-
xlrec= (xl_xact_assignment*)XLogRecGetData(r);
273-
274-
sub_xid=&xlrec->xsub[0];
275-
276-
for (i=0;i<xlrec->nsubxacts;i++)
277-
{
278-
ReorderBufferAssignChild(reorder,xlrec->xtop,
279-
*(sub_xid++),buf->origptr);
280-
}
281-
break;
282-
}
279+
/*
280+
* We assign subxact to the toplevel xact while processing each
281+
* record if required. So, we don't need to do anything here.
282+
* See LogicalDecodingProcessRecord.
283+
*/
284+
break;
283285
caseXLOG_XACT_PREPARE:
284286

285287
/*

‎src/include/access/xact.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
428428
externvoidRegisterSubXactCallback(SubXactCallbackcallback,void*arg);
429429
externvoidUnregisterSubXactCallback(SubXactCallbackcallback,void*arg);
430430

431+
externboolIsSubTransactionAssignmentPending(void);
432+
externvoidMarkSubTransactionAssigned(void);
433+
431434
externintxactGetCommittedChildren(TransactionId**ptr);
432435

433436
externXLogRecPtrXactLogCommitRecord(TimestampTzcommit_time,

‎src/include/access/xlog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
237237
*/
238238
#defineXLOG_INCLUDE_ORIGIN0x01/* include the replication origin */
239239
#defineXLOG_MARK_UNIMPORTANT0x02/* record not important for durability */
240+
#defineXLOG_INCLUDE_XID0x04/* include XID of top-level xact */
240241

241242

242243
/* Checkpoint statistics */

‎src/include/access/xlog_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
/*
3232
* Each page of XLOG file has a header like this:
3333
*/
34-
#defineXLOG_PAGE_MAGIC0xD106/* can be used as WAL version indicator */
34+
#defineXLOG_PAGE_MAGIC0xD107/* can be used as WAL version indicator */
3535

3636
typedefstructXLogPageHeaderData
3737
{

‎src/include/access/xlogreader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ struct XLogReaderState
191191

192192
RepOriginIdrecord_origin;
193193

194+
TransactionIdtoplevel_xid;/* XID of top-level transaction */
195+
194196
/* information about blocks referenced by the record. */
195197
DecodedBkpBlockblocks[XLR_MAX_BLOCK_ID+1];
196198

@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
304306
#defineXLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
305307
#defineXLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
306308
#defineXLogRecGetOrigin(decoder) ((decoder)->record_origin)
309+
#defineXLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
307310
#defineXLogRecGetData(decoder) ((decoder)->main_data)
308311
#defineXLogRecGetDataLen(decoder) ((decoder)->main_data_len)
309312
#defineXLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)

‎src/include/access/xlogrecord.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
223223
#defineXLR_BLOCK_ID_DATA_SHORT255
224224
#defineXLR_BLOCK_ID_DATA_LONG254
225225
#defineXLR_BLOCK_ID_ORIGIN253
226+
#defineXLR_BLOCK_ID_TOPLEVEL_XID252
226227

227228
#endif/* XLOGRECORD_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp