Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0b63291

Browse files
committed
Make pg_receivexlog and pg_basebackup -X stream work across timeline switches.
This mirrors the changes done earlier to the server in standby mode. Whenreceivelog reaches the end of a timeline, as reported by the server, itfetches the timeline history file of the next timeline, and restartsstreaming from the new timeline by issuing a new START_STREAMING command.When pg_receivexlog crosses a timeline, it leaves the .partial suffix on thelast segment on the old timeline. This helps you to tell apart a partialsegment left in the directory because of a timeline switch, and a completedsegment. If you just follow a single server, it won't make a difference, butit can be significant in more complicated scenarios where new WAL is stillgenerated on the old timeline.This includes two small changes to the streaming replication protocol:First, when you reach the end of timeline while streaming, the server nowsends the TLI of the next timeline in the server's history to the client.pg_receivexlog uses that as the next timeline, so that it doesn't need toparse the timeline history file like a standby server does. Second, whenBASE_BACKUP command sends the begin and end WAL positions, it now also sendsthe timeline IDs corresponding the positions.
1 parent8ae35e9 commit0b63291

File tree

12 files changed

+677
-290
lines changed

12 files changed

+677
-290
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,8 +1418,10 @@ The commands accepted in walsender mode are:
14181418
<para>
14191419
After streaming all the WAL on a timeline that is not the latest one,
14201420
the server will end streaming by exiting the COPY mode. When the client
1421-
acknowledges this by also exiting COPY mode, the server responds with a
1422-
CommandComplete message, and is ready to accept a new command.
1421+
acknowledges this by also exiting COPY mode, the server sends a
1422+
single-row, single-column result set indicating the next timeline in
1423+
this server's history. That is followed by a CommandComplete message,
1424+
and the server is ready to accept a new command.
14231425
</para>
14241426

14251427
<para>
@@ -1784,7 +1786,9 @@ The commands accepted in walsender mode are:
17841786
</para>
17851787
<para>
17861788
The first ordinary result set contains the starting position of the
1787-
backup, given in XLogRecPtr format as a single column in a single row.
1789+
backup, in a single row with two columns. The first column contains
1790+
the start position given in XLogRecPtr format, and the second column
1791+
contains the corresponding timeline ID.
17881792
</para>
17891793
<para>
17901794
The second ordinary result set has one row for each tablespace.
@@ -1827,7 +1831,9 @@ The commands accepted in walsender mode are:
18271831
<quote>ustar interchange format</> specified in the POSIX 1003.1-2008
18281832
standard) dump of the tablespace contents, except that the two trailing
18291833
blocks of zeroes specified in the standard are omitted.
1830-
After the tar data is complete, a final ordinary result set will be sent.
1834+
After the tar data is complete, a final ordinary result set will be sent,
1835+
containing the WAL end position of the backup, in the same format as
1836+
the start position.
18311837
</para>
18321838

18331839
<para>

‎src/backend/access/transam/timeline.c

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -545,22 +545,26 @@ tliOfPointInHistory(XLogRecPtr ptr, List *history)
545545
}
546546

547547
/*
548-
* Returns the point in history where we branched off the given timeline.
549-
*Returns InvalidXLogRecPtr ifthe timelineis current (= we have not
550-
*branched off from it), and throws an error if the timeline is not part of
551-
* this server's history.
548+
* Returns the point in history where we branched off the given timeline,
549+
*andthe timelinewe branched to (*nextTLI). Returns InvalidXLogRecPtr if
550+
*the timeline is current, ie. we have not branched off from it, and throws
551+
*an error if the timeline is not part ofthis server's history.
552552
*/
553553
XLogRecPtr
554-
tliSwitchPoint(TimeLineIDtli,List*history)
554+
tliSwitchPoint(TimeLineIDtli,List*history,TimeLineID*nextTLI)
555555
{
556556
ListCell*cell;
557557

558+
if (nextTLI)
559+
*nextTLI=0;
558560
foreach (cell,history)
559561
{
560562
TimeLineHistoryEntry*tle= (TimeLineHistoryEntry*)lfirst(cell);
561563

562564
if (tle->tli==tli)
563565
returntle->end;
566+
if (nextTLI)
567+
*nextTLI=tle->tli;
564568
}
565569

566570
ereport(ERROR,

‎src/backend/access/transam/xlog.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4930,7 +4930,7 @@ StartupXLOG(void)
49304930
* tliSwitchPoint will throw an error if the checkpoint's timeline
49314931
* is not in expectedTLEs at all.
49324932
*/
4933-
switchpoint=tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID,expectedTLEs);
4933+
switchpoint=tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID,expectedTLEs,NULL);
49344934
ereport(FATAL,
49354935
(errmsg("requested timeline %u is not a child of this server's history",
49364936
recoveryTargetTLI),
@@ -7870,16 +7870,21 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno)
78707870
* non-exclusive backups active at the same time, and they don't conflict
78717871
* with an exclusive backup either.
78727872
*
7873+
* Returns the minimum WAL position that must be present to restore from this
7874+
* backup, and the corresponding timeline ID in *starttli_p.
7875+
*
78737876
* Every successfully started non-exclusive backup must be stopped by calling
78747877
* do_pg_stop_backup() or do_pg_abort_backup().
78757878
*/
78767879
XLogRecPtr
7877-
do_pg_start_backup(constchar*backupidstr,boolfast,char**labelfile)
7880+
do_pg_start_backup(constchar*backupidstr,boolfast,TimeLineID*starttli_p,
7881+
char**labelfile)
78787882
{
78797883
boolexclusive= (labelfile==NULL);
78807884
boolbackup_started_in_recovery= false;
78817885
XLogRecPtrcheckpointloc;
78827886
XLogRecPtrstartpoint;
7887+
TimeLineIDstarttli;
78837888
pg_time_tstamp_time;
78847889
charstrfbuf[128];
78857890
charxlogfilename[MAXFNAMELEN];
@@ -8021,6 +8026,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
80218026
LWLockAcquire(ControlFileLock,LW_SHARED);
80228027
checkpointloc=ControlFile->checkPoint;
80238028
startpoint=ControlFile->checkPointCopy.redo;
8029+
starttli=ControlFile->checkPointCopy.ThisTimeLineID;
80248030
checkpointfpw=ControlFile->checkPointCopy.fullPageWrites;
80258031
LWLockRelease(ControlFileLock);
80268032

@@ -8154,6 +8160,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
81548160
/*
81558161
* We're done. As a convenience, return the starting WAL location.
81568162
*/
8163+
if (starttli_p)
8164+
*starttli_p=starttli;
81578165
returnstartpoint;
81588166
}
81598167

@@ -8190,14 +8198,18 @@ pg_start_backup_callback(int code, Datum arg)
81908198
81918199
* If labelfile is NULL, this stops an exclusive backup. Otherwise this stops
81928200
* the non-exclusive backup specified by 'labelfile'.
8201+
*
8202+
* Returns the last WAL position that must be present to restore from this
8203+
* backup, and the corresponding timeline ID in *stoptli_p.
81938204
*/
81948205
XLogRecPtr
8195-
do_pg_stop_backup(char*labelfile,boolwaitforarchive)
8206+
do_pg_stop_backup(char*labelfile,boolwaitforarchive,TimeLineID*stoptli_p)
81968207
{
81978208
boolexclusive= (labelfile==NULL);
81988209
boolbackup_started_in_recovery= false;
81998210
XLogRecPtrstartpoint;
82008211
XLogRecPtrstoppoint;
8212+
TimeLineIDstoptli;
82018213
XLogRecDatardata;
82028214
pg_time_tstamp_time;
82038215
charstrfbuf[128];
@@ -8401,8 +8413,11 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
84018413

84028414
LWLockAcquire(ControlFileLock,LW_SHARED);
84038415
stoppoint=ControlFile->minRecoveryPoint;
8416+
stoptli=ControlFile->minRecoveryPointTLI;
84048417
LWLockRelease(ControlFileLock);
84058418

8419+
if (stoptli_p)
8420+
*stoptli_p=stoptli;
84068421
returnstoppoint;
84078422
}
84088423

@@ -8414,6 +8429,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
84148429
rdata.buffer=InvalidBuffer;
84158430
rdata.next=NULL;
84168431
stoppoint=XLogInsert(RM_XLOG_ID,XLOG_BACKUP_END,&rdata);
8432+
stoptli=ThisTimeLineID;
84178433

84188434
/*
84198435
* Force a switch to a new xlog segment file, so that the backup is valid
@@ -8529,6 +8545,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
85298545
/*
85308546
* We're done. As a convenience, return the ending WAL location.
85318547
*/
8548+
if (stoptli_p)
8549+
*stoptli_p=stoptli;
85328550
returnstoppoint;
85338551
}
85348552

‎src/backend/access/transam/xlogfuncs.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pg_start_backup(PG_FUNCTION_ARGS)
5656

5757
backupidstr=text_to_cstring(backupid);
5858

59-
startpoint=do_pg_start_backup(backupidstr,fast,NULL);
59+
startpoint=do_pg_start_backup(backupidstr,fast,NULL,NULL);
6060

