Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3b02ea4

Browse files
committed
XLogReader general code cleanup
Some minor tweaks and comment additions, for cleanliness sake and toavoid having the upcoming timeline-following patch be polluted withunrelated cleanup.Extracted from a larger patch by Craig Ringer, reviewed by AndresFreund, with some additions by myself.
1 parent50861cd commit3b02ea4

File tree

5 files changed

+86
-30
lines changed

5 files changed

+86
-30
lines changed

‎src/backend/access/transam/xlogreader.c

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
*
1111
* NOTES
1212
*See xlogreader.h for more notes on this facility.
13+
*
14+
*This file is compiled as both front-end and backend code, so it
15+
*may not use ereport, server-defined static variables, etc.
1316
*-------------------------------------------------------------------------
1417
*/
15-
1618
#include"postgres.h"
1719

1820
#include"access/transam.h"
@@ -192,14 +194,21 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
192194
{
193195
XLogRecord*record;
194196
XLogRecPtrtargetPagePtr;
195-
boolrandAccess= false;
197+
boolrandAccess;
196198
uint32len,
197199
total_len;
198200
uint32targetRecOff;
199201
uint32pageHeaderSize;
200202
boolgotheader;
201203
intreadOff;
202204

205+
/*
206+
* randAccess indicates whether to verify the previous-record pointer of
207+
* the record we're reading. We only do this if we're reading
208+
* sequentially, which is what we initially assume.
209+
*/
210+
randAccess= false;
211+
203212
/* reset error state */
204213
*errormsg=NULL;
205214
state->errormsg_buf[0]='\0';
@@ -208,6 +217,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
208217

209218
if (RecPtr==InvalidXLogRecPtr)
210219
{
220+
/* No explicit start point; read the record after the one we just read */
211221
RecPtr=state->EndRecPtr;
212222

213223
if (state->ReadRecPtr==InvalidXLogRecPtr)
@@ -223,11 +233,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
223233
else
224234
{
225235
/*
236+
* Caller supplied a position to start at.
237+
*
226238
* In this case, the passed-in record pointer should already be
227239
* pointing to a valid record starting position.
228240
*/
229241
Assert(XRecOffIsValid(RecPtr));
230-
randAccess= true;/* allow readPageTLI to go backwards too */
242+
randAccess= true;
231243
}
232244

233245
state->currRecPtr=RecPtr;
@@ -309,8 +321,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
309321
/* XXX: more validation should be done here */
310322
if (total_len<SizeOfXLogRecord)
311323
{
312-
report_invalid_record(state,"invalid record length at %X/%X",
313-
(uint32) (RecPtr >>32), (uint32)RecPtr);
324+
report_invalid_record(state,
325+
"invalid record length at %X/%X: wanted %lu, got %u",
326+
(uint32) (RecPtr >>32), (uint32)RecPtr,
327+
SizeOfXLogRecord,total_len);
314328
gotoerr;
315329
}
316330
gotheader= false;
@@ -463,12 +477,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
463477
err:
464478

465479
/*
466-
* Invalidate thexlog page we've cached. We might read from a different
467-
*source afterfailure.
480+
* Invalidate theread state. We might read from a different source after
481+
* failure.
468482
*/
469-
state->readSegNo=0;
470-
state->readOff=0;
471-
state->readLen=0;
483+
XLogReaderInvalReadState(state);
472484

473485
if (state->errormsg_buf[0]!='\0')
474486
*errormsg=state->errormsg_buf;
@@ -572,18 +584,27 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
572584
if (!ValidXLogPageHeader(state,pageptr,hdr))
573585
gotoerr;
574586

575-
/* updatecache information */
587+
/* updateread state information */
576588
state->readSegNo=targetSegNo;
577589
state->readOff=targetPageOff;
578590
state->readLen=readLen;
579591

580592
returnreadLen;
581593

582594
err:
595+
XLogReaderInvalReadState(state);
596+
return-1;
597+
}
598+
599+
/*
600+
* Invalidate the xlogreader's read state to force a re-read.
601+
*/
602+
void
603+
XLogReaderInvalReadState(XLogReaderState*state)
604+
{
583605
state->readSegNo=0;
584606
state->readOff=0;
585607
state->readLen=0;
586-
return-1;
587608
}
588609

589610
/*
@@ -600,8 +621,9 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
600621
if (record->xl_tot_len<SizeOfXLogRecord)
601622
{
602623
report_invalid_record(state,
603-
"invalid record length at %X/%X",
604-
(uint32) (RecPtr >>32), (uint32)RecPtr);
624+
"invalid record length at %X/%X: wanted %lu, got %u",
625+
(uint32) (RecPtr >>32), (uint32)RecPtr,
626+
SizeOfXLogRecord,record->xl_tot_len);
605627
return false;
606628
}
607629
if (record->xl_rmid>RM_MAX_ID)
@@ -907,11 +929,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
907929
err:
908930
out:
909931
/* Reset state to what we had before finding the record */
910-
state->readSegNo=0;
911-
state->readOff=0;
912-
state->readLen=0;
913932
state->ReadRecPtr=saved_state.ReadRecPtr;
914933
state->EndRecPtr=saved_state.EndRecPtr;
934+
XLogReaderInvalReadState(state);
915935

916936
returnfound;
917937
}

‎src/backend/access/transam/xlogutils.c

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919

2020
#include<unistd.h>
2121

22-
#include"miscadmin.h"
23-
2422
#include"access/xlog.h"
2523
#include"access/xlog_internal.h"
2624
#include"access/xlogutils.h"
2725
#include"catalog/catalog.h"
26+
#include"miscadmin.h"
2827
#include"storage/smgr.h"
2928
#include"utils/guc.h"
3029
#include"utils/hsearch.h"
@@ -638,8 +637,17 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
638637
}
639638

640639
/*
641-
* TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
642-
* we currently don't have the infrastructure (elog!) to share it.
640+
* Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
641+
* in timeline 'tli'.
642+
*
643+
* Will open, and keep open, one WAL segment stored in the static file
644+
* descriptor 'sendFile'. This means if XLogRead is used once, there will
645+
* always be one descriptor left open until the process ends, but never
646+
* more than one.
647+
*
648+
* XXX This is very similar to pg_xlogdump's XLogDumpXLogRead and to XLogRead
649+
* in walsender.c but for small differences (such as lack of elog() in
650+
* frontend). Probably these should be merged at some point.
643651
*/
644652
staticvoid
645653
XLogRead(char*buf,TimeLineIDtli,XLogRecPtrstartptr,Sizecount)
@@ -648,6 +656,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
648656
XLogRecPtrrecptr;
649657
Sizenbytes;
650658

659+
/* state maintained across calls */
651660
staticintsendFile=-1;
652661
staticXLogSegNosendSegNo=0;
653662
staticuint32sendOff=0;
@@ -664,11 +673,11 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
664673

665674
startoff=recptr %XLogSegSize;
666675

676+
/* Do we need to switch to a different xlog segment? */
667677
if (sendFile<0|| !XLByteInSeg(recptr,sendSegNo))
668678
{
669679
charpath[MAXPGPATH];
670680

671-
/* Switch to another logfile segment */
672681
if (sendFile >=0)
673682
close(sendFile);
674683

@@ -745,7 +754,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
745754
* Public because it would likely be very helpful for someone writing another
746755
* output method outside walsender, e.g. in a bgworker.
747756
*
748-
* TODO: The walsender hasit's own version of this, but it relies on the
757+
* TODO: The walsender hasits own version of this, but it relies on the
749758
* walsender's latch being set whenever WAL is flushed. No such infrastructure
750759
* exists for normal backends, so we have to do a check/sleep/repeat style of
751760
* loop for now.

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
115115
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,TimeLineID*pageTLI)
116116
{
117117
returnread_local_xlog_page(state,targetPagePtr,reqLen,
118-
targetRecPtr,cur_page,pageTLI);
118+
targetRecPtr,cur_page,pageTLI);
119119
}
120120

121121
/*
@@ -241,6 +241,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
241241

242242
PG_TRY();
243243
{
244+
/*
245+
* Passing InvalidXLogRecPtr here causes replay to start at the slot's
246+
* confirmed_flush.
247+
*/
244248
ctx=CreateDecodingContext(InvalidXLogRecPtr,
245249
options,
246250
logical_read_local_xlog_page,
@@ -263,6 +267,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
263267

264268
ctx->output_writer_private=p;
265269

270+
/*
271+
* We start reading xlog from the restart lsn, even though in
272+
* CreateDecodingContext we set the snapshot builder up using the
273+
* slot's confirmed_flush. This means we might read xlog we don't
274+
* actually decode rows from, but the snapshot builder might need it
275+
* to get to a consistent point. The point we start returning data to
276+
* *users* at is the candidate restart lsn from the decoding context.
277+
*/
266278
startptr=MyReplicationSlot->data.restart_lsn;
267279

268280
CurrentResourceOwner=ResourceOwnerCreate(CurrentResourceOwner,"logical decoding");
@@ -280,6 +292,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
280292
if (errm)
281293
elog(ERROR,"%s",errm);
282294

295+
/*
296+
* Now that we've set up the xlog reader state, subsequent calls
297+
* pass InvalidXLogRecPtr to say "continue from last record"
298+
*/
283299
startptr=InvalidXLogRecPtr;
284300

285301
/*

‎src/include/access/xlogreader.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,22 @@ struct XLogReaderState
139139
* ----------------------------------------
140140
*/
141141

142-
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
142+
/*
143+
* Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
144+
* readLen bytes)
145+
*/
143146
char*readBuf;
147+
uint32readLen;
144148

145-
/* last read segment, segment offset,read length, TLI */
149+
/* last read segment, segment offset,TLI for data currently in readBuf */
146150
XLogSegNoreadSegNo;
147151
uint32readOff;
148-
uint32readLen;
149152
TimeLineIDreadPageTLI;
150153

151-
/* beginning of last page read, and its TLI */
154+
/*
155+
* beginning of prior page read, and its TLI. Doesn't necessarily
156+
* correspond to what's in readBuf; used for timeline sanity checks.
157+
*/
152158
XLogRecPtrlatestPagePtr;
153159
TimeLineIDlatestPageTLI;
154160

@@ -174,6 +180,9 @@ extern void XLogReaderFree(XLogReaderState *state);
174180
externstructXLogRecord*XLogReadRecord(XLogReaderState*state,
175181
XLogRecPtrrecptr,char**errormsg);
176182

183+
/* Invalidate read state */
184+
externvoidXLogReaderInvalReadState(XLogReaderState*state);
185+
177186
#ifdefFRONTEND
178187
externXLogRecPtrXLogFindNextRecord(XLogReaderState*state,XLogRecPtrRecPtr);
179188
#endif/* FRONTEND */

‎src/include/access/xlogutils.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
4747
externRelationCreateFakeRelcacheEntry(RelFileNodernode);
4848
externvoidFreeFakeRelcacheEntry(Relationfakerel);
4949

50-
externintread_local_xlog_page(XLogReaderState*state,XLogRecPtrtargetPagePtr,
51-
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,TimeLineID*pageTLI);
50+
externintread_local_xlog_page(XLogReaderState*state,
51+
XLogRecPtrtargetPagePtr,intreqLen,
52+
XLogRecPtrtargetRecPtr,char*cur_page,
53+
TimeLineID*pageTLI);
5254

5355
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp