@@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
221221static void RemoveGXact (GlobalTransaction gxact );
222222
223223static void XlogReadTwoPhaseData (XLogRecPtr lsn ,char * * buf ,int * len );
224- static char * ProcessTwoPhaseBuffer (TransactionId xid ,
224+ static char * ProcessTwoPhaseBuffer (FullTransactionId xid ,
225225XLogRecPtr prepare_start_lsn ,
226226bool fromdisk ,bool setParent ,bool setNextXid );
227227static void MarkAsPreparingGuts (GlobalTransaction gxact ,TransactionId xid ,
228228const char * gid ,TimestampTz prepared_at ,Oid owner ,
229229Oid databaseid );
230- static void RemoveTwoPhaseFile (TransactionId xid ,bool giveWarning );
230+ static void RemoveTwoPhaseFile (FullTransactionId fxid ,bool giveWarning );
231231static void RecreateTwoPhaseFile (TransactionId xid ,void * content ,int len );
232232
233233/*
@@ -927,41 +927,26 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
927927/************************************************************************/
928928
929929/*
930- * Compute the FullTransactionId for the given TransactionId.
931- *
932- * The wrap logic is safe here because the span of active xids cannot exceed one
933- * epoch at any given time.
930+ * Compute FullTransactionId for the given TransactionId, using the current
931+ * epoch.
934932 */
935933static inline FullTransactionId
936- AdjustToFullTransactionId (TransactionId xid )
934+ FullTransactionIdFromCurrentEpoch (TransactionId xid )
937935{
936+ FullTransactionId fxid ;
938937FullTransactionId nextFullXid ;
939- TransactionId nextXid ;
940938uint32 epoch ;
941939
942- Assert (TransactionIdIsValid (xid ));
943-
944- LWLockAcquire (XidGenLock ,LW_SHARED );
945- nextFullXid = TransamVariables -> nextXid ;
946- LWLockRelease (XidGenLock );
947-
948- nextXid = XidFromFullTransactionId (nextFullXid );
940+ nextFullXid = ReadNextFullTransactionId ();
949941epoch = EpochFromFullTransactionId (nextFullXid );
950- if (unlikely (xid > nextXid ))
951- {
952- /* Wraparound occurred, must be from a prev epoch. */
953- Assert (epoch > 0 );
954- epoch -- ;
955- }
956942
957- return FullTransactionIdFromEpochAndXid (epoch ,xid );
943+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
944+ return fxid ;
958945}
959946
960947static inline int
961- TwoPhaseFilePath (char * path ,TransactionId xid )
948+ TwoPhaseFilePath (char * path ,FullTransactionId fxid )
962949{
963- FullTransactionId fxid = AdjustToFullTransactionId (xid );
964-
965950return snprintf (path ,MAXPGPATH ,TWOPHASE_DIR "/%08X%08X" ,
966951EpochFromFullTransactionId (fxid ),
967952XidFromFullTransactionId (fxid ));
@@ -1297,7 +1282,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
12971282 * If it looks OK (has a valid magic number and CRC), return the palloc'd
12981283 * contents of the file, issuing an error when finding corrupted data. If
12991284 * missing_ok is true, which indicates that missing files can be safely
1300- * ignored, then return NULL. This state can be reached when doing recovery.
1285+ * ignored, then return NULL. This state can be reached when doing recovery
1286+ * after discarding two-phase files from other epochs.
13011287 */
13021288static char *
13031289ReadTwoPhaseFile (TransactionId xid ,bool missing_ok )
@@ -1311,8 +1297,10 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
13111297pg_crc32c calc_crc ,
13121298file_crc ;
13131299int r ;
1300+ FullTransactionId fxid ;
13141301
1315- TwoPhaseFilePath (path ,xid );
1302+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1303+ TwoPhaseFilePath (path ,fxid );
13161304
13171305fd = OpenTransientFile (path ,O_RDONLY |PG_BINARY );
13181306if (fd < 0 )
@@ -1677,10 +1665,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
16771665AtEOXact_PgStat (isCommit , false);
16781666
16791667/*
1680- * And now we can clean up any files we may have left.
1668+ * And now we can clean up any files we may have left. These should be
1669+ * from the current epoch.
16811670 */
16821671if (ondisk )
1683- RemoveTwoPhaseFile (xid , true);
1672+ {
1673+ FullTransactionId fxid ;
1674+
1675+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1676+ RemoveTwoPhaseFile (fxid , true);
1677+ }
16841678
16851679MyLockedGxact = NULL ;
16861680
@@ -1718,13 +1712,17 @@ ProcessRecords(char *bufptr, TransactionId xid,
17181712 *
17191713 * If giveWarning is false, do not complain about file-not-present;
17201714 * this is an expected case during WAL replay.
1715+ *
1716+ * This routine is used at early stages at recovery where future and
1717+ * past orphaned files are checked, hence the FullTransactionId to build
1718+ * a complete file name fit for the removal.
17211719 */
17221720static void
1723- RemoveTwoPhaseFile (TransactionId xid ,bool giveWarning )
1721+ RemoveTwoPhaseFile (FullTransactionId fxid ,bool giveWarning )
17241722{
17251723char path [MAXPGPATH ];
17261724
1727- TwoPhaseFilePath (path ,xid );
1725+ TwoPhaseFilePath (path ,fxid );
17281726if (unlink (path ))
17291727if (errno != ENOENT || giveWarning )
17301728ereport (WARNING ,
@@ -1744,13 +1742,16 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
17441742char path [MAXPGPATH ];
17451743pg_crc32c statefile_crc ;
17461744int fd ;
1745+ FullTransactionId fxid ;
17471746
17481747/* Recompute CRC */
17491748INIT_CRC32C (statefile_crc );
17501749COMP_CRC32C (statefile_crc ,content ,len );
17511750FIN_CRC32C (statefile_crc );
17521751
1753- TwoPhaseFilePath (path ,xid );
1752+ /* Use current epoch */
1753+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1754+ TwoPhaseFilePath (path ,fxid );
17541755
17551756fd = OpenTransientFile (path ,
17561757O_CREAT |O_TRUNC |O_WRONLY |PG_BINARY );
@@ -1898,7 +1899,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
18981899 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
18991900 * This is called once at the beginning of recovery, saving any extra
19001901 * lookups in the future. Two-phase files that are newer than the
1901- * minimum XID horizon are discarded on the way.
1902+ * minimum XID horizon are discarded on the way. Two-phase files with
1903+ * an epoch older or newer than the current checkpoint's record epoch
1904+ * are also discarded.
19021905 */
19031906void
19041907restoreTwoPhaseData (void )
@@ -1913,14 +1916,11 @@ restoreTwoPhaseData(void)
19131916if (strlen (clde -> d_name )== 16 &&
19141917strspn (clde -> d_name ,"0123456789ABCDEF" )== 16 )
19151918{
1916- TransactionId xid ;
19171919FullTransactionId fxid ;
19181920char * buf ;
19191921
19201922fxid = FullTransactionIdFromU64 (strtou64 (clde -> d_name ,NULL ,16 ));
1921- xid = XidFromFullTransactionId (fxid );
1922-
1923- buf = ProcessTwoPhaseBuffer (xid ,InvalidXLogRecPtr ,
1923+ buf = ProcessTwoPhaseBuffer (fxid ,InvalidXLogRecPtr ,
19241924true, false, false);
19251925if (buf == NULL )
19261926continue ;
@@ -1971,6 +1971,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
19711971TransactionId origNextXid = XidFromFullTransactionId (nextXid );
19721972TransactionId result = origNextXid ;
19731973TransactionId * xids = NULL ;
1974+ uint32 epoch = EpochFromFullTransactionId (nextXid );
19741975int nxids = 0 ;
19751976int allocsize = 0 ;
19761977int i ;
@@ -1979,14 +1980,20 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
19791980for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
19801981{
19811982TransactionId xid ;
1983+ FullTransactionId fxid ;
19821984char * buf ;
19831985GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
19841986
19851987Assert (gxact -> inredo );
19861988
19871989xid = gxact -> xid ;
19881990
1989- buf = ProcessTwoPhaseBuffer (xid ,
1991+ /*
1992+ * All two-phase files with past and future epoch in pg_twophase are
1993+ * gone at this point, so we're OK to rely on only the current epoch.
1994+ */
1995+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
1996+ buf = ProcessTwoPhaseBuffer (fxid ,
19901997gxact -> prepare_start_lsn ,
19911998gxact -> ondisk , false, true);
19921999
@@ -2048,19 +2055,31 @@ void
20482055StandbyRecoverPreparedTransactions (void )
20492056{
20502057int i ;
2058+ uint32 epoch ;
2059+ FullTransactionId nextFullXid ;
2060+
2061+ /* get current epoch */
2062+ nextFullXid = ReadNextFullTransactionId ();
2063+ epoch = EpochFromFullTransactionId (nextFullXid );
20512064
20522065LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
20532066for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
20542067{
20552068TransactionId xid ;
2069+ FullTransactionId fxid ;
20562070char * buf ;
20572071GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
20582072
20592073Assert (gxact -> inredo );
20602074
20612075xid = gxact -> xid ;
20622076
2063- buf = ProcessTwoPhaseBuffer (xid ,
2077+ /*
2078+ * At this stage, we're OK to work with the current epoch as all past
2079+ * and future files have been already discarded.
2080+ */
2081+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
2082+ buf = ProcessTwoPhaseBuffer (fxid ,
20642083gxact -> prepare_start_lsn ,
20652084gxact -> ondisk , true, false);
20662085if (buf != NULL )
@@ -2089,18 +2108,29 @@ void
20892108RecoverPreparedTransactions (void )
20902109{
20912110int i ;
2111+ uint32 epoch ;
2112+ FullTransactionId nextFullXid ;
2113+
2114+ /* get current epoch */
2115+ nextFullXid = ReadNextFullTransactionId ();
2116+ epoch = EpochFromFullTransactionId (nextFullXid );
20922117
20932118LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
20942119for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
20952120{
20962121TransactionId xid ;
2122+ FullTransactionId fxid ;
20972123char * buf ;
20982124GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
20992125char * bufptr ;
21002126TwoPhaseFileHeader * hdr ;
21012127TransactionId * subxids ;
21022128const char * gid ;
21032129
2130+ /*
2131+ * At this stage, we're OK to work with the current epoch as all past
2132+ * and future files have been already discarded.
2133+ */
21042134xid = gxact -> xid ;
21052135
21062136/*
@@ -2112,7 +2142,8 @@ RecoverPreparedTransactions(void)
21122142 * SubTransSetParent has been set before, if the prepared transaction
21132143 * generated xid assignment records.
21142144 */
2115- buf = ProcessTwoPhaseBuffer (xid ,
2145+ fxid = FullTransactionIdFromEpochAndXid (epoch ,xid );
2146+ buf = ProcessTwoPhaseBuffer (fxid ,
21162147gxact -> prepare_start_lsn ,
21172148gxact -> ondisk , true, false);
21182149if (buf == NULL )
@@ -2180,7 +2211,7 @@ RecoverPreparedTransactions(void)
21802211/*
21812212 * ProcessTwoPhaseBuffer
21822213 *
2183- * Given atransaction id , read it either from disk or read it directly
2214+ * Given aFullTransactionId , read it either from disk or read it directly
21842215 * via shmem xlog record pointer using the provided "prepare_start_lsn".
21852216 *
21862217 * If setParent is true, set up subtransaction parent linkages.
@@ -2189,32 +2220,35 @@ RecoverPreparedTransactions(void)
21892220 * value scanned.
21902221 */
21912222static char *
2192- ProcessTwoPhaseBuffer (TransactionId xid ,
2223+ ProcessTwoPhaseBuffer (FullTransactionId fxid ,
21932224XLogRecPtr prepare_start_lsn ,
21942225bool fromdisk ,
21952226bool setParent ,bool setNextXid )
21962227{
21972228FullTransactionId nextXid = TransamVariables -> nextXid ;
2198- TransactionId origNextXid = XidFromFullTransactionId (nextXid );
21992229TransactionId * subxids ;
22002230char * buf ;
22012231TwoPhaseFileHeader * hdr ;
22022232int i ;
2233+ TransactionId xid = XidFromFullTransactionId (fxid );
22032234
22042235Assert (LWLockHeldByMeInMode (TwoPhaseStateLock ,LW_EXCLUSIVE ));
22052236
22062237if (!fromdisk )
22072238Assert (prepare_start_lsn != InvalidXLogRecPtr );
22082239
2209- /* Reject XID if too new */
2210- if (TransactionIdFollowsOrEquals (xid ,origNextXid ))
2240+ /*
2241+ * Reject full XID if too new. Note that this discards files from future
2242+ * epochs.
2243+ */
2244+ if (FullTransactionIdFollowsOrEquals (fxid ,nextXid ))
22112245{
22122246if (fromdisk )
22132247{
22142248ereport (WARNING ,
2215- (errmsg ("removing future two-phase state file for transaction %u" ,
2216- xid )));
2217- RemoveTwoPhaseFile (xid , true);
2249+ (errmsg ("removing future two-phase state fileof epoch %u for transaction %u" ,
2250+ EpochFromFullTransactionId ( fxid ), xid )));
2251+ RemoveTwoPhaseFile (fxid , true);
22182252}
22192253else
22202254{
@@ -2226,6 +2260,26 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22262260return NULL ;
22272261}
22282262
2263+ /* Discard files from past epochs */
2264+ if (EpochFromFullTransactionId (fxid )< EpochFromFullTransactionId (nextXid ))
2265+ {
2266+ if (fromdisk )
2267+ {
2268+ ereport (WARNING ,
2269+ (errmsg ("removing past two-phase state file of epoch %u for transaction %u" ,
2270+ EpochFromFullTransactionId (fxid ),xid )));
2271+ RemoveTwoPhaseFile (fxid , true);
2272+ }
2273+ else
2274+ {
2275+ ereport (WARNING ,
2276+ (errmsg ("removing past two-phase state from memory for transaction %u" ,
2277+ xid )));
2278+ PrepareRedoRemove (xid , true);
2279+ }
2280+ return NULL ;
2281+ }
2282+
22292283/* Already processed? */
22302284if (TransactionIdDidCommit (xid )|| TransactionIdDidAbort (xid ))
22312285{
@@ -2234,7 +2288,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
22342288ereport (WARNING ,
22352289(errmsg ("removing stale two-phase state file for transaction %u" ,
22362290xid )));
2237- RemoveTwoPhaseFile (xid , true);
2291+ RemoveTwoPhaseFile (fxid , true);
22382292}
22392293else
22402294{
@@ -2520,8 +2574,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
25202574if (!XLogRecPtrIsInvalid (start_lsn ))
25212575{
25222576char path [MAXPGPATH ];
2577+ FullTransactionId fxid ;
25232578
2524- TwoPhaseFilePath (path ,hdr -> xid );
2579+ /* Use current epoch */
2580+ fxid = FullTransactionIdFromCurrentEpoch (hdr -> xid );
2581+ TwoPhaseFilePath (path ,fxid );
25252582
25262583if (access (path ,F_OK )== 0 )
25272584{
@@ -2616,7 +2673,15 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
26162673 */
26172674elog (DEBUG2 ,"removing 2PC data for transaction %u" ,xid );
26182675if (gxact -> ondisk )
2619- RemoveTwoPhaseFile (xid ,giveWarning );
2676+ {
2677+ FullTransactionId fxid ;
2678+
2679+ /*
2680+ * We should deal with a file at the current epoch here.
2681+ */
2682+ fxid = FullTransactionIdFromCurrentEpoch (xid );
2683+ RemoveTwoPhaseFile (fxid ,giveWarning );
2684+ }
26202685RemoveGXact (gxact );
26212686}
26222687