6161
snprintf(startxlogstr,sizeof(startxlogstr),"%X/%X",
6262
(uint32) (startpoint >>32), (uint32)startpoint);
@@ -82,7 +82,7 @@ pg_stop_backup(PG_FUNCTION_ARGS)
8282
XLogRecPtrstoppoint;
8383
charstopxlogstr[MAXFNAMELEN];
8484

85-
stoppoint=do_pg_stop_backup(NULL, true);
85+
stoppoint=do_pg_stop_backup(NULL, true,NULL);
8686

8787
snprintf(stopxlogstr,sizeof(stopxlogstr),"%X/%X",
8888
(uint32) (stoppoint >>32), (uint32)stoppoint);

‎src/backend/replication/basebackup.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ static void SendBackupHeader(List *tablespaces);
5555
staticvoidbase_backup_cleanup(intcode,Datumarg);
5656
staticvoidperform_base_backup(basebackup_options*opt,DIR*tblspcdir);
5757
staticvoidparse_basebackup_options(List*options,basebackup_options*opt);
58-
staticvoidSendXlogRecPtrResult(XLogRecPtrptr);
58+
staticvoidSendXlogRecPtrResult(XLogRecPtrptr,TimeLineIDtli);
5959
staticintcompareWalFileNames(constvoid*a,constvoid*b);
6060

6161
/* Was the backup currently in-progress initiated in recovery mode? */
@@ -94,13 +94,16 @@ static void
9494
perform_base_backup(basebackup_options*opt,DIR*tblspcdir)
9595
{
9696
XLogRecPtrstartptr;
97+
TimeLineIDstarttli;
9798
XLogRecPtrendptr;
99+
TimeLineIDendtli;
98100
char*labelfile;
99101

100102
backup_started_in_recovery=RecoveryInProgress();
101103

102-
startptr=do_pg_start_backup(opt->label,opt->fastcheckpoint,&labelfile);
103-
SendXlogRecPtrResult(startptr);
104+
startptr=do_pg_start_backup(opt->label,opt->fastcheckpoint,&starttli,
105+
&labelfile);
106+
SendXlogRecPtrResult(startptr,starttli);
104107

105108
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum)0);
106109
{
@@ -218,7 +221,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
218221
}
219222
PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum)0);
220223

221-
endptr=do_pg_stop_backup(labelfile, !opt->nowait);
224+
endptr=do_pg_stop_backup(labelfile, !opt->nowait,&endtli);
222225

223226
if (opt->includewal)
224227
{
@@ -426,7 +429,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
426429
/* Send CopyDone message for the last tar file */
427430
pq_putemptymessage('c');
428431
}
429-
SendXlogRecPtrResult(endptr);
432+
SendXlogRecPtrResult(endptr,endtli);
430433
}
431434

432435
/*
@@ -635,29 +638,45 @@ SendBackupHeader(List *tablespaces)
635638
* XlogRecPtr record (in text format)
636639
*/
637640
staticvoid
638-
SendXlogRecPtrResult(XLogRecPtrptr)
641+
SendXlogRecPtrResult(XLogRecPtrptr,TimeLineIDtli)
639642
{
640643
StringInfoDatabuf;
641644
charstr[MAXFNAMELEN];
642645

643-
snprintf(str,sizeof(str),"%X/%X", (uint32) (ptr >>32), (uint32)ptr);
644-
645646
pq_beginmessage(&buf,'T');/* RowDescription */
646-
pq_sendint(&buf,1,2);/*1 field */
647+
pq_sendint(&buf,2,2);/*2 fields */
647648

648-
/* Fieldheader */
649+
/* Fieldheaders */
649650
pq_sendstring(&buf,"recptr");
650651
pq_sendint(&buf,0,4);/* table oid */
651652
pq_sendint(&buf,0,2);/* attnum */
652653
pq_sendint(&buf,TEXTOID,4);/* type oid */
653654
pq_sendint(&buf,-1,2);
654655
pq_sendint(&buf,0,4);
655656
pq_sendint(&buf,0,2);
657+
658+
pq_sendstring(&buf,"tli");
659+
pq_sendint(&buf,0,4);/* table oid */
660+
pq_sendint(&buf,0,2);/* attnum */
661+
/*
662+
* int8 may seem like a surprising data type for this, but in thory int4
663+
* would not be wide enough for this, as TimeLineID is unsigned.
664+
*/
665+
pq_sendint(&buf,INT8OID,4);/* type oid */
666+
pq_sendint(&buf,-1,2);
667+
pq_sendint(&buf,0,4);
668+
pq_sendint(&buf,0,2);
656669
pq_endmessage(&buf);
657670

658671
/* Data row */
659672
pq_beginmessage(&buf,'D');
660-
pq_sendint(&buf,1,2);/* number of columns */
673+
pq_sendint(&buf,2,2);/* number of columns */
674+
675+
snprintf(str,sizeof(str),"%X/%X", (uint32) (ptr >>32), (uint32)ptr);
676+
pq_sendint(&buf,strlen(str),4);/* length */
677+
pq_sendbytes(&buf,str,strlen(str));
678+
679+
snprintf(str,sizeof(str),"%u",tli);
661680
pq_sendint(&buf,strlen(str),4);/* length */
662681
pq_sendbytes(&buf,str,strlen(str));
663682
pq_endmessage(&buf);

‎src/backend/replication/walsender.c

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ static uint32 sendOff = 0;
117117
* history forked off from that timeline at sendTimeLineValidUpto.
118118
*/
119119
staticTimeLineIDsendTimeLine=0;
120+
staticTimeLineIDsendTimeLineNextTLI=0;
120121
staticboolsendTimeLineIsHistoric= false;
121122
staticXLogRecPtrsendTimeLineValidUpto=InvalidXLogRecPtr;
122123

@@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd)
449450
* requested start location is on that timeline.
450451
*/
451452
timeLineHistory=readTimeLineHistory(ThisTimeLineID);
452-
switchpoint=tliSwitchPoint(cmd->timeline,timeLineHistory);
453+
switchpoint=tliSwitchPoint(cmd->timeline,timeLineHistory,
454+
&sendTimeLineNextTLI);
453455
list_free_deep(timeLineHistory);
454456

455457
/*
@@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd)
496498
streamingDoneSending=streamingDoneReceiving= false;
497499

498500
/* If there is nothing to stream, don't even enter COPY mode */
499-
if (!sendTimeLineIsHistoric||
500-
cmd->startpoint<sendTimeLineValidUpto)
501+
if (!sendTimeLineIsHistoric||cmd->startpoint<sendTimeLineValidUpto)
501502
{
502503
/*
503504
* When we first start replication the standby will be behind the primary.
@@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd)
554555
if (walsender_ready_to_stop)
555556
proc_exit(0);
556557
WalSndSetState(WALSNDSTATE_STARTUP);
558+
559+
Assert(streamingDoneSending&&streamingDoneReceiving);
560+
}
561+
562+
/*
563+
* Copy is finished now. Send a single-row result set indicating the next
564+
* timeline.
565+
*/
566+
if (sendTimeLineIsHistoric)
567+
{
568+
charstr[11];
569+
snprintf(str,sizeof(str),"%u",sendTimeLineNextTLI);
570+
571+
pq_beginmessage(&buf,'T');/* RowDescription */
572+
pq_sendint(&buf,1,2);/* 1 field */
573+
574+
/* Field header */
575+
pq_sendstring(&buf,"next_tli");
576+
pq_sendint(&buf,0,4);/* table oid */
577+
pq_sendint(&buf,0,2);/* attnum */
578+
/*
579+
* int8 may seem like a surprising data type for this, but in theory
580+
* int4 would not be wide enough for this, as TimeLineID is unsigned.
581+
*/
582+
pq_sendint(&buf,INT8OID,4);/* type oid */
583+
pq_sendint(&buf,-1,2);
584+
pq_sendint(&buf,0,4);
585+
pq_sendint(&buf,0,2);
586+
pq_endmessage(&buf);
587+
588+
/* Data row */
589+
pq_beginmessage(&buf,'D');
590+
pq_sendint(&buf,1,2);/* number of columns */
591+
pq_sendint(&buf,strlen(str),4);/* length */
592+
pq_sendbytes(&buf,str,strlen(str));
593+
pq_endmessage(&buf);
557594
}
558595

559-
/*Get out of COPY mode (CommandComplete). */
560-
EndCommand("COPY 0",DestRemote);
596+
/*SendCommandComplete message */
597+
pq_puttextmessage('C',"START_STREAMING");
561598
}
562599

563600
/*
@@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup)
13771414
List*history;
13781415

13791416
history=readTimeLineHistory(ThisTimeLineID);
1380-
sendTimeLineValidUpto=tliSwitchPoint(sendTimeLine,history);
1417+
sendTimeLineValidUpto=tliSwitchPoint(sendTimeLine,history,&sendTimeLineNextTLI);
13811418
Assert(sentPtr <=sendTimeLineValidUpto);
1419+
Assert(sendTimeLine<sendTimeLineNextTLI);
13821420
list_free_deep(history);
13831421

13841422
/* the current send pointer should be <= the switchpoint */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp