Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1eb6d65

Browse files
Store 2PC GID in commit/abort WAL recs for logical decoding
Store GID of 2PC in commit/abort WAL records when wal_level = logical.This allows logical decoding to send the SAME gid to subscribersacross restarts of logical replication.Track relica origin replay progress for 2PC.(Edited from patch 0003 in the logical decoding 2PC series.)Authors: Nikhil Sontakke, Stas KelvichReviewed-by: Simon Riggs, Andres Freund
1 parent75e95dd commit1eb6d65

File tree

5 files changed

+230
-24
lines changed

5 files changed

+230
-24
lines changed

‎src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
102102
parsed->twophase_xid=xl_twophase->xid;
103103

104104
data+=sizeof(xl_xact_twophase);
105+
106+
if (parsed->xinfo&XACT_XINFO_HAS_GID)
107+
{
108+
intgidlen;
109+
strcpy(parsed->twophase_gid,data);
110+
gidlen=strlen(parsed->twophase_gid)+1;
111+
data+=MAXALIGN(gidlen);
112+
}
105113
}
106114

107115
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
139147
data+=sizeof(xl_xact_xinfo);
140148
}
141149

150+
if (parsed->xinfo&XACT_XINFO_HAS_DBINFO)
151+
{
152+
xl_xact_dbinfo*xl_dbinfo= (xl_xact_dbinfo*)data;
153+
154+
parsed->dbId=xl_dbinfo->dbId;
155+
parsed->tsId=xl_dbinfo->tsId;
156+
157+
data+=sizeof(xl_xact_dbinfo);
158+
}
159+
142160
if (parsed->xinfo&XACT_XINFO_HAS_SUBXACTS)
143161
{
144162
xl_xact_subxacts*xl_subxacts= (xl_xact_subxacts*)data;
@@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
168186
parsed->twophase_xid=xl_twophase->xid;
169187

170188
data+=sizeof(xl_xact_twophase);
189+
190+
if (parsed->xinfo&XACT_XINFO_HAS_GID)
191+
{
192+
intgidlen;
193+
strcpy(parsed->twophase_gid,data);
194+
gidlen=strlen(parsed->twophase_gid)+1;
195+
data+=MAXALIGN(gidlen);
196+
}
197+
}
198+
199+
if (parsed->xinfo&XACT_XINFO_HAS_ORIGIN)
200+
{
201+
xl_xact_originxl_origin;
202+
203+
/* we're only guaranteed 4 byte alignment, so copy onto stack */
204+
memcpy(&xl_origin,data,sizeof(xl_origin));
205+
206+
parsed->origin_lsn=xl_origin.origin_lsn;
207+
parsed->origin_timestamp=xl_origin.origin_timestamp;
208+
209+
data+=sizeof(xl_xact_origin);
171210
}
172211
}
173212

‎src/backend/access/transam/twophase.c

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,7 @@ intmax_prepared_xacts = 0;
144144
*
145145
* typedef struct GlobalTransactionData *GlobalTransaction appears in
146146
* twophase.h
147-
*
148-
* Note that the max value of GIDSIZE must fit in the uint16 gidlen,
149-
* specified in TwoPhaseFileHeader.
150147
*/
151-
#defineGIDSIZE 200
152148

153149
typedefstructGlobalTransactionData
154150
{
@@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
211207
RelFileNode*rels,
212208
intninvalmsgs,
213209
SharedInvalidationMessage*invalmsgs,
214-
boolinitfileinval);
210+
boolinitfileinval,
211+
constchar*gid);
215212
staticvoidRecordTransactionAbortPrepared(TransactionIdxid,
216213
intnchildren,
217214
TransactionId*children,
218215
intnrels,
219-
RelFileNode*rels);
216+
RelFileNode*rels,
217+
constchar*gid);
220218
staticvoidProcessRecords(char*bufptr,TransactionIdxid,
221219
constTwoPhaseCallbackcallbacks[]);
222220
staticvoidRemoveGXact(GlobalTransactiongxact);
@@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
898896
/*
899897
* Header for a 2PC state file
900898
*/
901-
#defineTWOPHASE_MAGIC0x57F94533/* format identifier */
899+
#defineTWOPHASE_MAGIC0x57F94534/* format identifier */
902900

903901
typedefstructTwoPhaseFileHeader
904902
{
@@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader
914912
int32ninvalmsgs;/* number of cache invalidation messages */
915913
boolinitfileinval;/* does relcache init file need invalidation? */
916914
uint16gidlen;/* length of the GID - GID follows the header */
915+
XLogRecPtrorigin_lsn;/* lsn of this record at origin node */
916+
TimestampTzorigin_timestamp;/* time of prepare at origin node */
917917
}TwoPhaseFileHeader;
918918

919919
/*
@@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact)
10651065
{
10661066
TwoPhaseFileHeader*hdr;
10671067
StateFileChunk*record;
1068+
boolreplorigin;
10681069

10691070
/* Add the end sentinel to the list of 2PC records */
10701071
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID,0,
@@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact)
10751076
Assert(hdr->magic==TWOPHASE_MAGIC);
10761077
hdr->total_len=records.total_len+sizeof(pg_crc32c);
10771078

1079+
replorigin= (replorigin_session_origin!=InvalidRepOriginId&&
1080+
replorigin_session_origin!=DoNotReplicateId);
1081+
1082+
if (replorigin)
1083+
{
1084+
Assert(replorigin_session_origin_lsn!=InvalidXLogRecPtr);
1085+
hdr->origin_lsn=replorigin_session_origin_lsn;
1086+
hdr->origin_timestamp=replorigin_session_origin_timestamp;
1087+
}
1088+
else
1089+
{
1090+
hdr->origin_lsn=InvalidXLogRecPtr;
1091+
hdr->origin_timestamp=0;
1092+
}
1093+
10781094
/*
10791095
* If the data size exceeds MaxAllocSize, we won't be able to read it in
10801096
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact)
11071123
XLogBeginInsert();
11081124
for (record=records.head;record!=NULL;record=record->next)
11091125
XLogRegisterData(record->data,record->len);
1126+
1127+
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1128+
11101129
gxact->prepare_end_lsn=XLogInsert(RM_XACT_ID,XLOG_XACT_PREPARE);
1130+
1131+
if (replorigin)
1132+
/* Move LSNs forward for this replication origin */
1133+
replorigin_session_advance(replorigin_session_origin_lsn,
1134+
gxact->prepare_end_lsn);
1135+
11111136
XLogFlush(gxact->prepare_end_lsn);
11121137

11131138
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
12831308
returnbuf;
12841309
}
12851310

1311+
/*
1312+
* ParsePrepareRecord
1313+
*/
1314+
void
1315+
ParsePrepareRecord(uint8info,char*xlrec,xl_xact_parsed_prepare*parsed)
1316+
{
1317+
TwoPhaseFileHeader*hdr;
1318+
char*bufptr;
1319+
1320+
hdr= (TwoPhaseFileHeader*)xlrec;
1321+
bufptr=xlrec+MAXALIGN(sizeof(TwoPhaseFileHeader));
1322+
1323+
parsed->origin_lsn=hdr->origin_lsn;
1324+
parsed->origin_timestamp=hdr->origin_timestamp;
1325+
parsed->twophase_xid=hdr->xid;
1326+
parsed->dbId=hdr->database;
1327+
parsed->nsubxacts=hdr->nsubxacts;
1328+
parsed->nrels=hdr->ncommitrels;
1329+
parsed->nabortrels=hdr->nabortrels;
1330+
parsed->nmsgs=hdr->ninvalmsgs;
1331+
1332+
strncpy(parsed->twophase_gid,bufptr,hdr->gidlen);
1333+
bufptr+=MAXALIGN(hdr->gidlen);
1334+
1335+
parsed->subxacts= (TransactionId*)bufptr;
1336+
bufptr+=MAXALIGN(hdr->nsubxacts*sizeof(TransactionId));
1337+
1338+
parsed->xnodes= (RelFileNode*)bufptr;
1339+
bufptr+=MAXALIGN(hdr->ncommitrels*sizeof(RelFileNode));
1340+
1341+
parsed->abortnodes= (RelFileNode*)bufptr;
1342+
bufptr+=MAXALIGN(hdr->nabortrels*sizeof(RelFileNode));
1343+
1344+
parsed->msgs= (SharedInvalidationMessage*)bufptr;
1345+
bufptr+=MAXALIGN(hdr->ninvalmsgs*sizeof(SharedInvalidationMessage));
1346+
}
1347+
1348+
12861349

12871350
/*
12881351
* Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14351498
hdr->nsubxacts,children,
14361499
hdr->ncommitrels,commitrels,
14371500
hdr->ninvalmsgs,invalmsgs,
1438-
hdr->initfileinval);
1501+
hdr->initfileinval,gid);
14391502
else
14401503
RecordTransactionAbortPrepared(xid,
14411504
hdr->nsubxacts,children,
1442-
hdr->nabortrels,abortrels);
1505+
hdr->nabortrels,abortrels,
1506+
gid);
14431507

14441508
ProcArrayRemove(proc,latestXid);
14451509

@@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void)
17521816
if (buf==NULL)
17531817
continue;
17541818

1755-
PrepareRedoAdd(buf,InvalidXLogRecPtr,InvalidXLogRecPtr);
1819+
PrepareRedoAdd(buf,InvalidXLogRecPtr,
1820+
InvalidXLogRecPtr,InvalidRepOriginId);
17561821
}
17571822
}
17581823
LWLockRelease(TwoPhaseStateLock);
@@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
21652230
RelFileNode*rels,
21662231
intninvalmsgs,
21672232
SharedInvalidationMessage*invalmsgs,
2168-
boolinitfileinval)
2233+
boolinitfileinval,
2234+
constchar*gid)
21692235
{
21702236
XLogRecPtrrecptr;
21712237
TimestampTzcommitts=GetCurrentTimestamp();
@@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
21932259
ninvalmsgs,invalmsgs,
21942260
initfileinval, false,
21952261
MyXactFlags |XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2196-
xid);
2262+
xid,gid);
21972263

21982264

21992265
if (replorigin)
@@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
22552321
intnchildren,
22562322
TransactionId*children,
22572323
intnrels,
2258-
RelFileNode*rels)
2324+
RelFileNode*rels,
2325+
constchar*gid)
22592326
{
22602327
XLogRecPtrrecptr;
22612328

@@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
22782345
nchildren,children,
22792346
nrels,rels,
22802347
MyXactFlags |XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2281-
xid);
2348+
xid,gid);
22822349

22832350
/* Always flush, since we're about to remove the 2PC state file */
22842351
XLogFlush(recptr);
@@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
23092376
* data, the entry is marked as located on disk.
23102377
*/
23112378
void
2312-
PrepareRedoAdd(char*buf,XLogRecPtrstart_lsn,XLogRecPtrend_lsn)
2379+
PrepareRedoAdd(char*buf,XLogRecPtrstart_lsn,
2380+
XLogRecPtrend_lsn,RepOriginIdorigin_id)
23132381
{
23142382
TwoPhaseFileHeader*hdr= (TwoPhaseFileHeader*)buf;
23152383
char*bufptr;
@@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
23582426
Assert(TwoPhaseState->numPrepXacts<max_prepared_xacts);
23592427
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++]=gxact;
23602428

2429+
if (origin_id!=InvalidRepOriginId)
2430+
{
2431+
/* recover apply progress */
2432+
replorigin_advance(origin_id,hdr->origin_lsn,end_lsn,
2433+
false/* backward */ , false/* WAL */ );
2434+
}
2435+
23612436
elog(DEBUG2,"added 2PC data in shared memory for transaction %u",gxact->xid);
23622437
}
23632438

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp