1515 * Portions Copyright (c) 1994, Regents of the University of California
1616 *
1717 * IDENTIFICATION
18- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.7 2001/12/29 21:30:32 momjian Exp $
18+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.8 2002/01/07 16:33:00 tgl Exp $
1919 *
2020 *-------------------------------------------------------------------------
2121 */
3030typedef struct LWLock
3131{
3232slock_t mutex ;/* Protects LWLock and queue of PROCs */
33+ bool releaseOK ;/* T if ok to release waiters */
3334char exclusive ;/* # of exclusive holders (0 or 1) */
3435int shared ;/* # of shared holders (0..MaxBackends) */
3536PROC * head ;/* head of list of waiting PROCs */
@@ -67,9 +68,10 @@ inline static void
6768PRINT_LWDEBUG (const char * where ,LWLockId lockid ,const volatile LWLock * lock )
6869{
6970if (Trace_lwlocks )
70- elog (DEBUG ,"%s(%d): excl %d shared %d head %p" ,
71+ elog (DEBUG ,"%s(%d): excl %d shared %d head %p rOK %d " ,
7172where , (int )lockid ,
72- (int )lock -> exclusive ,lock -> shared ,lock -> head );
73+ (int )lock -> exclusive ,lock -> shared ,lock -> head ,
74+ (int )lock -> releaseOK );
7375}
7476
7577inline static void
@@ -153,6 +155,7 @@ CreateLWLocks(void)
153155for (id = 0 ,lock = LWLockArray ;id < numLocks ;id ++ ,lock ++ )
154156{
155157SpinLockInit (& lock -> mutex );
158+ lock -> releaseOK = true;
156159lock -> exclusive = 0 ;
157160lock -> shared = 0 ;
158161lock -> head = NULL ;
195198LWLockAcquire (LWLockId lockid ,LWLockMode mode )
196199{
197200volatile LWLock * lock = LWLockArray + lockid ;
198- bool mustwait ;
201+ PROC * proc = MyProc ;
202+ bool retry = false;
203+ int extraWaits = 0 ;
199204
200205PRINT_LWDEBUG ("LWLockAcquire" ,lockid ,lock );
201206
@@ -206,43 +211,62 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
206211 */
207212HOLD_INTERRUPTS ();
208213
209- /* Acquire mutex. Time spent holding mutex should be short! */
210- SpinLockAcquire_NoHoldoff (& lock -> mutex );
211-
212- /* If I can get the lock, do so quickly. */
213- if (mode == LW_EXCLUSIVE )
214+ /*
215+ * Loop here to try to acquire lock after each time we are signaled
216+ * by LWLockRelease.
217+ *
218+ * NOTE: it might seem better to have LWLockRelease actually grant us
219+ * the lock, rather than retrying and possibly having to go back to
220+ * sleep. But in practice that is no good because it means a process
221+ * swap for every lock acquisition when two or more processes are
222+ * contending for the same lock. Since LWLocks are normally used to
223+ * protect not-very-long sections of computation, a process needs to
224+ * be able to acquire and release the same lock many times during a
225+ * single CPU time slice, even in the presence of contention. The
226+ * efficiency of being able to do that outweighs the inefficiency of
227+ * sometimes wasting a process dispatch cycle because the lock is not
228+ * free when a released waiter finally gets to run. See pgsql-hackers
229+ * archives for 29-Dec-01.
230+ */
231+ for (;;)
214232{
215- if (lock -> exclusive == 0 && lock -> shared == 0 )
233+ bool mustwait ;
234+
235+ /* Acquire mutex. Time spent holding mutex should be short! */
236+ SpinLockAcquire_NoHoldoff (& lock -> mutex );
237+
238+ /* If retrying, allow LWLockRelease to release waiters again */
239+ if (retry )
240+ lock -> releaseOK = true;
241+
242+ /* If I can get the lock, do so quickly. */
243+ if (mode == LW_EXCLUSIVE )
216244{
217- lock -> exclusive ++ ;
218- mustwait = false;
245+ if (lock -> exclusive == 0 && lock -> shared == 0 )
246+ {
247+ lock -> exclusive ++ ;
248+ mustwait = false;
249+ }
250+ else
251+ mustwait = true;
219252}
220253else
221- mustwait = true;
222- }
223- else
224- {
225- /*
226- * If there is someone waiting (presumably for exclusive access),
227- * queue up behind him even though I could get the lock. This
228- * prevents a stream of read locks from starving a writer.
229- */
230- if (lock -> exclusive == 0 && lock -> head == NULL )
231254{
232- lock -> shared ++ ;
233- mustwait = false;
255+ if (lock -> exclusive == 0 )
256+ {
257+ lock -> shared ++ ;
258+ mustwait = false;
259+ }
260+ else
261+ mustwait = true;
234262}
235- else
236- mustwait = true;
237- }
238263
239- if (mustwait )
240- {
241- /* Add myself to wait queue */
242- PROC * proc = MyProc ;
243- int extraWaits = 0 ;
264+ if (!mustwait )
265+ break ;/* got the lock */
244266
245267/*
268+ * Add myself to wait queue.
269+ *
246270 * If we don't have a PROC structure, there's no way to wait. This
247271 * should never occur, since MyProc should only be null during
248272 * shared memory initialization.
@@ -267,9 +291,9 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
267291 *
268292 * Since we share the process wait semaphore with the regular lock
269293 * manager and ProcWaitForSignal, and we may need to acquire an
270- * LWLock while one of those is pending, it is possible that we
271- *get awakened for a reason other than beinggranted the LWLock .
272- * If so, loop back and wait again. Once we've gotten thelock ,
294+ * LWLock while one of those is pending, it is possible that we get
295+ * awakened for a reason other than beingsignaled by LWLockRelease .
296+ * If so, loop back and wait again. Once we've gotten theLWLock ,
273297 * re-increment the sema by the number of additional signals
274298 * received, so that the lock manager or signal manager will see
275299 * the received signal when it next waits.
@@ -287,23 +311,22 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
287311
288312LOG_LWDEBUG ("LWLockAcquire" ,lockid ,"awakened" );
289313
290- /*
291- * The awakener already updated the lock struct's state, so we
292- * don't need to do anything more to it. Just need to fix the
293- * semaphore count.
294- */
295- while (extraWaits -- > 0 )
296- IpcSemaphoreUnlock (proc -> sem .semId ,proc -> sem .semNum );
297- }
298- else
299- {
300- /* Got the lock without waiting */
301- SpinLockRelease_NoHoldoff (& lock -> mutex );
314+ /* Now loop back and try to acquire lock again. */
315+ retry = true;
302316}
303317
318+ /* We are done updating shared state of the lock itself. */
319+ SpinLockRelease_NoHoldoff (& lock -> mutex );
320+
304321/* Add lock to list of locks held by this backend */
305322Assert (num_held_lwlocks < MAX_SIMUL_LWLOCKS );
306323held_lwlocks [num_held_lwlocks ++ ]= lockid ;
324+
325+ /*
326+ * Fix the process wait semaphore's count for any absorbed wakeups.
327+ */
328+ while (extraWaits -- > 0 )
329+ IpcSemaphoreUnlock (proc -> sem .semId ,proc -> sem .semNum );
307330}
308331
309332/*
@@ -344,12 +367,7 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
344367}
345368else
346369{
347- /*
348- * If there is someone waiting (presumably for exclusive access),
349- * queue up behind him even though I could get the lock. This
350- * prevents a stream of read locks from starving a writer.
351- */
352- if (lock -> exclusive == 0 && lock -> head == NULL )
370+ if (lock -> exclusive == 0 )
353371{
354372lock -> shared ++ ;
355373mustwait = false;
@@ -419,33 +437,34 @@ LWLockRelease(LWLockId lockid)
419437
420438/*
421439 * See if I need to awaken any waiters. If I released a non-last
422- * shared hold, there cannot be anything to do.
440+ * shared hold, there cannot be anything to do. Also, do not awaken
441+ * any waiters if someone has already awakened waiters that haven't
442+ * yet acquired the lock.
423443 */
424444head = lock -> head ;
425445if (head != NULL )
426446{
427- if (lock -> exclusive == 0 && lock -> shared == 0 )
447+ if (lock -> exclusive == 0 && lock -> shared == 0 && lock -> releaseOK )
428448{
429449/*
430- * Remove the to-be-awakened PROCs from the queue, and update
431- * the lock state to show them as holding the lock.
450+ * Remove the to-be-awakened PROCs from the queue. If the
451+ * front waiter wants exclusive lock, awaken him only.
452+ * Otherwise awaken as many waiters as want shared access.
432453 */
433454proc = head ;
434- if (proc -> lwExclusive )
435- lock -> exclusive ++ ;
436- else
455+ if (!proc -> lwExclusive )
437456{
438- lock -> shared ++ ;
439457while (proc -> lwWaitLink != NULL &&
440458 !proc -> lwWaitLink -> lwExclusive )
441459{
442460proc = proc -> lwWaitLink ;
443- lock -> shared ++ ;
444461}
445462}
446463/* proc is now the last PROC to be released */
447464lock -> head = proc -> lwWaitLink ;
448465proc -> lwWaitLink = NULL ;
466+ /* prevent additional wakeups until retryer gets to run */
467+ lock -> releaseOK = false;
449468}
450469else
451470{