88 *
99 *
1010 * IDENTIFICATION
11- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
11+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.72 2008/06/20 00:24:53 tgl Exp $
1212 *
1313 *-------------------------------------------------------------------------
1414 */
2323#include "storage/proc.h"
2424#include "storage/shmem.h"
2525#include "storage/sinvaladt.h"
26+ #include "storage/spin.h"
2627
2728
2829/*
8485 * can operate in parallel with one or more readers, because the writer
8586 * has no need to touch anyone's ProcState, except in the infrequent cases
8687 * when SICleanupQueue is needed. The only point of overlap is that
87- * the writer might change maxMsgNum while readers are looking at it.
88- * This should be okay: we are assuming that fetching or storing an int
89- * is atomic, an assumption also made elsewhere in Postgres. However
90- * readers mustn't assume that maxMsgNum isn't changing under them.
88+ * the writer wants to change maxMsgNum while readers need to read it.
89+ * We deal with that by having a spinlock that readers must take for just
90+ * long enough to read maxMsgNum, while writers take it for just long enough
91+ * to write maxMsgNum. (The exact rule is that you need the spinlock to
92+ * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
93+ * spinlock to write maxMsgNum unless you are holding both locks.)
94+ *
95+ * Note: since maxMsgNum is an int and hence presumably atomically readable/
96+ * writable, the spinlock might seem unnecessary. The reason it is needed
97+ * is to provide a memory barrier: we need to be sure that messages written
98+ * to the array are actually there before maxMsgNum is increased, and that
99+ * readers will see that data after fetching maxMsgNum. Multiprocessors
100+ * that have weak memory-ordering guarantees can fail without the memory
101+ * barrier instructions that are included in the spinlock sequences.
91102 */
92103
93104
@@ -153,6 +164,8 @@ typedef struct SISeg
153164int lastBackend ;/* index of last active procState entry, +1 */
154165int maxBackends ;/* size of procState array */
155166
167+ slock_t msgnumLock ;/* spinlock protecting maxMsgNum */
168+
156169/*
157170 * Circular buffer holding shared-inval messages
158171 */
@@ -209,12 +222,13 @@ CreateSharedInvalidationState(void)
209222if (found )
210223return ;
211224
212- /* Clear message counters, save size of procState array */
225+ /* Clear message counters, save size of procState array, init spinlock */
213226shmInvalBuffer -> minMsgNum = 0 ;
214227shmInvalBuffer -> maxMsgNum = 0 ;
215228shmInvalBuffer -> nextThreshold = CLEANUP_MIN ;
216229shmInvalBuffer -> lastBackend = 0 ;
217230shmInvalBuffer -> maxBackends = MaxBackends ;
231+ SpinLockInit (& shmInvalBuffer -> msgnumLock );
218232
219233/* The buffer[] array is initially all unused, so we need not fill it */
220234
@@ -365,6 +379,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
365379{
366380int nthistime = Min (n ,WRITE_QUANTUM );
367381int numMsgs ;
382+ int max ;
368383
369384n -= nthistime ;
370385
@@ -388,10 +403,21 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
388403/*
389404 * Insert new message(s) into proper slot of circular buffer
390405 */
406+ max = segP -> maxMsgNum ;
391407while (nthistime -- > 0 )
392408{
393- segP -> buffer [segP -> maxMsgNum %MAXNUMMESSAGES ]= * data ++ ;
394- segP -> maxMsgNum ++ ;
409+ segP -> buffer [max %MAXNUMMESSAGES ]= * data ++ ;
410+ max ++ ;
411+ }
412+
413+ /* Update current value of maxMsgNum using spinlock */
414+ {
415+ /* use volatile pointer to prevent code rearrangement */
416+ volatile SISeg * vsegP = segP ;
417+
418+ SpinLockAcquire (& vsegP -> msgnumLock );
419+ vsegP -> maxMsgNum = max ;
420+ SpinLockRelease (& vsegP -> msgnumLock );
395421}
396422
397423LWLockRelease (SInvalWriteLock );
@@ -431,21 +457,32 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
431457{
432458SISeg * segP ;
433459ProcState * stateP ;
460+ int max ;
434461int n ;
435462
436463LWLockAcquire (SInvalReadLock ,LW_SHARED );
437464
438465segP = shmInvalBuffer ;
439466stateP = & segP -> procState [MyBackendId - 1 ];
440467
468+ /* Fetch current value of maxMsgNum using spinlock */
469+ {
470+ /* use volatile pointer to prevent code rearrangement */
471+ volatile SISeg * vsegP = segP ;
472+
473+ SpinLockAcquire (& vsegP -> msgnumLock );
474+ max = vsegP -> maxMsgNum ;
475+ SpinLockRelease (& vsegP -> msgnumLock );
476+ }
477+
441478if (stateP -> resetState )
442479{
443480/*
444481 * Force reset. We can say we have dealt with any messages added
445482 * since the reset, as well; and that means we should clear the
446483 * signaled flag, too.
447484 */
448- stateP -> nextMsgNum = segP -> maxMsgNum ;
485+ stateP -> nextMsgNum = max ;
449486stateP -> resetState = false;
450487stateP -> signaled = false;
451488LWLockRelease (SInvalReadLock );
@@ -459,13 +496,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
459496 * There may be other backends that haven't read the message(s), so we
460497 * cannot delete them here. SICleanupQueue() will eventually remove them
461498 * from the queue.
462- *
463- * Note: depending on the compiler, we might read maxMsgNum only once
464- * here, or each time through the loop. It doesn't matter (as long as
465- * each fetch is atomic).
466499 */
467500n = 0 ;
468- while (n < datasize && stateP -> nextMsgNum < segP -> maxMsgNum )
501+ while (n < datasize && stateP -> nextMsgNum < max )
469502{
470503data [n ++ ]= segP -> buffer [stateP -> nextMsgNum %MAXNUMMESSAGES ];
471504stateP -> nextMsgNum ++ ;
@@ -474,7 +507,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
474507/*
475508 * Reset our "signaled" flag whenever we have caught up completely.
476509 */
477- if (stateP -> nextMsgNum >=segP -> maxMsgNum )
510+ if (stateP -> nextMsgNum >=max )
478511stateP -> signaled = false;
479512
480513LWLockRelease (SInvalReadLock );