Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7fea40d

Browse files
committed
Rewrite storing origin in prepared record
1 parent7c8d160 commit7fea40d

File tree

3 files changed

+24
-59
lines changed

3 files changed

+24
-59
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11651165
:ts->status==TRANSACTION_STATUS_COMMITTED ?"committed" :"not prepared",
11661166
ts->xid,ts->gid);
11671167
}
1168-
//Assert(ts->xid != 10000);
1168+
Assert(ts->xid!=10000);
11691169
if (x->csn>ts->csn||Mtm->status==MTM_RECOVERY) {
11701170
Assert(x->csn!=INVALID_CSN);
11711171
ts->csn=x->csn;

‎src/backend/access/transam/twophase.c‎

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ typedef struct TwoPhaseFileHeader
871871
int32ninvalmsgs;/* number of cache invalidation messages */
872872
boolinitfileinval;/* does relcache init file need invalidation? */
873873
uint16gidlen;/* length of the GID - GID follows the header */
874+
xl_xact_originxl_origin;/* replication origin information */
874875
}TwoPhaseFileHeader;
875876

876877
/*
@@ -1022,12 +1023,10 @@ EndPrepare(GlobalTransaction gxact)
10221023
{
10231024
TwoPhaseFileHeader*hdr;
10241025
StateFileChunk*record;
1025-
xl_xact_originxl_origin;
1026-
xl_xact_xinfoxl_xinfo;
10271026
uint8info=XLOG_XACT_PREPARE;
1028-
boolreplorigin;
1027+
boolreplorigin;
10291028

1030-
replorigin= (replorigin_session_origin!=InvalidRepOriginId&&
1029+
replorigin= (replorigin_session_origin!=InvalidRepOriginId&&
10311030
replorigin_session_origin!=DoNotReplicateId);
10321031

10331032
/* Add the end sentinel to the list of 2PC records */
@@ -1039,6 +1038,15 @@ EndPrepare(GlobalTransaction gxact)
10391038
Assert(hdr->magic==TWOPHASE_MAGIC);
10401039
hdr->total_len=records.total_len+sizeof(pg_crc32c);
10411040

1041+
if (replorigin) {
1042+
Assert(replorigin_session_origin_lsn!=InvalidXLogRecPtr);
1043+
hdr->xl_origin.origin_lsn=replorigin_session_origin_lsn;
1044+
hdr->xl_origin.origin_timestamp=replorigin_session_origin_timestamp;
1045+
}else {
1046+
hdr->xl_origin.origin_lsn=InvalidXLogRecPtr;
1047+
hdr->xl_origin.origin_timestamp=0;
1048+
}
1049+
10421050
/*
10431051
* If the data size exceeds MaxAllocSize, we won't be able to read it in
10441052
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1065,31 +1073,12 @@ EndPrepare(GlobalTransaction gxact)
10651073
XLogEnsureRecordSpace(0,records.num_chunks);
10661074

10671075
START_CRIT_SECTION();
1068-
xl_xinfo.xinfo=0;
1069-
/* dump transaction origin information */
1070-
if (replorigin)
1071-
{
1072-
xl_xinfo.xinfo |=XACT_XINFO_HAS_ORIGIN;
1073-
Assert(replorigin_session_origin_lsn!=InvalidXLogRecPtr);
1074-
xl_origin.origin_lsn=replorigin_session_origin_lsn;
1075-
xl_origin.origin_timestamp=replorigin_session_origin_timestamp;
1076-
}
1077-
1078-
if (xl_xinfo.xinfo!=0)
1079-
info |=XLOG_XACT_HAS_INFO;
1080-
10811076
MyPgXact->delayChkpt= true;
10821077

10831078
XLogBeginInsert();
10841079
for (record=records.head;record!=NULL;record=record->next)
10851080
XLogRegisterData(record->data,record->len);
10861081

1087-
if (xl_xinfo.xinfo!=0)
1088-
XLogRegisterData((char*) (&xl_xinfo.xinfo),sizeof(xl_xinfo.xinfo));
1089-
1090-
if (xl_xinfo.xinfo&XACT_XINFO_HAS_ORIGIN)
1091-
XLogRegisterData((char*) (&xl_origin),sizeof(xl_xact_origin));
1092-
10931082
XLogIncludeOrigin();
10941083

