@@ -144,11 +144,7 @@ intmax_prepared_xacts = 0;
144144 *
145145 * typedef struct GlobalTransactionData *GlobalTransaction appears in
146146 * twophase.h
147- *
148- * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
149- * specified in TwoPhaseFileHeader.
150147 */
151- #define GIDSIZE 200
152148
153149typedef struct GlobalTransactionData
154150{
@@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
211207RelFileNode * rels ,
212208int ninvalmsgs ,
213209SharedInvalidationMessage * invalmsgs ,
214- bool initfileinval );
210+ bool initfileinval ,
211+ const char * gid );
215212static void RecordTransactionAbortPrepared (TransactionId xid ,
216213int nchildren ,
217214TransactionId * children ,
218215int nrels ,
219- RelFileNode * rels );
216+ RelFileNode * rels ,
217+ const char * gid );
220218static void ProcessRecords (char * bufptr ,TransactionId xid ,
221219const TwoPhaseCallback callbacks []);
222220static void RemoveGXact (GlobalTransaction gxact );
@@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
898896/*
899897 * Header for a 2PC state file
900898 */
901- #define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
899+ #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
902900
903901typedef struct TwoPhaseFileHeader
904902{
@@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader
914912int32 ninvalmsgs ;/* number of cache invalidation messages */
915913bool initfileinval ;/* does relcache init file need invalidation? */
916914uint16 gidlen ;/* length of the GID - GID follows the header */
915+ XLogRecPtr origin_lsn ;/* lsn of this record at origin node */
916+ TimestampTz origin_timestamp ;/* time of prepare at origin node */
917917}TwoPhaseFileHeader ;
918918
919919/*
@@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact)
10651065{
10661066TwoPhaseFileHeader * hdr ;
10671067StateFileChunk * record ;
1068+ bool replorigin ;
10681069
10691070/* Add the end sentinel to the list of 2PC records */
10701071RegisterTwoPhaseRecord (TWOPHASE_RM_END_ID ,0 ,
@@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact)
10751076Assert (hdr -> magic == TWOPHASE_MAGIC );
10761077hdr -> total_len = records .total_len + sizeof (pg_crc32c );
10771078
1079+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1080+ replorigin_session_origin != DoNotReplicateId );
1081+
1082+ if (replorigin )
1083+ {
1084+ Assert (replorigin_session_origin_lsn != InvalidXLogRecPtr );
1085+ hdr -> origin_lsn = replorigin_session_origin_lsn ;
1086+ hdr -> origin_timestamp = replorigin_session_origin_timestamp ;
1087+ }
1088+ else
1089+ {
1090+ hdr -> origin_lsn = InvalidXLogRecPtr ;
1091+ hdr -> origin_timestamp = 0 ;
1092+ }
1093+
10781094/*
10791095 * If the data size exceeds MaxAllocSize, we won't be able to read it in
10801096 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact)
11071123XLogBeginInsert ();
11081124for (record = records .head ;record != NULL ;record = record -> next )
11091125XLogRegisterData (record -> data ,record -> len );
1126+
1127+ XLogSetRecordFlags (XLOG_INCLUDE_ORIGIN );
1128+
11101129gxact -> prepare_end_lsn = XLogInsert (RM_XACT_ID ,XLOG_XACT_PREPARE );
1130+
1131+ if (replorigin )
1132+ /* Move LSNs forward for this replication origin */
1133+ replorigin_session_advance (replorigin_session_origin_lsn ,
1134+ gxact -> prepare_end_lsn );
1135+
11111136XLogFlush (gxact -> prepare_end_lsn );
11121137
11131138/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
12831308return buf ;
12841309}
12851310
1311+ /*
1312+ * ParsePrepareRecord
1313+ */
1314+ void
1315+ ParsePrepareRecord (uint8 info ,char * xlrec ,xl_xact_parsed_prepare * parsed )
1316+ {
1317+ TwoPhaseFileHeader * hdr ;
1318+ char * bufptr ;
1319+
1320+ hdr = (TwoPhaseFileHeader * )xlrec ;
1321+ bufptr = xlrec + MAXALIGN (sizeof (TwoPhaseFileHeader ));
1322+
1323+ parsed -> origin_lsn = hdr -> origin_lsn ;
1324+ parsed -> origin_timestamp = hdr -> origin_timestamp ;
1325+ parsed -> twophase_xid = hdr -> xid ;
1326+ parsed -> dbId = hdr -> database ;
1327+ parsed -> nsubxacts = hdr -> nsubxacts ;
1328+ parsed -> nrels = hdr -> ncommitrels ;
1329+ parsed -> nabortrels = hdr -> nabortrels ;
1330+ parsed -> nmsgs = hdr -> ninvalmsgs ;
1331+
1332+ strncpy (parsed -> twophase_gid ,bufptr ,hdr -> gidlen );
1333+ bufptr += MAXALIGN (hdr -> gidlen );
1334+
1335+ parsed -> subxacts = (TransactionId * )bufptr ;
1336+ bufptr += MAXALIGN (hdr -> nsubxacts * sizeof (TransactionId ));
1337+
1338+ parsed -> xnodes = (RelFileNode * )bufptr ;
1339+ bufptr += MAXALIGN (hdr -> ncommitrels * sizeof (RelFileNode ));
1340+
1341+ parsed -> abortnodes = (RelFileNode * )bufptr ;
1342+ bufptr += MAXALIGN (hdr -> nabortrels * sizeof (RelFileNode ));
1343+
1344+ parsed -> msgs = (SharedInvalidationMessage * )bufptr ;
1345+ bufptr += MAXALIGN (hdr -> ninvalmsgs * sizeof (SharedInvalidationMessage ));
1346+ }
1347+
1348+
12861349
12871350/*
12881351 * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14351498hdr -> nsubxacts ,children ,
14361499hdr -> ncommitrels ,commitrels ,
14371500hdr -> ninvalmsgs ,invalmsgs ,
1438- hdr -> initfileinval );
1501+ hdr -> initfileinval , gid );
14391502else
14401503RecordTransactionAbortPrepared (xid ,
14411504hdr -> nsubxacts ,children ,
1442- hdr -> nabortrels ,abortrels );
1505+ hdr -> nabortrels ,abortrels ,
1506+ gid );
14431507
14441508ProcArrayRemove (proc ,latestXid );
14451509
@@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void)
17521816if (buf == NULL )
17531817continue ;
17541818
1755- PrepareRedoAdd (buf ,InvalidXLogRecPtr ,InvalidXLogRecPtr );
1819+ PrepareRedoAdd (buf ,InvalidXLogRecPtr ,
1820+ InvalidXLogRecPtr ,InvalidRepOriginId );
17561821}
17571822}
17581823LWLockRelease (TwoPhaseStateLock );
@@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
21652230RelFileNode * rels ,
21662231int ninvalmsgs ,
21672232SharedInvalidationMessage * invalmsgs ,
2168- bool initfileinval )
2233+ bool initfileinval ,
2234+ const char * gid )
21692235{
21702236XLogRecPtr recptr ;
21712237TimestampTz committs = GetCurrentTimestamp ();
@@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
21932259ninvalmsgs ,invalmsgs ,
21942260initfileinval , false,
21952261MyXactFlags |XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK ,
2196- xid );
2262+ xid , gid );
21972263
21982264
21992265if (replorigin )
@@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
22552321int nchildren ,
22562322TransactionId * children ,
22572323int nrels ,
2258- RelFileNode * rels )
2324+ RelFileNode * rels ,
2325+ const char * gid )
22592326{
22602327XLogRecPtr recptr ;
22612328
@@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
22782345nchildren ,children ,
22792346nrels ,rels ,
22802347MyXactFlags |XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK ,
2281- xid );
2348+ xid , gid );
22822349
22832350/* Always flush, since we're about to remove the 2PC state file */
22842351XLogFlush (recptr );
@@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
23092376 * data, the entry is marked as located on disk.
23102377 */
23112378void
2312- PrepareRedoAdd (char * buf ,XLogRecPtr start_lsn ,XLogRecPtr end_lsn )
2379+ PrepareRedoAdd (char * buf ,XLogRecPtr start_lsn ,
2380+ XLogRecPtr end_lsn ,RepOriginId origin_id )
23132381{
23142382TwoPhaseFileHeader * hdr = (TwoPhaseFileHeader * )buf ;
23152383char * bufptr ;
@@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
23582426Assert (TwoPhaseState -> numPrepXacts < max_prepared_xacts );
23592427TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ++ ]= gxact ;
23602428
2429+ if (origin_id != InvalidRepOriginId )
2430+ {
2431+ /* recover apply progress */
2432+ replorigin_advance (origin_id ,hdr -> origin_lsn ,end_lsn ,
2433+ false/* backward */ , false/* WAL */ );
2434+ }
2435+
23612436elog (DEBUG2 ,"added 2PC data in shared memory for transaction %u" ,gxact -> xid );
23622437}
23632438