@@ -202,12 +202,19 @@ typedef struct QueuePosition
202202 (x).page != (y).page ? (y) : \
203203 (x).offset < (y).offset ? (x) : (y))
204204
205+ /* choose logically larger QueuePosition */
206+ #define QUEUE_POS_MAX (x ,y ) \
207+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
208+ (x).page != (y).page ? (x) : \
209+ (x).offset > (y).offset ? (x) : (y))
210+
205211/*
206212 * Struct describing a listening backend's status
207213 */
208214typedef struct QueueBackendStatus
209215{
210216int32 pid ;/* either a PID or InvalidPid */
217+ Oid dboid ;/* backend's database OID, or InvalidOid */
211218QueuePosition pos ;/* backend has read queue up to here */
212219}QueueBackendStatus ;
213220
@@ -224,6 +231,7 @@ typedef struct QueueBackendStatus
224231 * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
225232 * of other backends and also change the head and tail pointers.
226233 *
234+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
227235 * In order to avoid deadlocks, whenever we need both locks, we always first
228236 * get AsyncQueueLock and then AsyncCtlLock.
229237 *
@@ -234,8 +242,8 @@ typedef struct QueueBackendStatus
234242typedef struct AsyncQueueControl
235243{
236244QueuePosition head ;/* head points to the next free location */
237- QueuePosition tail ;/* the global tail is equivalent to thetail
238- *of the "slowest" backend */
245+ QueuePosition tail ;/* the global tail is equivalent to thepos of
246+ * the "slowest" backend */
239247TimestampTz lastQueueFillWarn ;/* time of last queue-full msg */
240248QueueBackendStatus backend [FLEXIBLE_ARRAY_MEMBER ];
241249/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -246,6 +254,7 @@ static AsyncQueueControl *asyncQueueControl;
246254#define QUEUE_HEAD (asyncQueueControl->head)
247255#define QUEUE_TAIL (asyncQueueControl->tail)
248256#define QUEUE_BACKEND_PID (i )(asyncQueueControl->backend[i].pid)
257+ #define QUEUE_BACKEND_DBOID (i )(asyncQueueControl->backend[i].dboid)
249258#define QUEUE_BACKEND_POS (i )(asyncQueueControl->backend[i].pos)
250259
251260/*
@@ -459,6 +468,7 @@ AsyncShmemInit(void)
459468for (i = 0 ;i <=MaxBackends ;i ++ )
460469{
461470QUEUE_BACKEND_PID (i )= InvalidPid ;
471+ QUEUE_BACKEND_DBOID (i )= InvalidOid ;
462472SET_QUEUE_POS (QUEUE_BACKEND_POS (i ),0 ,0 );
463473}
464474}
@@ -905,6 +915,10 @@ AtCommit_Notify(void)
905915static void
906916Exec_ListenPreCommit (void )
907917{
918+ QueuePosition head ;
919+ QueuePosition max ;
920+ int i ;
921+
908922/*
909923 * Nothing to do if we are already listening to something, nor if we
910924 * already ran this routine in this transaction.
@@ -932,10 +946,34 @@ Exec_ListenPreCommit(void)
932946 * over already-committed notifications. This ensures we cannot miss any
933947 * not-yet-committed notifications. We might get a few more but that
934948 * doesn't hurt.
949+ *
950+ * In some scenarios there might be a lot of committed notifications that
951+ * have not yet been pruned away (because some backend is being lazy about
952+ * reading them). To reduce our startup time, we can look at other
953+ * backends and adopt the maximum "pos" pointer of any backend that's in
954+ * our database; any notifications it's already advanced over are surely
955+ * committed and need not be re-examined by us. (We must consider only
956+ * backends connected to our DB, because others will not have bothered to
957+ * check committed-ness of notifications in our DB.) But we only bother
958+ * with that if there's more than a page worth of notifications
959+ * outstanding, otherwise scanning all the other backends isn't worth it.
960+ *
961+ * We need exclusive lock here so we can look at other backends' entries.
935962 */
936- LWLockAcquire (AsyncQueueLock ,LW_SHARED );
937- QUEUE_BACKEND_POS (MyBackendId )= QUEUE_TAIL ;
963+ LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
964+ head = QUEUE_HEAD ;
965+ max = QUEUE_TAIL ;
966+ if (QUEUE_POS_PAGE (max )!= QUEUE_POS_PAGE (head ))
967+ {
968+ for (i = 1 ;i <=MaxBackends ;i ++ )
969+ {
970+ if (QUEUE_BACKEND_DBOID (i )== MyDatabaseId )
971+ max = QUEUE_POS_MAX (max ,QUEUE_BACKEND_POS (i ));
972+ }
973+ }
974+ QUEUE_BACKEND_POS (MyBackendId )= max ;
938975QUEUE_BACKEND_PID (MyBackendId )= MyProcPid ;
976+ QUEUE_BACKEND_DBOID (MyBackendId )= MyDatabaseId ;
939977LWLockRelease (AsyncQueueLock );
940978
941979/* Now we are listed in the global array, so remember we're listening */
@@ -951,7 +989,8 @@ Exec_ListenPreCommit(void)
951989 *
952990 * This will also advance the global tail pointer if possible.
953991 */
954- asyncQueueReadAllNotifications ();
992+ if (!QUEUE_POS_EQUAL (max ,head ))
993+ asyncQueueReadAllNotifications ();
955994}
956995
957996/*
@@ -1154,6 +1193,7 @@ asyncQueueUnregister(void)
11541193QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ),QUEUE_TAIL );
11551194/* ... then mark it invalid */
11561195QUEUE_BACKEND_PID (MyBackendId )= InvalidPid ;
1196+ QUEUE_BACKEND_DBOID (MyBackendId )= InvalidOid ;
11571197LWLockRelease (AsyncQueueLock );
11581198
11591199/* mark ourselves as no longer listed in the global array */