10951084
gxact->prepare_end_lsn=XLogInsert(RM_XACT_ID,info);
@@ -1285,6 +1274,9 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12851274
hdr= (TwoPhaseFileHeader*)xlrec;
12861275
bufptr=xlrec+MAXALIGN(sizeof(TwoPhaseFileHeader));
12871276

1277+
parsed->origin_lsn=hdr->xl_origin.origin_lsn;
1278+
parsed->origin_timestamp=hdr->xl_origin.origin_timestamp;
1279+
12881280
strncpy(parsed->twophase_gid,bufptr,hdr->gidlen);
12891281
bufptr+=MAXALIGN(hdr->gidlen);
12901282

@@ -1305,33 +1297,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
13051297

13061298
parsed->msgs= (SharedInvalidationMessage*)bufptr;
13071299
bufptr+=MAXALIGN(hdr->ninvalmsgs*sizeof(SharedInvalidationMessage));
1308-
1309-
/*
1310-
* Parse xinfo now.
1311-
*/
1312-
1313-
parsed->xinfo=0;
1314-
bufptr=xlrec+hdr->total_len-sizeof(pg_crc32c);
1315-
1316-
if (info&XLOG_XACT_HAS_INFO)
1317-
{
1318-
xl_xact_xinfo*xl_xinfo= (xl_xact_xinfo*)bufptr;
1319-
parsed->xinfo=xl_xinfo->xinfo;
1320-
bufptr+=sizeof(xl_xact_xinfo);
1321-
}
1322-
1323-
1324-
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
1325-
{
1326-
xl_xact_originxl_origin;
1327-
/* we're only guaranteed 4 byte alignment, so copy onto stack */
1328-
memcpy(&xl_origin,bufptr,sizeof(xl_origin));
1329-
parsed->origin_lsn=xl_origin.origin_lsn;
1330-
Assert(parsed->origin_lsn!=0);
1331-
parsed->origin_timestamp=xl_origin.origin_timestamp;
1332-
bufptr+=sizeof(xl_xact_origin);
1333-
}
1334-
13351300
}
13361301

13371302

‎src/backend/access/transam/xact.c‎

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5657,20 +5657,20 @@ xact_redo(XLogReaderState *record)
56575657
}
56585658
elseif (info==XLOG_XACT_PREPARE)
56595659
{
5660-
xl_xact_parsed_prepareparsed;
5661-
5662-
ParsePrepareRecord(XLogRecGetXid(record),XLogRecGetData(record),&parsed);
5660+
RepOriginIdoriginId=XLogRecGetOrigin(record);
56635661

56645662
/* the record contents are exactly the 2PC file */
56655663
RecreateTwoPhaseFile(XLogRecGetXid(record),
5666-
XLogRecGetData(record),XLogRecGetDataLen(record));
5664+
XLogRecGetData(record),XLogRecGetDataLen(record));
56675665

5668-
if (parsed.xinfo&XACT_XINFO_HAS_ORIGIN)
5666+
if (originId!=InvalidRepOriginId&&originId!=DoNotReplicateId)
56695667
{
5670-
Assert(XLogRecGetOrigin(record)!=InvalidRepOriginId);
5668+
xl_xact_parsed_prepareparsed;
5669+
ParsePrepareRecord(XLogRecGetXid(record),XLogRecGetData(record),&parsed);
5670+
Assert(parsed.origin_lsn!=InvalidXLogRecPtr);
56715671
/* recover apply progress */
5672-
replorigin_advance(XLogRecGetOrigin(record),parsed.origin_lsn,
5673-
record->EndRecPtr, false/* backward */ , false/* WAL */ );
5672+
replorigin_advance(originId,parsed.origin_lsn,
5673+
record->EndRecPtr, false/* backward */ , false/* WAL */ );
56745674
}
56755675
}
56765676
elseif (info==XLOG_XACT_ASSIGNMENT)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp