Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit24c5f1a

Browse files
committed
Enable logical slots to follow timeline switches
When decoding from a logical slot, it's necessary for xlog reading to beable to read xlog from historical (i.e. not current) timelines;otherwise, decoding fails after failover, because the archives are inthe historical timeline. This is required to make "failover logicalslots" possible; it currently has no other use, although theoreticallyit could be used by an extension that creates a slot on a standby andcontinues to replay from the slot when the standby is promoted.This commit includes a module in src/test/modules with functions tomanipulate the slots (which is not otherwise possible in SQL code) inorder to enable testing, and a new test in src/test/recovery to ensurethat the behavior is as expected.Author: Craig RingerReviewed-By: Oleksii Kliukin, Andres Freund, Petr Jelínek
1 parent3b02ea4 commit24c5f1a

File tree

16 files changed

+790
-31
lines changed

16 files changed

+790
-31
lines changed

‎src/backend/access/transam/xlogreader.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
118118
returnNULL;
119119
}
120120

121+
#ifndefFRONTEND
122+
/* Will be loaded on first read */
123+
state->timelineHistory=NIL;
124+
#endif
125+
121126
returnstate;
122127
}
123128

@@ -137,6 +142,10 @@ XLogReaderFree(XLogReaderState *state)
137142
pfree(state->errormsg_buf);
138143
if (state->readRecordBuf)
139144
pfree(state->readRecordBuf);
145+
#ifndefFRONTEND
146+
if (state->timelineHistory)
147+
list_free_deep(state->timelineHistory);
148+
#endif
140149
pfree(state->readBuf);
141150
pfree(state);
142151
}

‎src/backend/access/transam/xlogutils.c

Lines changed: 221 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include<unistd.h>
2121

22+
#include"access/timeline.h"
2223
#include"access/xlog.h"
2324
#include"access/xlog_internal.h"
2425
#include"access/xlogutils.h"
@@ -659,6 +660,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
659660
/* state maintained across calls */
660661
staticintsendFile=-1;
661662
staticXLogSegNosendSegNo=0;
663+
staticTimeLineIDsendTLI=0;
662664
staticuint32sendOff=0;
663665

664666
p=buf;
@@ -674,7 +676,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
674676
startoff=recptr %XLogSegSize;
675677

676678
/* Do we need to switch to a different xlog segment? */
677-
if (sendFile<0|| !XLByteInSeg(recptr,sendSegNo))
679+
if (sendFile<0|| !XLByteInSeg(recptr,sendSegNo)||
680+
sendTLI!=tli)
678681
{
679682
charpath[MAXPGPATH];
680683

@@ -701,6 +704,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
701704
path)));
702705
}
703706
sendOff=0;
707+
sendTLI=tli;
704708
}
705709

706710
/* Need to seek in the file? */
@@ -748,6 +752,147 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
748752
}
749753
}
750754

755+
/*
756+
* Determine XLogReaderState->currTLI and ->currTLIValidUntil;
757+
* XLogReaderState->EndRecPtr, ->currRecPtr and ThisTimeLineID affect the
758+
* decision. This may later be used to determine which xlog segment file to
759+
* open, etc.
760+
*
761+
* We switch to an xlog segment from the new timeline eagerly when on a
762+
* historical timeline, as soon as we reach the start of the xlog segment
763+
* containing the timeline switch. The server copied the segment to the new
764+
* timeline so all the data up to the switch point is the same, but there's no
765+
* guarantee the old segment will still exist. It may have been deleted or
766+
* renamed with a .partial suffix so we can't necessarily keep reading from
767+
* the old TLI even though tliSwitchPoint says it's OK.
768+
*
769+
* Because of this, callers MAY NOT assume that currTLI is the timeline that
770+
* will be in a page's xlp_tli; the page may begin on an older timeline or we
771+
* might be reading from historical timeline data on a segment that's been
772+
* copied to a new timeline.
773+
*/
774+
staticvoid
775+
XLogReadDetermineTimeline(XLogReaderState*state)
776+
{
777+
/* Read the history on first time through */
778+
if (state->timelineHistory==NIL)
779+
state->timelineHistory=readTimeLineHistory(ThisTimeLineID);
780+
781+
/*
782+
* Are we reading the record immediately following the one we read last
783+
* time? If not, then don't use the cached timeline info.
784+
*/
785+
if (state->currRecPtr!=state->EndRecPtr)
786+
{
787+
state->currTLI=0;
788+
state->currTLIValidUntil=InvalidXLogRecPtr;
789+
}
790+
791+
/*
792+
* Are we reading a timeline that used to be the latest one, but became
793+
* historical?This can happen in a replica that gets promoted, and in a
794+
* cascading replica whose upstream gets promoted. In either case,
795+
* re-read the timeline history data. We cannot read past the timeline
796+
* switch point, because either the records in the old timeline might be
797+
* invalid, or worse, they may valid but *different* from the ones we
798+
* should be reading.
799+
*/
800+
if (state->currTLIValidUntil==InvalidXLogRecPtr&&
801+
state->currTLI!=ThisTimeLineID&&
802+
state->currTLI!=0)
803+
{
804+
/* re-read timeline history */
805+
list_free_deep(state->timelineHistory);
806+
state->timelineHistory=readTimeLineHistory(ThisTimeLineID);
807+
808+
elog(DEBUG2,"timeline %u became historical during decoding",
809+
state->currTLI);
810+
811+
/* then invalidate the cached timeline info */
812+
state->currTLI=0;
813+
state->currTLIValidUntil=InvalidXLogRecPtr;
814+
}
815+
816+
/*
817+
* Are we reading a record immediately following a timeline switch? If
818+
* so, we must follow the switch too.
819+
*/
820+
if (state->currRecPtr==state->EndRecPtr&&
821+
state->currTLI!=0&&
822+
state->currTLIValidUntil!=InvalidXLogRecPtr&&
823+
state->currRecPtr >=state->currTLIValidUntil)
824+
{
825+
elog(DEBUG2,
826+
"requested record %X/%X is on segment containing end of timeline %u valid until %X/%X, switching to next timeline",
827+
(uint32) (state->currRecPtr >>32),
828+
(uint32)state->currRecPtr,
829+
state->currTLI,
830+
(uint32) (state->currTLIValidUntil >>32),
831+
(uint32) (state->currTLIValidUntil));
832+
833+
/* invalidate TLI info so we look up the next TLI */
834+
state->currTLI=0;
835+
state->currTLIValidUntil=InvalidXLogRecPtr;
836+
}
837+
838+
if (state->currTLI==0)
839+
{
840+
/*
841+
* Something changed; work out what timeline this record is on. We
842+
* might read it from the segment on this TLI or, if the segment is
843+
* also contained by newer timelines, the copy from a newer TLI.
844+
*/
845+
state->currTLI=tliOfPointInHistory(state->currRecPtr,
846+
state->timelineHistory);
847+
848+
/*
849+
* Look for the most recent timeline that's on the same xlog segment
850+
* as this record, since that's the only one we can assume is still
851+
* readable.
852+
*/
853+
while (state->currTLI!=ThisTimeLineID&&
854+
state->currTLIValidUntil==InvalidXLogRecPtr)
855+
{
856+
XLogRecPtrtliSwitch;
857+
TimeLineIDnextTLI;
858+
859+
CHECK_FOR_INTERRUPTS();
860+
861+
tliSwitch=tliSwitchPoint(state->currTLI,state->timelineHistory,
862+
&nextTLI);
863+
864+
/* round ValidUntil down to start of seg containing the switch */
865+
state->currTLIValidUntil=
866+
((tliSwitch /XLogSegSize)*XLogSegSize);
867+
868+
if (state->currRecPtr >=state->currTLIValidUntil)
869+
{
870+
/*
871+
* The new currTLI ends on this WAL segment so check the next
872+
* TLI to see if it's the last one on the segment.
873+
*
874+
* If that's the current TLI we'll stop searching.
875+
*/
876+
state->currTLI=nextTLI;
877+
state->currTLIValidUntil=InvalidXLogRecPtr;
878+
}
879+
}
880+
881+
/*
882+
* We're now either reading from the first xlog segment in the current
883+
* server's timeline or the most recent historical timeline that
884+
* exists on the target segment.
885+
*/
886+
elog(DEBUG2,"XLog read ptr %X/%X is on segment with TLI %u valid until %X/%X, server current TLI is %u",
887+
(uint32) (state->currRecPtr >>32),
888+
(uint32)state->currRecPtr,
889+
state->currTLI,
890+
(uint32) (state->currTLIValidUntil >>32),
891+
(uint32) (state->currTLIValidUntil),
892+
ThisTimeLineID);
893+
}
894+
}
895+
751896
/*
752897
* read_page callback for reading local xlog files
753898
*
@@ -761,48 +906,101 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
761906
*/
762907
int
763908
read_local_xlog_page(XLogReaderState*state,XLogRecPtrtargetPagePtr,
764-
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,TimeLineID*pageTLI)
909+
intreqLen,XLogRecPtrtargetRecPtr,char*cur_page,
910+
TimeLineID*pageTLI)
765911
{
766-
XLogRecPtrflushptr,
912+
XLogRecPtrread_upto,
767913
loc;
768914
intcount;
769915

770916
loc=targetPagePtr+reqLen;
917+
918+
/* Make sure enough xlog is available... */
771919
while (1)
772920
{
773921
/*
774-
*TODO: we're going tohave to do something more intelligent about
775-
* timelines on standbys. Use readTimeLineHistory() and
776-
*tliOfPointInHistory()toget the proper LSN? For nowwe'll catch
777-
*that case earlier, butthecode and TODO is left in here for when
778-
*that changes.
922+
*Check which timeline toget the record from.
923+
*
924+
*We havetodo it each time through the loop because ifwe're in
925+
*recovery as a cascading standby,thecurrent timeline might've
926+
*become historical.
779927
*/
780-
if (!RecoveryInProgress())
928+
XLogReadDetermineTimeline(state);
929+
930+
if (state->currTLI==ThisTimeLineID)
781931
{
782-
*pageTLI=ThisTimeLineID;
783-
flushptr=GetFlushRecPtr();
932+
/*
933+
* We're reading from the current timeline so we might have to
934+
* wait for the desired record to be generated (or, for a standby,
935+
* received & replayed)
936+
*/
937+
if (!RecoveryInProgress())
938+
{
939+
*pageTLI=ThisTimeLineID;
940+
read_upto=GetFlushRecPtr();
941+
}
942+
else
943+
read_upto=GetXLogReplayRecPtr(pageTLI);
944+
945+
if (loc <=read_upto)
946+
break;
947+
948+
CHECK_FOR_INTERRUPTS();
949+
pg_usleep(1000L);
784950
}
785951
else
786-
flushptr=GetXLogReplayRecPtr(pageTLI);
787-
788-
if (loc <=flushptr)
952+
{
953+
/*
954+
* We're on a historical timeline, so limit reading to the switch
955+
* point where we moved to the next timeline.
956+
*
957+
* We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
958+
* about the new timeline, so we must've received past the end of
959+
* it.
960+
*/
961+
read_upto=state->currTLIValidUntil;
962+
963+
/*
964+
* Setting pageTLI to our wanted record's TLI is slightly wrong;
965+
* the page might begin on an older timeline if it contains a
966+
* timeline switch, since its xlog segment will have been copied
967+
* from the prior timeline. This is pretty harmless though, as
968+
* nothing cares so long as the timeline doesn't go backwards. We
969+
* should read the page header instead; FIXME someday.
970+
*/
971+
*pageTLI=state->currTLI;
972+
973+
/* No need to wait on a historical timeline */
789974
break;
790-
791-
CHECK_FOR_INTERRUPTS();
792-
pg_usleep(1000L);
975+
}
793976
}
794977

795-
/* more than one block available */
796-
if (targetPagePtr+XLOG_BLCKSZ <=flushptr)
978+
if (targetPagePtr+XLOG_BLCKSZ <=read_upto)
979+
{
980+
/*
981+
* more than one block available; read only that block, have caller
982+
* come back if they need more.
983+
*/
797984
count=XLOG_BLCKSZ;
798-
/* not enough data there */
799-
elseif (targetPagePtr+reqLen>flushptr)
985+
}
986+
elseif (targetPagePtr+reqLen>read_upto)
987+
{
988+
/* not enough data there */
800989
return-1;
801-
/* part of the page available */
990+
}
802991
else
803-
count=flushptr-targetPagePtr;
992+
{
993+
/* enough bytes available to satisfy the request */
994+
count=read_upto-targetPagePtr;
995+
}
804996

997+
/*
998+
* Even though we just determined how much of the page can be validly read
999+
* as 'count', read the whole page anyway. It's guaranteed to be
1000+
* zero-padded up to the page boundary if it's incomplete.
1001+
*/
8051002
XLogRead(cur_page,*pageTLI,targetPagePtr,XLOG_BLCKSZ);
8061003

1004+
/* number of valid bytes in the buffer */
8071005
returncount;
8081006
}

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
231231
rsinfo->setResult=p->tupstore;
232232
rsinfo->setDesc=p->tupdesc;
233233

234-
/* compute the current end-of-wal */
235-
if (!RecoveryInProgress())
236-
end_of_wal=GetFlushRecPtr();
237-
else
238-
end_of_wal=GetXLogReplayRecPtr(NULL);
239-
240234
ReplicationSlotAcquire(NameStr(*name));
241235

242236
PG_TRY();
@@ -273,7 +267,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
273267
* slot's confirmed_flush. This means we might read xlog we don't
274268
* actually decode rows from, but the snapshot builder might need it
275269
* to get to a consistent point. The point we start returning data to
276-
* *users* at is the candidate restart lsn from the decoding context.
270+
* *users* at is the confirmed_flush lsn set up in the decoding
271+
* context.
277272
*/
278273
startptr=MyReplicationSlot->data.restart_lsn;
279274

@@ -282,8 +277,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
282277
/* invalidate non-timetravel entries */
283278
InvalidateSystemCaches();
284279

280+
if (!RecoveryInProgress())
281+
end_of_wal=GetFlushRecPtr();
282+
else
283+
end_of_wal=GetXLogReplayRecPtr(NULL);
284+
285+
/* Decode until we run out of records */
285286
while ((startptr!=InvalidXLogRecPtr&&startptr<end_of_wal)||
286-
(ctx->reader->EndRecPtr&&ctx->reader->EndRecPtr<end_of_wal))
287+
(ctx->reader->EndRecPtr!=InvalidXLogRecPtr&&ctx->reader->EndRecPtr<end_of_wal))
287288
{
288289
XLogRecord*record;
289290
char*errm=NULL;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp