Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2c0a485

Browse files
committed
Prevent WAL files created by pg_basebackup -x/X from being archived again.
WAL (and timeline history) files created by pg_basebackup did notmaintain the new base backup's archive status. That's currently not aproblem if the new node is used as a standby - but if that node ispromoted all still existing files can get archived again. With a highwal_keep_segment settings that can happen a significant time later -which is quite confusing.Change both the backend (for the -x/-X fetch case) and pg_basebackup(for -X stream) itself to always mark WAL/timeline files included inthe base backup as .done. That's in line with walreceiver.c doing so.The verbosity of the pg_basebackup changes show pretty clearly that itneeds some refactoring, but that'd result in not be backpatchablechanges.Backpatch to 9.1 where pg_basebackup was introduced.Discussion: 20141205002854.GE21964@awork2.anarazel.de
1 parentccb161b commit2c0a485

File tree

5 files changed

+127
-32
lines changed

5 files changed

+127
-32
lines changed

‎src/backend/replication/basebackup.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
471471
errmsg("unexpected WAL file size \"%s\"",walFiles[i])));
472472
}
473473

474+
/* send the WAL file itself */
474475
_tarWriteHeader(pathbuf,NULL,&statbuf);
475476

476477
while ((cnt=fread(buf,1,Min(sizeof(buf),XLogSegSize-len),fp))>0)
@@ -497,7 +498,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
497498
}
498499

499500
/* XLogSegSize is a multiple of 512, so no need for padding */
501+
500502
FreeFile(fp);
503+
504+
/*
505+
* Mark file as archived, otherwise files can get archived again
506+
* after promotion of a new node. This is in line with
507+
* walreceiver.c always doing a XLogArchiveForceDone() after a
508+
* complete segment.
509+
*/
510+
StatusFilePath(pathbuf,walFiles[i],".done");
511+
sendFileWithContent(pathbuf,"");
501512
}
502513

503514
/*
@@ -521,6 +532,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
521532
errmsg("could not stat file \"%s\": %m",pathbuf)));
522533

523534
sendFile(pathbuf,pathbuf,&statbuf, false);
535+
536+
/* unconditionally mark file as archived */
537+
StatusFilePath(pathbuf,fname,".done");
538+
sendFileWithContent(pathbuf,"");
524539
}
525540

526541
/* Send CopyDone message for the last tar file */
@@ -1021,6 +1036,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
10211036
_tarWriteHeader(pathbuf+basepathlen+1,NULL,&statbuf);
10221037
}
10231038
size+=512;/* Size of the header just added */
1039+
1040+
/*
1041+
* Also send archive_status directory (by hackishly reusing
1042+
* statbuf from above ...).
1043+
*/
1044+
if (!sizeonly)
1045+
_tarWriteHeader("./pg_xlog/archive_status",NULL,&statbuf);
1046+
size+=512;/* Size of the header just added */
1047+
10241048
continue;/* don't recurse into pg_xlog */
10251049
}
10261050

‎src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include<zlib.h>
2626
#endif
2727

28+
#include"common/string.h"
2829
#include"getopt_long.h"
2930
#include"libpq-fe.h"
3031
#include"pqexpbuffer.h"
@@ -370,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
370371
if (!ReceiveXlogStream(param->bgconn,param->startptr,param->timeline,
371372
param->sysidentifier,param->xlogdir,
372373
reached_end_position,standby_message_timeout,
373-
NULL, false))
374+
NULL, false, true))
374375

375376
/*
376377
* Any errors will already have been reported in the function process,
@@ -394,6 +395,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
394395
logstreamer_param*param;
395396
uint32hi,
396397
lo;
398+
charstatusdir[MAXPGPATH];
397399

398400
param=pg_malloc0(sizeof(logstreamer_param));
399401
param->timeline=timeline;
@@ -428,13 +430,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
428430
/* Error message already written in GetConnection() */
429431
exit(1);
430432

