Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit323cbe7

Browse files
committed
Remove read_page callback from XLogReader.
Previously, the XLogReader module would fetch new input data using acallback function. Redesign the interface so that it tells the callerto insert more data with a special return value instead. This API suitslater patches for prefetching, encryption and maybe other futureprojects that would otherwise require continually extending the callbackinterface.As incidental cleanup work, move global variables readOff, readLen andreadSegNo inside XlogReaderState.Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>Author: Heikki Linnakangas <hlinnaka@iki.fi> (parts of earlier version)Reviewed-by: Antonin Houska <ah@cybertec.at>Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>Reviewed-by: Takashi Menjo <takashi.menjo@gmail.com>Reviewed-by: Andres Freund <andres@anarazel.de>Reviewed-by: Thomas Munro <thomas.munro@gmail.com>Discussion:https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
1 parent5ac9c43 commit323cbe7

File tree

13 files changed

+930
-677
lines changed

13 files changed

+930
-677
lines changed

‎src/backend/access/transam/twophase.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,19 +1330,21 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
13301330
char*errormsg;
13311331
TimeLineIDsave_currtli=ThisTimeLineID;
13321332

1333-
xlogreader=XLogReaderAllocate(wal_segment_size,NULL,
1334-
XL_ROUTINE(.page_read=&read_local_xlog_page,
1335-
.segment_open=&wal_segment_open,
1336-
.segment_close=&wal_segment_close),
1337-
NULL);
1333+
xlogreader=XLogReaderAllocate(wal_segment_size,NULL,wal_segment_close);
1334+
13381335
if (!xlogreader)
13391336
ereport(ERROR,
13401337
(errcode(ERRCODE_OUT_OF_MEMORY),
13411338
errmsg("out of memory"),
13421339
errdetail("Failed while allocating a WAL reading processor.")));
13431340

13441341
XLogBeginRead(xlogreader,lsn);
1345-
record=XLogReadRecord(xlogreader,&errormsg);
1342+
while (XLogReadRecord(xlogreader,&record,&errormsg)==
1343+
XLREAD_NEED_DATA)
1344+
{
1345+
if (!read_local_xlog_page(xlogreader))
1346+
break;
1347+
}
13461348

13471349
/*
13481350
* Restore immediately the timeline where it was previously, as

‎src/backend/access/transam/xlog.c

Lines changed: 57 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0;
811811
* These variables are used similarly to the ones above, but for reading
812812
* the XLOG. Note, however, that readOff generally represents the offset
813813
* of the page just read, not the seek position of the FD itself, which
814-
* will be just past that page. readLen indicates how much of the current
815-
* page has been read into readBuf, and readSource indicates where we got
816-
* the currently open file from.
814+
* will be just past that page. readSource indicates where we got the
815+
* currently open file from.
817816
* Note: we could use Reserve/ReleaseExternalFD to track consumption of
818817
* this FD too; but it doesn't currently seem worthwhile, since the XLOG is
819818
* not read by general-purpose sessions.
820819
*/
821820
staticintreadFile=-1;
822-
staticXLogSegNoreadSegNo=0;
823-
staticuint32readOff=0;
824-
staticuint32readLen=0;
825821
staticXLogSourcereadSource=XLOG_FROM_ANY;
826822

827823
/*
@@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY;
838834
staticboollastSourceFailed= false;
839835
staticboolpendingWalRcvRestart= false;
840836

841-
typedefstructXLogPageReadPrivate
842-
{
843-
intemode;
844-
boolfetching_ckpt;/* are we fetching a checkpoint record? */
845-
boolrandAccess;
846-
}XLogPageReadPrivate;
847-
848837
/*
849838
* These variables track when we last obtained some WAL data to process,
850839
* and where we got it from. (XLogReceiptSource is initially the same as
@@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
920909
staticintXLogFileRead(XLogSegNosegno,intemode,TimeLineIDtli,
921910
XLogSourcesource,boolnotfoundOk);
922911
staticintXLogFileReadAnyTLI(XLogSegNosegno,intemode,XLogSourcesource);
923-
staticintXLogPageRead(XLogReaderState*xlogreader,XLogRecPtrtargetPagePtr,
924-
intreqLen,XLogRecPtrtargetRecPtr,char*readBuf);
912+
staticboolXLogPageRead(XLogReaderState*state,
913+
boolfetching_ckpt,intemode,boolrandAccess);
925914
staticboolWaitForWALToBecomeAvailable(XLogRecPtrRecPtr,boolrandAccess,
926-
boolfetching_ckpt,XLogRecPtrtliRecPtr);
915+
boolfetching_ckpt,
916+
XLogRecPtrtliRecPtr,
917+
XLogSegNoreadSegNo);
927918
staticintemode_for_corrupt_record(intemode,XLogRecPtrRecPtr);
928919
staticvoidXLogFileClose(void);
929920
staticvoidPreallocXlogFiles(XLogRecPtrendptr);
@@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata,
12341225
appendBinaryStringInfo(&recordBuf,rdata->data,rdata->len);
12351226

12361227
if (!debug_reader)
1237-
debug_reader=XLogReaderAllocate(wal_segment_size,NULL,
1238-
XL_ROUTINE(),NULL);
1228+
debug_reader=XLogReaderAllocate(wal_segment_size,NULL,NULL);
12391229

12401230
if (!debug_reader)
12411231
{
@@ -4373,21 +4363,24 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
43734363
boolfetching_ckpt)
43744364
{
43754365
XLogRecord*record;
4376-
XLogPageReadPrivate*private= (XLogPageReadPrivate*)xlogreader->private_data;
4377-
4378-
/* Pass through parameters to XLogPageRead */
4379-
private->fetching_ckpt=fetching_ckpt;
4380-
private->emode=emode;
4381-
private->randAccess= (xlogreader->ReadRecPtr==InvalidXLogRecPtr);
4366+
boolrandAccess= (xlogreader->ReadRecPtr==InvalidXLogRecPtr);
43824367

43834368
/* This is the first attempt to read this page. */
43844369
lastSourceFailed= false;
43854370

