@@ -198,12 +198,19 @@ typedef struct QueuePosition
198
198
(x).page != (y).page ? (y) : \
199
199
(x).offset < (y).offset ? (x) : (y))
200
200
201
+ /* choose logically larger QueuePosition */
202
+ #define QUEUE_POS_MAX (x ,y ) \
203
+ (asyncQueuePagePrecedesLogically((x).page, (y).page) ? (y) : \
204
+ (x).page != (y).page ? (x) : \
205
+ (x).offset > (y).offset ? (x) : (y))
206
+
201
207
/*
202
208
* Struct describing a listening backend's status
203
209
*/
204
210
typedef struct QueueBackendStatus
205
211
{
206
212
int32 pid ;/* either a PID or InvalidPid */
213
+ Oid dboid ;/* backend's database OID, or InvalidOid */
207
214
QueuePosition pos ;/* backend has read queue up to here */
208
215
}QueueBackendStatus ;
209
216
@@ -222,6 +229,7 @@ typedef struct QueueBackendStatus
222
229
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
223
230
* of other backends and also change the head and tail pointers.
224
231
*
232
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
225
233
* In order to avoid deadlocks, whenever we need both locks, we always first
226
234
* get AsyncQueueLock and then AsyncCtlLock.
227
235
*
@@ -232,8 +240,8 @@ typedef struct QueueBackendStatus
232
240
typedef struct AsyncQueueControl
233
241
{
234
242
QueuePosition head ;/* head points to the next free location */
235
- QueuePosition tail ;/* the global tail is equivalent to thetail
236
- *of the "slowest" backend */
243
+ QueuePosition tail ;/* the global tail is equivalent to thepos of
244
+ * the "slowest" backend */
237
245
TimestampTz lastQueueFillWarn ;/* time of last queue-full msg */
238
246
QueueBackendStatus backend [1 ];/* actually of length MaxBackends+1 */
239
247
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
@@ -244,6 +252,7 @@ static AsyncQueueControl *asyncQueueControl;
244
252
#define QUEUE_HEAD (asyncQueueControl->head)
245
253
#define QUEUE_TAIL (asyncQueueControl->tail)
246
254
#define QUEUE_BACKEND_PID (i )(asyncQueueControl->backend[i].pid)
255
+ #define QUEUE_BACKEND_DBOID (i )(asyncQueueControl->backend[i].dboid)
247
256
#define QUEUE_BACKEND_POS (i )(asyncQueueControl->backend[i].pos)
248
257
249
258
/*
@@ -477,6 +486,7 @@ AsyncShmemInit(void)
477
486
for (i = 0 ;i <=MaxBackends ;i ++ )
478
487
{
479
488
QUEUE_BACKEND_PID (i )= InvalidPid ;
489
+ QUEUE_BACKEND_DBOID (i )= InvalidOid ;
480
490
SET_QUEUE_POS (QUEUE_BACKEND_POS (i ),0 ,0 );
481
491
}
482
492
}
@@ -929,6 +939,10 @@ AtCommit_Notify(void)
929
939
static void
930
940
Exec_ListenPreCommit (void )
931
941
{
942
+ QueuePosition head ;
943
+ QueuePosition max ;
944
+ int i ;
945
+
932
946
/*
933
947
* Nothing to do if we are already listening to something, nor if we
934
948
* already ran this routine in this transaction.
@@ -956,10 +970,34 @@ Exec_ListenPreCommit(void)
956
970
* over already-committed notifications. This ensures we cannot miss any
957
971
* not-yet-committed notifications. We might get a few more but that
958
972
* doesn't hurt.
973
+ *
974
+ * In some scenarios there might be a lot of committed notifications that
975
+ * have not yet been pruned away (because some backend is being lazy about
976
+ * reading them). To reduce our startup time, we can look at other
977
+ * backends and adopt the maximum "pos" pointer of any backend that's in
978
+ * our database; any notifications it's already advanced over are surely
979
+ * committed and need not be re-examined by us. (We must consider only
980
+ * backends connected to our DB, because others will not have bothered to
981
+ * check committed-ness of notifications in our DB.) But we only bother
982
+ * with that if there's more than a page worth of notifications
983
+ * outstanding, otherwise scanning all the other backends isn't worth it.
984
+ *
985
+ * We need exclusive lock here so we can look at other backends' entries.
959
986
*/
960
- LWLockAcquire (AsyncQueueLock ,LW_SHARED );
961
- QUEUE_BACKEND_POS (MyBackendId )= QUEUE_TAIL ;
987
+ LWLockAcquire (AsyncQueueLock ,LW_EXCLUSIVE );
988
+ head = QUEUE_HEAD ;
989
+ max = QUEUE_TAIL ;
990
+ if (QUEUE_POS_PAGE (max )!= QUEUE_POS_PAGE (head ))
991
+ {
992
+ for (i = 1 ;i <=MaxBackends ;i ++ )
993
+ {
994
+ if (QUEUE_BACKEND_DBOID (i )== MyDatabaseId )
995
+ max = QUEUE_POS_MAX (max ,QUEUE_BACKEND_POS (i ));
996
+ }
997
+ }
998
+ QUEUE_BACKEND_POS (MyBackendId )= max ;
962
999
QUEUE_BACKEND_PID (MyBackendId )= MyProcPid ;
1000
+ QUEUE_BACKEND_DBOID (MyBackendId )= MyDatabaseId ;
963
1001
LWLockRelease (AsyncQueueLock );
964
1002
965
1003
/* Now we are listed in the global array, so remember we're listening */
@@ -975,7 +1013,8 @@ Exec_ListenPreCommit(void)
975
1013
*
976
1014
* This will also advance the global tail pointer if possible.
977
1015
*/
978
- asyncQueueReadAllNotifications ();
1016
+ if (!QUEUE_POS_EQUAL (max ,head ))
1017
+ asyncQueueReadAllNotifications ();
979
1018
}
980
1019
981
1020
/*
@@ -1178,6 +1217,7 @@ asyncQueueUnregister(void)
1178
1217
QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ),QUEUE_TAIL );
1179
1218
/* ... then mark it invalid */
1180
1219
QUEUE_BACKEND_PID (MyBackendId )= InvalidPid ;
1220
+ QUEUE_BACKEND_DBOID (MyBackendId )= InvalidOid ;
1181
1221
LWLockRelease (AsyncQueueLock );
1182
1222
1183
1223
/* mark ourselves as no longer listed in the global array */