@@ -302,11 +302,6 @@ static bool doPageWrites;
302
302
* so it's a plain spinlock. The other locks are held longer (potentially
303
303
* over I/O operations), so we use LWLocks for them. These locks are:
304
304
*
305
- * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
306
- * It is only held while initializing and changing the mapping. If the
307
- * contents of the buffer being replaced haven't been written yet, the mapping
308
- * lock is released while the write is done, and reacquired afterwards.
309
- *
310
305
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
311
306
* XLogFlush).
312
307
*
@@ -473,21 +468,32 @@ typedef struct XLogCtlData
473
468
pg_atomic_uint64 logFlushResult ;/* last byte + 1 flushed */
474
469
475
470
/*
476
- * Latest initialized page in the cache (last byte position + 1).
471
+ * Latest reserved for inititalization page in the cache (last byte
472
+ * position + 1).
477
473
*
478
- * To change the identity of a buffer (and InitializedUpTo) , you need to
479
- *hold WALBufMappingLock . To change the identity of a buffer that's
474
+ * To change the identity of a buffer, you need to advance
475
+ *InitializeReserved first . To change the identity of a buffer that's
480
476
* still dirty, the old page needs to be written out first, and for that
481
477
* you need WALWriteLock, and you need to ensure that there are no
482
478
* in-progress insertions to the page by calling
483
479
* WaitXLogInsertionsToFinish().
484
480
*/
485
- XLogRecPtr InitializedUpTo ;
481
+ pg_atomic_uint64 InitializeReserved ;
482
+
483
+ /*
484
+ * Latest initialized page in the cache (last byte position + 1).
485
+ *
486
+ * InitializedUpTo is updated after the buffer initialization. After
487
+ * update, waiters got notification using InitializedUpToCondVar.
488
+ */
489
+ pg_atomic_uint64 InitializedUpTo ;
490
+ ConditionVariable InitializedUpToCondVar ;
486
491
487
492
/*
488
493
* These values do not change after startup, although the pointed-to pages
489
- * and xlblocks values certainly do. xlblocks values are protected by
490
- * WALBufMappingLock.
494
+ * and xlblocks values certainly do. xlblocks values are changed
495
+ * lock-free according to the check for the xlog write position and are
496
+ * accompanied by changes of InitializeReserved and InitializedUpTo.
491
497
*/
492
498
char * pages ;/* buffers for unwritten XLOG pages */
493
499
pg_atomic_uint64 * xlblocks ;/* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -810,9 +816,9 @@ XLogInsertRecord(XLogRecData *rdata,
810
816
* fullPageWrites from changing until the insertion is finished.
811
817
*
812
818
* Step 2 can usually be done completely in parallel. If the required WAL
813
- * page is not initialized yet, you have tograb WALBufMappingLock to
814
- *initialize it, but the WAL writer tries to do that ahead of insertions
815
- * to avoid that from happening in the critical path.
819
+ * page is not initialized yet, you have togo through AdvanceXLInsertBuffer,
820
+ *which will ensure it is initialized. But the WAL writer tries to do that
821
+ *ahead of insertions to avoid that from happening in the critical path.
816
822
*
817
823
*----------
818
824
*/
@@ -1991,32 +1997,70 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
1991
1997
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr ;
1992
1998
XLogRecPtr NewPageBeginPtr ;
1993
1999
XLogPageHeader NewPage ;
2000
+ XLogRecPtr ReservedPtr ;
1994
2001
int npages pg_attribute_unused ()= 0 ;
1995
2002
1996
- LWLockAcquire (WALBufMappingLock ,LW_EXCLUSIVE );
1997
-
1998
2003
/*
1999
- * Now that we have the lock, check if someone initialized the page
2000
- * already.
2004
+ * We must run the loop below inside the critical section as we expect
2005
+ * XLogCtl->InitializedUpTo to eventually keep up. The most of callers
2006
+ * already run inside the critical section. Except for WAL writer, which
2007
+ * passed 'opportunistic == true', and therefore we don't perform
2008
+ * operations that could error out.
2009
+ *
2010
+ * Start an explicit critical section anyway though.
2001
2011
*/
2002
- while (upto >=XLogCtl -> InitializedUpTo || opportunistic )
2012
+ Assert (CritSectionCount > 0 || opportunistic );
2013
+ START_CRIT_SECTION ();
2014
+
2015
+ /*--
2016
+ * Loop till we get all the pages in WAL buffer before 'upto' reserved for
2017
+ * initialization. Multiple process can initialize different buffers with
2018
+ * this loop in parallel as following.
2019
+ *
2020
+ * 1. Reserve page for initialization using XLogCtl->InitializeReserved.
2021
+ * 2. Initialize the reserved page.
2022
+ * 3. Attempt to advance XLogCtl->InitializedUpTo,
2023
+ */
2024
+ ReservedPtr = pg_atomic_read_u64 (& XLogCtl -> InitializeReserved );
2025
+ while (upto >=ReservedPtr || opportunistic )
2003
2026
{
2004
- nextidx = XLogRecPtrToBufIdx ( XLogCtl -> InitializedUpTo );
2027
+ Assert ( ReservedPtr % XLOG_BLCKSZ == 0 );
2005
2028
2006
2029
/*
2007
- * Get ending-offset of the buffer page we need to replace (this may
2008
- * be zero if the buffer hasn't been used yet). Fall through if it's
2009
- * already written out.
2030
+ * Get ending-offset of the buffer page we need to replace.
2031
+ *
2032
+ * We don't lookup into xlblocks, but rather calculate position we
2033
+ * must wait to be written. If it was written, xlblocks will have this
2034
+ * position (or uninitialized)
2010
2035
*/
2011
- OldPageRqstPtr = pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ]);
2012
- if (LogwrtResult .Write < OldPageRqstPtr )
2036
+ if (ReservedPtr + XLOG_BLCKSZ > XLOG_BLCKSZ * XLOGbuffers )
2037
+ OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - XLOG_BLCKSZ * XLOGbuffers ;
2038
+ else
2039
+ OldPageRqstPtr = InvalidXLogRecPtr ;
2040
+
2041
+ if (LogwrtResult .Write < OldPageRqstPtr && opportunistic )
2013
2042
{
2014
2043
/*
2015
- *Nope, got work to do. If we just want to pre-initialize as much
2016
- *as we can without flushing, give up now.
2044
+ * If we just want to pre-initialize as much as we can without
2045
+ * flushing, give up now.
2017
2046
*/
2018
- if (opportunistic )
2019
- break ;
2047
+ upto = ReservedPtr - 1 ;
2048
+ break ;
2049
+ }
2050
+
2051
+ /*
2052
+ * Attempt to reserve the page for initialization. Failure means that
2053
+ * this page got reserved by another process.
2054
+ */
2055
+ if (!pg_atomic_compare_exchange_u64 (& XLogCtl -> InitializeReserved ,
2056
+ & ReservedPtr ,
2057
+ ReservedPtr + XLOG_BLCKSZ ))
2058
+ continue ;
2059
+
2060
+ /* Fall through if it's already written out. */
2061
+ if (LogwrtResult .Write < OldPageRqstPtr )
2062
+ {
2063
+ /* Nope, got work to do. */
2020
2064
2021
2065
/* Advance shared memory write request position */
2022
2066
SpinLockAcquire (& XLogCtl -> info_lck );
@@ -2031,14 +2075,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
2031
2075
RefreshXLogWriteResult (LogwrtResult );
2032
2076
if (LogwrtResult .Write < OldPageRqstPtr )
2033
2077
{
2034
- /*
2035
- * Must acquire write lock. Release WALBufMappingLock first,
2036
- * to make sure that all insertions that we need to wait for
2037
- * can finish (up to this same position). Otherwise we risk
2038
- * deadlock.
2039
- */
2040
- LWLockRelease (WALBufMappingLock );
2041
-
2042
2078
WaitXLogInsertionsToFinish (OldPageRqstPtr );
2043
2079
2044
2080
LWLockAcquire (WALWriteLock ,LW_EXCLUSIVE );
@@ -2060,20 +2096,24 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
2060
2096
PendingWalStats .wal_buffers_full ++ ;
2061
2097
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE ();
2062
2098
}
2063
- /* Re-acquire WALBufMappingLock and retry */
2064
- LWLockAcquire (WALBufMappingLock ,LW_EXCLUSIVE );
2065
- continue ;
2066
2099
}
2067
2100
}
2068
2101
2069
2102
/*
2070
2103
* Now the next buffer slot is free and we can set it up to be the
2071
2104
* next output page.
2072
2105
*/
2073
- NewPageBeginPtr = XLogCtl -> InitializedUpTo ;
2106
+ NewPageBeginPtr = ReservedPtr ;
2074
2107
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ ;
2108
+ nextidx = XLogRecPtrToBufIdx (ReservedPtr );
2075
2109
2076
- Assert (XLogRecPtrToBufIdx (NewPageBeginPtr )== nextidx );
2110
+ #ifdef USE_ASSERT_CHECKING
2111
+ {
2112
+ XLogRecPtr storedBound = pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ]);
2113
+
2114
+ Assert (storedBound == OldPageRqstPtr || storedBound == InvalidXLogRecPtr );
2115
+ }
2116
+ #endif
2077
2117
2078
2118
NewPage = (XLogPageHeader ) (XLogCtl -> pages + nextidx * (Size )XLOG_BLCKSZ );
2079
2119
@@ -2139,11 +2179,50 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
2139
2179
pg_write_barrier ();
2140
2180
2141
2181
pg_atomic_write_u64 (& XLogCtl -> xlblocks [nextidx ],NewPageEndPtr );
2142
- XLogCtl -> InitializedUpTo = NewPageEndPtr ;
2182
+
2183
+ /*
2184
+ * Try to advance XLogCtl->InitializedUpTo.
2185
+ *
2186
+ * If the CAS operation failed, then some of previous pages are not
2187
+ * initialized yet, and this backend gives up.
2188
+ *
2189
+ * Since initializer of next page might give up on advancing of
2190
+ * InitializedUpTo, this backend have to attempt advancing until it
2191
+ * find page "in the past" or concurrent backend succeeded at
2192
+ * advancing. When we finish advancing XLogCtl->InitializedUpTo, we
2193
+ * notify all the waiters with XLogCtl->InitializedUpToCondVar.
2194
+ */
2195
+ while (pg_atomic_compare_exchange_u64 (& XLogCtl -> InitializedUpTo ,& NewPageBeginPtr ,NewPageEndPtr ))
2196
+ {
2197
+ NewPageBeginPtr = NewPageEndPtr ;
2198
+ NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ ;
2199
+ nextidx = XLogRecPtrToBufIdx (NewPageBeginPtr );
2200
+
2201
+ if (pg_atomic_read_u64 (& XLogCtl -> xlblocks [nextidx ])!= NewPageEndPtr )
2202
+ {
2203
+ /*
2204
+ * Page at nextidx wasn't initialized yet, so we cann't move
2205
+ * InitializedUpto further. It will be moved by backend which
2206
+ * will initialize nextidx.
2207
+ */
2208
+ ConditionVariableBroadcast (& XLogCtl -> InitializedUpToCondVar );
2209
+ break ;
2210
+ }
2211
+ }
2143
2212
2144
2213
npages ++ ;
2145
2214
}
2146
- LWLockRelease (WALBufMappingLock );
2215
+
2216
+ END_CRIT_SECTION ();
2217
+
2218
+ /*
2219
+ * All the pages in WAL buffer before 'upto' were reserved for
2220
+ * initialization. However, some pages might be reserved by concurrent
2221
+ * processes. Wait till they finish initialization.
2222
+ */
2223
+ while (upto >=pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo ))
2224
+ ConditionVariableSleep (& XLogCtl -> InitializedUpToCondVar ,WAIT_EVENT_WAL_BUFFER_INIT );
2225
+ ConditionVariableCancelSleep ();
2147
2226
2148
2227
#ifdef WAL_DEBUG
2149
2228
if (XLOG_DEBUG && npages > 0 )
@@ -5044,6 +5123,10 @@ XLOGShmemInit(void)
5044
5123
pg_atomic_init_u64 (& XLogCtl -> logWriteResult ,InvalidXLogRecPtr );
5045
5124
pg_atomic_init_u64 (& XLogCtl -> logFlushResult ,InvalidXLogRecPtr );
5046
5125
pg_atomic_init_u64 (& XLogCtl -> unloggedLSN ,InvalidXLogRecPtr );
5126
+
5127
+ pg_atomic_init_u64 (& XLogCtl -> InitializeReserved ,InvalidXLogRecPtr );
5128
+ pg_atomic_init_u64 (& XLogCtl -> InitializedUpTo ,InvalidXLogRecPtr );
5129
+ ConditionVariableInit (& XLogCtl -> InitializedUpToCondVar );
5047
5130
}
5048
5131
5049
5132
/*
@@ -6063,7 +6146,7 @@ StartupXLOG(void)
6063
6146
memset (page + len ,0 ,XLOG_BLCKSZ - len );
6064
6147
6065
6148
pg_atomic_write_u64 (& XLogCtl -> xlblocks [firstIdx ],endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ );
6066
- XLogCtl -> InitializedUpTo = endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ ;
6149
+ pg_atomic_write_u64 ( & XLogCtl -> InitializedUpTo , endOfRecoveryInfo -> lastPageBeginPtr + XLOG_BLCKSZ ) ;
6067
6150
}
6068
6151
else
6069
6152
{
@@ -6072,8 +6155,9 @@ StartupXLOG(void)
6072
6155
* let the first attempt to insert a log record to initialize the next
6073
6156
* buffer.
6074
6157
*/
6075
- XLogCtl -> InitializedUpTo = EndOfLog ;
6158
+ pg_atomic_write_u64 ( & XLogCtl -> InitializedUpTo , EndOfLog ) ;
6076
6159
}
6160
+ pg_atomic_write_u64 (& XLogCtl -> InitializeReserved ,pg_atomic_read_u64 (& XLogCtl -> InitializedUpTo ));
6077
6161
6078
6162
/*
6079
6163
* Update local and shared status. This is OK to do without any locks