@@ -218,6 +218,8 @@ static bool recoveryPauseAtTarget = true;
218218static TransactionId recoveryTargetXid ;
219219static TimestampTz recoveryTargetTime ;
220220static char * recoveryTargetName ;
221+ static int min_recovery_apply_delay = 0 ;
222+ static TimestampTz recoveryDelayUntilTime ;
221223
222224/* options taken from recovery.conf for XLOG streaming */
223225static bool StandbyModeRequested = false;
@@ -728,8 +730,10 @@ static bool holdingAllSlots = false;
728730
729731static void readRecoveryCommandFile (void );
730732static void exitArchiveRecovery (TimeLineID endTLI ,XLogSegNo endLogSegNo );
731- static bool recoveryStopsHere (XLogRecord * record ,bool * includeThis );
733+ static bool recoveryStopsHere (XLogRecord * record ,bool * includeThis , bool * delayThis );
732734static void recoveryPausesHere (void );
735+ static void recoveryApplyDelay (void );
736+ static bool SetRecoveryDelayUntilTime (TimestampTz xtime );
733737static void SetLatestXTime (TimestampTz xtime );
734738static void SetCurrentChunkStartTime (TimestampTz xtime );
735739static void CheckRequiredParameterValues (void );
@@ -5476,6 +5480,19 @@ readRecoveryCommandFile(void)
54765480(errmsg_internal ("trigger_file = '%s'" ,
54775481TriggerFile )));
54785482}
5483+ else if (strcmp (item -> name ,"min_recovery_apply_delay" )== 0 )
5484+ {
5485+ const char * hintmsg ;
5486+
5487+ if (!parse_int (item -> value ,& min_recovery_apply_delay ,GUC_UNIT_MS ,
5488+ & hintmsg ))
5489+ ereport (ERROR ,
5490+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
5491+ errmsg ("parameter \"%s\" requires a temporal value" ,"min_recovery_apply_delay" ),
5492+ hintmsg ?errhint ("%s" ,_ (hintmsg )) :0 ));
5493+ ereport (DEBUG2 ,
5494+ (errmsg ("min_recovery_apply_delay = '%s'" ,item -> value )));
5495+ }
54795496else
54805497ereport (FATAL ,
54815498(errmsg ("unrecognized recovery parameter \"%s\"" ,
@@ -5625,10 +5642,11 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
56255642 * We also track the timestamp of the latest applied COMMIT/ABORT
56265643 * record in XLogCtl->recoveryLastXTime, for logging purposes.
56275644 * Also, some information is saved in recoveryStopXid et al for use in
5628- * annotating the new timeline's history file.
5645+ * annotating the new timeline's history file; and recoveryDelayUntilTime
5646+ * is updated, for time-delayed standbys.
56295647 */
56305648static bool
5631- recoveryStopsHere (XLogRecord * record ,bool * includeThis )
5649+ recoveryStopsHere (XLogRecord * record ,bool * includeThis , bool * delayThis )
56325650{
56335651bool stopsHere ;
56345652uint8 record_info ;
@@ -5645,20 +5663,31 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
56455663
56465664recordXactCommitData = (xl_xact_commit_compact * )XLogRecGetData (record );
56475665recordXtime = recordXactCommitData -> xact_time ;
5666+
5667+ * delayThis = SetRecoveryDelayUntilTime (recordXactCommitData -> xact_time );
56485668}
56495669else if (record -> xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT )
56505670{
56515671xl_xact_commit * recordXactCommitData ;
56525672
56535673recordXactCommitData = (xl_xact_commit * )XLogRecGetData (record );
56545674recordXtime = recordXactCommitData -> xact_time ;
5675+
5676+ * delayThis = SetRecoveryDelayUntilTime (recordXactCommitData -> xact_time );
56555677}
56565678else if (record -> xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT )
56575679{
56585680xl_xact_abort * recordXactAbortData ;
56595681
56605682recordXactAbortData = (xl_xact_abort * )XLogRecGetData (record );
56615683recordXtime = recordXactAbortData -> xact_time ;
5684+
5685+ /*
5686+ * We deliberately choose not to delay aborts since they have no
5687+ * effect on MVCC. We already allow replay of records that don't
5688+ * have a timestamp, so there is already opportunity for issues
5689+ * caused by early conflicts on standbys.
5690+ */
56625691}
56635692else if (record -> xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT )
56645693{
@@ -5667,6 +5696,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
56675696recordRestorePointData = (xl_restore_point * )XLogRecGetData (record );
56685697recordXtime = recordRestorePointData -> rp_time ;
56695698strncpy (recordRPName ,recordRestorePointData -> rp_name ,MAXFNAMELEN );
5699+
5700+ * delayThis = SetRecoveryDelayUntilTime (recordRestorePointData -> rp_time );
56705701}
56715702else
56725703return false;
@@ -5833,6 +5864,66 @@ SetRecoveryPause(bool recoveryPause)
58335864SpinLockRelease (& xlogctl -> info_lck );
58345865}
58355866
5867+ static bool
5868+ SetRecoveryDelayUntilTime (TimestampTz xtime )
5869+ {
5870+ if (min_recovery_apply_delay != 0 )
5871+ {
5872+ recoveryDelayUntilTime =
5873+ TimestampTzPlusMilliseconds (xtime ,min_recovery_apply_delay );
5874+
5875+ return true;
5876+ }
5877+
5878+ return false;
5879+ }
5880+ /*
5881+ * When min_recovery_apply_delay is set, we wait long enough to make sure
5882+ * certain record types are applied at least that interval behind the master.
5883+ * See recoveryStopsHere().
5884+ *
5885+ * Note that the delay is calculated between the WAL record log time and
5886+ * the current time on standby. We would prefer to keep track of when this
5887+ * standby received each WAL record, which would allow a more consistent
5888+ * approach and one not affected by time synchronisation issues, but that
5889+ * is significantly more effort and complexity for little actual gain in
5890+ * usability.
5891+ */
5892+ static void
5893+ recoveryApplyDelay (void )
5894+ {
5895+ while (true)
5896+ {
5897+ long secs ;
5898+ int microsecs ;
5899+
5900+ ResetLatch (& XLogCtl -> recoveryWakeupLatch );
5901+
5902+ /* might change the trigger file's location */
5903+ HandleStartupProcInterrupts ();
5904+
5905+ if (CheckForStandbyTrigger ())
5906+ break ;
5907+
5908+ /*
5909+ * Wait for difference between GetCurrentTimestamp() and
5910+ * recoveryDelayUntilTime
5911+ */
5912+ TimestampDifference (GetCurrentTimestamp (),recoveryDelayUntilTime ,
5913+ & secs ,& microsecs );
5914+
5915+ if (secs <=0 && microsecs <=0 )
5916+ break ;
5917+
5918+ elog (DEBUG2 ,"recovery apply delay %ld seconds, %d milliseconds" ,
5919+ secs ,microsecs /1000 );
5920+
5921+ WaitLatch (& XLogCtl -> recoveryWakeupLatch ,
5922+ WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,
5923+ secs * 1000L + microsecs /1000 );
5924+ }
5925+ }
5926+
58365927/*
58375928 * Save timestamp of latest processed commit/abort record.
58385929 *
@@ -6660,6 +6751,7 @@ StartupXLOG(void)
66606751{
66616752bool recoveryContinue = true;
66626753bool recoveryApply = true;
6754+ bool recoveryDelay = false;
66636755ErrorContextCallback errcallback ;
66646756TimestampTz xtime ;
66656757
@@ -6719,7 +6811,7 @@ StartupXLOG(void)
67196811/*
67206812 * Have we reached our recovery target?
67216813 */
6722- if (recoveryStopsHere (record ,& recoveryApply ))
6814+ if (recoveryStopsHere (record ,& recoveryApply , & recoveryDelay ))
67236815{
67246816if (recoveryPauseAtTarget )
67256817{
@@ -6734,6 +6826,25 @@ StartupXLOG(void)
67346826break ;
67356827}
67366828
6829+ /*
6830+ * If we've been asked to lag the master, wait on
6831+ * latch until enough time has passed.
6832+ */
6833+ if (recoveryDelay )
6834+ {
6835+ recoveryApplyDelay ();
6836+
6837+ /*
6838+ * We test for paused recovery again here. If
6839+ * user sets delayed apply, it may be because
6840+ * they expect to pause recovery in case of
6841+ * problems, so we must test again here otherwise
6842+ * pausing during the delay-wait wouldn't work.
6843+ */
6844+ if (xlogctl -> recoveryPause )
6845+ recoveryPausesHere ();
6846+ }
6847+
67376848/* Setup error traceback support for ereport() */
67386849errcallback .callback = rm_redo_error_callback ;
67396850errcallback .arg = (void * )record ;