43864371
for (;;)
43874372
{
43884373
char*errormsg;
4374+
XLogReadRecordResultresult;
4375+
4376+
while ((result=XLogReadRecord(xlogreader,&record,&errormsg))
4377+
==XLREAD_NEED_DATA)
4378+
{
4379+
if (!XLogPageRead(xlogreader,fetching_ckpt,emode,randAccess))
4380+
break;
4381+
4382+
}
43894383

4390-
record=XLogReadRecord(xlogreader,&errormsg);
43914384
ReadRecPtr=xlogreader->ReadRecPtr;
43924385
EndRecPtr=xlogreader->EndRecPtr;
43934386
if (record==NULL)
@@ -6457,7 +6450,6 @@ StartupXLOG(void)
64576450
boolbackupFromStandby= false;
64586451
DBStatedbstate_at_startup;
64596452
XLogReaderState*xlogreader;
6460-
XLogPageReadPrivateprivate;
64616453
boolpromoted= false;
64626454
structstatst;
64636455

@@ -6616,13 +6608,9 @@ StartupXLOG(void)
66166608
OwnLatch(&XLogCtl->recoveryWakeupLatch);
66176609

66186610
/* Set up XLOG reader facility */
6619-
MemSet(&private,0,sizeof(XLogPageReadPrivate));
66206611
xlogreader=
6621-
XLogReaderAllocate(wal_segment_size,NULL,
6622-
XL_ROUTINE(.page_read=&XLogPageRead,
6623-
.segment_open=NULL,
6624-
.segment_close=wal_segment_close),
6625-
&private);
6612+
XLogReaderAllocate(wal_segment_size,NULL,wal_segment_close);
6613+
66266614
if (!xlogreader)
66276615
ereport(ERROR,
66286616
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -7819,7 +7807,8 @@ StartupXLOG(void)
78197807
XLogRecPtrpageBeginPtr;
78207808

78217809
pageBeginPtr=EndOfLog- (EndOfLog %XLOG_BLCKSZ);
7822-
Assert(readOff==XLogSegmentOffset(pageBeginPtr,wal_segment_size));
7810+
Assert(XLogSegmentOffset(xlogreader->readPagePtr,wal_segment_size)==
7811+
XLogSegmentOffset(pageBeginPtr,wal_segment_size));
78237812

78247813
firstIdx=XLogRecPtrToBufIdx(EndOfLog);
78257814

@@ -12107,13 +12096,15 @@ CancelBackup(void)
1210712096
* XLogPageRead() to try fetching the record from another source, or to
1210812097
* sleep and retry.
1210912098
*/
12110-
staticint
12111-
XLogPageRead(XLogReaderState*xlogreader,XLogRecPtrtargetPagePtr,intreqLen,
12112-
XLogRecPtrtargetRecPtr,char*readBuf)
12113-
{
12114-
XLogPageReadPrivate*private=
12115-
(XLogPageReadPrivate*)xlogreader->private_data;
12116-
intemode=private->emode;
12099+
staticbool
12100+
XLogPageRead(XLogReaderState*state,
12101+
boolfetching_ckpt,intemode,boolrandAccess)
12102+
{
12103+
char*readBuf=state->readBuf;
12104+
XLogRecPtrtargetPagePtr=state->readPagePtr;
12105+
intreqLen=state->reqLen;
12106+
intreadLen=0;
12107+
XLogRecPtrtargetRecPtr=state->ReadRecPtr;
1211712108
uint32targetPageOff;
1211812109
XLogSegNotargetSegNoPG_USED_FOR_ASSERTS_ONLY;
1211912110
intr;
@@ -12126,18 +12117,18 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1212612117
* is not in the currently open one.
1212712118
*/
1212812119
if (readFile >=0&&
12129-
!XLByteInSeg(targetPagePtr,readSegNo,wal_segment_size))
12120+
!XLByteInSeg(targetPagePtr,state->seg.ws_segno,wal_segment_size))
1213012121
{
1213112122
/*
1213212123
* Request a restartpoint if we've replayed too much xlog since the
1213312124
* last one.
1213412125
*/
1213512126
if (bgwriterLaunched)
1213612127
{
12137-
if (XLogCheckpointNeeded(readSegNo))
12128+
if (XLogCheckpointNeeded(state->seg.ws_segno))
1213812129
{
1213912130
(void)GetRedoRecPtr();
12140-
if (XLogCheckpointNeeded(readSegNo))
12131+
if (XLogCheckpointNeeded(state->seg.ws_segno))
1214112132
RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
1214212133
}
1214312134
}
@@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1214712138
readSource=XLOG_FROM_ANY;
1214812139
}
1214912140

12150-
XLByteToSeg(targetPagePtr,readSegNo,wal_segment_size);
12141+
XLByteToSeg(targetPagePtr,state->seg.ws_segno,wal_segment_size);
1215112142

1215212143
retry:
1215312144
/* See if we need to retrieve more data */
@@ -12156,17 +12147,15 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1215612147
flushedUpto<targetPagePtr+reqLen))
1215712148
{
1215812149
if (!WaitForWALToBecomeAvailable(targetPagePtr+reqLen,
12159-
private->randAccess,
12160-
private->fetching_ckpt,
12161-
targetRecPtr))
12150+
randAccess,fetching_ckpt,
12151+
targetRecPtr,state->seg.ws_segno))
1216212152
{
1216312153
if (readFile >=0)
1216412154
close(readFile);
1216512155
readFile=-1;
12166-
readLen=0;
1216712156
readSource=XLOG_FROM_ANY;
12168-
12169-
return-1;
12157+
XLogReaderSetInputData(state,-1);
12158+
returnfalse;
1217012159
}
1217112160
}
1217212161

@@ -12193,40 +12182,36 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1219312182
else
1219412183
readLen=XLOG_BLCKSZ;
1219512184

12196-
/* Read the requested page */
12197-
readOff=targetPageOff;
12198-
1219912185
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
12200-
r=pg_pread(readFile,readBuf,XLOG_BLCKSZ, (off_t)readOff);
12186+
r=pg_pread(readFile,readBuf,XLOG_BLCKSZ, (off_t)targetPageOff);
1220112187
if (r!=XLOG_BLCKSZ)
1220212188
{
1220312189
charfname[MAXFNAMELEN];
1220412190
intsave_errno=errno;
1220512191

1220612192
pgstat_report_wait_end();
12207-
XLogFileName(fname,curFileTLI,readSegNo,wal_segment_size);
12193+
XLogFileName(fname,curFileTLI,state->seg.ws_segno,wal_segment_size);
1220812194
if (r<0)
1220912195
{
1221012196
errno=save_errno;
1221112197
ereport(emode_for_corrupt_record(emode,targetPagePtr+reqLen),
1221212198
(errcode_for_file_access(),
1221312199
errmsg("could not read from log segment %s, offset %u: %m",
12214-
fname,readOff)));
12200+
fname,targetPageOff)));
1221512201
}
1221612202
else
1221712203
ereport(emode_for_corrupt_record(emode,targetPagePtr+reqLen),
1221812204
(errcode(ERRCODE_DATA_CORRUPTED),
1221912205
errmsg("could not read from log segment %s, offset %u: read %d of %zu",
12220-
fname,readOff,r, (Size)XLOG_BLCKSZ)));
12206+
fname,targetPageOff,r, (Size)XLOG_BLCKSZ)));
1222112207
gotonext_record_is_invalid;
1222212208
}
1222312209
pgstat_report_wait_end();
1222412210

12225-
Assert(targetSegNo==readSegNo);
12226-
Assert(targetPageOff==readOff);
12227-
Assert(reqLen <=readLen);
12211+
Assert(targetSegNo==state->seg.ws_segno);
12212+
Assert(readLen >=reqLen);
1222812213

12229-
xlogreader->seg.ws_tli=curFileTLI;
12214+
state->seg.ws_tli=curFileTLI;
1223012215

1223112216
/*
1223212217
* Check the page header immediately, so that we can retry immediately if
@@ -12254,29 +12239,31 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1225412239
* Validating the page header is cheap enough that doing it twice
1225512240
* shouldn't be a big deal from a performance point of view.
1225612241
*/
12257-
if (!XLogReaderValidatePageHeader(xlogreader,targetPagePtr,readBuf))
12242+
if (!XLogReaderValidatePageHeader(state,targetPagePtr,readBuf))
1225812243
{
12259-
/* reset any errorXLogReaderValidatePageHeader() might have set */
12260-
xlogreader->errormsg_buf[0]='\0';
12244+
/* reset any errorStateValidatePageHeader() might have set */
12245+
state->errormsg_buf[0]='\0';
1226112246
gotonext_record_is_invalid;
1226212247
}
1226312248

12264-
returnreadLen;
12249+
Assert(state->readPagePtr==targetPagePtr);
12250+
XLogReaderSetInputData(state,readLen);
12251+
return true;
1226512252

1226612253
next_record_is_invalid:
1226712254
lastSourceFailed= true;
1226812255

1226912256
if (readFile >=0)
1227012257
close(readFile);
1227112258
readFile=-1;
12272-
readLen=0;
1227312259
readSource=XLOG_FROM_ANY;
1227412260

1227512261
/* In standby-mode, keep trying */
1227612262
if (StandbyMode)
1227712263
gotoretry;
12278-
else
12279-
return-1;
12264+
12265+
XLogReaderSetInputData(state,-1);
12266+
return false;
1228012267
}
1228112268

1228212269
/*
@@ -12307,7 +12294,8 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1230712294
*/
1230812295
staticbool
1230912296
WaitForWALToBecomeAvailable(XLogRecPtrRecPtr,boolrandAccess,
12310-
boolfetching_ckpt,XLogRecPtrtliRecPtr)
12297+
boolfetching_ckpt,XLogRecPtrtliRecPtr,
12298+
XLogSegNoreadSegNo)
1231112299
{
1231212300
staticTimestampTzlast_fail_time=0;
1231312301
TimestampTznow;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp