@@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
221221static void RemoveGXact (GlobalTransaction gxact );
222222
223223static void XlogReadTwoPhaseData (XLogRecPtr lsn ,char * * buf ,int * len );
224- static char * ProcessTwoPhaseBuffer (TransactionId xid ,
224+ static char * ProcessTwoPhaseBuffer (FullTransactionId xid ,
225225XLogRecPtr prepare_start_lsn ,
226226bool fromdisk ,bool setParent ,bool setNextXid );
227227static void MarkAsPreparingGuts (GlobalTransaction gxact ,TransactionId xid ,
228228const char * gid ,TimestampTz prepared_at ,Oid owner ,
229229Oid databaseid );
230- static void RemoveTwoPhaseFile (TransactionId xid ,bool giveWarning );
230+ static void RemoveTwoPhaseFile (FullTransactionId fxid ,bool giveWarning );
231231static void RecreateTwoPhaseFile (TransactionId xid ,void * content ,int len );
232232
233233/*
@@ -927,41 +927,26 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
927927/************************************************************************/
928928
929929/*
930- * Compute the FullTransactionId for the given TransactionId.
931- *
932- * The wrap logic is safe here because the span of active xids cannot exceed one
933- * epoch at any given time.
930+ * Compute FullTransactionId for the given TransactionId, using the current
931+ * epoch.
934932 */
935933static inline FullTransactionId
936- AdjustToFullTransactionId (TransactionId xid )
934+ FullTransactionIdFromCurrentEpoch (TransactionId xid )
937935{
936+ FullTransactionId fxid ;
938937FullTransactionId nextFullXid ;
939- TransactionId nextXid ;
940938uint32 epoch ;
941939
942- Assert (TransactionIdIsValid (xid ));
943-
944- LWLockAcquire (XidGenLock ,LW_SHARED );
945- nextFullXid = TransamVariables -> nextXid ;
946- LWLockRelease (XidGenLock );
947-
948- nextXid = XidFromFullTransactionId (nextFullXid );
940+ nextFullXid = ReadNextFullTransactionId ();
949941epoch = EpochFromFullTransactionId (nextFullXid );
950- if (unlikely (xid > nextXid ))
951- {
952- /* Wraparound occurred, must be from a prev epoch. */
953- Assert (epoch > 0 );
954- epoch -- ;
955- }
956942
957- return FullTransactionIdFromEpochAndXid (epoch ,xid );
943+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
944+ return fxid ;
958945}
959946
960947static inline int
961- TwoPhaseFilePath (char * path ,TransactionId xid )
948+ TwoPhaseFilePath (char * path ,FullTransactionId fxid )
962949{
963- FullTransactionId fxid = AdjustToFullTransactionId (xid );
964-
965950return snprintf (path ,MAXPGPATH ,TWOPHASE_DIR "/%08X%08X" ,
966951EpochFromFullTransactionId (fxid ),
967952XidFromFullTransactionId (fxid ));
@@ -1297,7 +1282,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
12971282 * If it looks OK (has a valid magic number and CRC), return the palloc'd
12981283 * contents of the file, issuing an error when finding corrupted data. If
12991284 * missing_ok is true, which indicates that missing files can be safely
1300- * ignored, then return NULL. This state can be reached when doing recovery.
1285+ * ignored, then return NULL. This state can be reached when doing recovery
1286+ * after discarding two-phase files from other epochs.
13011287 */
13021288static char *
13031289ReadTwoPhaseFile (TransactionId xid ,bool missing_ok )
@@ -1311,8 +1297,10 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
13111297pg_crc32c calc_crc ,
13121298file_crc ;
13131299int r ;
1300+ FullTransactionId fxid ;
13141301
1315- TwoPhaseFilePath (path ,xid );
1302+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1303+ TwoPhaseFilePath (path ,fxid );
13161304
13171305fd = OpenTransientFile (path ,O_RDONLY |PG_BINARY );
13181306if (fd < 0 )
@@ -1677,10 +1665,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
16771665AtEOXact_PgStat (isCommit , false);
16781666
16791667/*
1680- * And now we can clean up any files we may have left.
1668+ * And now we can clean up any files we may have left. These should be
1669+ * from the current epoch.
16811670 */
16821671if (ondisk )
1683- RemoveTwoPhaseFile (xid , true);
1672+ {
1673+ FullTransactionId fxid ;
1674+
1675+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1676+ RemoveTwoPhaseFile (fxid , true);
1677+ }
16841678
16851679MyLockedGxact = NULL ;
16861680
@@ -1719,13 +1713,17 @@ ProcessRecords(char *bufptr, TransactionId xid,
17191713 *
17201714 * If giveWarning is false, do not complain about file-not-present;
17211715 * this is an expected case during WAL replay.
1716+ *
1717+ * This routine is used at early stages at recovery where future and
1718+ * past orphaned files are checked, hence the FullTransactionId to build
1719+ * a complete file name fit for the removal.
17221720 */
17231721static void
1724- RemoveTwoPhaseFile (TransactionId xid ,bool giveWarning )
1722+ RemoveTwoPhaseFile (FullTransactionId fxid ,bool giveWarning )
17251723{
17261724char path [MAXPGPATH ];
17271725
1728- TwoPhaseFilePath (path ,xid );
1726+ TwoPhaseFilePath (path ,fxid );
17291727if (unlink (path ))
17301728if (errno != ENOENT || giveWarning )
17311729ereport (WARNING ,
@@ -1745,13 +1743,16 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
17451743char path [MAXPGPATH ];
17461744pg_crc32c statefile_crc ;
17471745int fd ;
1746+ FullTransactionId fxid ;
17481747
17491748/* Recompute CRC */
17501749INIT_CRC32C (statefile_crc );
17511750COMP_CRC32C (statefile_crc ,content ,len );
17521751FIN_CRC32C (statefile_crc );
17531752
1754- TwoPhaseFilePath (path ,xid );
1753+ /* Use current epoch */
1754+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1755+ TwoPhaseFilePath (path ,fxid );
17551756
17561757fd = OpenTransientFile (path ,
17571758O_CREAT |O_TRUNC |O_WRONLY |PG_BINARY );
@@ -1899,7 +1900,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
18991900 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
19001901 * This is called once at the beginning of recovery, saving any extra
19011902 * lookups in the future. Two-phase files that are newer than the
1902- * minimum XID horizon are discarded on the way.
1903+ * minimum XID horizon are discarded on the way. Two-phase files with
1904+ * an epoch older or newer than the current checkpoint's record epoch
1905+ * are also discarded.
19031906 */
19041907void
19051908restoreTwoPhaseData (void )
@@ -1914,14 +1917,11 @@ restoreTwoPhaseData(void)
19141917if (strlen (clde -> d_name )== 16 &&
19151918strspn (clde -> d_name ,"0123456789ABCDEF" )== 16 )
19161919{
1917- TransactionId xid ;
19181920FullTransactionId fxid ;
19191921char * buf ;
19201922
19211923fxid = FullTransactionIdFromU64 (strtou64 (clde -> d_name ,NULL ,16 ));
1922- xid = XidFromFullTransactionId (fxid );
1923-
1924- buf = ProcessTwoPhaseBuffer (xid ,InvalidXLogRecPtr ,
1924+ buf = ProcessTwoPhaseBuffer (fxid ,InvalidXLogRecPtr ,
19251925true, false, false);
19261926if (buf == NULL )
19271927continue ;
@@ -1972,6 +1972,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
19721972TransactionId origNextXid = XidFromFullTransactionId (nextXid );
19731973TransactionId result = origNextXid ;
19741974TransactionId * xids = NULL ;
1975+ uint32 epoch = EpochFromFullTransactionId (nextXid );
19751976int nxids = 0 ;
19761977int allocsize = 0 ;
19771978int i ;
@@ -1980,14 +1981,20 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
19801981for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
19811982{
19821983TransactionId xid ;
1984+ FullTransactionId fxid ;
19831985char * buf ;
19841986GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
19851987
19861988Assert (gxact -> inredo );
19871989
19881990xid = gxact -> xid ;
19891991
1990- buf = ProcessTwoPhaseBuffer (xid ,
1992+ /*
1993+ * All two-phase files with past and future epoch in pg_twophase are
1994+ * gone at this point, so we're OK to rely on only the current epoch.
1995+ */
1996+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
1997+ buf = ProcessTwoPhaseBuffer (fxid ,
19911998gxact -> prepare_start_lsn ,
19921999gxact -> ondisk , false, true);
19932000
@@ -2049,19 +2056,31 @@ void
20492056StandbyRecoverPreparedTransactions (void )
20502057{
20512058int i ;
2059+ uint32 epoch ;
2060+ FullTransactionId nextFullXid ;
2061+
2062+ /* get current epoch */
2063+ nextFullXid = ReadNextFullTransactionId ();
2064+ epoch = EpochFromFullTransactionId (nextFullXid );
20522065
20532066LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
20542067for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
20552068{
20562069TransactionId xid ;
2070+ FullTransactionId fxid ;
20572071char * buf ;
20582072GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
20592073
20602074Assert (gxact -> inredo );
20612075
20622076xid = gxact -> xid ;
20632077
2064- buf = ProcessTwoPhaseBuffer (xid ,
2078+ /*
2079+ * At this stage, we're OK to work with the current epoch as all past
2080+ * and future files have been already discarded.
2081+ */
2082+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
2083+ buf = ProcessTwoPhaseBuffer (fxid ,
20652084gxact -> prepare_start_lsn ,
20662085gxact -> ondisk , true, false);
20672086if (buf != NULL )
@@ -2090,18 +2109,29 @@ void
20902109RecoverPreparedTransactions (void )
20912110{
20922111int i ;
2112+ uint32 epoch ;
2113+ FullTransactionId nextFullXid ;
2114+
2115+ /* get current epoch */
2116+ nextFullXid = ReadNextFullTransactionId ();
2117+ epoch = EpochFromFullTransactionId (nextFullXid );
20932118
20942119LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
20952120for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
20962121{
20972122TransactionId xid ;
2123+ FullTransactionId fxid ;
20982124char * buf ;
20992125GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
21002126char * bufptr ;
21012127TwoPhaseFileHeader * hdr ;
21022128TransactionId * subxids ;
21032129const char * gid ;
21042130
2131+ /*
2132+ * At this stage, we're OK to work with the current epoch as all past
2133+ * and future files have been already discarded.
2134+ */
21052135xid = gxact -> xid ;
21062136
21072137/*
@@ -2113,7 +2143,8 @@ RecoverPreparedTransactions(void)
21132143 * SubTransSetParent has been set before, if the prepared transaction
21142144 * generated xid assignment records.
21152145 */
2116- buf = ProcessTwoPhaseBuffer (xid ,
2146+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
2147+ buf = ProcessTwoPhaseBuffer (fxid ,
21172148gxact -> prepare_start_lsn ,
21182149gxact -> ondisk , true, false);
21192150if (buf == NULL )
@@ -2181,7 +2212,7 @@ RecoverPreparedTransactions(void)
21812212/*
21822213 * ProcessTwoPhaseBuffer
21832214 *
2184- * Given atransaction id , read it either from disk or read it directly
2215+ * Given aFullTransactionId , read it either from disk or read it directly
21852216 * via shmem xlog record pointer using the provided "prepare_start_lsn".
21862217 *
21872218 * If setParent is true, set up subtransaction parent linkages.
@@ -2190,32 +2221,35 @@ RecoverPreparedTransactions(void)
21902221 * value scanned.
21912222 */
21922223static char *
2193- ProcessTwoPhaseBuffer (TransactionId xid ,
2224+ ProcessTwoPhaseBuffer (FullTransactionId fxid ,
21942225XLogRecPtr prepare_start_lsn ,
21952226bool fromdisk ,
21962227bool setParent ,bool setNextXid )
21972228{
21982229FullTransactionId nextXid = TransamVariables -> nextXid ;
2199- TransactionId origNextXid = XidFromFullTransactionId (nextXid );
22002230TransactionId * subxids ;
22012231char * buf ;
22022232TwoPhaseFileHeader * hdr ;
22032233int i ;
2234+ TransactionId xid = XidFromFullTransactionId (fxid );
22042235
22052236Assert (LWLockHeldByMeInMode (TwoPhaseStateLock ,LW_EXCLUSIVE ));
22062237
22072238if (!fromdisk )
22082239Assert (prepare_start_lsn != InvalidXLogRecPtr );
22092240
2210- /* Reject XID if too new */
2211- if (TransactionIdFollowsOrEquals (xid ,origNextXid ))
2241+ /*
2242+ * Reject full XID if too new. Note that this discards files from future
2243+ * epochs.
2244+ */
2245+ if (FullTransactionIdFollowsOrEquals (fxid ,nextXid ))
22122246{
22132247if (fromdisk )
22142248{
22152249ereport (WARNING ,
2216- (errmsg ("removing future two-phase state file for transaction %u" ,
2217- xid )));
2218- RemoveTwoPhaseFile (xid , true);
2250+ (errmsg ("removing future two-phase state fileof epoch %u for transaction %u" ,
2251+ EpochFromFullTransactionId ( fxid ), xid )));
2252+ RemoveTwoPhaseFile (fxid , true);
22192253}
22202254else
22212255{
@@ -2227,6 +2261,26 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22272261return NULL ;
22282262}
22292263
2264+ /* Discard files from past epochs */
2265+ if (EpochFromFullTransactionId (fxid )< EpochFromFullTransactionId (nextXid ))
2266+ {
2267+ if (fromdisk )
2268+ {
2269+ ereport (WARNING ,
2270+ (errmsg ("removing past two-phase state file of epoch %u for transaction %u" ,
2271+ EpochFromFullTransactionId (fxid ),xid )));
2272+ RemoveTwoPhaseFile (fxid , true);
2273+ }
2274+ else
2275+ {
2276+ ereport (WARNING ,
2277+ (errmsg ("removing past two-phase state from memory for transaction %u" ,
2278+ xid )));
2279+ PrepareRedoRemove (xid , true);
2280+ }
2281+ return NULL ;
2282+ }
2283+
22302284/* Already processed? */
22312285if (TransactionIdDidCommit (xid )|| TransactionIdDidAbort (xid ))
22322286{
@@ -2235,7 +2289,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22352289ereport (WARNING ,
22362290(errmsg ("removing stale two-phase state file for transaction %u" ,
22372291xid )));
2238- RemoveTwoPhaseFile (xid , true);
2292+ RemoveTwoPhaseFile (fxid , true);
22392293}
22402294else
22412295{
@@ -2521,8 +2575,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
25212575if (!XLogRecPtrIsInvalid (start_lsn ))
25222576{
25232577char path [MAXPGPATH ];
2578+ FullTransactionId fxid ;
25242579
2525- TwoPhaseFilePath (path ,hdr -> xid );
2580+ /* Use current epoch */
2581+ fxid = FullTransactionIdFromCurrentEpoch (hdr -> xid );
2582+ TwoPhaseFilePath (path ,fxid );
25262583
25272584if (access (path ,F_OK )== 0 )
25282585{
@@ -2617,7 +2674,15 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
26172674 */
26182675elog (DEBUG2 ,"removing 2PC data for transaction %u" ,xid );
26192676if (gxact -> ondisk )
2620- RemoveTwoPhaseFile (xid ,giveWarning );
2677+ {
2678+ FullTransactionId fxid ;
2679+
2680+ /*
2681+ * We should deal with a file at the current epoch here.
2682+ */
2683+ fxid = FullTransactionIdFromCurrentEpoch (xid );
2684+ RemoveTwoPhaseFile (fxid ,giveWarning );
2685+ }
26212686RemoveGXact (gxact );
26222687}
26232688