@@ -40,6 +40,7 @@ static intnoloop = 0;
4040static int standby_message_timeout = 10 * 1000 ;/* 10 sec = default */
4141static int fsync_interval = 10 * 1000 ;/* 10 sec = default */
4242static XLogRecPtr startpos = InvalidXLogRecPtr ;
43+ static XLogRecPtr endpos = InvalidXLogRecPtr ;
4344static bool do_create_slot = false;
4445static bool slot_exists_ok = false;
4546static bool do_start_slot = false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
6364static void usage (void );
6465static void StreamLogicalLog (void );
6566static void disconnect_and_exit (int code );
67+ static bool flushAndSendFeedback (PGconn * conn ,TimestampTz * now );
68+ static void prepareToTerminate (PGconn * conn ,XLogRecPtr endpos ,
69+ bool keepalive ,XLogRecPtr lsn );
6670
6771static void
6872usage (void )
@@ -81,6 +85,7 @@ usage(void)
8185" time between fsyncs to the output file (default: %d)\n" ), (fsync_interval /1000 ));
8286printf (_ (" --if-not-exists do not error if slot already exists when creating a slot\n" ));
8387printf (_ (" -I, --startpos=LSN where in an existing slot should the streaming start\n" ));
88+ printf (_ (" -E, --endpos=LSN exit after receiving the specified LSN\n" ));
8489printf (_ (" -n, --no-loop do not loop on connection lost\n" ));
8590printf (_ (" -o, --option=NAME[=VALUE]\n"
8691" pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
281286int bytes_written ;
282287int64 now ;
283288int hdr_len ;
289+ XLogRecPtr cur_record_lsn = InvalidXLogRecPtr ;
284290
285291if (copybuf != NULL )
286292{
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
454460int pos ;
455461bool replyRequested ;
456462XLogRecPtr walEnd ;
463+ bool endposReached = false;
457464
458465/*
459466 * Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
476483}
477484replyRequested = copybuf [pos ];
478485
479- /* If the server requested an immediate reply, send one. */
480- if (replyRequested )
486+ if (endpos != InvalidXLogRecPtr && walEnd >=endpos )
481487{
482- /* fsync data, so we send a recent flush pointer */
483- if (!OutputFsync (now ))
484- gotoerror ;
488+ /*
489+ * If there's nothing to read on the socket until a keepalive
490+ * we know that the server has nothing to send us; and if
491+ * walEnd has passed endpos, we know nothing else can have
492+ * committed before endpos. So we can bail out now.
493+ */
494+ endposReached = true;
495+ }
485496
486- now = feGetCurrentTimestamp ();
487- if (!sendFeedback (conn ,now , true, false))
497+ /* Send a reply, if necessary */
498+ if (replyRequested || endposReached )
499+ {
500+ if (!flushAndSendFeedback (conn ,& now ))
488501gotoerror ;
489502last_status = now ;
490503}
504+
505+ if (endposReached )
506+ {
507+ prepareToTerminate (conn ,endpos , true,InvalidXLogRecPtr );
508+ time_to_abort = true;
509+ break ;
510+ }
511+
491512continue ;
492513}
493514else if (copybuf [0 ]!= 'w' )
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
497518gotoerror ;
498519}
499520
500-
501521/*
502522 * Read the header of the XLogData message, enclosed in the CopyData
503523 * message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
515535}
516536
517537/* Extract WAL location for this block */
518- {
519- XLogRecPtr temp = fe_recvint64 (& copybuf [1 ]);
538+ cur_record_lsn = fe_recvint64 (& copybuf [1 ]);
520539
521- output_written_lsn = Max (temp ,output_written_lsn );
540+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos )
541+ {
542+ /*
543+ * We've read past our endpoint, so prepare to go away being
544+ * cautious about what happens to our output data.
545+ */
546+ if (!flushAndSendFeedback (conn ,& now ))
547+ gotoerror ;
548+ prepareToTerminate (conn ,endpos , false,cur_record_lsn );
549+ time_to_abort = true;
550+ break ;
522551}
523552
553+ output_written_lsn = Max (cur_record_lsn ,output_written_lsn );
554+
524555bytes_left = r - hdr_len ;
525556bytes_written = 0 ;
526557
@@ -557,10 +588,29 @@ StreamLogicalLog(void)
557588strerror (errno ));
558589gotoerror ;
559590}
591+
592+ if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos )
593+ {
594+ /* endpos was exactly the record we just processed, we're done */
595+ if (!flushAndSendFeedback (conn ,& now ))
596+ gotoerror ;
597+ prepareToTerminate (conn ,endpos , false,cur_record_lsn );
598+ time_to_abort = true;
599+ break ;
600+ }
560601}
561602
562603res = PQgetResult (conn );
563- if (PQresultStatus (res )!= PGRES_COMMAND_OK )
604+ if (PQresultStatus (res )== PGRES_COPY_OUT )
605+ {
606+ /*
607+ * We're doing a client-initiated clean exit and have sent CopyDone to
608+ * the server. We've already sent replay confirmation and fsync'd so
609+ * we can just clean up the connection now.
610+ */
611+ gotoerror ;
612+ }
613+ else if (PQresultStatus (res )!= PGRES_COMMAND_OK )
564614{
565615fprintf (stderr ,
566616_ ("%s: unexpected termination of replication stream: %s" ),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
638688{"password" ,no_argument ,NULL ,'W' },
639689/* replication options */
640690{"startpos" ,required_argument ,NULL ,'I' },
691+ {"endpos" ,required_argument ,NULL ,'E' },
641692{"option" ,required_argument ,NULL ,'o' },
642693{"plugin" ,required_argument ,NULL ,'P' },
643694{"status-interval" ,required_argument ,NULL ,'s' },
@@ -673,7 +724,7 @@ main(int argc, char **argv)
673724}
674725}
675726
676- while ((c = getopt_long (argc ,argv ,"f:F:nvd:h:p:U:wWI:o:P:s:S:" ,
727+ while ((c = getopt_long (argc ,argv ,"f:F:nvd:h:p:U:wWI:E: o:P:s:S:" ,
677728long_options ,& option_index ))!= -1 )
678729{
679730switch (c )
@@ -733,6 +784,16 @@ main(int argc, char **argv)
733784}
734785startpos = ((uint64 )hi ) <<32 |lo ;
735786break ;
787+ case 'E' :
788+ if (sscanf (optarg ,"%X/%X" ,& hi ,& lo )!= 2 )
789+ {
790+ fprintf (stderr ,
791+ _ ("%s: could not parse end position \"%s\"\n" ),
792+ progname ,optarg );
793+ exit (1 );
794+ }
795+ endpos = ((uint64 )hi ) <<32 |lo ;
796+ break ;
736797case 'o' :
737798{
738799char * data = pg_strdup (optarg );
@@ -857,6 +918,16 @@ main(int argc, char **argv)
857918exit (1 );
858919}
859920
921+ if (endpos != InvalidXLogRecPtr && !do_start_slot )
922+ {
923+ fprintf (stderr ,
924+ _ ("%s: --endpos may only be specified with --start\n" ),
925+ progname );
926+ fprintf (stderr ,_ ("Try \"%s --help\" for more information.\n" ),
927+ progname );
928+ exit (1 );
929+ }
930+
860931#ifndef WIN32
861932pqsignal (SIGINT ,sigint_handler );
862933pqsignal (SIGHUP ,sighup_handler );
@@ -923,8 +994,8 @@ main(int argc, char **argv)
923994if (time_to_abort )
924995{
925996/*
926- * We've been Ctrl-C'ed. That's not anerror, so exitwithout an
927- * errorcode.
997+ * We've been Ctrl-C'ed or reached an exitlimit condition. That's
998+ *not an error, so exit without an errorcode.
928999 */
9291000disconnect_and_exit (0 );
9301001}
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
9431014}
9441015}
9451016}
1017+
1018+ /*
1019+ * Fsync our output data, and send a feedback message to the server. Returns
1020+ * true if successful, false otherwise.
1021+ *
1022+ * If successful, *now is updated to the current timestamp just before sending
1023+ * feedback.
1024+ */
1025+ static bool
1026+ flushAndSendFeedback (PGconn * conn ,TimestampTz * now )
1027+ {
1028+ /* flush data to disk, so that we send a recent flush pointer */
1029+ if (!OutputFsync (* now ))
1030+ return false;
1031+ * now = feGetCurrentTimestamp ();
1032+ if (!sendFeedback (conn ,* now , true, false))
1033+ return false;
1034+
1035+ return true;
1036+ }
1037+
1038+ /*
1039+ * Try to inform the server about of upcoming demise, but don't wait around or
1040+ * retry on failure.
1041+ */
1042+ static void
1043+ prepareToTerminate (PGconn * conn ,XLogRecPtr endpos ,bool keepalive ,XLogRecPtr lsn )
1044+ {
1045+ (void )PQputCopyEnd (conn ,NULL );
1046+ (void )PQflush (conn );
1047+
1048+ if (verbose )
1049+ {
1050+ if (keepalive )
1051+ fprintf (stderr ,"%s: endpos %X/%X reached by keepalive\n" ,
1052+ progname ,
1053+ (uint32 ) (endpos >>32 ), (uint32 )endpos );
1054+ else
1055+ fprintf (stderr ,"%s: endpos %X/%X reached by record at %X/%X\n" ,
1056+ progname , (uint32 ) (endpos >>32 ), (uint32 ) (endpos ),
1057+ (uint32 ) (lsn >>32 ), (uint32 )lsn );
1058+
1059+ }
1060+ }