7575 * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
7676 * to every listening backend (we don't know which backend is listening on
7777 * which channel so we must signal them all). We can exclude backends that
78- * are already up to date, though. We don't bother with a self-signal
79- * either, but just process the queue directly.
78+ * are already up to date, though, and we can also exclude backends that
79+ * are in other databases (unless they are way behind and should be kicked
80+ * to make them advance their pointers). We don't bother with a
81+ * self-signal either, but just process the queue directly.
8082 *
8183 * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
8284 * sets the process's latch, which triggers the event to be processed
8991 * Inbound-notify processing consists of reading all of the notifications
9092 * that have arrived since scanning last time. We read every notification
9193 * until we reach either a notification from an uncommitted transaction or
92- * the head pointer's position. Then we check if we were the laziest
93- * backend: if our pointer is set to the same position as the global tail
94- * pointer is set, then we move the global tail pointer ahead to where the
95- * second-laziest backend is (in general, we take the MIN of the current
96- * head position and all active backends' new tail pointers). Whenever we
97- * move the global tail pointer we also truncate now-unused pages (i.e.,
98- * delete files in pg_notify/ that are no longer used).
94+ * the head pointer's position.
95+ *
96+ * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
97+ * pointer needs to be advanced so that old pages can be truncated.
98+ * This is relatively expensive (notably, it requires an exclusive lock),
99+ * so we don't want to do it often. We make sending backends do this work
100+ * if they advanced the queue head into a new page, but only once every
101+ * QUEUE_CLEANUP_DELAY pages.
99102 *
100103 * An application that listens on the same channel it notifies will get
101104 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@@ -211,6 +214,19 @@ typedef struct QueuePosition
211214 (x).page != (y).page ? (x) : \
212215 (x).offset > (y).offset ? (x) : (y))
213216
217+ /*
218+ * Parameter determining how often we try to advance the tail pointer:
219+ * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
220+ * also the distance by which a backend in another database needs to be
221+ * behind before we'll decide we need to wake it up to advance its pointer.
222+ *
223+ * Resist the temptation to make this really large. While that would save
224+ * work in some places, it would add cost in others. In particular, this
225+ * should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends
226+ * catch up before the pages they'll need to read fall out of SLRU cache.
227+ */
228+ #define QUEUE_CLEANUP_DELAY 4
229+
214230/*
215231 * Struct describing a listening backend's status
216232 */
@@ -252,8 +268,8 @@ typedef struct QueueBackendStatus
252268typedef struct AsyncQueueControl
253269{
254270QueuePosition head ;/* head points to the next free location */
255- QueuePosition tail ;/*the global tailis equivalent to thepos of
256- *the "slowest" backend */
271+ QueuePosition tail ;/* tailmust be <= thequeue position of every
272+ *listening backend */
257273BackendId firstListener ;/* id of first listener, or InvalidBackendId */
258274TimestampTz lastQueueFillWarn ;/* time of last queue-full msg */
259275QueueBackendStatus backend [FLEXIBLE_ARRAY_MEMBER ];
@@ -402,10 +418,14 @@ static bool amRegisteredListener = false;
402418/* has this backend sent notifications in the current transaction? */
403419static bool backendHasSentNotifications = false;
404420
421+ /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
422+ static bool backendTryAdvanceTail = false;
423+
405424/* GUC parameter */
406425bool Trace_notify = false;
407426
408427/* local function prototypes */
428+ static int asyncQueuePageDiff (int p ,int q );
409429static bool asyncQueuePagePrecedes (int p ,int q );
410430static void queue_listen (ListenActionKind action ,const char * channel );
411431static void Async_UnlistenOnExit (int code ,Datum arg );
@@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
421441static ListCell * asyncQueueAddEntries (ListCell * nextNotify );
422442static double asyncQueueUsage (void );
423443static void asyncQueueFillWarning (void );
424- static bool SignalBackends (void );
444+ static void SignalBackends (void );
425445static void asyncQueueReadAllNotifications (void );
426446static bool asyncQueueProcessPageEntries (volatile QueuePosition * current ,
427447QueuePosition stop ,
@@ -436,10 +456,11 @@ static intnotification_match(const void *key1, const void *key2, Size keysize);
436456static void ClearPendingActionsAndNotifies (void );
437457
438458/*
439- * We will work on the page range of 0..QUEUE_MAX_PAGE.
459+ * Compute the difference between two queue page numbers (i.e., p - q),
460+ * accounting for wraparound.
440461 */
441- static bool
442- asyncQueuePagePrecedes (int p ,int q )
462+ static int
463+ asyncQueuePageDiff (int p ,int q )
443464{
444465int diff ;
445466
@@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q)
455476diff -= QUEUE_MAX_PAGE + 1 ;
456477else if (diff < - ((QUEUE_MAX_PAGE + 1 ) /2 ))
457478diff += QUEUE_MAX_PAGE + 1 ;
458- return diff < 0 ;
479+ return diff ;
480+ }
481+
482+ /* Is p < q, accounting for wraparound? */
483+ static bool
484+ asyncQueuePagePrecedes (int p ,int q )
485+ {
486+ return asyncQueuePageDiff (p ,q )< 0 ;
459487}
460488
461489/*
@@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void)
10511079 * notification to the frontend. Also, although our transaction might
10521080 * have executed NOTIFY, those message(s) aren't queued yet so we can't
10531081 * see them in the queue.
1054- *
1055- * This will also advance the global tail pointer if possible.
10561082 */
10571083if (!QUEUE_POS_EQUAL (max ,head ))
10581084asyncQueueReadAllNotifications ();
@@ -1138,6 +1164,8 @@ Exec_UnlistenAllCommit(void)
11381164 * of a transaction. If we issued any notifications in the just-completed
11391165 * transaction, send signals to other backends to process them, and also
11401166 * process the queue ourselves to send messages to our own frontend.
1167+ * Also, if we filled enough queue pages with new notifies, try to advance
1168+ * the queue tail pointer.
11411169 *
11421170 * The reason that this is not done in AtCommit_Notify is that there is
11431171 * a nonzero chance of errors here (for example, encoding conversion errors
@@ -1156,7 +1184,6 @@ void
11561184ProcessCompletedNotifies (void )
11571185{
11581186MemoryContext caller_context ;
1159- bool signalled ;
11601187
11611188/* Nothing to do if we didn't send any notifications */
11621189if (!backendHasSentNotifications )
@@ -1185,23 +1212,20 @@ ProcessCompletedNotifies(void)
11851212StartTransactionCommand ();
11861213
11871214/* Send signals to other backends */
1188- signalled = SignalBackends ();
1215+ SignalBackends ();
11891216
11901217if (listenChannels != NIL )
11911218{
11921219/* Read the queue ourselves, and send relevant stuff to the frontend */
11931220asyncQueueReadAllNotifications ();
11941221}
1195- else if (!signalled )
1222+
1223+ /*
1224+ * If it's time to try to advance the global tail pointer, do that.
1225+ */
1226+ if (backendTryAdvanceTail )
11961227{
1197- /*
1198- * If we found no other listening backends, and we aren't listening
1199- * ourselves, then we must execute asyncQueueAdvanceTail to flush the
1200- * queue, because ain't nobody else gonna do it. This prevents queue
1201- * overflow when we're sending useless notifies to nobody. (A new
1202- * listener could have joined since we looked, but if so this is
1203- * harmless.)
1204- */
1228+ backendTryAdvanceTail = false;
12051229asyncQueueAdvanceTail ();
12061230}
12071231
@@ -1242,8 +1266,6 @@ IsListeningOn(const char *channel)
12421266static void
12431267asyncQueueUnregister (void )
12441268{
1245- bool advanceTail ;
1246-
12471269Assert (listenChannels == NIL );/* else caller error */
12481270
12491271if (!amRegisteredListener )/* nothing to do */
@@ -1253,10 +1275,7 @@ asyncQueueUnregister(void)
12531275 * Need exclusive lock here to manipulate list links.
12541276 */
12551277LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
1256- /* check if entry is valid and oldest ... */
1257- advanceTail = (MyProcPid == QUEUE_BACKEND_PID (MyBackendId ))&&
1258- QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ),QUEUE_TAIL );
1259- /* ... then mark it invalid */
1278+ /* Mark our entry as invalid */
12601279QUEUE_BACKEND_PID (MyBackendId )= InvalidPid ;
12611280QUEUE_BACKEND_DBOID (MyBackendId )= InvalidOid ;
12621281/* and remove it from the list */
@@ -1278,10 +1297,6 @@ asyncQueueUnregister(void)
12781297
12791298/* mark ourselves as no longer listed in the global array */
12801299amRegisteredListener = false;
1281-
1282- /* If we were the laziest backend, try to advance the tail pointer */
1283- if (advanceTail )
1284- asyncQueueAdvanceTail ();
12851300}
12861301
12871302/*
@@ -1467,6 +1482,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
14671482 * page without overrunning the queue.
14681483 */
14691484slotno = SimpleLruZeroPage (AsyncCtl ,QUEUE_POS_PAGE (queue_head ));
1485+
1486+ /*
1487+ * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1488+ * set flag to remember that we should try to advance the tail
1489+ * pointer (we don't want to actually do that right here).
1490+ */
1491+ if (QUEUE_POS_PAGE (queue_head ) %QUEUE_CLEANUP_DELAY == 0 )
1492+ backendTryAdvanceTail = true;
1493+
14701494/* And exit the loop */
14711495break ;
14721496}
@@ -1570,31 +1594,30 @@ asyncQueueFillWarning(void)
15701594}
15711595
15721596/*
1573- * Send signals toall listening backends (except our own) .
1597+ * Send signals to listening backends.
15741598 *
1575- *Returns true if we sent at least one signal .
1599+ *We never signal our own process; that should be handled by our caller .
15761600 *
1577- * Since we need EXCLUSIVE lock anyway we also check the position of the other
1578- * backends and in case one is already up-to-date we don't signal it.
1579- * This can happen if concurrent notifying transactions have sent a signal and
1580- * the signaled backend has read the other notifications and ours in the same
1581- * step.
1601+ * Normally we signal only backends in our own database, since only those
1602+ * backends could be interested in notifies we send. However, if there's
1603+ * notify traffic in our database but no traffic in another database that
1604+ * does have listener(s), those listeners will fall further and further
1605+ * behind. Waken them anyway if they're far enough behind, so that they'll
1606+ * advance their queue position pointers, allowing the global tail to advance.
15821607 *
15831608 * Since we know the BackendId and the Pid the signalling is quite cheap.
15841609 */
1585- static bool
1610+ static void
15861611SignalBackends (void )
15871612{
1588- bool signalled = false;
15891613int32 * pids ;
15901614BackendId * ids ;
15911615int count ;
1592- int32 pid ;
15931616
15941617/*
1595- * Identifyall backends thatare listening and not already up-to-date. We
1596- *don't want to send signals while holding the AsyncQueueLock, sowe just
1597- *build a list of target PIDs.
1618+ * Identify backends thatwe need to signal. We don't want to send
1619+ * signals while holding the AsyncQueueLock, sothis loop just builds a
1620+ * list of target PIDs.
15981621 *
15991622 * XXX in principle these pallocs could fail, which would be bad. Maybe
16001623 * preallocate the arrays?But in practice this is only run in trivial
@@ -1607,26 +1630,43 @@ SignalBackends(void)
16071630LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
16081631for (BackendId i = QUEUE_FIRST_LISTENER ;i > 0 ;i = QUEUE_NEXT_LISTENER (i ))
16091632{
1610- pid = QUEUE_BACKEND_PID (i );
1633+ int32 pid = QUEUE_BACKEND_PID (i );
1634+ QueuePosition pos ;
1635+
16111636Assert (pid != InvalidPid );
1612- if (pid != MyProcPid )
1637+ if (pid == MyProcPid )
1638+ continue ;/* never signal self */
1639+ pos = QUEUE_BACKEND_POS (i );
1640+ if (QUEUE_BACKEND_DBOID (i )== MyDatabaseId )
16131641{
1614- QueuePosition pos = QUEUE_BACKEND_POS (i );
1615-
1616- if (!QUEUE_POS_EQUAL (pos ,QUEUE_HEAD ))
1617- {
1618- pids [count ]= pid ;
1619- ids [count ]= i ;
1620- count ++ ;
1621- }
1642+ /*
1643+ * Always signal listeners in our own database, unless they're
1644+ * already caught up (unlikely, but possible).
1645+ */
1646+ if (QUEUE_POS_EQUAL (pos ,QUEUE_HEAD ))
1647+ continue ;
1648+ }
1649+ else
1650+ {
1651+ /*
1652+ * Listeners in other databases should be signaled only if they
1653+ * are far behind.
1654+ */
1655+ if (asyncQueuePageDiff (QUEUE_POS_PAGE (QUEUE_HEAD ),
1656+ QUEUE_POS_PAGE (pos ))< QUEUE_CLEANUP_DELAY )
1657+ continue ;
16221658}
1659+ /* OK, need to signal this one */
1660+ pids [count ]= pid ;
1661+ ids [count ]= i ;
1662+ count ++ ;
16231663}
16241664LWLockRelease (AsyncQueueLock );
16251665
16261666/* Now send signals */
16271667for (int i = 0 ;i < count ;i ++ )
16281668{
1629- pid = pids [i ];
1669+ int32 pid = pids [i ];
16301670
16311671/*
16321672 * Note: assuming things aren't broken, a signal failure here could
@@ -1636,14 +1676,10 @@ SignalBackends(void)
16361676 */
16371677if (SendProcSignal (pid ,PROCSIG_NOTIFY_INTERRUPT ,ids [i ])< 0 )
16381678elog (DEBUG3 ,"could not signal backend with PID %d: %m" ,pid );
1639- else
1640- signalled = true;
16411679}
16421680
16431681pfree (pids );
16441682pfree (ids );
1645-
1646- return signalled ;
16471683}
16481684
16491685/*
@@ -1844,7 +1880,6 @@ asyncQueueReadAllNotifications(void)
18441880QueuePosition oldpos ;
18451881QueuePosition head ;
18461882Snapshot snapshot ;
1847- bool advanceTail ;
18481883
18491884/* page_buffer must be adequately aligned, so use a union */
18501885union
@@ -1966,27 +2001,17 @@ asyncQueueReadAllNotifications(void)
19662001/* Update shared state */
19672002LWLockAcquire (AsyncQueueLock ,LW_SHARED );
19682003QUEUE_BACKEND_POS (MyBackendId )= pos ;
1969- advanceTail = QUEUE_POS_EQUAL (oldpos ,QUEUE_TAIL );
19702004LWLockRelease (AsyncQueueLock );
19712005
1972- /* If we were the laziest backend, try to advance the tail pointer */
1973- if (advanceTail )
1974- asyncQueueAdvanceTail ();
1975-
19762006PG_RE_THROW ();
19772007}
19782008PG_END_TRY ();
19792009
19802010/* Update shared state */
19812011LWLockAcquire (AsyncQueueLock ,LW_SHARED );
19822012QUEUE_BACKEND_POS (MyBackendId )= pos ;
1983- advanceTail = QUEUE_POS_EQUAL (oldpos ,QUEUE_TAIL );
19842013LWLockRelease (AsyncQueueLock );
19852014
1986- /* If we were the laziest backend, try to advance the tail pointer */
1987- if (advanceTail )
1988- asyncQueueAdvanceTail ();
1989-
19902015/* Done with snapshot */
19912016UnregisterSnapshot (snapshot );
19922017}