Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit422a55a

Browse files
Refactor to create generic WAL page read callback
Previously we didn’t have a generic WAL page read callback function,surprisingly. Logical decoding has logical_read_local_xlog_page(), which wasactually generic, so move that to xlogfunc.c and rename toread_local_xlog_page().Maintain logical_read_local_xlog_page() so existing callers still work.As requested by Michael Paquier, Alvaro Herrera and Andres Freund
1 parent45be99f commit422a55a

File tree

3 files changed

+172
-155
lines changed

3 files changed

+172
-155
lines changed

‎src/backend/access/transam/xlogutils.c

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
*/
1818
#include"postgres.h"
1919

20+
#include<unistd.h>
21+
22+
#include"miscadmin.h"
23+
2024
#include"access/xlog.h"
25+
#include"access/xlog_internal.h"
2126
#include"access/xlogutils.h"
2227
#include"catalog/catalog.h"
2328
#include"storage/smgr.h"
@@ -631,3 +636,164 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
631636
{
632637
forget_invalid_pages(rnode,forkNum,nblocks);
633638
}
639+
640+
/*
641+
* TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
642+
* we currently don't have the infrastructure (elog!) to share it.
643+
*/
644+
staticvoid
645+
XLogRead(char*buf,TimeLineIDtli,XLogRecPtrstartptr,Sizecount)
646+
{
647+
char*p;
648+
XLogRecPtrrecptr;
649+
Sizenbytes;
650+
651+
staticintsendFile=-1;
652+
staticXLogSegNosendSegNo=0;
653+
staticuint32sendOff=0;
654+
655+
p=buf;
656+
recptr=startptr;
657+
nbytes=count;
658+
659+
while (nbytes>0)
660+
{
661+
uint32startoff;
662+
intsegbytes;
663+
intreadbytes;
664+
665+
startoff=recptr %XLogSegSize;
666+
667+
if (sendFile<0|| !XLByteInSeg(recptr,sendSegNo))
668+
{
669+
charpath[MAXPGPATH];
670+
671+
/* Switch to another logfile segment */
672+
if (sendFile >=0)
673+
close(sendFile);
674+
675+
XLByteToSeg(recptr,sendSegNo);
676+
677+
XLogFilePath(path,tli,sendSegNo);
678+
679+
sendFile=BasicOpenFile(path,O_RDONLY |PG_BINARY,0);
680+
681+
if (sendFile<0)
682+
{
683+
if (errno==ENOENT)
684+
ereport(ERROR,
685+
(errcode_for_file_access(),
686+
errmsg("requested WAL segment %s has already been removed",
687+
path)));
688+
else
689+
ereport(ERROR,
690+
(errcode_for_file_access(),
691+
errmsg("could not open file \"%s\": %m",
692+
path)));
693+
}
694+
sendOff=0;
695+
}
696+
697+
/* Need to seek in the file? */
698+
if (sendOff!=startoff)
699+
{
700+
if (lseek(sendFile, (off_t)startoff,SEEK_SET)<0)
701+
{
702+
charpath[MAXPGPATH];
703+
704+
XLogFilePath(path,tli,sendSegNo);
705+
706+
ereport(ERROR,
707+
(errcode_for_file_access(),
708+
errmsg("could not seek in log segment %s to offset %u: %m",
709+
path,startoff)));
710+
}
711+
sendOff=startoff;
712+
}
713+
714+
/* How many bytes are within this segment? */
715+
if (nbytes> (XLogSegSize-startoff))
716+
segbytes=XLogSegSize-startoff;
717+
else
718+
segbytes=nbytes;
719+
720+
readbytes=read(sendFile,p,segbytes);
721+
if (readbytes <=0)
722+
{
723+
charpath[MAXPGPATH];
724+
725+
XLogFilePath(path,tli,sendSegNo);
726+
727+
ereport(ERROR,
728+
(errcode_for_file_access(),
729+
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
730+
path,sendOff, (unsigned long)segbytes)));
731+
}
732+
733+
/* Update state for read */
734+
recptr+=readbytes;
735+
736+
sendOff+=readbytes;
737+
nbytes-=readbytes;
738+
p+=readbytes;
739+
}
740+
}
741+
742+
/*
743+
* read_page callback for reading local xlog files
744+
*
745+
* Public because it would likely be very helpful for someone writing another
746+
* output method outside walsender, e.g. in a bgworker.
747+
*
748+
* TODO: The walsender has it's own version of this, but it relies on the
749+
* walsender's latch being set whenever WAL is flushed. No such infrastructure
750+
* exists for normal backends, so we have to do a check/sleep/repeat style of
751+
* loop for now.
752+
*/
753+
int
754+
read_local_xlog_page(XLogReaderState*state,XLogRecPtrtargetPagePtr,
755+
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,TimeLineID*pageTLI)
756+
{
757+
XLogRecPtrflushptr,
758+
loc;
759+
intcount;
760+
761+
loc=targetPagePtr+reqLen;
762+
while (1)
763+
{
764+
/*
765+
* TODO: we're going to have to do something more intelligent about
766+
* timelines on standbys. Use readTimeLineHistory() and
767+
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
768+
* that case earlier, but the code and TODO is left in here for when
769+
* that changes.
770+
*/
771+
if (!RecoveryInProgress())
772+
{
773+
*pageTLI=ThisTimeLineID;
774+
flushptr=GetFlushRecPtr();
775+
}
776+
else
777+
flushptr=GetXLogReplayRecPtr(pageTLI);
778+
779+
if (loc <=flushptr)
780+
break;
781+
782+
CHECK_FOR_INTERRUPTS();
783+
pg_usleep(1000L);
784+
}
785+
786+
/* more than one block available */
787+
if (targetPagePtr+XLOG_BLCKSZ <=flushptr)
788+
count=XLOG_BLCKSZ;
789+
/* not enough data there */
790+
elseif (targetPagePtr+reqLen>flushptr)
791+
return-1;
792+
/* part of the page available */
793+
else
794+
count=flushptr-targetPagePtr;
795+
796+
XLogRead(cur_page,*pageTLI,targetPagePtr,XLOG_BLCKSZ);
797+
798+
returncount;
799+
}

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 3 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include"miscadmin.h"
2323

2424
#include"access/xlog_internal.h"
25+
#include"access/xlogutils.h"
2526

2627
#include"catalog/pg_type.h"
2728

@@ -100,108 +101,6 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
100101
p->returned_rows++;
101102
}
102103

103-
/*
104-
* TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
105-
* we currently don't have the infrastructure (elog!) to share it.
106-
*/
107-
staticvoid
108-
XLogRead(char*buf,TimeLineIDtli,XLogRecPtrstartptr,Sizecount)
109-
{
110-
char*p;
111-
XLogRecPtrrecptr;
112-
Sizenbytes;
113-
114-
staticintsendFile=-1;
115-
staticXLogSegNosendSegNo=0;
116-
staticuint32sendOff=0;
117-
118-
p=buf;
119-
recptr=startptr;
120-
nbytes=count;
121-
122-
while (nbytes>0)
123-
{
124-
uint32startoff;
125-
intsegbytes;
126-
intreadbytes;
127-
128-
startoff=recptr %XLogSegSize;
129-
130-
if (sendFile<0|| !XLByteInSeg(recptr,sendSegNo))
131-
{
132-
charpath[MAXPGPATH];
133-
134-
/* Switch to another logfile segment */
135-
if (sendFile >=0)
136-
close(sendFile);
137-
138-
XLByteToSeg(recptr,sendSegNo);
139-
140-
XLogFilePath(path,tli,sendSegNo);
141-
142-
sendFile=BasicOpenFile(path,O_RDONLY |PG_BINARY,0);
143-
144-
if (sendFile<0)
145-
{
146-
if (errno==ENOENT)
147-
ereport(ERROR,
148-
(errcode_for_file_access(),
149-
errmsg("requested WAL segment %s has already been removed",
150-
path)));
151-
else
152-
ereport(ERROR,
153-
(errcode_for_file_access(),
154-
errmsg("could not open file \"%s\": %m",
155-
path)));
156-
}
157-
sendOff=0;
158-
}
159-
160-
/* Need to seek in the file? */
161-
if (sendOff!=startoff)
162-
{
163-
if (lseek(sendFile, (off_t)startoff,SEEK_SET)<0)
164-
{
165-
charpath[MAXPGPATH];
166-
167-
XLogFilePath(path,tli,sendSegNo);
168-
169-
ereport(ERROR,
170-
(errcode_for_file_access(),
171-
errmsg("could not seek in log segment %s to offset %u: %m",
172-
path,startoff)));
173-
}
174-
sendOff=startoff;
175-
}
176-
177-
/* How many bytes are within this segment? */
178-
if (nbytes> (XLogSegSize-startoff))
179-
segbytes=XLogSegSize-startoff;
180-
else
181-
segbytes=nbytes;
182-
183-
readbytes=read(sendFile,p,segbytes);
184-
if (readbytes <=0)
185-
{
186-
charpath[MAXPGPATH];
187-
188-
XLogFilePath(path,tli,sendSegNo);
189-
190-
ereport(ERROR,
191-
(errcode_for_file_access(),
192-
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
193-
path,sendOff, (unsigned long)segbytes)));
194-
}
195-
196-
/* Update state for read */
197-
recptr+=readbytes;
198-
199-
sendOff+=readbytes;
200-
nbytes-=readbytes;
201-
p+=readbytes;
202-
}
203-
}
204-
205104
staticvoid
206105
check_permissions(void)
207106
{
@@ -211,63 +110,12 @@ check_permissions(void)
211110
(errmsg("must be superuser or replication role to use replication slots"))));
212111
}
213112

214-
/*
215-
* read_page callback for logical decoding contexts.
216-
*
217-
* Public because it would likely be very helpful for someone writing another
218-
* output method outside walsender, e.g. in a bgworker.
219-
*
220-
* TODO: The walsender has it's own version of this, but it relies on the
221-
* walsender's latch being set whenever WAL is flushed. No such infrastructure
222-
* exists for normal backends, so we have to do a check/sleep/repeat style of
223-
* loop for now.
224-
*/
225113
int
226114
logical_read_local_xlog_page(XLogReaderState*state,XLogRecPtrtargetPagePtr,
227115
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,TimeLineID*pageTLI)
228116
{
229-
XLogRecPtrflushptr,
230-
loc;
231-
intcount;
232-
233-
loc=targetPagePtr+reqLen;
234-
while (1)
235-
{
236-
/*
237-
* TODO: we're going to have to do something more intelligent about
238-
* timelines on standbys. Use readTimeLineHistory() and
239-
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
240-
* that case earlier, but the code and TODO is left in here for when
241-
* that changes.
242-
*/
243-
if (!RecoveryInProgress())
244-
{
245-
*pageTLI=ThisTimeLineID;
246-
flushptr=GetFlushRecPtr();
247-
}
248-
else
249-
flushptr=GetXLogReplayRecPtr(pageTLI);
250-
251-
if (loc <=flushptr)
252-
break;
253-
254-
CHECK_FOR_INTERRUPTS();
255-
pg_usleep(1000L);
256-
}
257-
258-
/* more than one block available */
259-
if (targetPagePtr+XLOG_BLCKSZ <=flushptr)
260-
count=XLOG_BLCKSZ;
261-
/* not enough data there */
262-
elseif (targetPagePtr+reqLen>flushptr)
263-
return-1;
264-
/* part of the page available */
265-
else
266-
count=flushptr-targetPagePtr;
267-
268-
XLogRead(cur_page,*pageTLI,targetPagePtr,XLOG_BLCKSZ);
269-
270-
returncount;
117+
returnread_local_xlog_page(state,targetPagePtr,reqLen,
118+
targetRecPtr,cur_page,pageTLI);
271119
}
272120

273121
/*

‎src/include/access/xlogutils.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
4747
externRelationCreateFakeRelcacheEntry(RelFileNodernode);
4848
externvoidFreeFakeRelcacheEntry(Relationfakerel);
4949

50+
externintread_local_xlog_page(XLogReaderState*state,XLogRecPtrtargetPagePtr,
51+
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,TimeLineID*pageTLI);
52+
5053
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp