Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite7cc843

Browse files
committed
Pre-pad WAL files when streaming transaction log
Instead of filling files as they appear, pre-pad theWAL files received when streaming xlog the same waythat the server does. Data is streamed into a .partialfile which is then renamed()d into palce when it's complete,but it will always be 16MB.This also means that the starting position for pg_receivexlogis now simply right after the last complete segment, and wenever need to deal with partial segments there.Patch by me, review by Fujii Masao
1 parent4429f6a commite7cc843

File tree

2 files changed

+125
-79
lines changed

2 files changed

+125
-79
lines changed

‎src/bin/pg_basebackup/pg_receivexlog.c

Lines changed: 11 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -71,33 +71,10 @@ usage(void)
7171
staticbool
7272
segment_callback(XLogRecPtrsegendpos,uint32timeline)
7373
{
74-
charfn[MAXPGPATH];
75-
structstatstatbuf;
76-
7774
if (verbose)
7875
fprintf(stderr,_("%s: finished segment at %X/%X (timeline %u)\n"),
7976
progname,segendpos.xlogid,segendpos.xrecoff,timeline);
8077

81-
/*
82-
* Check if there is a partial file for the name we just finished, and if
83-
* there is, remove it under the assumption that we have now got all the
84-
* data we need.
85-
*/
86-
segendpos.xrecoff /=XLOG_SEG_SIZE;
87-
PrevLogSeg(segendpos.xlogid,segendpos.xrecoff);
88-
snprintf(fn,sizeof(fn),"%s/%08X%08X%08X.partial",
89-
basedir,timeline,
90-
segendpos.xlogid,
91-
segendpos.xrecoff);
92-
if (stat(fn,&statbuf)==0)
93-
{
94-
/* File existed, get rid of it */
95-
if (verbose)
96-
fprintf(stderr,_("%s: removing file \"%s\"\n"),
97-
progname,fn);
98-
unlink(fn);
99-
}
100-
10178
/*
10279
* Never abort from this - we handle all aborting in continue_streaming()
10380
*/
@@ -119,9 +96,8 @@ continue_streaming(void)
11996
/*
12097
* Determine starting location for streaming, based on:
12198
* 1. If there are existing xlog segments, start at the end of the last one
122-
* 2. If the last one is a partial segment, rename it and start over, since
123-
* we don't sync after every write.
124-
* 3. If no existing xlog exists, start from the beginning of the current
99+
* that is complete (size matches XLogSegSize)
100+
* 2. If no valid xlog exists, start from the beginning of the current
125101
* WAL segment.
126102
*/
127103
staticXLogRecPtr
@@ -133,7 +109,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
133109
boolb;
134110
uint32high_log=0;
135111
uint32high_seg=0;
136-
boolpartial= false;
137112

138113
dir=opendir(basedir);
139114
if (dir==NULL)
@@ -195,7 +170,7 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
195170
disconnect_and_exit(1);
196171
}
197172

