Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit709d003

Browse files
committed
Rework WAL-reading supporting structs
The state-tracking of WAL reading in various places was pretty messy,mostly because the ancient physical-replication WAL reading code wasn'tusing the XLogReader abstraction. This led to some untidy code. Makeit prettier by creating two additional supporting structs,WALSegmentContext and WALOpenSegment which keep track of WAL-readingstate. This makes code cleaner, as well as supports more futurecleanup.Author: Antonin HouskaReviewed-by: Álvaro Herrera and (older versions) Robert HaasDiscussion:https://postgr.es/m/14984.1554998742@spoje.net
1 parenta9ae99d commit709d003

File tree

12 files changed

+189
-179
lines changed

12 files changed

+189
-179
lines changed

‎src/backend/access/transam/twophase.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,7 +1377,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
13771377
*
13781378
* Note clearly that this function can access WAL during normal operation,
13791379
* similarly to the way WALSender or Logical Decoding would do.
1380-
*
13811380
*/
13821381
staticvoid
13831382
XlogReadTwoPhaseData(XLogRecPtrlsn,char**buf,int*len)
@@ -1386,8 +1385,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
13861385
XLogReaderState*xlogreader;
13871386
char*errormsg;
13881387

1389-
xlogreader=XLogReaderAllocate(wal_segment_size,&read_local_xlog_page,
1390-
NULL);
1388+
xlogreader=XLogReaderAllocate(wal_segment_size,NULL,
1389+
&read_local_xlog_page,NULL);
13911390
if (!xlogreader)
13921391
ereport(ERROR,
13931392
(errcode(ERRCODE_OUT_OF_MEMORY),

‎src/backend/access/transam/xlog.c

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -885,8 +885,7 @@ static intXLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
885885
intsource,boolnotfoundOk);
886886
staticintXLogFileReadAnyTLI(XLogSegNosegno,intemode,intsource);
887887
staticintXLogPageRead(XLogReaderState*xlogreader,XLogRecPtrtargetPagePtr,
888-
intreqLen,XLogRecPtrtargetRecPtr,char*readBuf,
889-
TimeLineID*readTLI);
888+
intreqLen,XLogRecPtrtargetRecPtr,char*readBuf);
890889
staticboolWaitForWALToBecomeAvailable(XLogRecPtrRecPtr,boolrandAccess,
891890
boolfetching_ckpt,XLogRecPtrtliRecPtr);
892891
staticintemode_for_corrupt_record(intemode,XLogRecPtrRecPtr);
@@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata,
11951194
appendBinaryStringInfo(&recordBuf,rdata->data,rdata->len);
11961195

11971196
if (!debug_reader)
1198-
debug_reader=XLogReaderAllocate(wal_segment_size,NULL,NULL);
1197+
debug_reader=XLogReaderAllocate(wal_segment_size,NULL,
1198+
NULL,NULL);
11991199

12001200
if (!debug_reader)
12011201
{
@@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
42964296
XLByteToSeg(xlogreader->latestPagePtr,segno,wal_segment_size);
42974297
offset=XLogSegmentOffset(xlogreader->latestPagePtr,
42984298
wal_segment_size);
4299-
XLogFileName(fname,xlogreader->readPageTLI,segno,
4299+
XLogFileName(fname,xlogreader->seg.ws_tli,segno,
43004300
wal_segment_size);
43014301
ereport(emode_for_corrupt_record(emode,
43024302
RecPtr ?RecPtr :EndRecPtr),
@@ -6353,7 +6353,8 @@ StartupXLOG(void)
63536353

63546354
/* Set up XLOG reader facility */
63556355
MemSet(&private,0,sizeof(XLogPageReadPrivate));
6356-
xlogreader=XLogReaderAllocate(wal_segment_size,&XLogPageRead,&private);
6356+
xlogreader=XLogReaderAllocate(wal_segment_size,NULL,
6357+
&XLogPageRead,&private);
63576358
if (!xlogreader)
63586359
ereport(ERROR,
63596360
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -7355,7 +7356,7 @@ StartupXLOG(void)
73557356
* and we were reading the old WAL from a segment belonging to a higher
73567357
* timeline.
73577358
*/
7358-
EndOfLogTLI=xlogreader->readPageTLI;
7359+
EndOfLogTLI=xlogreader->seg.ws_tli;
73597360

73607361
/*
73617362
* Complain if we did not roll forward far enough to render the backup
@@ -11523,7 +11524,7 @@ CancelBackup(void)
1152311524
*/
1152411525
staticint
1152511526
XLogPageRead(XLogReaderState*xlogreader,XLogRecPtrtargetPagePtr,intreqLen,
11526-
XLogRecPtrtargetRecPtr,char*readBuf,TimeLineID*readTLI)
11527+
XLogRecPtrtargetRecPtr,char*readBuf)
1152711528
{
1152811529
XLogPageReadPrivate*private=
1152911530
(XLogPageReadPrivate*)xlogreader->private_data;
@@ -11640,7 +11641,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1164011641
Assert(targetPageOff==readOff);
1164111642
Assert(reqLen <=readLen);
1164211643

11643-
*readTLI=curFileTLI;
11644+
xlogreader->seg.ws_tli=curFileTLI;
1164411645

1164511646
/*
1164611647
* Check the page header immediately, so that we can retry immediately if

‎src/backend/access/transam/xlogreader.c

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
6868
* Returns NULL if the xlogreader couldn't be allocated.
6969
*/
7070
XLogReaderState*
71-
XLogReaderAllocate(intwal_segment_size,XLogPageReadCBpagereadfunc,
72-
void*private_data)
71+
XLogReaderAllocate(intwal_segment_size,constchar*waldir,
72+
XLogPageReadCBpagereadfunc,void*private_data)
7373
{
7474
XLogReaderState*state;
7575

@@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
9696
returnNULL;
9797
}
9898

99-
state->wal_segment_size=wal_segment_size;
99+
/* Initialize segment info. */
100+
WALOpenSegmentInit(&state->seg,&state->segcxt,wal_segment_size,
101+
waldir);
102+
100103
state->read_page=pagereadfunc;
101104
/* system_identifier initialized to zeroes above */
102105
state->private_data=private_data;
@@ -198,6 +201,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
198201
return true;
199202
}
200203

204+
/*
205+
* Initialize the passed segment structs.
206+
*/
207+
void
208+
WALOpenSegmentInit(WALOpenSegment*seg,WALSegmentContext*segcxt,
209+
intsegsize,constchar*waldir)
210+
{
211+
seg->ws_file=-1;
212+
seg->ws_segno=0;
213+
seg->ws_off=0;
214+
seg->ws_tli=0;
215+
216+
segcxt->ws_segsize=segsize;
217+
if (waldir)
218+
snprintf(segcxt->ws_dir,MAXPGPATH,"%s",waldir);
219+
}
220+
201221
/*
202222
* Attempt to read an XLOG record.
203223
*
@@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
490510
(record->xl_info& ~XLR_INFO_MASK)==XLOG_SWITCH)
491511
{
492512
/* Pretend it extends to end of segment */
493-
state->EndRecPtr+=state->wal_segment_size-1;
494-
state->EndRecPtr-=XLogSegmentOffset(state->EndRecPtr,state->wal_segment_size);
513+
state->EndRecPtr+=state->segcxt.ws_segsize-1;
514+
state->EndRecPtr-=XLogSegmentOffset(state->EndRecPtr,state->segcxt.ws_segsize);
495515
}
496516

497517
if (DecodeXLogRecord(state,record,errormsg))
@@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
533553

534554
Assert((pageptr %XLOG_BLCKSZ)==0);
535555

536-
XLByteToSeg(pageptr,targetSegNo,state->wal_segment_size);
537-
targetPageOff=XLogSegmentOffset(pageptr,state->wal_segment_size);
556+
XLByteToSeg(pageptr,targetSegNo,state->segcxt.ws_segsize);
557+
targetPageOff=XLogSegmentOffset(pageptr,state->segcxt.ws_segsize);
538558

539559
/* check whether we have all the requested data already */
540-
if (targetSegNo==state->readSegNo&&targetPageOff==state->readOff&&
541-
reqLen <=state->readLen)
560+
if (targetSegNo==state->seg.ws_segno&&
561+
targetPageOff==state->seg.ws_off&&reqLen <=state->readLen)
542562
returnstate->readLen;
543563

544564
/*
@@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
553573
* record is. This is so that we can check the additional identification
554574
* info that is present in the first page's "long" header.
555575
*/
556-
if (targetSegNo!=state->readSegNo&&targetPageOff!=0)
576+
if (targetSegNo!=state->seg.ws_segno&&targetPageOff!=0)
557577
{
558578
XLogRecPtrtargetSegmentPtr=pageptr-targetPageOff;
559579

560580
readLen=state->read_page(state,targetSegmentPtr,XLOG_BLCKSZ,
561581
state->currRecPtr,
562-
state->readBuf,&state->readPageTLI);
582+
state->readBuf);
563583
if (readLen<0)
564584
gotoerr;
565585

@@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
577597
*/
578598
readLen=state->read_page(state,pageptr,Max(reqLen,SizeOfXLogShortPHD),
579599
state->currRecPtr,
580-
state->readBuf,&state->readPageTLI);
600+
state->readBuf);
581601
if (readLen<0)
582602
gotoerr;
583603

@@ -596,7 +616,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
596616
{
597617
readLen=state->read_page(state,pageptr,XLogPageHeaderSize(hdr),
598618
state->currRecPtr,
599-
state->readBuf,&state->readPageTLI);
619+
state->readBuf);
600620
if (readLen<0)
601621
gotoerr;
602622
}
@@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
608628
gotoerr;
609629

610630
/* update read state information */
611-
state->readSegNo=targetSegNo;
612-
state->readOff=targetPageOff;
631+
state->seg.ws_segno=targetSegNo;
632+
state->seg.ws_off=targetPageOff;
613633
state->readLen=readLen;
614634

615635
returnreadLen;
@@ -625,8 +645,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
625645
staticvoid
626646
XLogReaderInvalReadState(XLogReaderState*state)
627647
{
628-
state->readSegNo=0;
629-
state->readOff=0;
648+
state->seg.ws_segno=0;
649+
state->seg.ws_off=0;
630650
state->readLen=0;
631651
}
632652

@@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
745765

746766
Assert((recptr %XLOG_BLCKSZ)==0);
747767

748-
XLByteToSeg(recptr,segno,state->wal_segment_size);
749-
offset=XLogSegmentOffset(recptr,state->wal_segment_size);
768+
XLByteToSeg(recptr,segno,state->segcxt.ws_segsize);
769+
offset=XLogSegmentOffset(recptr,state->segcxt.ws_segsize);
750770

751-
XLogSegNoOffsetToRecPtr(segno,offset,state->wal_segment_size,recaddr);
771+
XLogSegNoOffsetToRecPtr(segno,offset,state->segcxt.ws_segsize,recaddr);
752772

753773
if (hdr->xlp_magic!=XLOG_PAGE_MAGIC)
754774
{
755775
charfname[MAXFNAMELEN];
756776

757-
XLogFileName(fname,state->readPageTLI,segno,state->wal_segment_size);
777+
XLogFileName(fname,state->seg.ws_tli,segno,state->segcxt.ws_segsize);
758778

759779
report_invalid_record(state,
760780
"invalid magic number %04X in log segment %s, offset %u",
@@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
768788
{
769789
charfname[MAXFNAMELEN];
770790

771-
XLogFileName(fname,state->readPageTLI,segno,state->wal_segment_size);
791+
XLogFileName(fname,state->seg.ws_tli,segno,state->segcxt.ws_segsize);
772792

773793
report_invalid_record(state,
774794
"invalid info bits %04X in log segment %s, offset %u",
@@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
791811
(unsigned long long)state->system_identifier);
792812
return false;
793813
}
794-
elseif (longhdr->xlp_seg_size!=state->wal_segment_size)
814+
elseif (longhdr->xlp_seg_size!=state->segcxt.ws_segsize)
795815
{
796816
report_invalid_record(state,
797817
"WAL file is from different database system: incorrect segment size in page header");
@@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
808828
{
809829
charfname[MAXFNAMELEN];
810830

811-
XLogFileName(fname,state->readPageTLI,segno,state->wal_segment_size);
831+
XLogFileName(fname,state->seg.ws_tli,segno,state->segcxt.ws_segsize);
812832

813833
/* hmm, first page of file doesn't have a long header? */
814834
report_invalid_record(state,
@@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
828848
{
829849
charfname[MAXFNAMELEN];
830850

831-
XLogFileName(fname,state->readPageTLI,segno,state->wal_segment_size);
851+
XLogFileName(fname,state->seg.ws_tli,segno,state->segcxt.ws_segsize);
832852

833853
report_invalid_record(state,
834854
"unexpected pageaddr %X/%X in log segment %s, offset %u",
@@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
853873
{
854874
charfname[MAXFNAMELEN];
855875

856-
XLogFileName(fname,state->readPageTLI,segno,state->wal_segment_size);
876+
XLogFileName(fname,state->seg.ws_tli,segno,state->segcxt.ws_segsize);
857877

858878
report_invalid_record(state,
859879
"out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
@@ -997,7 +1017,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
9971017

9981018
#endif/* FRONTEND */
9991019

1000-
10011020
/* ----------------------------------------
10021021
* Functions for decoding the data and block references in a record.
10031022
* ----------------------------------------

‎src/backend/access/transam/xlogutils.c

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
802802
void
803803
XLogReadDetermineTimeline(XLogReaderState*state,XLogRecPtrwantPage,uint32wantLength)
804804
{
805-
constXLogRecPtrlastReadPage=state->readSegNo*
806-
state->wal_segment_size+state->readOff;
805+
constXLogRecPtrlastReadPage=state->seg.ws_segno*
806+
state->segcxt.ws_segsize+state->seg.ws_off;
807807

808808
Assert(wantPage!=InvalidXLogRecPtr&&wantPage %XLOG_BLCKSZ==0);
809809
Assert(wantLength <=XLOG_BLCKSZ);
@@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
847847
if (state->currTLIValidUntil!=InvalidXLogRecPtr&&
848848
state->currTLI!=ThisTimeLineID&&
849849
state->currTLI!=0&&
850-
((wantPage+wantLength) /state->wal_segment_size)<
851-
(state->currTLIValidUntil /state->wal_segment_size))
850+
((wantPage+wantLength) /state->segcxt.ws_segsize)<
851+
(state->currTLIValidUntil /state->segcxt.ws_segsize))
852852
return;
853853

854854
/*
@@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
869869
* by a promotion or replay from a cascaded replica.
870870
*/
871871
List*timelineHistory=readTimeLineHistory(ThisTimeLineID);
872+
XLogRecPtrendOfSegment;
872873

873-
XLogRecPtrendOfSegment= (((wantPage /state->wal_segment_size)+1)
874-
*state->wal_segment_size)-1;
875-
876-
Assert(wantPage /state->wal_segment_size==
877-
endOfSegment /state->wal_segment_size);
874+
endOfSegment= ((wantPage /state->segcxt.ws_segsize)+1)*
875+
state->segcxt.ws_segsize-1;
876+
Assert(wantPage /state->segcxt.ws_segsize==
877+
endOfSegment /state->segcxt.ws_segsize);
878878

879879
/*
880880
* Find the timeline of the last LSN on the segment containing
@@ -909,8 +909,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
909909
*/
910910
int
911911
read_local_xlog_page(XLogReaderState*state,XLogRecPtrtargetPagePtr,
912-
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,
913-
TimeLineID*pageTLI)
912+
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page)
914913
{
915914
XLogRecPtrread_upto,
916915
loc;
@@ -933,8 +932,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
933932
read_upto=GetFlushRecPtr();
934933
else
935934
read_upto=GetXLogReplayRecPtr(&ThisTimeLineID);
936-
937-
*pageTLI=ThisTimeLineID;
935+
state->seg.ws_tli=ThisTimeLineID;
938936

939937
/*
940938
* Check which timeline to get the record from.
@@ -984,14 +982,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
984982
read_upto=state->currTLIValidUntil;
985983

986984
/*
987-
* SettingpageTLI to our wanted record's TLI is slightly wrong;
985+
* Settingws_tli to our wanted record's TLI is slightly wrong;
988986
* the page might begin on an older timeline if it contains a
989987
* timeline switch, since its xlog segment will have been copied
990988
* from the prior timeline. This is pretty harmless though, as
991989
* nothing cares so long as the timeline doesn't go backwards. We
992990
* should read the page header instead; FIXME someday.
993991
*/
994-
*pageTLI=state->currTLI;
992+
state->seg.ws_tli=state->currTLI;
995993

996994
/* No need to wait on a historical timeline */
997995
break;
@@ -1022,7 +1020,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
10221020
* as 'count', read the whole page anyway. It's guaranteed to be
10231021
* zero-padded up to the page boundary if it's incomplete.
10241022
*/
1025-
XLogRead(cur_page,state->wal_segment_size,*pageTLI,targetPagePtr,
1023+
XLogRead(cur_page,state->segcxt.ws_segsize,state->seg.ws_tli,targetPagePtr,
10261024
XLOG_BLCKSZ);
10271025

10281026
/* number of valid bytes in the buffer */

‎src/backend/replication/logical/logical.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options,
173173

174174
ctx->slot=slot;
175175

176-
ctx->reader=XLogReaderAllocate(wal_segment_size,read_page,ctx);
176+
ctx->reader=XLogReaderAllocate(wal_segment_size,NULL,read_page,ctx);
177177
if (!ctx->reader)
178178
ereport(ERROR,
179179
(errcode(ERRCODE_OUT_OF_MEMORY),

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,10 @@ check_permissions(void)
116116

117117
int
118118
logical_read_local_xlog_page(XLogReaderState*state,XLogRecPtrtargetPagePtr,
119-
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,TimeLineID*pageTLI)
119+
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page)
120120
{
121121
returnread_local_xlog_page(state,targetPagePtr,reqLen,
122-
targetRecPtr,cur_page,pageTLI);
122+
targetRecPtr,cur_page);
123123
}
124124

125125
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp