Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd140f2f

Browse files
committed
Rationalize GetWalRcv{Write,Flush}RecPtr().
GetWalRcvWriteRecPtr() previously reported the latest *flushed*location. Adopt the conventional terminology used elsewhere in the treeby renaming it to GetWalRcvFlushRecPtr(), and likewise for some relatedvariables that used the term "received".Add a new definition of GetWalRcvWriteRecPtr(), which returns the latest*written* value. This will allow later patches to use the value fornon-data-integrity purposes, without having to wait for the flushpointer to advance.Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>Reviewed-by: Andres Freund <andres@anarazel.de>Discussion:https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
1 parent83fd453 commitd140f2f

File tree

7 files changed

+55
-28
lines changed

7 files changed

+55
-28
lines changed

‎src/backend/access/transam/xlog.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,8 @@ HotStandbyState standbyState = STANDBY_DISABLED;
208208

209209
staticXLogRecPtrLastRec;
210210

211-
/* Local copy of WalRcv->receivedUpto */
212-
staticXLogRecPtrreceivedUpto=0;
211+
/* Local copy of WalRcv->flushedUpto */
212+
staticXLogRecPtrflushedUpto=0;
213213
staticTimeLineIDreceiveTLI=0;
214214

215215
/*
@@ -9363,7 +9363,7 @@ CreateRestartPoint(int flags)
93639363
* Retreat _logSegNo using the current end of xlog replayed or received,
93649364
* whichever is later.
93659365
*/
9366-
receivePtr=GetWalRcvWriteRecPtr(NULL,NULL);
9366+
receivePtr=GetWalRcvFlushRecPtr(NULL,NULL);
93679367
replayPtr=GetXLogReplayRecPtr(&replayTLI);
93689368
endptr= (receivePtr<replayPtr) ?replayPtr :receivePtr;
93699369
KeepLogSeg(endptr,&_logSegNo);
@@ -11856,7 +11856,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1185611856
/* See if we need to retrieve more data */
1185711857
if (readFile<0||
1185811858
(readSource==XLOG_FROM_STREAM&&
11859-
receivedUpto<targetPagePtr+reqLen))
11859+
flushedUpto<targetPagePtr+reqLen))
1186011860
{
1186111861
if (!WaitForWALToBecomeAvailable(targetPagePtr+reqLen,
1186211862
private->randAccess,
@@ -11887,10 +11887,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
1188711887
*/
1188811888
if (readSource==XLOG_FROM_STREAM)
1188911889
{
11890-
if (((targetPagePtr) /XLOG_BLCKSZ)!= (receivedUpto /XLOG_BLCKSZ))
11890+
if (((targetPagePtr) /XLOG_BLCKSZ)!= (flushedUpto /XLOG_BLCKSZ))
1189111891
readLen=XLOG_BLCKSZ;
1189211892
else
11893-
readLen=XLogSegmentOffset(receivedUpto,wal_segment_size)-
11893+
readLen=XLogSegmentOffset(flushedUpto,wal_segment_size)-
1189411894
targetPageOff;
1189511895
}
1189611896
else
@@ -12305,7 +12305,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
1230512305
RequestXLogStreaming(tli,ptr,PrimaryConnInfo,
1230612306
PrimarySlotName,
1230712307
wal_receiver_create_temp_slot);
12308-
receivedUpto=0;
12308+
flushedUpto=0;
1230912309
}
1231012310

1231112311
/*
@@ -12329,14 +12329,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
1232912329
* XLogReceiptTime will not advance, so the grace time
1233012330
* allotted to conflicting queries will decrease.
1233112331
*/
12332-
if (RecPtr<receivedUpto)
12332+
if (RecPtr<flushedUpto)
1233312333
havedata= true;
1233412334
else
1233512335
{
1233612336
XLogRecPtrlatestChunkStart;
1233712337

12338-
receivedUpto=GetWalRcvWriteRecPtr(&latestChunkStart,&receiveTLI);
12339-
if (RecPtr<receivedUpto&&receiveTLI==curFileTLI)
12338+
flushedUpto=GetWalRcvFlushRecPtr(&latestChunkStart,&receiveTLI);
12339+
if (RecPtr<flushedUpto&&receiveTLI==curFileTLI)
1234012340
{
1234112341
havedata= true;
1234212342
if (latestChunkStart <=RecPtr)

‎src/backend/access/transam/xlogfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS)
398398
{
399399
XLogRecPtrrecptr;
400400

401-
recptr=GetWalRcvWriteRecPtr(NULL,NULL);
401+
recptr=GetWalRcvFlushRecPtr(NULL,NULL);
402402

403403
if (recptr==0)
404404
PG_RETURN_NULL();

‎src/backend/replication/README

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in
5454
WalRcvData->receiveStart.
5555

5656
As walreceiver receives WAL from the master server, and writes and flushes
57-
it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals
57+
it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals
5858
the startup process to know how far WAL replay can advance.
5959

6060
Walreceiver sends information about replication progress to the master server

‎src/backend/replication/walreceiver.c

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* in the primary server), and then keeps receiving XLOG records and
1313
* writing them to the disk as long as the connection is alive. As XLOG
1414
* records are received and flushed to disk, it updates the
15-
* WalRcv->receivedUpto variable in shared memory, to inform the startup
15+
* WalRcv->flushedUpto variable in shared memory, to inform the startup
1616
* process of how far it can proceed with XLOG replay.
1717
*
1818
* A WAL receiver cannot directly load GUC parameters used when establishing
@@ -261,6 +261,8 @@ WalReceiverMain(void)
261261

262262
SpinLockRelease(&walrcv->mutex);
263263

264+
pg_atomic_init_u64(&WalRcv->writtenUpto,0);
265+
264266
/* Arrange to clean up at walreceiver exit */
265267
on_shmem_exit(WalRcvDie,0);
266268

@@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
984986

985987
LogstreamResult.Write=recptr;
986988
}
989+
990+
/* Update shared-memory status */
991+
pg_atomic_write_u64(&WalRcv->writtenUpto,LogstreamResult.Write);
987992
}
988993

989994
/*
@@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying)
10051010

10061011
/* Update shared-memory status */
10071012
SpinLockAcquire(&walrcv->mutex);
1008-
if (walrcv->receivedUpto<LogstreamResult.Flush)
1013+
if (walrcv->flushedUpto<LogstreamResult.Flush)
10091014
{
1010-
walrcv->latestChunkStart=walrcv->receivedUpto;
1011-
walrcv->receivedUpto=LogstreamResult.Flush;
1015+
walrcv->latestChunkStart=walrcv->flushedUpto;
1016+
walrcv->flushedUpto=LogstreamResult.Flush;
10121017
walrcv->receivedTLI=ThisTimeLineID;
10131018
}
10141019
SpinLockRelease(&walrcv->mutex);
@@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
13611366
state=WalRcv->walRcvState;
13621367
receive_start_lsn=WalRcv->receiveStart;
13631368
receive_start_tli=WalRcv->receiveStartTLI;
1364-
received_lsn=WalRcv->receivedUpto;
1369+
received_lsn=WalRcv->flushedUpto;
13651370
received_tli=WalRcv->receivedTLI;
13661371
last_send_time=WalRcv->lastMsgSendTime;
13671372
last_receipt_time=WalRcv->lastMsgReceiptTime;

‎src/backend/replication/walreceiverfuncs.c

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
282282

283283
/*
284284
* If this is the first startup of walreceiver (on this timeline),
285-
* initializereceivedUpto and latestChunkStart to the starting point.
285+
* initializeflushedUpto and latestChunkStart to the starting point.
286286
*/
287287
if (walrcv->receiveStart==0||walrcv->receivedTLI!=tli)
288288
{
289-
walrcv->receivedUpto=recptr;
289+
walrcv->flushedUpto=recptr;
290290
walrcv->receivedTLI=tli;
291291
walrcv->latestChunkStart=recptr;
292292
}
@@ -304,21 +304,21 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
304304
}
305305

306306
/*
307-
* Returns the last+1 byte position that walreceiver haswritten.
307+
* Returns the last+1 byte position that walreceiver hasflushed.
308308
*
309309
* Optionally, returns the previous chunk start, that is the first byte
310310
* written in the most recent walreceiver flush cycle. Callers not
311311
* interested in that value may pass NULL for latestChunkStart. Same for
312312
* receiveTLI.
313313
*/
314314
XLogRecPtr
315-
GetWalRcvWriteRecPtr(XLogRecPtr*latestChunkStart,TimeLineID*receiveTLI)
315+
GetWalRcvFlushRecPtr(XLogRecPtr*latestChunkStart,TimeLineID*receiveTLI)
316316
{
317317
WalRcvData*walrcv=WalRcv;
318318
XLogRecPtrrecptr;
319319

320320
SpinLockAcquire(&walrcv->mutex);
321-
recptr=walrcv->receivedUpto;
321+
recptr=walrcv->flushedUpto;
322322
if (latestChunkStart)
323323
*latestChunkStart=walrcv->latestChunkStart;
324324
if (receiveTLI)
@@ -328,6 +328,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
328328
returnrecptr;
329329
}
330330

331+
/*
332+
* Returns the last+1 byte position that walreceiver has written.
333+
* This returns a recently written value without taking a lock.
334+
*/
335+
XLogRecPtr
336+
GetWalRcvWriteRecPtr(void)
337+
{
338+
WalRcvData*walrcv=WalRcv;
339+
340+
returnpg_atomic_read_u64(&walrcv->writtenUpto);
341+
}
342+
331343
/*
332344
* Returns the replication apply delay in ms or -1
333345
* if the apply delay info is not available
@@ -345,7 +357,7 @@ GetReplicationApplyDelay(void)
345357
TimestampTzchunkReplayStartTime;
346358

347359
SpinLockAcquire(&walrcv->mutex);
348-
receivePtr=walrcv->receivedUpto;
360+
receivePtr=walrcv->flushedUpto;
349361
SpinLockRelease(&walrcv->mutex);
350362

351363
replayPtr=GetXLogReplayRecPtr(NULL);

‎src/backend/replication/walsender.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void)
29492949
* has streamed, but hasn't been replayed yet.
29502950
*/
29512951

2952-
receivePtr=GetWalRcvWriteRecPtr(NULL,&receiveTLI);
2952+
receivePtr=GetWalRcvFlushRecPtr(NULL,&receiveTLI);
29532953
replayPtr=GetXLogReplayRecPtr(&replayTLI);
29542954

29552955
ThisTimeLineID=replayTLI;

‎src/include/replication/walreceiver.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include"access/xlogdefs.h"
1717
#include"getaddrinfo.h"/* for NI_MAXHOST */
1818
#include"pgtime.h"
19+
#include"port/atomics.h"
1920
#include"replication/logicalproto.h"
2021
#include"replication/walsender.h"
2122
#include"storage/latch.h"
@@ -73,19 +74,19 @@ typedef struct
7374
TimeLineIDreceiveStartTLI;
7475

7576
/*
76-
*receivedUpto-1 is the last byte position that has already been
77+
*flushedUpto-1 is the last byte position that has already been
7778
* received, and receivedTLI is the timeline it came from. At the first
7879
* startup of walreceiver, these are set to receiveStart and
7980
* receiveStartTLI. After that, walreceiver updates these whenever it
8081
* flushes the received WAL to disk.
8182
*/
82-
XLogRecPtrreceivedUpto;
83+
XLogRecPtrflushedUpto;
8384
TimeLineIDreceivedTLI;
8485

8586
/*
8687
* latestChunkStart is the starting byte position of the current "batch"
8788
* of received WAL. It's actually the same as the previous value of
88-
*receivedUpto before the last flush to disk. Startup process can use
89+
*flushedUpto before the last flush to disk. Startup process can use
8990
* this to detect whether it's keeping up or not.
9091
*/
9192
XLogRecPtrlatestChunkStart;
@@ -141,6 +142,14 @@ typedef struct
141142

142143
slock_tmutex;/* locks shared variables shown above */
143144

145+
/*
146+
* Like flushedUpto, but advanced after writing and before flushing,
147+
* without the need to acquire the spin lock. Data can be read by another
148+
* process up to this point, but shouldn't be used for data integrity
149+
* purposes.
150+
*/
151+
pg_atomic_uint64writtenUpto;
152+
144153
/*
145154
* force walreceiver reply? This doesn't need to be locked; memory
146155
* barriers for ordering are sufficient. But we do need atomic fetch and
@@ -322,7 +331,8 @@ extern bool WalRcvRunning(void);
322331
externvoidRequestXLogStreaming(TimeLineIDtli,XLogRecPtrrecptr,
323332
constchar*conninfo,constchar*slotname,
324333
boolcreate_temp_slot);
325-
externXLogRecPtrGetWalRcvWriteRecPtr(XLogRecPtr*latestChunkStart,TimeLineID*receiveTLI);
334+
externXLogRecPtrGetWalRcvFlushRecPtr(XLogRecPtr*latestChunkStart,TimeLineID*receiveTLI);
335+
externXLogRecPtrGetWalRcvWriteRecPtr(void);
326336
externintGetReplicationApplyDelay(void);
327337
externintGetReplicationTransferLatency(void);
328338
externvoidWalRcvForceReply(void);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp