5555
5656/* GUC variables */
5757int wal_receiver_status_interval ;
58+ int wal_receiver_timeout ;
5859bool hot_standby_feedback ;
5960
6061/* libpqreceiver hooks to these when loaded */
@@ -121,7 +122,7 @@ static void WalRcvDie(int code, Datum arg);
121122static void XLogWalRcvProcessMsg (unsignedchar type ,char * buf ,Size len );
122123static void XLogWalRcvWrite (char * buf ,Size nbytes ,XLogRecPtr recptr );
123124static void XLogWalRcvFlush (bool dying );
124- static void XLogWalRcvSendReply (void );
125+ static void XLogWalRcvSendReply (bool force , bool requestReply );
125126static void XLogWalRcvSendHSFeedback (void );
126127static void ProcessWalSndrMessage (XLogRecPtr walEnd ,TimestampTz sendTime );
127128
@@ -170,9 +171,10 @@ WalReceiverMain(void)
170171{
171172char conninfo [MAXCONNINFO ];
172173XLogRecPtr startpoint ;
173-
174174/* use volatile pointer to prevent code rearrangement */
175175volatile WalRcvData * walrcv = WalRcv ;
176+ TimestampTz last_recv_timestamp ;
177+ bool ping_sent ;
176178
177179/*
178180 * WalRcv should be set up already (if we are a backend, we inherit this
@@ -282,6 +284,10 @@ WalReceiverMain(void)
282284MemSet (& reply_message ,0 ,sizeof (reply_message ));
283285MemSet (& feedback_message ,0 ,sizeof (feedback_message ));
284286
287+ /* Initialize the last recv timestamp */
288+ last_recv_timestamp = GetCurrentTimestamp ();
289+ ping_sent = false;
290+
285291/* Loop until end-of-streaming or error */
286292for (;;)
287293{
@@ -316,15 +322,23 @@ WalReceiverMain(void)
316322/* Wait a while for data to arrive */
317323if (walrcv_receive (NAPTIME_PER_CYCLE ,& type ,& buf ,& len ))
318324{
325+ /* Something was received from master, so reset timeout */
326+ last_recv_timestamp = GetCurrentTimestamp ();
327+ ping_sent = false;
328+
319329/* Accept the received data, and process it */
320330XLogWalRcvProcessMsg (type ,buf ,len );
321331
322332/* Receive any more data we can without sleeping */
323333while (walrcv_receive (0 ,& type ,& buf ,& len ))
334+ {
335+ last_recv_timestamp = GetCurrentTimestamp ();
336+ ping_sent = false;
324337XLogWalRcvProcessMsg (type ,buf ,len );
338+ }
325339
326340/* Let the master know that we received some data. */
327- XLogWalRcvSendReply ();
341+ XLogWalRcvSendReply (false, false );
328342
329343/*
330344 * If we've written some records, flush them to disk and let the
@@ -335,10 +349,48 @@ WalReceiverMain(void)
335349else
336350{
337351/*
338- * We didn't receive anything new, but send a status update to the
339- * master anyway, to report any progress in applying WAL.
352+ * We didn't receive anything new. If we haven't heard anything
353+ * from the server for more than wal_receiver_timeout / 2,
354+ * ping the server. Also, if it's been longer than
355+ * wal_receiver_status_interval since the last update we sent,
356+ * send a status update to the master anyway, to report any
357+ * progress in applying WAL.
358+ */
359+ bool requestReply = false;
360+
361+ /*
362+ * Check if time since last receive from standby has reached the
363+ * configured limit.
340364 */
341- XLogWalRcvSendReply ();
365+ if (wal_receiver_timeout > 0 )
366+ {
367+ TimestampTz now = GetCurrentTimestamp ();
368+ TimestampTz timeout ;
369+
370+ timeout = TimestampTzPlusMilliseconds (last_recv_timestamp ,
371+ wal_receiver_timeout );
372+
373+ if (now >=timeout )
374+ ereport (ERROR ,
375+ (errmsg ("terminating walreceiver due to timeout" )));
376+
377+ /*
378+ * We didn't receive anything new, for half of receiver
379+ * replication timeout. Ping the server.
380+ */
381+ if (!ping_sent )
382+ {
383+ timeout = TimestampTzPlusMilliseconds (last_recv_timestamp ,
384+ (wal_receiver_timeout /2 ));
385+ if (now >=timeout )
386+ {
387+ requestReply = true;
388+ ping_sent = true;
389+ }
390+ }
391+ }
392+
393+ XLogWalRcvSendReply (requestReply ,requestReply );
342394XLogWalRcvSendHSFeedback ();
343395}
344396}
@@ -460,6 +512,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
460512memcpy (& keepalive ,buf ,sizeof (PrimaryKeepaliveMessage ));
461513
462514ProcessWalSndrMessage (keepalive .walEnd ,keepalive .sendTime );
515+
516+ /* If the primary requested a reply, send one immediately */
517+ if (keepalive .replyRequested )
518+ XLogWalRcvSendReply (true, false);
463519break ;
464520}
465521default :
@@ -609,19 +665,25 @@ XLogWalRcvFlush(bool dying)
609665
610666/* Also let the master know that we made some progress */
611667if (!dying )
612- {
613- XLogWalRcvSendReply ();
614- XLogWalRcvSendHSFeedback ();
615- }
668+ XLogWalRcvSendReply (false, false);
616669}
617670}
618671
619672/*
620- * Send reply message to primary, indicating our current XLOG positions and
621- * the current time.
673+ * Send reply message to primary, indicating our current XLOG positions, oldest
674+ * xmin and the current time.
675+ *
676+ * If 'force' is not set, the message is only sent if enough time has
677+ * passed since last status update to reach wal_receiver_status_internal.
678+ * If wal_receiver_status_interval is disabled altogether and 'force' is
679+ * false, this is a no-op.
680+ *
681+ * If 'requestReply' is true, requests the server to reply immediately upon
682+ * receiving this message. This is used for heartbearts, when approaching
683+ * wal_receiver_timeout.
622684 */
623685static void
624- XLogWalRcvSendReply (void )
686+ XLogWalRcvSendReply (bool force , bool requestReply )
625687{
626688char buf [sizeof (StandbyReplyMessage )+ 1 ];
627689TimestampTz now ;
@@ -630,7 +692,7 @@ XLogWalRcvSendReply(void)
630692 * If the user doesn't want status to be reported to the master, be sure
631693 * to exit before doing anything at all.
632694 */
633- if (wal_receiver_status_interval <=0 )
695+ if (! force && wal_receiver_status_interval <=0 )
634696return ;
635697
636698/* Get current timestamp. */
@@ -645,7 +707,8 @@ XLogWalRcvSendReply(void)
645707 * this is only for reporting purposes and only on idle systems, that's
646708 * probably OK.
647709 */
648- if (XLByteEQ (reply_message .write ,LogstreamResult .Write )
710+ if (!force
711+ && XLByteEQ (reply_message .write ,LogstreamResult .Write )
649712&& XLByteEQ (reply_message .flush ,LogstreamResult .Flush )
650713&& !TimestampDifferenceExceeds (reply_message .sendTime ,now ,
651714wal_receiver_status_interval * 1000 ))
@@ -656,6 +719,7 @@ XLogWalRcvSendReply(void)
656719reply_message .flush = LogstreamResult .Flush ;
657720reply_message .apply = GetXLogReplayRecPtr (NULL );
658721reply_message .sendTime = now ;
722+ reply_message .replyRequested = requestReply ;
659723
660724elog (DEBUG2 ,"sending write %X/%X flush %X/%X apply %X/%X" ,
661725 (uint32 ) (reply_message .write >>32 ), (uint32 )reply_message .write ,