@@ -97,6 +97,7 @@ extern slock_t *ShmemLock;
9797
9898#define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
9999#define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
100+ #define LW_FLAG_LOCKED ((uint32) 1 << 28)
100101
101102#define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
102103#define LW_VAL_SHARED 1
@@ -711,7 +712,6 @@ RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
711712void
712713LWLockInitialize (LWLock * lock ,int tranche_id )
713714{
714- SpinLockInit (& lock -> mutex );
715715pg_atomic_init_u32 (& lock -> state ,LW_FLAG_RELEASE_OK );
716716#ifdef LOCK_DEBUG
717717pg_atomic_init_u32 (& lock -> nwaiters ,0 );
@@ -842,6 +842,74 @@ LWLockAttemptLock(LWLock *lock, LWLockMode mode)
842842pg_unreachable ();
843843}
844844
845+ /*
846+ * Lock the LWLock's wait list against concurrent activity.
847+ *
848+ * NB: even though the wait list is locked, non-conflicting lock operations
849+ * may still happen concurrently.
850+ *
851+ * Time spent holding mutex should be short!
852+ */
853+ static void
854+ LWLockWaitListLock (LWLock * lock )
855+ {
856+ uint32 old_state ;
857+ #ifdef LWLOCK_STATS
858+ lwlock_stats * lwstats ;
859+ uint32 delays = 0 ;
860+
861+ lwstats = get_lwlock_stats_entry (lock );
862+ #endif
863+
864+ while (true)
865+ {
866+ /* always try once to acquire lock directly */
867+ old_state = pg_atomic_fetch_or_u32 (& lock -> state ,LW_FLAG_LOCKED );
868+ if (!(old_state & LW_FLAG_LOCKED ))
869+ break ;/* got lock */
870+
871+ /* and then spin without atomic operations until lock is released */
872+ {
873+ SpinDelayStatus delayStatus = init_spin_delay (& lock -> state );
874+
875+ while (old_state & LW_FLAG_LOCKED )
876+ {
877+ perform_spin_delay (& delayStatus );
878+ old_state = pg_atomic_read_u32 (& lock -> state );
879+ }
880+ #ifdef LWLOCK_STATS
881+ delays += delayStatus .delays ;
882+ #endif
883+ finish_spin_delay (& delayStatus );
884+ }
885+
886+ /*
887+ * Retry. The lock might obviously already be re-acquired by the time
888+ * we're attempting to get it again.
889+ */
890+ }
891+
892+ #ifdef LWLOCK_STATS
893+ lwstats -> spin_delay_count += delays ;
894+ #endif
895+ }
896+
897+ /*
898+ * Unlock the LWLock's wait list.
899+ *
900+ * Note that it can be more efficient to manipulate flags and release the
901+ * locks in a single atomic operation.
902+ */
903+ static void
904+ LWLockWaitListUnlock (LWLock * lock )
905+ {
906+ uint32 old_state PG_USED_FOR_ASSERTS_ONLY ;
907+
908+ old_state = pg_atomic_fetch_and_u32 (& lock -> state , ~LW_FLAG_LOCKED );
909+
910+ Assert (old_state & LW_FLAG_LOCKED );
911+ }
912+
845913/*
846914 * Wakeup all the lockers that currently have a chance to acquire the lock.
847915 */
@@ -852,22 +920,13 @@ LWLockWakeup(LWLock *lock)
852920bool wokeup_somebody = false;
853921dlist_head wakeup ;
854922dlist_mutable_iter iter ;
855- #ifdef LWLOCK_STATS
856- lwlock_stats * lwstats ;
857-
858- lwstats = get_lwlock_stats_entry (lock );
859- #endif
860923
861924dlist_init (& wakeup );
862925
863926new_release_ok = true;
864927
865- /* Acquire mutex. Time spent holding mutex should be short! */
866- #ifdef LWLOCK_STATS
867- lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
868- #else
869- SpinLockAcquire (& lock -> mutex );
870- #endif
928+ /* lock wait list while collecting backends to wake up */
929+ LWLockWaitListLock (lock );
871930
872931dlist_foreach_modify (iter ,& lock -> waiters )
873932{
@@ -904,19 +963,33 @@ LWLockWakeup(LWLock *lock)
904963
905964Assert (dlist_is_empty (& wakeup )|| pg_atomic_read_u32 (& lock -> state )& LW_FLAG_HAS_WAITERS );
906965
907- /* Unset both flags at once if required */
908- if (!new_release_ok && dlist_is_empty (& wakeup ))
909- pg_atomic_fetch_and_u32 (& lock -> state ,
910- ~(LW_FLAG_RELEASE_OK |LW_FLAG_HAS_WAITERS ));
911- else if (!new_release_ok )
912- pg_atomic_fetch_and_u32 (& lock -> state , ~LW_FLAG_RELEASE_OK );
913- else if (dlist_is_empty (& wakeup ))
914- pg_atomic_fetch_and_u32 (& lock -> state , ~LW_FLAG_HAS_WAITERS );
915- else if (new_release_ok )
916- pg_atomic_fetch_or_u32 (& lock -> state ,LW_FLAG_RELEASE_OK );
966+ /* unset required flags, and release lock, in one fell swoop */
967+ {
968+ uint32 old_state ;
969+ uint32 desired_state ;
970+
971+ old_state = pg_atomic_read_u32 (& lock -> state );
972+ while (true)
973+ {
974+ desired_state = old_state ;
975+
976+ /* compute desired flags */
977+
978+ if (new_release_ok )
979+ desired_state |=LW_FLAG_RELEASE_OK ;
980+ else
981+ desired_state &= ~LW_FLAG_RELEASE_OK ;
917982
918- /* We are done updating the shared state of the lock queue. */
919- SpinLockRelease (& lock -> mutex );
983+ if (dlist_is_empty (& wakeup ))
984+ desired_state &= ~LW_FLAG_HAS_WAITERS ;
985+
986+ desired_state &= ~LW_FLAG_LOCKED ;/* release lock */
987+
988+ if (pg_atomic_compare_exchange_u32 (& lock -> state ,& old_state ,
989+ desired_state ))
990+ break ;
991+ }
992+ }
920993
921994/* Awaken any waiters I removed from the queue. */
922995dlist_foreach_modify (iter ,& wakeup )
@@ -933,7 +1006,7 @@ LWLockWakeup(LWLock *lock)
9331006 * that happens before the list unlink happens, the list would end up
9341007 * being corrupted.
9351008 *
936- * The barrier pairs with theSpinLockAcquire () when enqueing for
1009+ * The barrier pairs with theLWLockWaitListLock () when enqueing for
9371010 * another lock.
9381011 */
9391012pg_write_barrier ();
@@ -950,12 +1023,6 @@ LWLockWakeup(LWLock *lock)
9501023static void
9511024LWLockQueueSelf (LWLock * lock ,LWLockMode mode )
9521025{
953- #ifdef LWLOCK_STATS
954- lwlock_stats * lwstats ;
955-
956- lwstats = get_lwlock_stats_entry (lock );
957- #endif
958-
9591026/*
9601027 * If we don't have a PGPROC structure, there's no way to wait. This
9611028 * should never occur, since MyProc should only be null during shared
@@ -967,11 +1034,7 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode)
9671034if (MyProc -> lwWaiting )
9681035elog (PANIC ,"queueing for lock while waiting on another one" );
9691036
970- #ifdef LWLOCK_STATS
971- lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
972- #else
973- SpinLockAcquire (& lock -> mutex );
974- #endif
1037+ LWLockWaitListLock (lock );
9751038
9761039/* setting the flag is protected by the spinlock */
9771040pg_atomic_fetch_or_u32 (& lock -> state ,LW_FLAG_HAS_WAITERS );
@@ -986,7 +1049,7 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode)
9861049dlist_push_tail (& lock -> waiters ,& MyProc -> lwWaitLink );
9871050
9881051/* Can release the mutex now */
989- SpinLockRelease ( & lock -> mutex );
1052+ LWLockWaitListUnlock ( lock );
9901053
9911054#ifdef LOCK_DEBUG
9921055pg_atomic_fetch_add_u32 (& lock -> nwaiters ,1 );
@@ -1015,11 +1078,7 @@ LWLockDequeueSelf(LWLock *lock)
10151078lwstats -> dequeue_self_count ++ ;
10161079#endif
10171080
1018- #ifdef LWLOCK_STATS
1019- lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
1020- #else
1021- SpinLockAcquire (& lock -> mutex );
1022- #endif
1081+ LWLockWaitListLock (lock );
10231082
10241083/*
10251084 * Can't just remove ourselves from the list, but we need to iterate over
@@ -1043,7 +1102,8 @@ LWLockDequeueSelf(LWLock *lock)
10431102pg_atomic_fetch_and_u32 (& lock -> state , ~LW_FLAG_HAS_WAITERS );
10441103}
10451104
1046- SpinLockRelease (& lock -> mutex );
1105+ /* XXX: combine with fetch_and above? */
1106+ LWLockWaitListUnlock (lock );
10471107
10481108/* clear waiting state again, nice for debugging */
10491109if (found )
@@ -1460,11 +1520,6 @@ LWLockConflictsWithVar(LWLock *lock,
14601520{
14611521bool mustwait ;
14621522uint64 value ;
1463- #ifdef LWLOCK_STATS
1464- lwlock_stats * lwstats ;
1465-
1466- lwstats = get_lwlock_stats_entry (lock );
1467- #endif
14681523
14691524/*
14701525 * Test first to see if it the slot is free right now.
@@ -1484,17 +1539,13 @@ LWLockConflictsWithVar(LWLock *lock,
14841539* result = false;
14851540
14861541/*
1487- * Read value usingspinlock as we can'trely on atomic 64 bit
1488- * reads/stores. TODO: On platforms with a way to do atomic 64 bit
1489- * reads/writes the spinlockcould be optimized away.
1542+ * Read value usingthe lwlock's wait list lock, as we can'tgenerally
1543+ *rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1544+ *do atomic 64 bit reads/writes the spinlockshould be optimized away.
14901545 */
1491- #ifdef LWLOCK_STATS
1492- lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
1493- #else
1494- SpinLockAcquire (& lock -> mutex );
1495- #endif
1546+ LWLockWaitListLock (lock );
14961547value = * valptr ;
1497- SpinLockRelease ( & lock -> mutex );
1548+ LWLockWaitListUnlock ( lock );
14981549
14991550if (value != oldval )
15001551{
@@ -1668,22 +1719,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
16681719{
16691720dlist_head wakeup ;
16701721dlist_mutable_iter iter ;
1671- #ifdef LWLOCK_STATS
1672- lwlock_stats * lwstats ;
1673-
1674- lwstats = get_lwlock_stats_entry (lock );
1675- #endif
16761722
16771723PRINT_LWDEBUG ("LWLockUpdateVar" ,lock ,LW_EXCLUSIVE );
16781724
16791725dlist_init (& wakeup );
16801726
1681- /* Acquire mutex. Time spent holding mutex should be short! */
1682- #ifdef LWLOCK_STATS
1683- lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
1684- #else
1685- SpinLockAcquire (& lock -> mutex );
1686- #endif
1727+ LWLockWaitListLock (lock );
16871728
16881729Assert (pg_atomic_read_u32 (& lock -> state )& LW_VAL_EXCLUSIVE );
16891730
@@ -1706,7 +1747,7 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
17061747}
17071748
17081749/* We are done updating shared state of the lock itself. */
1709- SpinLockRelease ( & lock -> mutex );
1750+ LWLockWaitListUnlock ( lock );
17101751
17111752/*
17121753 * Awaken any waiters I removed from the queue.
@@ -1804,21 +1845,15 @@ LWLockRelease(LWLock *lock)
18041845void
18051846LWLockReleaseClearVar (LWLock * lock ,uint64 * valptr ,uint64 val )
18061847{
1807- #ifdef LWLOCK_STATS
1808- lwlock_stats * lwstats ;
1848+ LWLockWaitListLock (lock );
18091849
1810- lwstats = get_lwlock_stats_entry (lock );
1811- lwstats -> spin_delay_count += SpinLockAcquire (& lock -> mutex );
1812- #else
1813- SpinLockAcquire (& lock -> mutex );
1814- #endif
18151850/*
18161851 * Set the variable's value before releasing the lock, that prevents race
18171852 * a race condition wherein a new locker acquires the lock, but hasn't yet
18181853 * set the variables value.
18191854 */
18201855* valptr = val ;
1821- SpinLockRelease ( & lock -> mutex );
1856+ LWLockWaitListUnlock ( lock );
18221857
18231858LWLockRelease (lock );
18241859}