198-
if (statbuf.st_size==16*1024*1024)
173+
if (statbuf.st_size==XLOG_SEG_SIZE)
199174
{
200175
/* Completed segment */
201176
if (log>high_log||
@@ -208,37 +183,9 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
208183
}
209184
else
210185
{
211-
/*
212-
* This is a partial file. Rename it out of the way.
213-
*/
214-
charnewfn[MAXPGPATH];
215-
216-
fprintf(stderr,_("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
217-
progname,dirent->d_name,dirent->d_name);
218-
219-
snprintf(newfn,sizeof(newfn),"%s/%s.partial",
220-
basedir,dirent->d_name);
221-
222-
if (stat(newfn,&statbuf)==0)
223-
{
224-
/*
225-
* XXX: perhaps we should only error out if the existing file
226-
* is larger?
227-
*/
228-
fprintf(stderr,_("%s: file \"%s\" already exists. Check and clean up manually.\n"),
229-
progname,newfn);
230-
disconnect_and_exit(1);
231-
}
232-
if (rename(fullpath,newfn)!=0)
233-
{
234-
fprintf(stderr,_("%s: could not rename \"%s\" to \"%s\": %s\n"),
235-
progname,fullpath,newfn,strerror(errno));
236-
disconnect_and_exit(1);
237-
}
238-
239-
/* Don't continue looking for more, we assume this is the last */
240-
partial= true;
241-
break;
186+
fprintf(stderr,_("%s: segment file '%s' is incorrect size %d, skipping\n"),
187+
progname,dirent->d_name, (int)statbuf.st_size);
188+
continue;
242189
}
243190
}
244191

@@ -247,17 +194,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
247194
if (high_log>0||high_seg>0)
248195
{
249196
XLogRecPtrhigh_ptr;
250-
251-
if (!partial)
252-
{
253-
/*
254-
* If the segment was partial, the pointer is already at the right
255-
* location since we want to re-transmit that segment. If it was
256-
* not, we need to move it to the next segment, since we are
257-
* tracking the last one that was complete.
258-
*/
259-
NextLogSeg(high_log,high_seg);
260-
}
197+
/*
198+
* Move the starting pointer to the start of the next segment,
199+
* since the highest one we've seen was completed.
200+
*/
201+
NextLogSeg(high_log,high_seg);
261202

262203
high_ptr.xlogid=high_log;
263204
high_ptr.xrecoff=high_seg*XLOG_SEG_SIZE;

‎src/bin/pg_basebackup/receivelog.c

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include"receivelog.h"
2828
#include"streamutil.h"
2929

30+
#include<sys/stat.h>
3031
#include<sys/time.h>
3132
#include<sys/types.h>
3233
#include<unistd.h>
@@ -41,24 +42,128 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
4142
* Open a new WAL file in the specified directory. Store the name
4243
* (not including the full directory) in namebuf. Assumes there is
4344
* enough room in this buffer...
45+
*
46+
* The file will be padded to 16Mb with zeroes.
4447
*/
4548
staticint
4649
open_walfile(XLogRecPtrstartpoint,uint32timeline,char*basedir,char*namebuf)
4750
{
4851
intf;
4952
charfn[MAXPGPATH];
53+
structstatstatbuf;
54+
char*zerobuf;
55+
intbytes;
5056

5157
XLogFileName(namebuf,timeline,startpoint.xlogid,
5258
startpoint.xrecoff /XLOG_SEG_SIZE);
5359

54-
snprintf(fn,sizeof(fn),"%s/%s",basedir,namebuf);
55-
f=open(fn,O_WRONLY |O_CREAT |O_EXCL |PG_BINARY,0666);
60+
snprintf(fn,sizeof(fn),"%s/%s.partial",basedir,namebuf);
61+
f=open(fn,O_WRONLY |O_CREAT |PG_BINARY,S_IRUSR |S_IWUSR);
5662
if (f==-1)
63+
{
5764
fprintf(stderr,_("%s: Could not open WAL segment %s: %s\n"),
58-
progname,namebuf,strerror(errno));
65+
progname,fn,strerror(errno));
66+
return-1;
67+
}
68+
69+
/*
70+
* Verify that the file is either empty (just created), or a complete
71+
* XLogSegSize segment. Anything in between indicates a corrupt file.
72+
*/
73+
if (fstat(f,&statbuf)!=0)
74+
{
75+
fprintf(stderr,_("%s: could not stat WAL segment %s: %s\n"),
76+
progname,fn,strerror(errno));
77+
close(f);
78+
return-1;
79+
}
80+
if (statbuf.st_size==XLogSegSize)
81+
returnf;/* File is open and ready to use */
82+
if (statbuf.st_size!=0)
83+
{
84+
fprintf(stderr,_("%s: WAL segment %s is %d bytes, should be 0 or %d\n"),
85+
progname,fn, (int)statbuf.st_size,XLogSegSize);
86+
close(f);
87+
return-1;
88+
}
89+
90+
/* New, empty, file. So pad it to 16Mb with zeroes */
91+
zerobuf=xmalloc0(XLOG_BLCKSZ);
92+
for (bytes=0;bytes<XLogSegSize;bytes+=XLOG_BLCKSZ)
93+
{
94+
if (write(f,zerobuf,XLOG_BLCKSZ)!=XLOG_BLCKSZ)
95+
{
96+
fprintf(stderr,_("%s: could not pad WAL segment %s: %s\n"),
97+
progname,fn,strerror(errno));
98+
close(f);
99+
unlink(fn);
100+
return-1;
101+
}
102+
}
103+
free(zerobuf);
104+
105+
if (lseek(f,SEEK_SET,0)!=0)
106+
{
107+
fprintf(stderr,_("%s: could not seek back to beginning of WAL segment %s: %s\n"),
108+
progname,fn,strerror(errno));
109+
close(f);
110+
return-1;
111+
}
59112
returnf;
60113
}
61114

115+
staticbool
116+
close_walfile(intwalfile,char*basedir,char*walname)
117+
{
118+
off_tcurrpos=lseek(walfile,0,SEEK_CUR);
119+
120+
if (currpos==-1)
121+
{
122+
fprintf(stderr,_("%s: could not get current position in file %s: %s\n"),
123+
progname,walname,strerror(errno));
124+
return false;
125+
}
126+
127+
if (fsync(walfile)!=0)
128+
{
129+
fprintf(stderr,_("%s: could not fsync file %s: %s\n"),
130+
progname,walname,strerror(errno));
131+
return false;
132+
}
133+
134+
if (close(walfile)!=0)
135+
{
136+
fprintf(stderr,_("%s: could not close file %s: %s\n"),
137+
progname,walname,strerror(errno));
138+
return false;
139+
}
140+
141+
/*
142+
* Rename the .partial file only if we've completed writing the
143+
* whole segment.
144+
*/
145+
if (currpos==XLOG_SEG_SIZE)
146+
{
147+
charoldfn[MAXPGPATH];
148+
charnewfn[MAXPGPATH];
149+
150+
snprintf(oldfn,sizeof(oldfn),"%s/%s.partial",basedir,walname);
151+
snprintf(newfn,sizeof(newfn),"%s/%s",basedir,walname);
152+
if (rename(oldfn,newfn)!=0)
153+
{
154+
fprintf(stderr,_("%s: could not rename file %s: %s\n"),
155+
progname,walname,strerror(errno));
156+
return false;
157+
}
158+
}
159+
else
160+
fprintf(stderr,_("%s: not renaming %s, segment is not complete.\n"),
161+
progname,walname);
162+
163+
return true;
164+
}
165+
166+
62167
/*
63168
* Local version of GetCurrentTimestamp(), since we are not linked with
64169
* backend code.
@@ -178,10 +283,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
178283
if (stream_continue&&stream_continue())
179284
{
180285
if (walfile!=-1)
181-
{
182-
fsync(walfile);
183-
close(walfile);
184-
}
286+
/* Potential error message is written by close_walfile */
287+
returnclose_walfile(walfile,basedir,current_walfile_name);
185288
return true;
186289
}
187290

@@ -360,8 +463,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
360463
/* Did we reach the end of a WAL segment? */
361464
if (blockpos.xrecoff %XLOG_SEG_SIZE==0)
362465
{
363-
fsync(walfile);
364-
close(walfile);
466+
if (!close_walfile(walfile,basedir,current_walfile_name))
467+
/* Error message written in close_walfile() */
468+
return false;
469+
365470
walfile=-1;
366471
xlogoff=0;
367472

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp