@@ -302,11 +302,6 @@ static bool doPageWrites;
302302 * so it's a plain spinlock. The other locks are held longer (potentially
303303 * over I/O operations), so we use LWLocks for them. These locks are:
304304 *
305- * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
306- * It is only held while initializing and changing the mapping. If the
307- * contents of the buffer being replaced haven't been written yet, the mapping
308- * lock is released while the write is done, and reacquired afterwards.
309- *
310305 * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
311306 * XLogFlush).
312307 *
@@ -473,21 +468,37 @@ typedef struct XLogCtlData
473468pg_atomic_uint64 logFlushResult ;/* last byte + 1 flushed */
474469
475470/*
476- * Latest initialized page in the cache (last byte position + 1).
471+ * First initialized page in the cache (first byte position).
472+ */
473+ XLogRecPtr InitializedFrom ;
474+
475+ /*
476+ * Latest reserved for inititalization page in the cache (last byte
477+ * position + 1).
477478 *
478- * To change the identity of a buffer (and InitializedUpTo) , you need to
479- *hold WALBufMappingLock . To change the identity of a buffer that's
479+ * To change the identity of a buffer, you need to advance
480+ *InitializeReserved first . To change the identity of a buffer that's
480481 * still dirty, the old page needs to be written out first, and for that
481482 * you need WALWriteLock, and you need to ensure that there are no
482483 * in-progress insertions to the page by calling
483484 * WaitXLogInsertionsToFinish().
484485 */
485- XLogRecPtr InitializedUpTo ;
486+ pg_atomic_uint64 InitializeReserved ;
487+
488+ /*
489+ * Latest initialized page in the cache (last byte position + 1).
490+ *
491+ * InitializedUpTo is updated after the buffer initialization. After
492+ * update, waiters got notification using InitializedUpToCondVar.
493+ */
494+ pg_atomic_uint64 InitializedUpTo ;
495+ ConditionVariable InitializedUpToCondVar ;
486496
487497/*
488498 * These values do not change after startup, although the pointed-to pages
489- * and xlblocks values certainly do. xlblocks values are protected by
490- * WALBufMappingLock.
499+ * and xlblocks values certainly do. xlblocks values are changed
500+ * lock-free according to the check for the xlog write position and are
501+ * accompanied by changes of InitializeReserved and InitializedUpTo.
491502 */
492503char * pages ;/* buffers for unwritten XLOG pages */
493504pg_atomic_uint64 * xlblocks ;/* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -810,9 +821,9 @@ XLogInsertRecord(XLogRecData *rdata,
810821 * fullPageWrites from changing until the insertion is finished.
811822 *
812823 * Step 2 can usually be done completely in parallel. If the required WAL
813- * page is not initialized yet, you have tograb WALBufMappingLock to
814- *initialize it, but the WAL writer tries to do that ahead of insertions
815- * to avoid that from happening in the critical path.
824+ * page is not initialized yet, you have togo through AdvanceXLInsertBuffer,
825+ *which will ensure it is initialized. But the WAL writer tries to do that
826+ *ahead of insertions to avoid that from happening in the critical path.
816827 *
817828 *----------
818829 */
@@ -1991,32 +2002,79 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
19912002XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr ;
19922003XLogRecPtr NewPageBeginPtr ;
19932004XLogPageHeader NewPage ;
2005+ XLogRecPtr ReservedPtr ;
19942006int npages pg_attribute_unused ()= 0 ;
19952007
1996- LWLockAcquire (WALBufMappingLock ,LW_EXCLUSIVE );
1997-
19982008/*
1999- * Now that we have the lock, check if someone initialized the page
2000- * already.
2009+ * We must run the loop below inside the critical section as we expect
2010+ * XLogCtl->InitializedUpTo to eventually keep up. The most of callers
2011+ * already run inside the critical section. Except for WAL writer, which
2012+ * passed 'opportunistic == true', and therefore we don't perform
2013+ * operations that could error out.
2014+ *
2015+ * Start an explicit critical section anyway though.
2016+ */
2017+ Assert (CritSectionCount > 0 || opportunistic );
2018+ START_CRIT_SECTION ();
2019+
2020+ /*--
2021+ * Loop till we get all the pages in WAL buffer before 'upto' reserved for
2022+ * initialization. Multiple process can initialize different buffers with
2023+ * this loop in parallel as following.
2024+ *
2025+ * 1. Reserve page for initialization using XLogCtl->InitializeReserved.
2026+ * 2. Initialize the reserved page.
2027+ * 3. Attempt to advance XLogCtl->InitializedUpTo,
20012028 */
2002- while (upto >=XLogCtl -> InitializedUpTo || opportunistic )
2029+ ReservedPtr = pg_atomic_read_u64 (& XLogCtl -> InitializeReserved );
2030+ while (upto >=ReservedPtr || opportunistic )
20032031{
2004- nextidx = XLogRecPtrToBufIdx ( XLogCtl -> InitializedUpTo );
2032+ Assert ( ReservedPtr % XLOG_BLCKSZ == 0 );
20052033
20062034/*
2007- * Get ending-offset of the buffer page we need to replace (this may
2008- * be zero if the buffer hasn't been used yet). Fall through if it's
2009- * already written out.
2035+ * Get ending-offset of the buffer page we need to replace.
2036+ *
2037+ * We don't lookup into xlblocks, but rather calculate position we
2038+ * must wait to be written. If it was written, xlblocks will have this
2039+ * position (or uninitialized)
20102040 */
2011- OldPageRqstPtr = pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ]);
2012- if (LogwrtResult .Write < OldPageRqstPtr )
2041+ if (ReservedPtr + XLOG_BLCKSZ > XLogCtl -> InitializedFrom + XLOG_BLCKSZ * XLOGbuffers )
2042+ OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr )XLOG_BLCKSZ * XLOGbuffers ;
2043+ else
2044+ OldPageRqstPtr = InvalidXLogRecPtr ;
2045+
2046+ if (LogwrtResult .Write < OldPageRqstPtr && opportunistic )
20132047{
20142048/*
2015- *Nope, got work to do. If we just want to pre-initialize as much
2016- *as we can without flushing, give up now.
2049+ * If we just want to pre-initialize as much as we can without
2050+ * flushing, give up now.
20172051 */
2018- if (opportunistic )
2019- break ;
2052+ upto = ReservedPtr - 1 ;
2053+ break ;
2054+ }
2055+
2056+ /*
2057+ * Attempt to reserve the page for initialization. Failure means that
2058+ * this page got reserved by another process.
2059+ */
2060+ if (!pg_atomic_compare_exchange_u64 (& XLogCtl -> InitializeReserved ,
2061+ & ReservedPtr ,
2062+ ReservedPtr + XLOG_BLCKSZ ))
2063+ continue ;
2064+
2065+ /*
2066+ * Wait till page gets correctly initialized up to OldPageRqstPtr.
2067+ */
2068+ nextidx = XLogRecPtrToBufIdx (ReservedPtr );
2069+ while (pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo )< OldPageRqstPtr )
2070+ ConditionVariableSleep (& XLogCtl -> InitializedUpToCondVar ,WAIT_EVENT_WAL_BUFFER_INIT );
2071+ ConditionVariableCancelSleep ();
2072+ Assert (pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ])== OldPageRqstPtr );
2073+
2074+ /* Fall through if it's already written out. */
2075+ if (LogwrtResult .Write < OldPageRqstPtr )
2076+ {
2077+ /* Nope, got work to do. */
20202078
20212079/* Advance shared memory write request position */
20222080SpinLockAcquire (& XLogCtl -> info_lck );
@@ -2031,14 +2089,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
20312089RefreshXLogWriteResult (LogwrtResult );
20322090if (LogwrtResult .Write < OldPageRqstPtr )
20332091{
2034- /*
2035- * Must acquire write lock. Release WALBufMappingLock first,
2036- * to make sure that all insertions that we need to wait for
2037- * can finish (up to this same position). Otherwise we risk
2038- * deadlock.
2039- */
2040- LWLockRelease (WALBufMappingLock );
2041-
20422092WaitXLogInsertionsToFinish (OldPageRqstPtr );
20432093
20442094LWLockAcquire (WALWriteLock ,LW_EXCLUSIVE );
@@ -2060,21 +2110,16 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
20602110pgWalUsage .wal_buffers_full ++ ;
20612111TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE ();
20622112}
2063- /* Re-acquire WALBufMappingLock and retry */
2064- LWLockAcquire (WALBufMappingLock ,LW_EXCLUSIVE );
2065- continue ;
20662113}
20672114}
20682115
20692116/*
20702117 * Now the next buffer slot is free and we can set it up to be the
20712118 * next output page.
20722119 */
2073- NewPageBeginPtr = XLogCtl -> InitializedUpTo ;
2120+ NewPageBeginPtr = ReservedPtr ;
20742121NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ ;
20752122
2076- Assert (XLogRecPtrToBufIdx (NewPageBeginPtr )== nextidx );
2077-
20782123NewPage = (XLogPageHeader ) (XLogCtl -> pages + nextidx * (Size )XLOG_BLCKSZ );
20792124
20802125/*
@@ -2138,12 +2183,100 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
21382183 */
21392184pg_write_barrier ();
21402185
2186+ /*-----
2187+ * Update the value of XLogCtl->xlblocks[nextidx] and try to advance
2188+ * XLogCtl->InitializedUpTo in a lock-less manner.
2189+ *
2190+ * First, let's provide a formal proof of the algorithm. Let it be 'n'
2191+ * process with the following variables in shared memory:
2192+ *f - an array of 'n' boolean flags,
2193+ *v - atomic integer variable.
2194+ *
2195+ * Also, let
2196+ *i - a number of a process,
2197+ *j - local integer variable,
2198+ * CAS(var, oldval, newval) - compare-and-swap atomic operation
2199+ * returning true on success,
2200+ * write_barrier()/read_barrier() - memory barriers.
2201+ *
2202+ * The pseudocode for each process is the following.
2203+ *
2204+ *j := i
2205+ *f[i] := true
2206+ *write_barrier()
2207+ *while CAS(v, j, j + 1):
2208+ *j := j + 1
2209+ *read_barrier()
2210+ *if not f[j]:
2211+ *break
2212+ *
2213+ * Let's prove that v eventually reaches the value of n.
2214+ * 1. Prove by contradiction. Assume v doesn't reach n and stucks
2215+ * on k, where k < n.
2216+ * 2. Process k attempts CAS(v, k, k + 1). 1). If, as we assumed, v
2217+ * gets stuck at k, then this CAS operation must fail. Therefore,
2218+ * v < k when process k attempts CAS(v, k, k + 1).
2219+ * 3. If, as we assumed, v gets stuck at k, then the value k of v
2220+ * must be achieved by some process m, where m < k. The process
2221+ * m must observe f[k] == false. Otherwise, it will later attempt
2222+ * CAS(v, k, k + 1) with success.
2223+ * 4. Therefore, corresponding read_barrier() (while j == k) on
2224+ * process m happend before write_barrier() of process k. But then
2225+ * process k attempts CAS(v, k, k + 1) after process m successfully
2226+ * incremented v to k, and that CAS operation must succeed.
2227+ * That leads to a contradiction. So, there is no such k (k < n)
2228+ * where v gets stuck. Q.E.D.
2229+ *
2230+ * To apply this proof to the code below, we assume
2231+ * XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ
2232+ * granularity. We also assume setting XLogCtl->xlblocks[nextidx] to
2233+ * NewPageEndPtr to play the role of setting f[i] to true. Also, note
2234+ * that processes can't concurrently map different xlog locations to
2235+ * the same nextidx because we previously requested that
2236+ * XLogCtl->InitializedUpTo >= OldPageRqstPtr. So, a xlog buffer can
2237+ * be taken for initialization only once the previous initialization
2238+ * takes effect on XLogCtl->InitializedUpTo.
2239+ */
2240+
21412241pg_atomic_write_u64 (& XLogCtl -> xlblocks [nextidx ],NewPageEndPtr );
2142- XLogCtl -> InitializedUpTo = NewPageEndPtr ;
2242+
2243+ pg_write_barrier ();
2244+
2245+ while (pg_atomic_compare_exchange_u64 (& XLogCtl -> InitializedUpTo ,& NewPageBeginPtr ,NewPageEndPtr ))
2246+ {
2247+ NewPageBeginPtr = NewPageEndPtr ;
2248+ NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ ;
2249+ nextidx = XLogRecPtrToBufIdx (NewPageBeginPtr );
2250+
2251+ pg_read_barrier ();
2252+
2253+ if (pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ])!= NewPageEndPtr )
2254+ {
2255+ /*
2256+ * Page at nextidx wasn't initialized yet, so we cann't move
2257+ * InitializedUpto further. It will be moved by backend which
2258+ * will initialize nextidx.
2259+ */
2260+ ConditionVariableBroadcast (& XLogCtl -> InitializedUpToCondVar );
2261+ break ;
2262+ }
2263+ }
21432264
21442265npages ++ ;
21452266}
2146- LWLockRelease (WALBufMappingLock );
2267+
2268+ END_CRIT_SECTION ();
2269+
2270+ /*
2271+ * All the pages in WAL buffer before 'upto' were reserved for
2272+ * initialization. However, some pages might be reserved by concurrent
2273+ * processes. Wait till they finish initialization.
2274+ */
2275+ while (upto >=pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo ))
2276+ ConditionVariableSleep (& XLogCtl -> InitializedUpToCondVar ,WAIT_EVENT_WAL_BUFFER_INIT );
2277+ ConditionVariableCancelSleep ();
2278+
2279+ pg_read_barrier ();
21472280
21482281#ifdef WAL_DEBUG
21492282if (XLOG_DEBUG && npages > 0 )
@@ -5071,6 +5204,10 @@ XLOGShmemInit(void)
50715204pg_atomic_init_u64 (& XLogCtl -> logWriteResult ,InvalidXLogRecPtr );
50725205pg_atomic_init_u64 (& XLogCtl -> logFlushResult ,InvalidXLogRecPtr );
50735206pg_atomic_init_u64 (& XLogCtl -> unloggedLSN ,InvalidXLogRecPtr );
5207+
5208+ pg_atomic_init_u64 (& XLogCtl -> InitializeReserved ,InvalidXLogRecPtr );
5209+ pg_atomic_init_u64 (& XLogCtl -> InitializedUpTo ,InvalidXLogRecPtr );
5210+ ConditionVariableInit (& XLogCtl -> InitializedUpToCondVar );
50745211}
50755212
50765213/*
@@ -6090,7 +6227,8 @@ StartupXLOG(void)
60906227memset (page + len ,0 ,XLOG_BLCKSZ - len );
60916228
60926229pg_atomic_write_u64 (& XLogCtl -> xlblocks [firstIdx ],endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ );
6093- XLogCtl -> InitializedUpTo = endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ ;
6230+ pg_atomic_write_u64 (& XLogCtl -> InitializedUpTo ,endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ );
6231+ XLogCtl -> InitializedFrom = endOfRecoveryInfo -> lastPageBeginPtr ;
60946232}
60956233else
60966234{
@@ -6099,8 +6237,10 @@ StartupXLOG(void)
60996237 * let the first attempt to insert a log record to initialize the next
61006238 * buffer.
61016239 */
6102- XLogCtl -> InitializedUpTo = EndOfLog ;
6240+ pg_atomic_write_u64 (& XLogCtl -> InitializedUpTo ,EndOfLog );
6241+ XLogCtl -> InitializedFrom = EndOfLog ;
61036242}
6243+ pg_atomic_write_u64 (& XLogCtl -> InitializeReserved ,pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo ));
61046244
61056245/*
61066246 * Update local and shared status. This is OK to do without any locks