@@ -78,13 +78,21 @@ static SlruCtlData CommitTsCtlData;
7878#define CommitTsCtl (&CommitTsCtlData)
7979
8080/*
81- * We keep a cache of the last value set in shared memory. This is protected
82- * by CommitTsLock.
81+ * We keep a cache of the last value set in shared memory.
82+ *
83+ * This is also good place to keep the activation status. We keep this
84+ * separate from the GUC so that the standby can activate the module if the
85+ * primary has it active independently of the value of the GUC.
86+ *
87+ * This is protected by CommitTsLock. In some places, we use commitTsActive
88+ * without acquiring the lock; where this happens, a comment explains the
89+ * rationale for it.
8390 */
8491typedef struct CommitTimestampShared
8592{
8693TransactionId xidLastCommit ;
8794CommitTimestampEntry dataLastCommit ;
95+ bool commitTsActive ;
8896}CommitTimestampShared ;
8997
9098CommitTimestampShared * commitTsShared ;
@@ -93,14 +101,6 @@ CommitTimestampShared *commitTsShared;
93101/* GUC variable */
94102bool track_commit_timestamp ;
95103
96- /*
97- * When this is set, commit_ts is force-enabled during recovery. This is so
98- * that a standby can replay WAL records coming from a master with the setting
99- * enabled. (Note that this doesn't enable SQL access to the data; it's
100- * effectively write-only until the GUC itself is enabled.)
101- */
102- static bool enable_during_recovery ;
103-
104104static void SetXidCommitTsInPage (TransactionId xid ,int nsubxids ,
105105TransactionId * subxids ,TimestampTz ts ,
106106RepOriginId nodeid ,int pageno );
@@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109109static int ZeroCommitTsPage (int pageno ,bool writeXlog );
110110static bool CommitTsPagePrecedes (int page1 ,int page2 );
111111static void ActivateCommitTs (void );
112- static void DeactivateCommitTs (bool do_wal );
112+ static void DeactivateCommitTs (void );
113113static void WriteZeroPageXlogRec (int pageno );
114114static void WriteTruncateXlogRec (int pageno );
115115static void WriteSetTimestampXlogRec (TransactionId mainxid ,int nsubxids ,
@@ -149,10 +149,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
149149TransactionId newestXact ;
150150
151151/*
152- * No-op if the module is not enabled, but allow writes in a standby
153- * during recovery.
152+ * No-op if the module is not active.
153+ *
154+ * An unlocked read here is fine, because in a standby (the only place
155+ * where the flag can change in flight) this routine is only called by
156+ * the recovery process, which is also the only process which can change
157+ * the flag.
154158 */
155- if (!track_commit_timestamp && ! enable_during_recovery )
159+ if (!commitTsShared -> commitTsActive )
156160return ;
157161
158162/*
@@ -283,30 +287,45 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283287TransactionId oldestCommitTs ;
284288TransactionId newestCommitTs ;
285289
290+ /* error if the given Xid doesn't normally commit */
291+ if (!TransactionIdIsNormal (xid ))
292+ ereport (ERROR ,
293+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
294+ errmsg ("cannot retrieve commit timestamp for transaction %u" ,xid )));
295+
296+ LWLockAcquire (CommitTsLock ,LW_SHARED );
297+
286298/* Error if module not enabled */
287- if (!track_commit_timestamp )
299+ if (!commitTsShared -> commitTsActive )
288300ereport (ERROR ,
289301(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
290302errmsg ("could not get commit timestamp data" ),
291303errhint ("Make sure the configuration parameter \"%s\" is set." ,
292304"track_commit_timestamp" )));
293305
294- /* error if the given Xid doesn't normally commit */
295- if (!TransactionIdIsNormal (xid ))
296- ereport (ERROR ,
297- (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
298- errmsg ("cannot retrieve commit timestamp for transaction %u" ,xid )));
299-
300306/*
301- * Return empty if the requested value is outside our valid range.
307+ * If we're asked for the cached value, return that. Otherwise, fall
308+ * through to read from SLRU.
302309 */
303- LWLockAcquire (CommitTsLock ,LW_SHARED );
310+ if (commitTsShared -> xidLastCommit == xid )
311+ {
312+ * ts = commitTsShared -> dataLastCommit .time ;
313+ if (nodeid )
314+ * nodeid = commitTsShared -> dataLastCommit .nodeid ;
315+
316+ LWLockRelease (CommitTsLock );
317+ return * ts != 0 ;
318+ }
319+
304320oldestCommitTs = ShmemVariableCache -> oldestCommitTs ;
305321newestCommitTs = ShmemVariableCache -> newestCommitTs ;
306322/* neither is invalid, or both are */
307323Assert (TransactionIdIsValid (oldestCommitTs )== TransactionIdIsValid (newestCommitTs ));
308324LWLockRelease (CommitTsLock );
309325
326+ /*
327+ * Return empty if the requested value is outside our valid range.
328+ */
310329if (!TransactionIdIsValid (oldestCommitTs )||
311330TransactionIdPrecedes (xid ,oldestCommitTs )||
312331TransactionIdPrecedes (newestCommitTs ,xid ))
@@ -317,27 +336,6 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
317336return false;
318337}
319338
320- /*
321- * Use an unlocked atomic read on our cached value in shared memory; if
322- * it's a hit, acquire a lock and read the data, after verifying that it's
323- * still what we initially read. Otherwise, fall through to read from
324- * SLRU.
325- */
326- if (commitTsShared -> xidLastCommit == xid )
327- {
328- LWLockAcquire (CommitTsLock ,LW_SHARED );
329- if (commitTsShared -> xidLastCommit == xid )
330- {
331- * ts = commitTsShared -> dataLastCommit .time ;
332- if (nodeid )
333- * nodeid = commitTsShared -> dataLastCommit .nodeid ;
334-
335- LWLockRelease (CommitTsLock );
336- return * ts != 0 ;
337- }
338- LWLockRelease (CommitTsLock );
339- }
340-
341339/* lock is acquired by SimpleLruReadPage_ReadOnly */
342340slotno = SimpleLruReadPage_ReadOnly (CommitTsCtl ,pageno ,xid );
343341memcpy (& entry ,
@@ -366,15 +364,16 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
366364{
367365TransactionId xid ;
368366
367+ LWLockAcquire (CommitTsLock ,LW_SHARED );
368+
369369/* Error if module not enabled */
370- if (!track_commit_timestamp )
370+ if (!commitTsShared -> commitTsActive )
371371ereport (ERROR ,
372372(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
373373errmsg ("could not get commit timestamp data" ),
374374errhint ("Make sure the configuration parameter \"%s\" is set." ,
375375"track_commit_timestamp" )));
376376
377- LWLockAcquire (CommitTsLock ,LW_SHARED );
378377xid = commitTsShared -> xidLastCommit ;
379378if (ts )
380379* ts = commitTsShared -> dataLastCommit .time ;
@@ -493,6 +492,7 @@ CommitTsShmemInit(void)
493492commitTsShared -> xidLastCommit = InvalidTransactionId ;
494493TIMESTAMP_NOBEGIN (commitTsShared -> dataLastCommit .time );
495494commitTsShared -> dataLastCommit .nodeid = InvalidRepOriginId ;
495+ commitTsShared -> commitTsActive = false;
496496}
497497else
498498Assert (found );
@@ -566,7 +566,7 @@ CompleteCommitTsInitialization(void)
566566 * any leftover data.
567567 */
568568if (!track_commit_timestamp )
569- DeactivateCommitTs (true );
569+ DeactivateCommitTs ();
570570}
571571
572572/*
@@ -588,11 +588,11 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
588588 */
589589if (newvalue )
590590{
591- if (!track_commit_timestamp && ! oldvalue )
591+ if (!commitTsShared -> commitTsActive )
592592ActivateCommitTs ();
593593}
594- else if (! track_commit_timestamp && oldvalue )
595- DeactivateCommitTs (false );
594+ else if (commitTsShared -> commitTsActive )
595+ DeactivateCommitTs ();
596596}
597597
598598/*
@@ -645,7 +645,7 @@ ActivateCommitTs(void)
645645}
646646LWLockRelease (CommitTsLock );
647647
648- /*Finally, create the current segment file, if necessary */
648+ /*Create the current segment file, if necessary */
649649if (!SimpleLruDoesPhysicalPageExist (CommitTsCtl ,pageno ))
650650{
651651int slotno ;
@@ -657,8 +657,10 @@ ActivateCommitTs(void)
657657LWLockRelease (CommitTsControlLock );
658658}
659659
660- /* We can now replay xlog records from this module */
661- enable_during_recovery = true;
660+ /* Change the activation status in shared memory. */
661+ LWLockAcquire (CommitTsLock ,LW_EXCLUSIVE );
662+ commitTsShared -> commitTsActive = true;
663+ LWLockRelease (CommitTsLock );
662664}
663665
664666/*
@@ -672,21 +674,25 @@ ActivateCommitTs(void)
672674 * possibly-invalid data; also removes segments of old data.
673675 */
674676static void
675- DeactivateCommitTs (bool do_wal )
677+ DeactivateCommitTs (void )
676678{
677- TransactionId xid = ShmemVariableCache -> nextXid ;
678- int pageno = TransactionIdToCTsPage (xid );
679-
680679/*
681- * Re-Initialize our idea of the latest page number.
680+ * Cleanup the status in the shared memory.
681+ *
682+ * We reset everything in the commitTsShared record to prevent user from
683+ * getting confusing data about last committed transaction on the standby
684+ * when the module was activated repeatedly on the primary.
682685 */
683- LWLockAcquire (CommitTsControlLock ,LW_EXCLUSIVE );
684- CommitTsCtl -> shared -> latest_page_number = pageno ;
685- LWLockRelease (CommitTsControlLock );
686-
687686LWLockAcquire (CommitTsLock ,LW_EXCLUSIVE );
687+
688+ commitTsShared -> commitTsActive = false;
689+ commitTsShared -> xidLastCommit = InvalidTransactionId ;
690+ TIMESTAMP_NOBEGIN (commitTsShared -> dataLastCommit .time );
691+ commitTsShared -> dataLastCommit .nodeid = InvalidRepOriginId ;
692+
688693ShmemVariableCache -> oldestCommitTs = InvalidTransactionId ;
689694ShmemVariableCache -> newestCommitTs = InvalidTransactionId ;
695+
690696LWLockRelease (CommitTsLock );
691697
692698/*
@@ -697,10 +703,9 @@ DeactivateCommitTs(bool do_wal)
697703 * be overwritten anyway when we wrap around, but it seems better to be
698704 * tidy.)
699705 */
706+ LWLockAcquire (CommitTsControlLock ,LW_EXCLUSIVE );
700707(void )SlruScanDirectory (CommitTsCtl ,SlruScanDirCbDeleteAll ,NULL );
701-
702- /* No longer enabled on recovery */
703- enable_during_recovery = false;
708+ LWLockRelease (CommitTsControlLock );
704709}
705710
706711/*
@@ -739,8 +744,13 @@ ExtendCommitTs(TransactionId newestXact)
739744{
740745int pageno ;
741746
742- /* nothing to do if module not enabled */
743- if (!track_commit_timestamp && !enable_during_recovery )
747+ /*
748+ * Nothing to do if module not enabled. Note we do an unlocked read of the
749+ * flag here, which is okay because this routine is only called from
750+ * GetNewTransactionId, which is never called in a standby.
751+ */
752+ Assert (!InRecovery );
753+ if (!commitTsShared -> commitTsActive )
744754return ;
745755
746756/*
@@ -768,7 +778,7 @@ ExtendCommitTs(TransactionId newestXact)
768778 * Note that we don't need to flush XLOG here.
769779 */
770780void
771- TruncateCommitTs (TransactionId oldestXact , bool do_wal )
781+ TruncateCommitTs (TransactionId oldestXact )
772782{
773783int cutoffPage ;
774784
@@ -784,8 +794,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal)
784794return ;/* nothing to remove */
785795
786796/* Write XLOG record */
787- if (do_wal )
788- WriteTruncateXlogRec (cutoffPage );
797+ WriteTruncateXlogRec (cutoffPage );
789798
790799/* Now we can remove the old CommitTs segment(s) */
791800SimpleLruTruncate (CommitTsCtl ,cutoffPage );