Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb060dbe

Browse files
committed
Rework XLogReader callback system
Code review for0dc8ead, prompted by a bug closed by91c4054.XLogReader's system for opening and closing segments had gotten toocomplicated, with callbacks being passed at both the XLogReaderAllocatelevel (read_page) as well as at the WALRead level (segment_open). Thiswas confusing and hard to follow, so restructure things so that thesecallbacks are passed together at XLogReaderAllocate time, and addanother callback to the set (segment_close) to make it a coherent whole.Also, ensure XLogReaderState is an argument to all the callbacks, sothat they can grab at the ->private data if necessary.Document the whole arrangement more clearly.Author: Álvaro Herrera <alvherre@alvh.no-ip.org>Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>Discussion:https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql
1 parent871696b commitb060dbe

File tree

13 files changed

+214
-113
lines changed

13 files changed

+214
-113
lines changed

‎src/backend/access/transam/twophase.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
13311331
char*errormsg;
13321332

13331333
xlogreader=XLogReaderAllocate(wal_segment_size,NULL,
1334-
&read_local_xlog_page,NULL);
1334+
XL_ROUTINE(.page_read=&read_local_xlog_page,
1335+
.segment_open=&wal_segment_open,
1336+
.segment_close=&wal_segment_close),
1337+
NULL);
13351338
if (!xlogreader)
13361339
ereport(ERROR,
13371340
(errcode(ERRCODE_OUT_OF_MEMORY),

‎src/backend/access/transam/xlog.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata,
12231223

12241224
if (!debug_reader)
12251225
debug_reader=XLogReaderAllocate(wal_segment_size,NULL,
1226-
NULL,NULL);
1226+
XL_ROUTINE(),NULL);
12271227

12281228
if (!debug_reader)
12291229
{
@@ -6478,8 +6478,12 @@ StartupXLOG(void)
64786478

64796479
/* Set up XLOG reader facility */
64806480
MemSet(&private,0,sizeof(XLogPageReadPrivate));
6481-
xlogreader=XLogReaderAllocate(wal_segment_size,NULL,
6482-
&XLogPageRead,&private);
6481+
xlogreader=
6482+
XLogReaderAllocate(wal_segment_size,NULL,
6483+
XL_ROUTINE(.page_read=&XLogPageRead,
6484+
.segment_open=NULL,
6485+
.segment_close=wal_segment_close),
6486+
&private);
64836487
if (!xlogreader)
64846488
ereport(ERROR,
64856489
(errcode(ERRCODE_OUT_OF_MEMORY),

‎src/backend/access/transam/xlogreader.c

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
7171
*/
7272
XLogReaderState*
7373
XLogReaderAllocate(intwal_segment_size,constchar*waldir,
74-
XLogPageReadCBpagereadfunc,void*private_data)
74+
XLogReaderRoutine*routine,void*private_data)
7575
{
7676
XLogReaderState*state;
7777

@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
8181
if (!state)
8282
returnNULL;
8383

84+
/* initialize caller-provided support functions */
85+
state->routine=*routine;
86+
8487
state->max_block_id=-1;
8588

8689
/*
@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
102105
WALOpenSegmentInit(&state->seg,&state->segcxt,wal_segment_size,
103106
waldir);
104107

105-
state->read_page=pagereadfunc;
106108
/* system_identifier initialized to zeroes above */
107109
state->private_data=private_data;
108110
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
137139
intblock_id;
138140

139141
if (state->seg.ws_file!=-1)
140-
close(state->seg.ws_file);
142+
state->routine.segment_close(state);
141143

142144
for (block_id=0;block_id <=XLR_MAX_BLOCK_ID;block_id++)
143145
{
@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
250252
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call
251253
* to XLogReadRecord().
252254
*
253-
* If theread_page callback fails to read the requested data, NULL is
255+
* If thepage_read callback fails to read the requested data, NULL is
254256
* returned. The callback is expected to have reported the error; errormsg
255257
* is set to NULL.
256258
*
@@ -559,10 +561,10 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
559561

560562
/*
561563
* Read a single xlog page including at least [pageptr, reqLen] of valid data
562-
* via theread_page() callback.
564+
* via thepage_read() callback.
563565
*
564566
* Returns -1 if the required page cannot be read for some reason; errormsg_buf
565-
* is set in that case (unless the error occurs in theread_page callback).
567+
* is set in that case (unless the error occurs in thepage_read callback).
566568
*
567569
* We fetch the page from a reader-local cache if we know we have the required
568570
* data and if there hasn't been any error since caching the data.
@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
589591
* Data is not in our buffer.
590592
*
591593
* Every time we actually read the segment, even if we looked at parts of
592-
* it before, we need to do verification as theread_page callback might
594+
* it before, we need to do verification as thepage_read callback might
593595
* now be rereading data from a different source.
594596
*
595597
* Whenever switching to a new WAL segment, we read the first page of the
@@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
601603
{
602604
XLogRecPtrtargetSegmentPtr=pageptr-targetPageOff;
603605

604-
readLen=state->read_page(state,targetSegmentPtr,XLOG_BLCKSZ,
605-
state->currRecPtr,
606-
state->readBuf);
606+
readLen=state->routine.page_read(state,targetSegmentPtr,XLOG_BLCKSZ,
607+
state->currRecPtr,
608+
state->readBuf);
607609
if (readLen<0)
608610
gotoerr;
609611

@@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
619621
* First, read the requested data length, but at least a short page header
620622
* so that we can validate it.
621623
*/
622-
readLen=state->read_page(state,pageptr,Max(reqLen,SizeOfXLogShortPHD),
623-
state->currRecPtr,
624-
state->readBuf);
624+
readLen=state->routine.page_read(state,pageptr,Max(reqLen,SizeOfXLogShortPHD),
625+
state->currRecPtr,
626+
state->readBuf);
625627
if (readLen<0)
626628
gotoerr;
627629

@@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
638640
/* still not enough */
639641
if (readLen<XLogPageHeaderSize(hdr))
640642
{
641-
readLen=state->read_page(state,pageptr,XLogPageHeaderSize(hdr),
642-
state->currRecPtr,
643-
state->readBuf);
643+
readLen=state->routine.page_read(state,pageptr,XLogPageHeaderSize(hdr),
644+
state->currRecPtr,
645+
state->readBuf);
644646
if (readLen<0)
645647
gotoerr;
646648
}
@@ -1041,11 +1043,14 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
10411043
#endif/* FRONTEND */
10421044

10431045
/*
1046+
* Helper function to ease writing of XLogRoutine->page_read callbacks.
1047+
* If this function is used, caller must supply an open_segment callback in
1048+
* 'state', as that is used here.
1049+
*
10441050
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
10451051
* fetched from timeline 'tli'.
10461052
*
1047-
* 'seg/segcxt' identify the last segment used. 'openSegment' is a callback
1048-
* to open the next segment, if necessary.
1053+
* 'seg/segcxt' identify the last segment used.
10491054
*
10501055
* Returns true if succeeded, false if an error occurs, in which case
10511056
* 'errinfo' receives error details.
@@ -1054,9 +1059,10 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
10541059
* WAL buffers when possible.
10551060
*/
10561061
bool
1057-
WALRead(char*buf,XLogRecPtrstartptr,Sizecount,TimeLineIDtli,
1062+
WALRead(XLogReaderState*state,
1063+
char*buf,XLogRecPtrstartptr,Sizecount,TimeLineIDtli,
10581064
WALOpenSegment*seg,WALSegmentContext*segcxt,
1059-
WALSegmentOpenopenSegment,WALReadError*errinfo)
1065+
WALReadError*errinfo)
10601066
{
10611067
char*p;
10621068
XLogRecPtrrecptr;
@@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
10861092
XLogSegNonextSegNo;
10871093

10881094
if (seg->ws_file >=0)
1089-
close(seg->ws_file);
1095+
state->routine.segment_close(state);
10901096

10911097
XLByteToSeg(recptr,nextSegNo,segcxt->ws_segsize);
1092-
seg->ws_file=openSegment(nextSegNo,segcxt,&tli);
1098+
seg->ws_file=state->routine.segment_open(state,nextSegNo,
1099+
segcxt,&tli);
10931100

10941101
/* Update the current segment info. */
10951102
seg->ws_tli=tli;

‎src/backend/access/transam/xlogutils.c

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
783783
}
784784
}
785785

786-
/*openSegment callback forWALRead */
787-
staticint
788-
wal_segment_open(XLogSegNonextSegNo,WALSegmentContext*segcxt,
789-
TimeLineID*tli_p)
786+
/*XLogReaderRoutine->segment_open callback forlocal pg_wal files */
787+
int
788+
wal_segment_open(XLogReaderState*state,XLogSegNonextSegNo,
789+
WALSegmentContext*segcxt,TimeLineID*tli_p)
790790
{
791791
TimeLineIDtli=*tli_p;
792792
charpath[MAXPGPATH];
@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
811811
return-1;/* keep compiler quiet */
812812
}
813813

814+
/* stock XLogReaderRoutine->segment_close callback */
815+
void
816+
wal_segment_close(XLogReaderState*state)
817+
{
818+
close(state->seg.ws_file);
819+
/* need to check errno? */
820+
state->seg.ws_file=-1;
821+
}
822+
814823
/*
815-
*read_page callback for reading local xlog files
824+
*XLogReaderRoutine->page_read callback for reading local xlog files
816825
*
817826
* Public because it would likely be very helpful for someone writing another
818827
* output method outside walsender, e.g. in a bgworker.
@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
937946
* as 'count', read the whole page anyway. It's guaranteed to be
938947
* zero-padded up to the page boundary if it's incomplete.
939948
*/
940-
if (!WALRead(cur_page,targetPagePtr,XLOG_BLCKSZ,tli,&state->seg,
941-
&state->segcxt,wal_segment_open,&errinfo))
949+
if (!WALRead(state,cur_page,targetPagePtr,XLOG_BLCKSZ,tli,
950+
&state->seg,&state->segcxt,
951+
&errinfo))
942952
WALReadRaiseError(&errinfo);
943953

944954
/* number of valid bytes in the buffer */

‎src/backend/replication/logical/logical.c

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options,
120120
TransactionIdxmin_horizon,
121121
boolneed_full_snapshot,
122122
boolfast_forward,
123-
XLogPageReadCBread_page,
123+
XLogReaderRoutine*xl_routine,
124124
LogicalOutputPluginWriterPrepareWriteprepare_write,
125125
LogicalOutputPluginWriterWritedo_write,
126126
LogicalOutputPluginWriterUpdateProgressupdate_progress)
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
169169

170170
ctx->slot=slot;
171171

172-
ctx->reader=XLogReaderAllocate(wal_segment_size,NULL,read_page,ctx);
172+
ctx->reader=XLogReaderAllocate(wal_segment_size,NULL,xl_routine,ctx);
173173
if (!ctx->reader)
174174
ereport(ERROR,
175175
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options,
215215
*Otherwise, we set for decoding to start from the given LSN without
216216
*marking WAL reserved beforehand. In that scenario, it's up to the
217217
*caller to guarantee that WAL remains available.
218-
* read_page, prepare_write, do_write, update_progress --
218+
* xl_routine -- XLogReaderRoutine for underlying XLogReader
219+
* prepare_write, do_write, update_progress --
219220
*callbacks that perform the use-case dependent, actual, work.
220221
*
221222
* Needs to be called while in a memory context that's at least as long lived
@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin,
230231
List*output_plugin_options,
231232
boolneed_full_snapshot,
232233
XLogRecPtrrestart_lsn,
233-
XLogPageReadCBread_page,
234+
XLogReaderRoutine*xl_routine,
234235
LogicalOutputPluginWriterPrepareWriteprepare_write,
235236
LogicalOutputPluginWriterWritedo_write,
236237
LogicalOutputPluginWriterUpdateProgressupdate_progress)
@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
327328

328329
ctx=StartupDecodingContext(NIL,restart_lsn,xmin_horizon,
329330
need_full_snapshot, false,
330-
read_page,prepare_write,do_write,
331+
xl_routine,prepare_write,do_write,
331332
update_progress);
332333

333334
/* call output plugin initialization callback */
@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin,
357358
* fast_forward
358359
*bypass the generation of logical changes.
359360
*
360-
* read_page, prepare_write, do_write, update_progress
361+
* xl_routine
362+
*XLogReaderRoutine used by underlying xlogreader
363+
*
364+
* prepare_write, do_write, update_progress
361365
*callbacks that have to be filled to perform the use-case dependent,
362366
*actual work.
363367
*
@@ -372,7 +376,7 @@ LogicalDecodingContext *
372376
CreateDecodingContext(XLogRecPtrstart_lsn,
373377
List*output_plugin_options,
374378
boolfast_forward,
375-
XLogPageReadCBread_page,
379+
XLogReaderRoutine*xl_routine,
376380
LogicalOutputPluginWriterPrepareWriteprepare_write,
377381
LogicalOutputPluginWriterWritedo_write,
378382
LogicalOutputPluginWriterUpdateProgressupdate_progress)
@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
425429

426430
ctx=StartupDecodingContext(output_plugin_options,
427431
start_lsn,InvalidTransactionId, false,
428-
fast_forward,read_page,prepare_write,
432+
fast_forward,xl_routine,prepare_write,
429433
do_write,update_progress);
430434

431435
/* call output plugin initialization callback */

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
233233
ctx=CreateDecodingContext(InvalidXLogRecPtr,
234234
options,
235235
false,
236-
read_local_xlog_page,
236+
XL_ROUTINE(.page_read=read_local_xlog_page,
237+
.segment_open=wal_segment_open,
238+
.segment_close=wal_segment_close),
237239
LogicalOutputPrepareWrite,
238240
LogicalOutputWrite,NULL);
239241

‎src/backend/replication/slotfuncs.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin,
152152
ctx=CreateInitDecodingContext(plugin,NIL,
153153
false,/* just catalogs is OK */
154154
restart_lsn,
155-
read_local_xlog_page,NULL,NULL,
156-
NULL);
155+
XL_ROUTINE(.page_read=read_local_xlog_page,
156+
.segment_open=wal_segment_open,
157+
.segment_close=wal_segment_close),
158+
NULL,NULL,NULL);
157159

158160
/*
159161
* If caller needs us to determine the decoding start point, do so now.
@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
464466
ctx=CreateDecodingContext(InvalidXLogRecPtr,
465467
NIL,
466468
true,/* fast_forward */
467-
read_local_xlog_page,
469+
XL_ROUTINE(.page_read=read_local_xlog_page,
470+
.segment_open=wal_segment_open,
471+
.segment_close=wal_segment_close),
468472
NULL,NULL,NULL);
469473

470474
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp