Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf3ff7bf

Browse files
committed
Add XLogCtl->logInsertResult
This tracks the position of WAL that's been fully copied into WALbuffers by all processes emitting WAL. (For some reason we call that"WAL insertion"). This is updated using atomic monotonic advance duringWaitXLogInsertionsToFinish, which is not when the insertions actuallyoccur, but it's the only place where we know where have all theinsertions have completed.This value is useful in WALReadFromBuffers, which can verify thatcallers don't try to read past what has been inserted. (However, moreinfrastructure is needed in order to actually use WAL after the flushpoint, since it could be lost.)The value is also useful in WaitXLogInsertionsToFinish() itself, sincewe can now exit quickly when all WAL has been already inserted, withouteven having to take any locks.
1 parent29f6a95 commitf3ff7bf

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

‎src/backend/access/transam/xlog.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ typedef struct XLogCtlData
469469
XLogRecPtrlastSegSwitchLSN;
470470

471471
/* These are accessed using atomics -- info_lck not needed */
472+
pg_atomic_uint64logInsertResult;/* last byte + 1 inserted to buffers */
472473
pg_atomic_uint64logWriteResult;/* last byte + 1 written out */
473474
pg_atomic_uint64logFlushResult;/* last byte + 1 flushed */
474475

@@ -1499,6 +1500,7 @@ static XLogRecPtr
14991500
WaitXLogInsertionsToFinish(XLogRecPtrupto)
15001501
{
15011502
uint64bytepos;
1503+
XLogRecPtrinserted;
15021504
XLogRecPtrreservedUpto;
15031505
XLogRecPtrfinishedUpto;
15041506
XLogCtlInsert*Insert=&XLogCtl->Insert;
@@ -1507,6 +1509,14 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
15071509
if (MyProc==NULL)
15081510
elog(PANIC,"cannot wait without a PGPROC structure");
15091511

1512+
/*
1513+
* Check if there's any work to do. Use a barrier to ensure we get the
1514+
* freshest value.
1515+
*/
1516+
inserted=pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult);
1517+
if (upto <=inserted)
1518+
returninserted;
1519+
15101520
/* Read the current insert position */
15111521
SpinLockAcquire(&Insert->insertpos_lck);
15121522
bytepos=Insert->CurrBytePos;
@@ -1586,6 +1596,15 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
15861596
if (insertingat!=InvalidXLogRecPtr&&insertingat<finishedUpto)
15871597
finishedUpto=insertingat;
15881598
}
1599+
1600+
/*
1601+
* Advance the limit we know to have been inserted and return the freshest
1602+
* value we know of, which might be beyond what we requested if somebody
1603+
* is concurrently doing this with an 'upto' pointer ahead of us.
1604+
*/
1605+
finishedUpto=pg_atomic_monotonic_advance_u64(&XLogCtl->logInsertResult,
1606+
finishedUpto);
1607+
15891608
returnfinishedUpto;
15901609
}
15911610

@@ -1727,13 +1746,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
17271746
{
17281747
char*pdst=dstbuf;
17291748
XLogRecPtrrecptr=startptr;
1749+
XLogRecPtrinserted;
17301750
Sizenbytes=count;
17311751

17321752
if (RecoveryInProgress()||tli!=GetWALInsertionTimeLine())
17331753
return0;
17341754

17351755
Assert(!XLogRecPtrIsInvalid(startptr));
1736-
Assert(startptr+count <=LogwrtResult.Write);
1756+
1757+
/*
1758+
* Caller should ensure that the requested data has been inserted into WAL
1759+
* buffers before we try to read it.
1760+
*/
1761+
inserted=pg_atomic_read_u64(&XLogCtl->logInsertResult);
1762+
if (startptr+count>inserted)
1763+
ereport(ERROR,
1764+
errmsg("cannot read past end of generated WAL: requested %X/%X, current position %X/%X",
1765+
LSN_FORMAT_ARGS(startptr+count),
1766+
LSN_FORMAT_ARGS(inserted)));
17371767

17381768
/*
17391769
* Loop through the buffers without a lock. For each buffer, atomically
@@ -2571,13 +2601,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
25712601
{
25722602
XLogRecPtrFlush;
25732603
XLogRecPtrWrite;
2604+
XLogRecPtrInsert;
25742605

25752606
Flush=pg_atomic_read_u64(&XLogCtl->logFlushResult);
25762607
pg_read_barrier();
25772608
Write=pg_atomic_read_u64(&XLogCtl->logWriteResult);
2609+
pg_read_barrier();
2610+
Insert=pg_atomic_read_u64(&XLogCtl->logInsertResult);
25782611

25792612
/* WAL written to disk is always ahead of WAL flushed */
25802613
Assert(Write >=Flush);
2614+
2615+
/* WAL inserted to buffers is always ahead of WAL written */
2616+
Assert(Insert >=Write);
25812617
}
25822618
#endif
25832619
}
@@ -4951,6 +4987,7 @@ XLOGShmemInit(void)
49514987

49524988
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
49534989
SpinLockInit(&XLogCtl->info_lck);
4990+
pg_atomic_init_u64(&XLogCtl->logInsertResult,InvalidXLogRecPtr);
49544991
pg_atomic_init_u64(&XLogCtl->logWriteResult,InvalidXLogRecPtr);
49554992
pg_atomic_init_u64(&XLogCtl->logFlushResult,InvalidXLogRecPtr);
49564993
pg_atomic_init_u64(&XLogCtl->unloggedLSN,InvalidXLogRecPtr);
@@ -5979,6 +6016,7 @@ StartupXLOG(void)
59796016
* because no other process can be reading or writing WAL yet.
59806017
*/
59816018
LogwrtResult.Write=LogwrtResult.Flush=EndOfLog;
6019+
pg_atomic_write_u64(&XLogCtl->logInsertResult,EndOfLog);
59826020
pg_atomic_write_u64(&XLogCtl->logWriteResult,EndOfLog);
59836021
pg_atomic_write_u64(&XLogCtl->logFlushResult,EndOfLog);
59846022
XLogCtl->LogwrtRqst.Write=EndOfLog;

‎src/include/port/atomics.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
570570
returnpg_atomic_sub_fetch_u64_impl(ptr,sub_);
571571
}
572572

573+
/*
574+
* Monotonically advance the given variable using only atomic operations until
575+
* it's at least the target value. Returns the latest value observed, which
576+
* may or may not be the target value.
577+
*
578+
* Full barrier semantics (even when value is unchanged).
579+
*/
580+
staticinlineuint64
581+
pg_atomic_monotonic_advance_u64(volatilepg_atomic_uint64*ptr,uint64target_)
582+
{
583+
uint64currval;
584+
585+
#ifndefPG_HAVE_ATOMIC_U64_SIMULATION
586+
AssertPointerAlignment(ptr,8);
587+
#endif
588+
589+
currval=pg_atomic_read_u64_impl(ptr);
590+
if (currval >=target_)
591+
{
592+
pg_memory_barrier();
593+
returncurrval;
594+
}
595+
596+
#ifndefPG_HAVE_ATOMIC_U64_SIMULATION
597+
AssertPointerAlignment(&currval,8);
598+
#endif
599+
600+
while (currval<target_)
601+
{
602+
if (pg_atomic_compare_exchange_u64_impl(ptr,&currval,target_))
603+
break;
604+
}
605+
606+
returnMax(target_,currval);
607+
}
608+
573609
#undef INSIDE_ATOMICS_H
574610

575611
#endif/* ATOMICS_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp