@@ -218,6 +218,7 @@ typedef struct QueueBackendStatus
218218{
219219int32 pid ;/* either a PID or InvalidPid */
220220Oid dboid ;/* backend's database OID, or InvalidOid */
221+ BackendId nextListener ;/* id of next listener, or InvalidBackendId */
221222QueuePosition pos ;/* backend has read queue up to here */
222223}QueueBackendStatus ;
223224
@@ -241,12 +242,19 @@ typedef struct QueueBackendStatus
241242 * Each backend uses the backend[] array entry with index equal to its
242243 * BackendId (which can range from 1 to MaxBackends). We rely on this to make
243244 * SendProcSignal fast.
245+ *
246+ * The backend[] array entries for actively-listening backends are threaded
247+ * together using firstListener and the nextListener links, so that we can
248+ * scan them without having to iterate over inactive entries. We keep this
249+ * list in order by BackendId so that the scan is cache-friendly when there
250+ * are many active entries.
244251 */
245252typedef struct AsyncQueueControl
246253{
247254QueuePosition head ;/* head points to the next free location */
248255QueuePosition tail ;/* the global tail is equivalent to the pos of
249256 * the "slowest" backend */
257+ BackendId firstListener ;/* id of first listener, or InvalidBackendId */
250258TimestampTz lastQueueFillWarn ;/* time of last queue-full msg */
251259QueueBackendStatus backend [FLEXIBLE_ARRAY_MEMBER ];
252260/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -256,8 +264,10 @@ static AsyncQueueControl *asyncQueueControl;
256264
257265#define QUEUE_HEAD (asyncQueueControl->head)
258266#define QUEUE_TAIL (asyncQueueControl->tail)
267+ #define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
259268#define QUEUE_BACKEND_PID (i )(asyncQueueControl->backend[i].pid)
260269#define QUEUE_BACKEND_DBOID (i )(asyncQueueControl->backend[i].dboid)
270+ #define QUEUE_NEXT_LISTENER (i )(asyncQueueControl->backend[i].nextListener)
261271#define QUEUE_BACKEND_POS (i )(asyncQueueControl->backend[i].pos)
262272
263273/*
@@ -490,16 +500,16 @@ AsyncShmemInit(void)
490500if (!found )
491501{
492502/* First time through, so initialize it */
493- int i ;
494-
495503SET_QUEUE_POS (QUEUE_HEAD ,0 ,0 );
496504SET_QUEUE_POS (QUEUE_TAIL ,0 ,0 );
505+ QUEUE_FIRST_LISTENER = InvalidBackendId ;
497506asyncQueueControl -> lastQueueFillWarn = 0 ;
498507/* zero'th entry won't be used, but let's initialize it anyway */
499- for (i = 0 ;i <=MaxBackends ;i ++ )
508+ for (int i = 0 ;i <=MaxBackends ;i ++ )
500509{
501510QUEUE_BACKEND_PID (i )= InvalidPid ;
502511QUEUE_BACKEND_DBOID (i )= InvalidOid ;
512+ QUEUE_NEXT_LISTENER (i )= InvalidBackendId ;
503513SET_QUEUE_POS (QUEUE_BACKEND_POS (i ),0 ,0 );
504514}
505515}
@@ -959,7 +969,7 @@ Exec_ListenPreCommit(void)
959969{
960970QueuePosition head ;
961971QueuePosition max ;
962- int i ;
972+ BackendId prevListener ;
963973
964974/*
965975 * Nothing to do if we are already listening to something, nor if we
@@ -996,26 +1006,37 @@ Exec_ListenPreCommit(void)
9961006 * our database; any notifications it's already advanced over are surely
9971007 * committed and need not be re-examined by us. (We must consider only
9981008 * backends connected to our DB, because others will not have bothered to
999- * check committed-ness of notifications in our DB.) But we only bother
1000- * with that if there's more than a page worth of notifications
1001- * outstanding, otherwise scanning all the other backends isn't worth it.
1009+ * check committed-ness of notifications in our DB.)
10021010 *
1003- * We need exclusive lock here so we can look at other backends' entries.
1011+ * We need exclusive lock here so we can look at other backends' entries
1012+ * and manipulate the list links.
10041013 */
10051014LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
10061015head = QUEUE_HEAD ;
10071016max = QUEUE_TAIL ;
1008- if (QUEUE_POS_PAGE (max )!= QUEUE_POS_PAGE (head ))
1017+ prevListener = InvalidBackendId ;
1018+ for (BackendId i = QUEUE_FIRST_LISTENER ;i > 0 ;i = QUEUE_NEXT_LISTENER (i ))
10091019{
1010- for ( i = 1 ; i <= MaxBackends ; i ++ )
1011- {
1012- if ( QUEUE_BACKEND_DBOID ( i ) == MyDatabaseId )
1013- max = QUEUE_POS_MAX ( max , QUEUE_BACKEND_POS ( i ));
1014- }
1020+ if ( QUEUE_BACKEND_DBOID ( i ) == MyDatabaseId )
1021+ max = QUEUE_POS_MAX ( max , QUEUE_BACKEND_POS ( i ));
1022+ /* Also find last listening backend before this one */
1023+ if ( i < MyBackendId )
1024+ prevListener = i ;
10151025}
10161026QUEUE_BACKEND_POS (MyBackendId )= max ;
10171027QUEUE_BACKEND_PID (MyBackendId )= MyProcPid ;
10181028QUEUE_BACKEND_DBOID (MyBackendId )= MyDatabaseId ;
1029+ /* Insert backend into list of listeners at correct position */
1030+ if (prevListener > 0 )
1031+ {
1032+ QUEUE_NEXT_LISTENER (MyBackendId )= QUEUE_NEXT_LISTENER (prevListener );
1033+ QUEUE_NEXT_LISTENER (prevListener )= MyBackendId ;
1034+ }
1035+ else
1036+ {
1037+ QUEUE_NEXT_LISTENER (MyBackendId )= QUEUE_FIRST_LISTENER ;
1038+ QUEUE_FIRST_LISTENER = MyBackendId ;
1039+ }
10191040LWLockRelease (AsyncQueueLock );
10201041
10211042/* Now we are listed in the global array, so remember we're listening */
@@ -1228,13 +1249,31 @@ asyncQueueUnregister(void)
12281249if (!amRegisteredListener )/* nothing to do */
12291250return ;
12301251
1231- LWLockAcquire (AsyncQueueLock ,LW_SHARED );
1252+ /*
1253+ * Need exclusive lock here to manipulate list links.
1254+ */
1255+ LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
12321256/* check if entry is valid and oldest ... */
12331257advanceTail = (MyProcPid == QUEUE_BACKEND_PID (MyBackendId ))&&
12341258QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ),QUEUE_TAIL );
12351259/* ... then mark it invalid */
12361260QUEUE_BACKEND_PID (MyBackendId )= InvalidPid ;
12371261QUEUE_BACKEND_DBOID (MyBackendId )= InvalidOid ;
1262+ /* and remove it from the list */
1263+ if (QUEUE_FIRST_LISTENER == MyBackendId )
1264+ QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER (MyBackendId );
1265+ else
1266+ {
1267+ for (BackendId i = QUEUE_FIRST_LISTENER ;i > 0 ;i = QUEUE_NEXT_LISTENER (i ))
1268+ {
1269+ if (QUEUE_NEXT_LISTENER (i )== MyBackendId )
1270+ {
1271+ QUEUE_NEXT_LISTENER (i )= QUEUE_NEXT_LISTENER (MyBackendId );
1272+ break ;
1273+ }
1274+ }
1275+ }
1276+ QUEUE_NEXT_LISTENER (MyBackendId )= InvalidBackendId ;
12381277LWLockRelease (AsyncQueueLock );
12391278
12401279/* mark ourselves as no longer listed in the global array */
@@ -1508,16 +1547,13 @@ asyncQueueFillWarning(void)
15081547{
15091548QueuePosition min = QUEUE_HEAD ;
15101549int32 minPid = InvalidPid ;
1511- int i ;
15121550
1513- for (i = 1 ;i <= MaxBackends ;i ++ )
1551+ for (BackendId i = QUEUE_FIRST_LISTENER ;i > 0 ;i = QUEUE_NEXT_LISTENER ( i ) )
15141552{
1515- if (QUEUE_BACKEND_PID (i )!= InvalidPid )
1516- {
1517- min = QUEUE_POS_MIN (min ,QUEUE_BACKEND_POS (i ));
1518- if (QUEUE_POS_EQUAL (min ,QUEUE_BACKEND_POS (i )))
1519- minPid = QUEUE_BACKEND_PID (i );
1520- }
1553+ Assert (QUEUE_BACKEND_PID (i )!= InvalidPid );
1554+ min = QUEUE_POS_MIN (min ,QUEUE_BACKEND_POS (i ));
1555+ if (QUEUE_POS_EQUAL (min ,QUEUE_BACKEND_POS (i )))
1556+ minPid = QUEUE_BACKEND_PID (i );
15211557}
15221558
15231559ereport (WARNING ,
@@ -1553,7 +1589,6 @@ SignalBackends(void)
15531589int32 * pids ;
15541590BackendId * ids ;
15551591int count ;
1556- int i ;
15571592int32 pid ;
15581593
15591594/*
@@ -1570,10 +1605,11 @@ SignalBackends(void)
15701605count = 0 ;
15711606
15721607LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
1573- for (i = 1 ;i <= MaxBackends ;i ++ )
1608+ for (BackendId i = QUEUE_FIRST_LISTENER ;i > 0 ;i = QUEUE_NEXT_LISTENER ( i ) )
15741609{
15751610pid = QUEUE_BACKEND_PID (i );
1576- if (pid != InvalidPid && pid != MyProcPid )
1611+ Assert (pid != InvalidPid );
1612+ if (pid != MyProcPid )
15771613{
15781614QueuePosition pos = QUEUE_BACKEND_POS (i );
15791615
@@ -1588,7 +1624,7 @@ SignalBackends(void)
15881624LWLockRelease (AsyncQueueLock );
15891625
15901626/* Now send signals */
1591- for (i = 0 ;i < count ;i ++ )
1627+ for (int i = 0 ;i < count ;i ++ )
15921628{
15931629pid = pids [i ];
15941630
@@ -2064,17 +2100,16 @@ static void
20642100asyncQueueAdvanceTail (void )
20652101{
20662102QueuePosition min ;
2067- int i ;
20682103int oldtailpage ;
20692104int newtailpage ;
20702105int boundary ;
20712106
20722107LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
20732108min = QUEUE_HEAD ;
2074- for (i = 1 ;i <= MaxBackends ;i ++ )
2109+ for (BackendId i = QUEUE_FIRST_LISTENER ;i > 0 ;i = QUEUE_NEXT_LISTENER ( i ) )
20752110{
2076- if (QUEUE_BACKEND_PID (i )!= InvalidPid )
2077- min = QUEUE_POS_MIN (min ,QUEUE_BACKEND_POS (i ));
2111+ Assert (QUEUE_BACKEND_PID (i )!= InvalidPid );
2112+ min = QUEUE_POS_MIN (min ,QUEUE_BACKEND_POS (i ));
20782113}
20792114oldtailpage = QUEUE_POS_PAGE (QUEUE_TAIL );
20802115QUEUE_TAIL = min ;