Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7c03078

Browse files
Add pg_recvlogical —-endpos=LSN
Allow pg_recvlogical to specify an ending LSN, complementingthe existing -—startpos=LSN option.Craig Ringer, reviewed by Euler Taveira and Naoki Okano
1 parent698127a commit7c03078

File tree

2 files changed

+164
-15
lines changed

2 files changed

+164
-15
lines changed

‎doc/src/sgml/ref/pg_recvlogical.sgml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ PostgreSQL documentation
3838
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
3939
replication (see <xref linkend="logicaldecoding">).
4040
</para>
41+
42+
<para>
43+
<command>pg_recvlogical</> has no equivalent to the logical decoding
44+
SQL interface's peek and get modes. It sends replay confirmations for
45+
data lazily as it receives it and on clean exit. To examine pending data on
46+
a slot without consuming it, use
47+
<link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
48+
</para>
4149
</refsect1>
4250

4351
<refsect1>
@@ -154,6 +162,32 @@ PostgreSQL documentation
154162
</listitem>
155163
</varlistentry>
156164

165+
<varlistentry>
166+
<term><option>-E <replaceable>lsn</replaceable></option></term>
167+
<term><option>--endpos=<replaceable>lsn</replaceable></option></term>
168+
<listitem>
169+
<para>
170+
In <option>--start</option> mode, automatically stop replication
171+
and exit with normal exit status 0 when receiving reaches the
172+
specified LSN. If specified when not in <option>--start</option>
173+
mode, an error is raised.
174+
</para>
175+
176+
<para>
177+
If there's a record with LSN exactly equal to <replaceable>lsn</>,
178+
the record will be output.
179+
</para>
180+
181+
<para>
182+
The <option>--endpos</option> option is not aware of transaction
183+
boundaries and may truncate output partway through a transaction.
184+
Any partially output transaction will not be consumed and will be
185+
replayed again when the slot is next read from. Individual messages
186+
are never truncated.
187+
</para>
188+
</listitem>
189+
</varlistentry>
190+
157191
<varlistentry>
158192
<term><option>--if-not-exists</option></term>
159193
<listitem>

‎src/bin/pg_basebackup/pg_recvlogical.c

Lines changed: 130 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ static intnoloop = 0;
4040
staticintstandby_message_timeout=10*1000;/* 10 sec = default */
4141
staticintfsync_interval=10*1000;/* 10 sec = default */
4242
staticXLogRecPtrstartpos=InvalidXLogRecPtr;
43+
staticXLogRecPtrendpos=InvalidXLogRecPtr;
4344
staticbooldo_create_slot= false;
4445
staticboolslot_exists_ok= false;
4546
staticbooldo_start_slot= false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
6364
staticvoidusage(void);
6465
staticvoidStreamLogicalLog(void);
6566
staticvoiddisconnect_and_exit(intcode);
67+
staticboolflushAndSendFeedback(PGconn*conn,TimestampTz*now);
68+
staticvoidprepareToTerminate(PGconn*conn,XLogRecPtrendpos,
69+
boolkeepalive,XLogRecPtrlsn);
6670

6771
staticvoid
6872
usage(void)
@@ -81,6 +85,7 @@ usage(void)
8185
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval /1000));
8286
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
8387
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
88+
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
8489
printf(_(" -n, --no-loop do not loop on connection lost\n"));
8590
printf(_(" -o, --option=NAME[=VALUE]\n"
8691
" pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
281286
intbytes_written;
282287
int64now;
283288
inthdr_len;
289+
XLogRecPtrcur_record_lsn=InvalidXLogRecPtr;
284290

285291
if (copybuf!=NULL)
286292
{
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
454460
intpos;
455461
boolreplyRequested;
456462
XLogRecPtrwalEnd;
463+
boolendposReached= false;
457464

458465
/*
459466
* Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
476483
}
477484
replyRequested=copybuf[pos];
478485

479-
/* If the server requested an immediate reply, send one. */
480-
if (replyRequested)
486+
if (endpos!=InvalidXLogRecPtr&&walEnd >=endpos)
481487
{
482-
/* fsync data, so we send a recent flush pointer */
483-
if (!OutputFsync(now))
484-
gotoerror;
488+
/*
489+
* If there's nothing to read on the socket until a keepalive
490+
* we know that the server has nothing to send us; and if
491+
* walEnd has passed endpos, we know nothing else can have
492+
* committed before endpos. So we can bail out now.
493+
*/
494+
endposReached= true;
495+
}
485496

486-
now=feGetCurrentTimestamp();
487-
if (!sendFeedback(conn,now, true, false))
497+
/* Send a reply, if necessary */
498+
if (replyRequested||endposReached)
499+
{
500+
if (!flushAndSendFeedback(conn,&now))
488501
gotoerror;
489502
last_status=now;
490503
}
504+
505+
if (endposReached)
506+
{
507+
prepareToTerminate(conn,endpos, true,InvalidXLogRecPtr);
508+
time_to_abort= true;
509+
break;
510+
}
511+
491512
continue;
492513
}
493514
elseif (copybuf[0]!='w')
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
497518
gotoerror;
498519
}
499520

500-
501521
/*
502522
* Read the header of the XLogData message, enclosed in the CopyData
503523
* message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
515535
}
516536

517537
/* Extract WAL location for this block */
518-
{
519-
XLogRecPtrtemp=fe_recvint64(&copybuf[1]);
538+
cur_record_lsn=fe_recvint64(&copybuf[1]);
520539

521-
output_written_lsn=Max(temp,output_written_lsn);
540+
if (endpos!=InvalidXLogRecPtr&&cur_record_lsn>endpos)
541+
{
542+
/*
543+
* We've read past our endpoint, so prepare to go away being
544+
* cautious about what happens to our output data.
545+
*/
546+
if (!flushAndSendFeedback(conn,&now))
547+
gotoerror;
548+
prepareToTerminate(conn,endpos, false,cur_record_lsn);
549+
time_to_abort= true;
550+
break;
522551
}
523552

553+
output_written_lsn=Max(cur_record_lsn,output_written_lsn);
554+
524555
bytes_left=r-hdr_len;
525556
bytes_written=0;
526557

@@ -557,10 +588,29 @@ StreamLogicalLog(void)
557588
strerror(errno));
558589
gotoerror;
559590
}
591+
592+
if (endpos!=InvalidXLogRecPtr&&cur_record_lsn==endpos)
593+
{
594+
/* endpos was exactly the record we just processed, we're done */
595+
if (!flushAndSendFeedback(conn,&now))
596+
gotoerror;
597+
prepareToTerminate(conn,endpos, false,cur_record_lsn);
598+
time_to_abort= true;
599+
break;
600+
}
560601
}
561602

562603
res=PQgetResult(conn);
563-
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
604+
if (PQresultStatus(res)==PGRES_COPY_OUT)
605+
{
606+
/*
607+
* We're doing a client-initiated clean exit and have sent CopyDone to
608+
* the server. We've already sent replay confirmation and fsync'd so
609+
* we can just clean up the connection now.
610+
*/
611+
gotoerror;
612+
}
613+
elseif (PQresultStatus(res)!=PGRES_COMMAND_OK)
564614
{
565615
fprintf(stderr,
566616
_("%s: unexpected termination of replication stream: %s"),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
638688
{"password",no_argument,NULL,'W'},
639689
/* replication options */
640690
{"startpos",required_argument,NULL,'I'},
691+
{"endpos",required_argument,NULL,'E'},
641692
{"option",required_argument,NULL,'o'},
642693
{"plugin",required_argument,NULL,'P'},
643694
{"status-interval",required_argument,NULL,'s'},
@@ -673,7 +724,7 @@ main(int argc, char **argv)
673724
}
674725
}
675726

676-
while ((c=getopt_long(argc,argv,"f:F:nvd:h:p:U:wWI:o:P:s:S:",
727+
while ((c=getopt_long(argc,argv,"f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
677728
long_options,&option_index))!=-1)
678729
{
679730
switch (c)
@@ -733,6 +784,16 @@ main(int argc, char **argv)
733784
}
734785
startpos= ((uint64)hi) <<32 |lo;
735786
break;
787+
case'E':
788+
if (sscanf(optarg,"%X/%X",&hi,&lo)!=2)
789+
{
790+
fprintf(stderr,
791+
_("%s: could not parse end position \"%s\"\n"),
792+
progname,optarg);
793+
exit(1);
794+
}
795+
endpos= ((uint64)hi) <<32 |lo;
796+
break;
736797
case'o':
737798
{
738799
char*data=pg_strdup(optarg);
@@ -857,6 +918,16 @@ main(int argc, char **argv)
857918
exit(1);
858919
}
859920

921+
if (endpos!=InvalidXLogRecPtr&& !do_start_slot)
922+
{
923+
fprintf(stderr,
924+
_("%s: --endpos may only be specified with --start\n"),
925+
progname);
926+
fprintf(stderr,_("Try \"%s --help\" for more information.\n"),
927+
progname);
928+
exit(1);
929+
}
930+
860931
#ifndefWIN32
861932
pqsignal(SIGINT,sigint_handler);
862933
pqsignal(SIGHUP,sighup_handler);
@@ -923,8 +994,8 @@ main(int argc, char **argv)
923994
if (time_to_abort)
924995
{
925996
/*
926-
* We've been Ctrl-C'ed. That's not anerror, soexitwithout an
927-
* errorcode.
997+
* We've been Ctrl-C'ed or reached an exitlimit condition. That's
998+
*not an error, so exit without anerrorcode.
928999
*/
9291000
disconnect_and_exit(0);
9301001
}
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
9431014
}
9441015
}
9451016
}
1017+
1018+
/*
1019+
* Fsync our output data, and send a feedback message to the server. Returns
1020+
* true if successful, false otherwise.
1021+
*
1022+
* If successful, *now is updated to the current timestamp just before sending
1023+
* feedback.
1024+
*/
1025+
staticbool
1026+
flushAndSendFeedback(PGconn*conn,TimestampTz*now)
1027+
{
1028+
/* flush data to disk, so that we send a recent flush pointer */
1029+
if (!OutputFsync(*now))
1030+
return false;
1031+
*now=feGetCurrentTimestamp();
1032+
if (!sendFeedback(conn,*now, true, false))
1033+
return false;
1034+
1035+
return true;
1036+
}
1037+
1038+
/*
1039+
* Try to inform the server about of upcoming demise, but don't wait around or
1040+
* retry on failure.
1041+
*/
1042+
staticvoid
1043+
prepareToTerminate(PGconn*conn,XLogRecPtrendpos,boolkeepalive,XLogRecPtrlsn)
1044+
{
1045+
(void)PQputCopyEnd(conn,NULL);
1046+
(void)PQflush(conn);
1047+
1048+
if (verbose)
1049+
{
1050+
if (keepalive)
1051+
fprintf(stderr,"%s: endpos %X/%X reached by keepalive\n",
1052+
progname,
1053+
(uint32) (endpos >>32), (uint32)endpos);
1054+
else
1055+
fprintf(stderr,"%s: endpos %X/%X reached by record at %X/%X\n",
1056+
progname, (uint32) (endpos >>32), (uint32) (endpos),
1057+
(uint32) (lsn >>32), (uint32)lsn);
1058+
1059+
}
1060+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp