Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1148e22

Browse files
Teach xlogreader to follow timeline switches
Uses page-based mechanism to ensure we’re using the correct timeline.Tests are included to exercise the functionality using a cold disk-level copyof the master that's started up as a replica with slots intact, but theintended use of the functionality is with later features.Craig Ringer, reviewed by Simon Riggs and Andres Freund
1 parent9ca2dd5 commit1148e22

File tree

7 files changed

+365
-20
lines changed

7 files changed

+365
-20
lines changed

‎src/backend/access/transam/xlogutils.c

Lines changed: 201 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include<unistd.h>
2121

22+
#include"access/timeline.h"
2223
#include"access/xlog.h"
2324
#include"access/xlog_internal.h"
2425
#include"access/xlogutils.h"
@@ -662,6 +663,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
662663
/* state maintained across calls */
663664
staticintsendFile=-1;
664665
staticXLogSegNosendSegNo=0;
666+
staticTimeLineIDsendTLI=0;
665667
staticuint32sendOff=0;
666668

667669
p=buf;
@@ -677,7 +679,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
677679
startoff=recptr %XLogSegSize;
678680

679681
/* Do we need to switch to a different xlog segment? */
680-
if (sendFile<0|| !XLByteInSeg(recptr,sendSegNo))
682+
if (sendFile<0|| !XLByteInSeg(recptr,sendSegNo)||
683+
sendTLI!=tli)
681684
{
682685
charpath[MAXPGPATH];
683686

@@ -704,6 +707,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
704707
path)));
705708
}
706709
sendOff=0;
710+
sendTLI=tli;
707711
}
708712

709713
/* Need to seek in the file? */
@@ -753,6 +757,133 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
753757
}
754758
}
755759

760+
/*
761+
* Determine which timeline to read an xlog page from and set the
762+
* XLogReaderState's currTLI to that timeline ID.
763+
*
764+
* We care about timelines in xlogreader when we might be reading xlog
765+
* generated prior to a promotion, either if we're currently a standby in
766+
* recovery or if we're a promoted master reading xlogs generated by the old
767+
* master before our promotion.
768+
*
769+
* wantPage must be set to the start address of the page to read and
770+
* wantLength to the amount of the page that will be read, up to
771+
* XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
772+
*
773+
* We switch to an xlog segment from the new timeline eagerly when on a
774+
* historical timeline, as soon as we reach the start of the xlog segment
775+
* containing the timeline switch. The server copied the segment to the new
776+
* timeline so all the data up to the switch point is the same, but there's no
777+
* guarantee the old segment will still exist. It may have been deleted or
778+
* renamed with a .partial suffix so we can't necessarily keep reading from
779+
* the old TLI even though tliSwitchPoint says it's OK.
780+
*
781+
* We can't just check the timeline when we read a page on a different segment
782+
* to the last page. We could've received a timeline switch from a cascading
783+
* upstream, so the current segment ends apruptly (possibly getting renamed to
784+
* .partial) and we have to switch to a new one. Even in the middle of reading
785+
* a page we could have to dump the cached page and switch to a new TLI.
786+
*
787+
* Because of this, callers MAY NOT assume that currTLI is the timeline that
788+
* will be in a page's xlp_tli; the page may begin on an older timeline or we
789+
* might be reading from historical timeline data on a segment that's been
790+
* copied to a new timeline.
791+
*
792+
* The caller must also make sure it doesn't read past the current replay
793+
* position (using GetWalRcvWriteRecPtr) if executing in recovery, so it
794+
* doesn't fail to notice that the current timeline became historical. The
795+
* caller must also update ThisTimeLineID with the result of
796+
* GetWalRcvWriteRecPtr and must check RecoveryInProgress().
797+
*/
798+
void
799+
XLogReadDetermineTimeline(XLogReaderState*state,XLogRecPtrwantPage,uint32wantLength)
800+
{
801+
constXLogRecPtrlastReadPage=state->readSegNo*XLogSegSize+state->readOff;
802+
803+
Assert(wantPage!=InvalidXLogRecPtr&&wantPage %XLOG_BLCKSZ==0);
804+
Assert(wantLength <=XLOG_BLCKSZ);
805+
Assert(state->readLen==0||state->readLen <=XLOG_BLCKSZ);
806+
807+
/*
808+
* If the desired page is currently read in and valid, we have nothing to do.
809+
*
810+
* The caller should've ensured that it didn't previously advance readOff
811+
* past the valid limit of this timeline, so it doesn't matter if the current
812+
* TLI has since become historical.
813+
*/
814+
if (lastReadPage==wantPage&&
815+
state->readLen!=0&&
816+
lastReadPage+state->readLen >=wantPage+Min(wantLength,XLOG_BLCKSZ-1))
817+
return;
818+
819+
/*
820+
* If we're reading from the current timeline, it hasn't become historical
821+
* and the page we're reading is after the last page read, we can again
822+
* just carry on. (Seeking backwards requires a check to make sure the older
823+
* page isn't on a prior timeline).
824+
*
825+
* ThisTimeLineID might've become historical since we last looked, but the
826+
* caller is required not to read past the flush limit it saw at the time
827+
* it looked up the timeline. There's nothing we can do about it if
828+
* StartupXLOG() renames it to .partial concurrently.
829+
*/
830+
if (state->currTLI==ThisTimeLineID&&wantPage >=lastReadPage)
831+
{
832+
Assert(state->currTLIValidUntil==InvalidXLogRecPtr);
833+
return;
834+
}
835+
836+
/*
837+
* If we're just reading pages from a previously validated historical
838+
* timeline and the timeline we're reading from is valid until the
839+
* end of the current segment we can just keep reading.
840+
*/
841+
if (state->currTLIValidUntil!=InvalidXLogRecPtr&&
842+
state->currTLI!=ThisTimeLineID&&
843+
state->currTLI!=0&&
844+
(wantPage+wantLength) /XLogSegSize<state->currTLIValidUntil /XLogSegSize)
845+
return;
846+
847+
/*
848+
* If we reach this point we're either looking up a page for random access,
849+
* the current timeline just became historical, or we're reading from a new
850+
* segment containing a timeline switch. In all cases we need to determine
851+
* the newest timeline on the segment.
852+
*
853+
* If it's the current timeline we can just keep reading from here unless
854+
* we detect a timeline switch that makes the current timeline historical.
855+
* If it's a historical timeline we can read all the segment on the newest
856+
* timeline because it contains all the old timelines' data too. So only
857+
* one switch check is required.
858+
*/
859+
{
860+
/*
861+
* We need to re-read the timeline history in case it's been changed
862+
* by a promotion or replay from a cascaded replica.
863+
*/
864+
List*timelineHistory=readTimeLineHistory(ThisTimeLineID);
865+
866+
XLogRecPtrendOfSegment= (((wantPage /XLogSegSize)+1)*XLogSegSize)-1;
867+
868+
Assert(wantPage /XLogSegSize==endOfSegment /XLogSegSize);
869+
870+
/* Find the timeline of the last LSN on the segment containing wantPage. */
871+
state->currTLI=tliOfPointInHistory(endOfSegment,timelineHistory);
872+
state->currTLIValidUntil=tliSwitchPoint(state->currTLI,timelineHistory,
873+
&state->nextTLI);
874+
875+
Assert(state->currTLIValidUntil==InvalidXLogRecPtr||
876+
wantPage+wantLength<state->currTLIValidUntil);
877+
878+
list_free_deep(timelineHistory);
879+
880+
elog(DEBUG3,"switched to timeline %u valid until %X/%X",
881+
state->currTLI,
882+
(uint32)(state->currTLIValidUntil >>32),
883+
(uint32)(state->currTLIValidUntil));
884+
}
885+
}
886+
756887
/*
757888
* read_page callback for reading local xlog files
758889
*
@@ -774,28 +905,84 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
774905
intcount;
775906

776907
loc=targetPagePtr+reqLen;
908+
909+
/* Loop waiting for xlog to be available if necessary */
777910
while (1)
778911
{
779912
/*
780-
* TODO: we're going to have to do something more intelligent about
781-
* timelines on standbys. Use readTimeLineHistory() and
782-
* tliOfPointInHistory() to get the proper LSN? For now we'll catch
783-
* that case earlier, but the code and TODO is left in here for when
784-
* that changes.
913+
* Determine the limit of xlog we can currently read to, and what the
914+
* most recent timeline is.
915+
*
916+
* RecoveryInProgress() will update ThisTimeLineID when it first
917+
* notices recovery finishes, so we only have to maintain it for the
918+
* local process until recovery ends.
785919
*/
786920
if (!RecoveryInProgress())
787-
{
788-
*pageTLI=ThisTimeLineID;
789921
read_upto=GetFlushRecPtr();
790-
}
791922
else
792-
read_upto=GetXLogReplayRecPtr(pageTLI);
923+
read_upto=GetXLogReplayRecPtr(&ThisTimeLineID);
793924

794-
if (loc <=read_upto)
795-
break;
925+
*pageTLI=ThisTimeLineID;
926+
927+
/*
928+
* Check which timeline to get the record from.
929+
*
930+
* We have to do it each time through the loop because if we're in
931+
* recovery as a cascading standby, the current timeline might've
932+
* become historical. We can't rely on RecoveryInProgress() because
933+
* in a standby configuration like
934+
*
935+
* A => B => C
936+
*
937+
* if we're a logical decoding session on C, and B gets promoted, our
938+
* timeline will change while we remain in recovery.
939+
*
940+
* We can't just keep reading from the old timeline as the last WAL
941+
* archive in the timeline will get renamed to .partial by StartupXLOG().
942+
*
943+
* If that happens after our caller updated ThisTimeLineID but before
944+
* we actually read the xlog page, we might still try to read from the
945+
* old (now renamed) segment and fail. There's not much we can do about
946+
* this, but it can only happen when we're a leaf of a cascading
947+
* standby whose master gets promoted while we're decoding, so a
948+
* one-off ERROR isn't too bad.
949+
*/
950+
XLogReadDetermineTimeline(state,targetPagePtr,reqLen);
951+
952+
if (state->currTLI==ThisTimeLineID)
953+
{
796954

797-
CHECK_FOR_INTERRUPTS();
798-
pg_usleep(1000L);
955+
if (loc <=read_upto)
956+
break;
957+
958+
CHECK_FOR_INTERRUPTS();
959+
pg_usleep(1000L);
960+
}
961+
else
962+
{
963+
/*
964+
* We're on a historical timeline, so limit reading to the switch
965+
* point where we moved to the next timeline.
966+
*
967+
* We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
968+
* about the new timeline, so we must've received past the end of
969+
* it.
970+
*/
971+
read_upto=state->currTLIValidUntil;
972+
973+
/*
974+
* Setting pageTLI to our wanted record's TLI is slightly wrong;
975+
* the page might begin on an older timeline if it contains a
976+
* timeline switch, since its xlog segment will have been copied
977+
* from the prior timeline. This is pretty harmless though, as
978+
* nothing cares so long as the timeline doesn't go backwards. We
979+
* should read the page header instead; FIXME someday.
980+
*/
981+
*pageTLI=state->currTLI;
982+
983+
/* No need to wait on a historical timeline */
984+
break;
985+
}
799986
}
800987

801988
if (targetPagePtr+XLOG_BLCKSZ <=read_upto)

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
235235
rsinfo->setResult=p->tupstore;
236236
rsinfo->setDesc=p->tupdesc;
237237

238-
/* compute the current end-of-wal */
238+
/*
239+
* Compute the current end-of-wal and maintain ThisTimeLineID.
240+
* RecoveryInProgress() will update ThisTimeLineID on promotion.
241+
*/
239242
if (!RecoveryInProgress())
240243
end_of_wal=GetFlushRecPtr();
241244
else
242-
end_of_wal=GetXLogReplayRecPtr(NULL);
245+
end_of_wal=GetXLogReplayRecPtr(&ThisTimeLineID);
243246

244247
ReplicationSlotAcquire(NameStr(*name));
245248

@@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
280283
/* invalidate non-timetravel entries */
281284
InvalidateSystemCaches();
282285

286+
/* Decode until we run out of records */
283287
while ((startptr!=InvalidXLogRecPtr&&startptr<end_of_wal)||
284288
(ctx->reader->EndRecPtr!=InvalidXLogRecPtr&&ctx->reader->EndRecPtr<end_of_wal))
285289
{

‎src/backend/replication/walsender.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include"access/transam.h"
4949
#include"access/xact.h"
5050
#include"access/xlog_internal.h"
51+
#include"access/xlogutils.h"
5152

5253
#include"catalog/pg_type.h"
5354
#include"commands/dbcommands.h"
@@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
721722
XLogRecPtrflushptr;
722723
intcount;
723724

725+
XLogReadDetermineTimeline(state,targetPagePtr,reqLen);
726+
sendTimeLineIsHistoric= (state->currTLI!=ThisTimeLineID);
727+
sendTimeLine=state->currTLI;
728+
sendTimeLineValidUpto=state->currTLIValidUntil;
729+
sendTimeLineNextTLI=state->nextTLI;
730+
724731
/* make sure we have enough WAL available */
725732
flushptr=WalSndWaitForWal(targetPagePtr+reqLen);
726733

@@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
974981
pq_endmessage(&buf);
975982
pq_flush();
976983

977-
/* setup state for XLogReadPage */
978-
sendTimeLineIsHistoric= false;
979-
sendTimeLine=ThisTimeLineID;
980-
981984
/*
982985
* Initialize position to the last ack'ed one, then the xlog records begin
983986
* to be shipped from that position.

‎src/include/access/xlogreader.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,22 @@ struct XLogReaderState
161161

162162
/* beginning of the WAL record being read. */
163163
XLogRecPtrcurrRecPtr;
164+
/* timeline to read it from, 0 if a lookup is required */
165+
TimeLineIDcurrTLI;
166+
/*
167+
* Safe point to read to in currTLI if current TLI is historical
168+
* (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
169+
*
170+
* Actually set to the start of the segment containing the timeline
171+
* switch that ends currTLI's validity, not the LSN of the switch
172+
* its self, since we can't assume the old segment will be present.
173+
*/
174+
XLogRecPtrcurrTLIValidUntil;
175+
/*
176+
* If currTLI is not the most recent known timeline, the next timeline to
177+
* read from when currTLIValidUntil is reached.
178+
*/
179+
TimeLineIDnextTLI;
164180

165181
/* Buffer for current ReadRecord result (expandable) */
166182
char*readRecordBuf;

‎src/include/access/xlogutils.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state,
5252
XLogRecPtrtargetRecPtr,char*cur_page,
5353
TimeLineID*pageTLI);
5454

55+
externvoidXLogReadDetermineTimeline(XLogReaderState*state,
56+
XLogRecPtrwantPage,uint32wantLength);
57+
5558
#endif

‎src/test/recovery/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#
1010
#-------------------------------------------------------------------------
1111

12+
EXTRA_INSTALL=contrib/test_decoding
13+
1214
subdir = src/test/recovery
1315
top_builddir = ../../..
1416
include$(top_builddir)/src/Makefile.global

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp