@@ -287,7 +287,7 @@ recvint64(char *buf)
287287 * Send a Standby Status Update message to server.
288288 */
289289static bool
290- sendFeedback (PGconn * conn ,XLogRecPtr blockpos ,int64 now )
290+ sendFeedback (PGconn * conn ,XLogRecPtr blockpos ,int64 now , bool replyRequested )
291291{
292292char replybuf [1 + 8 + 8 + 8 + 8 + 1 ];
293293int len = 0 ;
@@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
302302len += 8 ;
303303sendint64 (now ,& replybuf [len ]);/* sendTime */
304304len += 8 ;
305- replybuf [len ]= 0 ; /* replyRequested */
305+ replybuf [len ]= replyRequested ? 1 : 0 ; /* replyRequested */
306306len += 1 ;
307307
308308if (PQputCopyData (conn ,replybuf ,len ) <=0 || PQflush (conn ))
@@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
413413int bytes_left ;
414414int bytes_written ;
415415int64 now ;
416+ int hdr_len ;
416417
417418if (copybuf != NULL )
418419{
@@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
441442standby_message_timeout ))
442443{
443444/* Time to send feedback! */
444- if (!sendFeedback (conn ,blockpos ,now ))
445+ if (!sendFeedback (conn ,blockpos ,now , false ))
445446gotoerror ;
446447last_status = now ;
447448}
@@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
520521/* Check the message type. */
521522if (copybuf [0 ]== 'k' )
522523{
524+ int pos ;
525+ bool replyRequested ;
526+
523527/*
524- * keepalive message, sent in 9.2 and newer. We just ignore this
525- * message completely, but need to skip past it in the stream.
528+ * Parse the keepalive message, enclosed in the CopyData message.
529+ * We just check if the server requested a reply, and ignore the
530+ * rest.
526531 */
532+ pos = 1 ;/* skip msgtype 'k' */
533+ pos += 8 ;/* skip walEnd */
534+ pos += 8 ;/* skip sendTime */
535+
536+ if (r < pos + 1 )
537+ {
538+ fprintf (stderr ,_ ("%s: streaming header too small: %d\n" ),
539+ progname ,r );
540+ gotoerror ;
541+ }
542+ replyRequested = copybuf [pos ];
543+
544+ /* If the server requested an immediate reply, send one. */
545+ if (replyRequested )
546+ {
547+ now = localGetCurrentTimestamp ();
548+ if (!sendFeedback (conn ,blockpos ,now , false))
549+ gotoerror ;
550+ last_status = now ;
551+ }
527552continue ;
528553}
529554else if (copybuf [0 ]!= 'w' )
@@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
538563 * message. We only need the WAL location field (dataStart), the rest
539564 * of the header is ignored.
540565 */
541- #define STREAMING_HEADER_SIZE (1/* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */ )
542- if (r < STREAMING_HEADER_SIZE + 1 )
566+ hdr_len = 1 ;/* msgtype 'w' */
567+ hdr_len += 8 ;/* dataStart */
568+ hdr_len += 8 ;/* walEnd */
569+ hdr_len += 8 ;/* sendTime */
570+ if (r < hdr_len + 1 )
543571{
544572fprintf (stderr ,_ ("%s: streaming header too small: %d\n" ),
545573progname ,r );
@@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
578606}
579607}
580608
581- bytes_left = r - STREAMING_HEADER_SIZE ;
609+ bytes_left = r - hdr_len ;
582610bytes_written = 0 ;
583611
584612while (bytes_left )
@@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
604632}
605633
606634if (write (walfile ,
607- copybuf + STREAMING_HEADER_SIZE + bytes_written ,
635+ copybuf + hdr_len + bytes_written ,
608636bytes_to_write )!= bytes_to_write )
609637{
610638fprintf (stderr ,