433+
snprintf(param->xlogdir,sizeof(param->xlogdir),"%s/pg_xlog",basedir);
434+
431435
/*
432-
*Always in plain format,so we can write to basedir/pg_xlog. But the
433-
* directory entry in the tar file may arrive later, so make sure it's
434-
*created before we start.
436+
*Create pg_xlog/archive_status (and thus pg_xlog)so we cancanwrite to
437+
*basedir/pg_xlog as thedirectory entry in the tar file may arrive
438+
*later.
435439
*/
436-
snprintf(param->xlogdir,sizeof(param->xlogdir),"%s/pg_xlog",basedir);
437-
verify_dir_is_empty_or_create(param->xlogdir);
440+
snprintf(statusdir,sizeof(statusdir),"%s/pg_xlog/archive_status",
441+
basedir);
442+
443+
if (pg_mkdir_p(statusdir,S_IRWXU)!=0&&errno!=EEXIST)
444+
{
445+
fprintf(stderr,
446+
_("%s: could not create directory \"%s\": %s\n"),
447+
progname,statusdir,strerror(errno));
448+
disconnect_and_exit(1);
449+
}
438450

439451
/*
440452
* Start a child process and tell it to start streaming. On Unix, this is
@@ -1236,11 +1248,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
12361248
* by the wal receiver process. Also, when transaction
12371249
* log directory location was specified, pg_xlog has
12381250
* already been created as a symbolic link before
1239-
* starting the actual backup. So just ignorefailure
1240-
* onthem.
1251+
* starting the actual backup. So just ignorecreation
1252+
*failuresonrelated directories.
12411253
*/
1242-
if ((!streamwal&& (strcmp(xlog_dir,"")==0))
1243-
||strcmp(filename+strlen(filename)-8,"/pg_xlog")!=0)
1254+
if (!((pg_str_endswith(filename,"/pg_xlog")||
1255+
pg_str_endswith(filename,"/archive_status"))&&
1256+
errno==EEXIST))
12441257
{
12451258
fprintf(stderr,
12461259
_("%s: could not create directory \"%s\": %s\n"),

‎src/bin/pg_basebackup/pg_receivexlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ StreamLog(void)
342342

343343
ReceiveXlogStream(conn,startpos,starttli,NULL,basedir,
344344
stop_streaming,standby_message_timeout,".partial",
345-
synchronous);
345+
synchronous, false);
346346

347347
PQfinish(conn);
348348
conn=NULL;

‎src/bin/pg_basebackup/receivelog.c

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,58 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
3737
uint32timeline,char*basedir,
3838
stream_stop_callbackstream_stop,intstandby_message_timeout,
3939
char*partial_suffix,XLogRecPtr*stoppos,
40-
boolsynchronous);
40+
boolsynchronous,boolmark_done);
4141
staticintCopyStreamPoll(PGconn*conn,longtimeout_ms);
4242
staticintCopyStreamReceive(PGconn*conn,longtimeout,char**buffer);
4343
staticboolProcessKeepaliveMsg(PGconn*conn,char*copybuf,intlen,
4444
XLogRecPtrblockpos,int64*last_status);
4545
staticboolProcessXLogDataMsg(PGconn*conn,char*copybuf,intlen,
4646
XLogRecPtr*blockpos,uint32timeline,
4747
char*basedir,stream_stop_callbackstream_stop,
48-
char*partial_suffix);
48+
char*partial_suffix,boolmark_done);
4949
staticPGresult*HandleEndOfCopyStream(PGconn*conn,char*copybuf,
5050
XLogRecPtrblockpos,char*basedir,char*partial_suffix,
51-
XLogRecPtr*stoppos);
51+
XLogRecPtr*stoppos,boolmark_done);
5252
staticboolCheckCopyStreamStop(PGconn*conn,XLogRecPtrblockpos,
5353
uint32timeline,char*basedir,
5454
stream_stop_callbackstream_stop,
55-
char*partial_suffix,XLogRecPtr*stoppos);
55+
char*partial_suffix,XLogRecPtr*stoppos,
56+
boolmark_done);
5657
staticlongCalculateCopyStreamSleeptime(int64now,intstandby_message_timeout,
5758
int64last_status);
5859

5960
staticboolReadEndOfStreamingResult(PGresult*res,XLogRecPtr*startpos,
6061
uint32*timeline);
6162

63+
staticbool
64+
mark_file_as_archived(constchar*basedir,constchar*fname)
65+
{
66+
intfd;
67+
staticchartmppath[MAXPGPATH];
68+
69+
snprintf(tmppath,sizeof(tmppath),"%s/archive_status/%s.done",
70+
basedir,fname);
71+
72+
fd=open(tmppath,O_WRONLY |O_CREAT |PG_BINARY,S_IRUSR |S_IWUSR);
73+
if (fd<0)
74+
{
75+
fprintf(stderr,_("%s: could not create archive status file \"%s\": %s\n"),
76+
progname,tmppath,strerror(errno));
77+
return false;
78+
}
79+
80+
if (fsync(fd)!=0)
81+
{
82+
fprintf(stderr,_("%s: could not fsync file \"%s\": %s\n"),
83+
progname,tmppath,strerror(errno));
84+
return false;
85+
}
86+
87+
close(fd);
88+
89+
return true;
90+
}
91+
6292
/*
6393
* Open a new WAL file in the specified directory.
6494
*
@@ -152,7 +182,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
152182
* and returns false, otherwise returns true.
153183
*/
154184
staticbool
155-
close_walfile(char*basedir,char*partial_suffix,XLogRecPtrpos)
185+
close_walfile(char*basedir,char*partial_suffix,XLogRecPtrpos,boolmark_done)
156186
{
157187
off_tcurrpos;
158188

@@ -206,6 +236,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
206236
_("%s: not renaming \"%s%s\", segment is not complete\n"),
207237
progname,current_walfile_name,partial_suffix);
208238

239+
/*
240+
* Mark file as archived if requested by the caller - pg_basebackup needs
241+
* to do so as files can otherwise get archived again after promotion of a
242+
* new node. This is in line with walreceiver.c always doing a
243+
* XLogArchiveForceDone() after a complete segment.
244+
*/
245+
if (currpos==XLOG_SEG_SIZE&&mark_done)
246+
{
247+
/* writes error message if failed */
248+
if (!mark_file_as_archived(basedir,current_walfile_name))
249+
return false;
250+
}
251+
209252
lastFlushPosition=pos;
210253
return true;
211254
}
@@ -248,7 +291,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
248291
}
249292

250293
staticbool
251-
writeTimeLineHistoryFile(char*basedir,TimeLineIDtli,char*filename,char*content)
294+
writeTimeLineHistoryFile(char*basedir,TimeLineIDtli,char*filename,
295+
char*content,boolmark_done)
252296
{
253297
intsize=strlen(content);
254298
charpath[MAXPGPATH];
@@ -327,6 +371,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
327371
return false;
328372
}
329373

374+
/* Maintain archive_status, check close_walfile() for details. */
375+
if (mark_done)
376+
{
377+
/* writes error message if failed */
378+
if (!mark_file_as_archived(basedir,histfname))
379+
return false;
380+
}
381+
330382
return true;
331383
}
332384

@@ -447,7 +499,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
447499
char*sysidentifier,char*basedir,
448500
stream_stop_callbackstream_stop,
449501
intstandby_message_timeout,char*partial_suffix,
450-
boolsynchronous)
502+
boolsynchronous,boolmark_done)
451503
{
452504
charquery[128];
453505
charslotcmd[128];
@@ -562,7 +614,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
562614
/* Write the history file to disk */
563615
writeTimeLineHistoryFile(basedir,timeline,
564616
PQgetvalue(res,0,0),
565-
PQgetvalue(res,0,1));
617+
PQgetvalue(res,0,1),
618+
mark_done);
566619

567620
PQclear(res);
568621
}
@@ -592,7 +645,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
592645
/* Stream the WAL */
593646
res=HandleCopyStream(conn,startpos,timeline,basedir,stream_stop,
594647
standby_message_timeout,partial_suffix,
595-
&stoppos,synchronous);
648+
&stoppos,synchronous,mark_done);
596649
if (res==NULL)
597650
gotoerror;
598651

@@ -757,7 +810,7 @@ static PGresult *
757810
HandleCopyStream(PGconn*conn,XLogRecPtrstartpos,uint32timeline,
758811
char*basedir,stream_stop_callbackstream_stop,
759812
intstandby_message_timeout,char*partial_suffix,
760-
XLogRecPtr*stoppos,boolsynchronous)
813+
XLogRecPtr*stoppos,boolsynchronous,boolmark_done)
761814
{
762815
char*copybuf=NULL;
763816
int64last_status=-1;
@@ -775,7 +828,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
775828
* Check if we should continue streaming, or abort at this point.
776829
*/
777830
if (!CheckCopyStreamStop(conn,blockpos,timeline,basedir,
778-
stream_stop,partial_suffix,stoppos))
831+
stream_stop,partial_suffix,stoppos,
832+
mark_done))
779833
gotoerror;
780834

781835
now=feGetCurrentTimestamp();
@@ -830,7 +884,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
830884
if (r==-2)
831885
{
832886
PGresult*res=HandleEndOfCopyStream(conn,copybuf,blockpos,
833-
basedir,partial_suffix,stoppos);
887+
basedir,partial_suffix,
888+
stoppos,mark_done);
834889
if (res==NULL)
835890
gotoerror;
836891
else
@@ -847,14 +902,16 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
847902
elseif (copybuf[0]=='w')
848903
{
849904
if (!ProcessXLogDataMsg(conn,copybuf,r,&blockpos,
850-
timeline,basedir,stream_stop,partial_suffix))
905+
timeline,basedir,stream_stop,
906+
partial_suffix, true))
851907
gotoerror;
852908

853909
/*
854910
* Check if we should continue streaming, or abort at this point.
855911
*/
856912
if (!CheckCopyStreamStop(conn,blockpos,timeline,basedir,
857-
stream_stop,partial_suffix,stoppos))
913+
stream_stop,partial_suffix,stoppos,
914+
mark_done))
858915
gotoerror;
859916
}
860917
else
@@ -1055,7 +1112,7 @@ static bool
10551112
ProcessXLogDataMsg(PGconn*conn,char*copybuf,intlen,
10561113
XLogRecPtr*blockpos,uint32timeline,
10571114
char*basedir,stream_stop_callbackstream_stop,
1058-
char*partial_suffix)
1115+
char*partial_suffix,boolmark_done)
10591116
{
10601117
intxlogoff;
10611118
intbytes_left;
@@ -1163,7 +1220,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
11631220
/* Did we reach the end of a WAL segment? */
11641221
if (*blockpos %XLOG_SEG_SIZE==0)
11651222
{
1166-
if (!close_walfile(basedir,partial_suffix,*blockpos))
1223+
if (!close_walfile(basedir,partial_suffix,*blockpos,mark_done))
11671224
/* Error message written in close_walfile() */
11681225
return false;
11691226

@@ -1193,7 +1250,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
11931250
staticPGresult*
11941251
HandleEndOfCopyStream(PGconn*conn,char*copybuf,
11951252
XLogRecPtrblockpos,char*basedir,char*partial_suffix,
1196-
XLogRecPtr*stoppos)
1253+
XLogRecPtr*stoppos,boolmark_done)
11971254
{
11981255
PGresult*res=PQgetResult(conn);
11991256

@@ -1204,7 +1261,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
12041261
*/
12051262
if (still_sending)
12061263
{
1207-
if (!close_walfile(basedir,partial_suffix,blockpos))
1264+
if (!close_walfile(basedir,partial_suffix,blockpos,mark_done))
12081265
{
12091266
/* Error message written in close_walfile() */
12101267
PQclear(res);
@@ -1236,11 +1293,11 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
12361293
staticbool
12371294
CheckCopyStreamStop(PGconn*conn,XLogRecPtrblockpos,uint32timeline,
12381295
char*basedir,stream_stop_callbackstream_stop,
1239-
char*partial_suffix,XLogRecPtr*stoppos)
1296+
char*partial_suffix,XLogRecPtr*stoppos,boolmark_done)
12401297
{
12411298
if (still_sending&&stream_stop(blockpos,timeline, false))
12421299
{
1243-
if (!close_walfile(basedir,partial_suffix,blockpos))
1300+
if (!close_walfile(basedir,partial_suffix,blockpos,mark_done))
12441301
{
12451302
/* Potential error message is written by close_walfile */
12461303
return false;

‎src/bin/pg_basebackup/receivelog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ extern bool ReceiveXlogStream(PGconn *conn,
3131
stream_stop_callbackstream_stop,
3232
intstandby_message_timeout,
3333
char*partial_suffix,
34-
boolsynchronous);
34+
boolsynchronous,
35+
boolmark_done);
3536

3637
#endif/* RECEIVELOG_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp