Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaf275a1

Browse files
committed
Follow TLI of last replayed record, not recovery target TLI, in walsenders.
Most of the time, the last replayed record comes from the recovery targettimeline, but there is a corner case where it makes a difference. Whenthe startup process scans for a new timeline, and decides to change recoverytarget timeline, there is a window where the recovery target TLI has alreadybeen bumped, but there are no WAL segments from the new timeline in pg_xlogyet. For example, if we have just replayed up to point 0/30002D8, ontimeline 1, there is a WAL file called 000000010000000000000003 in pg_xlogthat contains the WAL up to that point. When recovery switches recoverytarget timeline to 2, a walsender can immediately try to read WAL from0/30002D8, from timeline 2, so it will try to open WAL file000000020000000000000003. However, that doesn't exist yet - the startupprocess hasn't copied that file from the archive yet nor has the walreceiverstreamed it yet, so walsender fails with error "requested WAL segment000000020000000000000003 has already been removed". That's harmless, in thatthe standby will try to reconnect later and by that time the segment isalready created, but error messages that should be ignored are not good.To fix that, have walsender track the TLI of the last replayed record,instead of the recovery target timeline. That way walsender will not try toread anything from timeline 2, until the WAL segment has been created and atleast one record has been replayed from it. The recovery target timeline isnow xlog.c's internal affair, it doesn't need to be exposed in shared memoryanymore.This fixes the error reported by Thom Brown. depesz the same error message,but I'm not sure if this fixes his scenario.
1 parent1a11d46 commitaf275a1

File tree

6 files changed

+92
-82
lines changed

6 files changed

+92
-82
lines changed

‎src/backend/access/transam/xlog.c

Lines changed: 27 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ typedef struct XLogCtlData
453453
* replayed, otherwise it's equal to lastReplayedEndRecPtr.
454454
*/
455455
XLogRecPtrlastReplayedEndRecPtr;
456+
TimeLineIDlastReplayedTLI;
456457
XLogRecPtrreplayEndRecPtr;
457458
TimeLineIDreplayEndTLI;
458459
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
@@ -3829,7 +3830,6 @@ rescanLatestTimeLine(void)
38293830
TimeLineIDnewtarget;
38303831
TimeLineHistoryEntry*currentTle=NULL;
38313832
/* use volatile pointer to prevent code rearrangement */
3832-
volatileXLogCtlData*xlogctl=XLogCtl;
38333833

38343834
newtarget=findNewestTimeLine(recoveryTargetTLI);
38353835
if (newtarget==recoveryTargetTLI)
@@ -3888,20 +3888,10 @@ rescanLatestTimeLine(void)
38883888
list_free_deep(expectedTLEs);
38893889
expectedTLEs=newExpectedTLEs;
38903890

3891-
SpinLockAcquire(&xlogctl->info_lck);
3892-
xlogctl->RecoveryTargetTLI=recoveryTargetTLI;
3893-
SpinLockRelease(&xlogctl->info_lck);
3894-
38953891
ereport(LOG,
38963892
(errmsg("new target timeline is %u",
38973893
recoveryTargetTLI)));
38983894

3899-
/*
3900-
* Wake up any walsenders to notice that we have a new target timeline.
3901-
*/
3902-
if (AllowCascadeReplication())
3903-
WalSndWakeup();
3904-
39053895
return true;
39063896
}
39073897

@@ -5389,11 +5379,9 @@ StartupXLOG(void)
53895379
ControlFile->minRecoveryPointTLI)));
53905380

53915381
/*
5392-
* Save the selected recovery target timeline ID and
5393-
* archive_cleanup_command in shared memory so that other processes can
5394-
* see them
5382+
* Save archive_cleanup_command in shared memory so that other processes
5383+
* can see it.
53955384
*/
5396-
XLogCtl->RecoveryTargetTLI=recoveryTargetTLI;
53975385
strncpy(XLogCtl->archiveCleanupCommand,
53985386
archiveCleanupCommand ?archiveCleanupCommand :"",
53995387
sizeof(XLogCtl->archiveCleanupCommand));
@@ -5770,6 +5758,7 @@ StartupXLOG(void)
57705758
xlogctl->replayEndRecPtr=ReadRecPtr;
57715759
xlogctl->replayEndTLI=ThisTimeLineID;
57725760
xlogctl->lastReplayedEndRecPtr=EndRecPtr;
5761+
xlogctl->lastReplayedEndRecPtr=ThisTimeLineID;
57735762
xlogctl->recoveryLastXTime=0;
57745763
xlogctl->currentChunkStartTime=0;
57755764
xlogctl->recoveryPause= false;
@@ -5837,6 +5826,7 @@ StartupXLOG(void)
58375826
*/
58385827
do
58395828
{
5829+
boolswitchedTLI= false;
58405830
#ifdefWAL_DEBUG
58415831
if (XLOG_DEBUG||
58425832
(rmid==RM_XACT_ID&&trace_recovery_messages <=DEBUG2)||
@@ -5942,6 +5932,7 @@ StartupXLOG(void)
59425932

59435933
/* Following WAL records should be run with new TLI */
59445934
ThisTimeLineID=newTLI;
5935+
switchedTLI= true;
59455936
}
59465937
}
59475938

@@ -5974,6 +5965,7 @@ StartupXLOG(void)
59745965
*/
59755966
SpinLockAcquire(&xlogctl->info_lck);
59765967
xlogctl->lastReplayedEndRecPtr=EndRecPtr;
5968+
xlogctl->lastReplayedTLI=ThisTimeLineID;
59775969
SpinLockRelease(&xlogctl->info_lck);
59785970

59795971
/* Remember this record as the last-applied one */
@@ -5982,6 +5974,13 @@ StartupXLOG(void)
59825974
/* Allow read-only connections if we're consistent now */
59835975
CheckRecoveryConsistency();
59845976

5977+
/*
5978+
* If this record was a timeline switch, wake up any
5979+
* walsenders to notice that we are on a new timeline.
5980+
*/
5981+
if (switchedTLI&&AllowCascadeReplication())
5982+
WalSndWakeup();
5983+
59855984
/* Exit loop if we reached inclusive recovery target */
59865985
if (!recoveryContinue)
59875986
break;
@@ -6822,23 +6821,6 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
68226821
*epoch=ckptXidEpoch;
68236822
}
68246823

6825-
/*
6826-
* GetRecoveryTargetTLI - get the current recovery target timeline ID
6827-
*/
6828-
TimeLineID
6829-
GetRecoveryTargetTLI(void)
6830-
{
6831-
/* use volatile pointer to prevent code rearrangement */
6832-
volatileXLogCtlData*xlogctl=XLogCtl;
6833-
TimeLineIDresult;
6834-
6835-
SpinLockAcquire(&xlogctl->info_lck);
6836-
result=xlogctl->RecoveryTargetTLI;
6837-
SpinLockRelease(&xlogctl->info_lck);
6838-
6839-
returnresult;
6840-
}
6841-
68426824
/*
68436825
* This must be called ONCE during postmaster or standalone-backend shutdown
68446826
*/
@@ -7642,10 +7624,16 @@ CreateRestartPoint(int flags)
76427624
*/
76437625
if (_logSegNo)
76447626
{
7627+
XLogRecPtrreceivePtr;
7628+
XLogRecPtrreplayPtr;
76457629
XLogRecPtrendptr;
76467630

7647-
/* Get the current (or recent) end of xlog */
7648-
endptr=GetStandbyFlushRecPtr();
7631+
/*
7632+
* Get the current end of xlog replayed or received, whichever is later.
7633+
*/
7634+
receivePtr=GetWalRcvWriteRecPtr(NULL,NULL);
7635+
replayPtr=GetXLogReplayRecPtr(NULL);
7636+
endptr= (receivePtr<replayPtr) ?replayPtr :receivePtr;
76497637

76507638
KeepLogSeg(endptr,&_logSegNo);
76517639
_logSegNo--;
@@ -9109,38 +9097,23 @@ do_pg_abort_backup(void)
91099097
* Exported to allow WALReceiver to read the pointer directly.
91109098
*/
91119099
XLogRecPtr
9112-
GetXLogReplayRecPtr(void)
9100+
GetXLogReplayRecPtr(TimeLineID*replayTLI)
91139101
{
91149102
/* use volatile pointer to prevent code rearrangement */
91159103
volatileXLogCtlData*xlogctl=XLogCtl;
91169104
XLogRecPtrrecptr;
9105+
TimeLineIDtli;
91179106

91189107
SpinLockAcquire(&xlogctl->info_lck);
91199108
recptr=xlogctl->lastReplayedEndRecPtr;
9109+
tli=xlogctl->lastReplayedTLI;
91209110
SpinLockRelease(&xlogctl->info_lck);
91219111

9112+
if (replayTLI)
9113+
*replayTLI=tli;
91229114
returnrecptr;
91239115
}
91249116

9125-
/*
9126-
* Get current standby flush position, ie, the last WAL position
9127-
* known to be fsync'd to disk in standby.
9128-
*/
9129-
XLogRecPtr
9130-
GetStandbyFlushRecPtr(void)
9131-
{
9132-
XLogRecPtrreceivePtr;
9133-
XLogRecPtrreplayPtr;
9134-
9135-
receivePtr=GetWalRcvWriteRecPtr(NULL,NULL);
9136-
replayPtr=GetXLogReplayRecPtr();
9137-
9138-
if (XLByteLT(receivePtr,replayPtr))
9139-
returnreplayPtr;
9140-
else
9141-
returnreceivePtr;
9142-
}
9143-
91449117
/*
91459118
* Get latest WAL insert pointer
91469119
*/

‎src/backend/access/transam/xlogfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
248248
XLogRecPtrrecptr;
249249
charlocation[MAXFNAMELEN];
250250

251-
recptr=GetXLogReplayRecPtr();
251+
recptr=GetXLogReplayRecPtr(NULL);
252252

253253
if (recptr==0)
254254
PG_RETURN_NULL();

‎src/backend/replication/walreceiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ WalReceiverMain(void)
370370
first_stream= false;
371371

372372
/* Initialize LogstreamResult and buffers for processing messages */
373-
LogstreamResult.Write=LogstreamResult.Flush=GetXLogReplayRecPtr();
373+
LogstreamResult.Write=LogstreamResult.Flush=GetXLogReplayRecPtr(NULL);
374374
initStringInfo(&reply_message);
375375
initStringInfo(&incoming_message);
376376

@@ -1026,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
10261026
/* Construct a new message */
10271027
writePtr=LogstreamResult.Write;
10281028
flushPtr=LogstreamResult.Flush;
1029-
applyPtr=GetXLogReplayRecPtr();
1029+
applyPtr=GetXLogReplayRecPtr(NULL);
10301030

10311031
resetStringInfo(&reply_message);
10321032
pq_sendbyte(&reply_message,'r');

‎src/backend/replication/walreceiverfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ GetReplicationApplyDelay(void)
324324
receivePtr=walrcv->receivedUpto;
325325
SpinLockRelease(&walrcv->mutex);
326326

327-
replayPtr=GetXLogReplayRecPtr();
327+
replayPtr=GetXLogReplayRecPtr(NULL);
328328

329329
if (XLByteEQ(receivePtr,replayPtr))
330330
return0;

‎src/backend/replication/walsender.c

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ static void WalSndLoop(void);
169169
staticvoidInitWalSenderSlot(void);
170170
staticvoidWalSndKill(intcode,Datumarg);
171171
staticvoidXLogSend(bool*caughtup);
172+
staticXLogRecPtrGetStandbyFlushRecPtr(TimeLineIDcurrentTLI);
172173
staticvoidIdentifySystem(void);
173174
staticvoidStartReplication(StartReplicationCmd*cmd);
174175
staticvoidProcessStandbyMessage(void);
@@ -190,12 +191,6 @@ InitWalSender(void)
190191
/* Set up resource owner */
191192
CurrentResourceOwner=ResourceOwnerCreate(NULL,"walsender top-level resource owner");
192193

193-
/*
194-
* Use the recovery target timeline ID during recovery
195-
*/
196-
if (am_cascading_walsender)
197-
ThisTimeLineID=GetRecoveryTargetTLI();
198-
199194
/*
200195
* Let postmaster know that we're a WAL sender. Once we've declared us as
201196
* a WAL sender process, postmaster will let us outlive the bgwriter and
@@ -254,8 +249,8 @@ IdentifySystem(void)
254249
am_cascading_walsender=RecoveryInProgress();
255250
if (am_cascading_walsender)
256251
{
257-
logptr=GetStandbyFlushRecPtr();
258-
ThisTimeLineID=GetRecoveryTargetTLI();
252+
/* this also updates ThisTimeLineID */
253+
logptr=GetStandbyFlushRecPtr(0);
259254
}
260255
else
261256
logptr=GetInsertRecPtr();
@@ -409,6 +404,7 @@ static void
409404
StartReplication(StartReplicationCmd*cmd)
410405
{
411406
StringInfoDatabuf;
407+
XLogRecPtrFlushPtr;
412408

413409
/*
414410
* We assume here that we're logging enough information in the WAL for
@@ -421,8 +417,17 @@ StartReplication(StartReplicationCmd *cmd)
421417

422418
/*
423419
* Select the timeline. If it was given explicitly by the client, use
424-
* that. Otherwise use the current ThisTimeLineID.
420+
* that. Otherwise use the timeline of the last replayed record, which
421+
* is kept in ThisTimeLineID.
425422
*/
423+
if (am_cascading_walsender)
424+
{
425+
/* this also updates ThisTimeLineID */
426+
FlushPtr=GetStandbyFlushRecPtr(0);
427+
}
428+
else
429+
FlushPtr=GetFlushRecPtr();
430+
426431
if (cmd->timeline!=0)
427432
{
428433
XLogRecPtrswitchpoint;
@@ -494,7 +499,6 @@ StartReplication(StartReplicationCmd *cmd)
494499
if (!sendTimeLineIsHistoric||
495500
XLByteLT(cmd->startpoint,sendTimeLineValidUpto))
496501
{
497-
XLogRecPtrFlushPtr;
498502
/*
499503
* When we first start replication the standby will be behind the primary.
500504
* For some applications, for example, synchronous replication, it is
@@ -516,10 +520,6 @@ StartReplication(StartReplicationCmd *cmd)
516520
* Don't allow a request to stream from a future point in WAL that
517521
* hasn't been flushed to disk in this server yet.
518522
*/
519-
if (am_cascading_walsender)
520-
FlushPtr=GetStandbyFlushRecPtr();
521-
else
522-
FlushPtr=GetFlushRecPtr();
523523
if (XLByteLT(FlushPtr,cmd->startpoint))
524524
{
525525
ereport(ERROR,
@@ -1330,7 +1330,7 @@ XLogSend(bool *caughtup)
13301330
* that gets lost on the master.
13311331
*/
13321332
if (am_cascading_walsender)
1333-
FlushPtr=GetStandbyFlushRecPtr();
1333+
FlushPtr=GetStandbyFlushRecPtr(sendTimeLine);
13341334
else
13351335
FlushPtr=GetFlushRecPtr();
13361336

@@ -1347,27 +1347,23 @@ XLogSend(bool *caughtup)
13471347
if (!sendTimeLineIsHistoric&&am_cascading_walsender)
13481348
{
13491349
boolbecameHistoric= false;
1350-
TimeLineIDtargetTLI;
13511350

13521351
if (!RecoveryInProgress())
13531352
{
13541353
/*
13551354
* We have been promoted. RecoveryInProgress() updated
13561355
* ThisTimeLineID to the new current timeline.
13571356
*/
1358-
targetTLI=ThisTimeLineID;
13591357
am_cascading_walsender= false;
13601358
becameHistoric= true;
13611359
}
13621360
else
13631361
{
13641362
/*
13651363
* Still a cascading standby. But is the timeline we're sending
1366-
* still the recoverytarget timeline?
1364+
* still theonerecoveryis recovering from?
13671365
*/
1368-
targetTLI=GetRecoveryTargetTLI();
1369-
1370-
if (targetTLI!=sendTimeLine)
1366+
if (sendTimeLine!=ThisTimeLineID)
13711367
becameHistoric= true;
13721368
}
13731369

@@ -1380,7 +1376,7 @@ XLogSend(bool *caughtup)
13801376
*/
13811377
List*history;
13821378

1383-
history=readTimeLineHistory(targetTLI);
1379+
history=readTimeLineHistory(ThisTimeLineID);
13841380
sendTimeLineValidUpto=tliSwitchPoint(sendTimeLine,history);
13851381
Assert(XLByteLE(sentPtr,sendTimeLineValidUpto));
13861382
list_free_deep(history);
@@ -1521,6 +1517,48 @@ XLogSend(bool *caughtup)
15211517
return;
15221518
}
15231519

1520+
/*
1521+
* Returns the latest point in WAL that has been safely flushed to disk, and
1522+
* can be sent to the standby. This should only be called when in recovery,
1523+
* ie. we're streaming to a cascaded standby.
1524+
*
1525+
* If currentTLI is non-zero, the function returns the point that the WAL on
1526+
* the given timeline has been flushed upto. If recovery has already switched
1527+
* to a different timeline, InvalidXLogRecPtr is returned.
1528+
*
1529+
* As a side-effect, ThisTimeLineID is updated to the TLI of the last
1530+
* replayed WAL record.
1531+
*/
1532+
staticXLogRecPtr
1533+
GetStandbyFlushRecPtr(TimeLineIDcurrentTLI)
1534+
{
1535+
XLogRecPtrreplayPtr;
1536+
TimeLineIDreplayTLI;
1537+
XLogRecPtrreceivePtr;
1538+
TimeLineIDreceiveTLI;
1539+
XLogRecPtrresult;
1540+
1541+
/*
1542+
* We can safely send what's already been replayed. Also, if walreceiver
1543+
* is streaming WAL from the same timeline, we can send anything that
1544+
* it has streamed, but hasn't been replayed yet.
1545+
*/
1546+
1547+
receivePtr=GetWalRcvWriteRecPtr(NULL,&receiveTLI);
1548+
replayPtr=GetXLogReplayRecPtr(&replayTLI);
1549+
1550+
ThisTimeLineID=replayTLI;
1551+
1552+
if (currentTLI!=replayTLI&&currentTLI!=0)
1553+
returnInvalidXLogRecPtr;
1554+
1555+
result=replayPtr;
1556+
if (receiveTLI==currentTLI&&receivePtr>replayPtr)
1557+
result=receivePtr;
1558+
1559+
returnresult;
1560+
}
1561+
15241562
/*
15251563
* Request walsenders to reload the currently-open WAL file
15261564
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp