Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2d4336c

Browse files
committed
Improve LISTEN startup time when there are many unread notifications.
If some existing listener is far behind, incoming new listener sessionswould start from that session's read pointer and then need to advance overmany already-committed notification messages, which they have no interestin. This was expensive in itself and also thrashed the pg_notify SLRUbuffers a lot more than necessary. We can improve matters considerablyin typical scenarios, without much added cost, by starting from thefurthest-ahead read pointer, not the furthest-behind one. We do have toconsider only sessions in our own database when doing this, which requiresan extra field in the data structure, but that's a pretty small cost.Back-patch to 9.0 where the current LISTEN/NOTIFY logic was introduced.Matt Newell, slightly adjusted by me
1 parent47ae6bc commit2d4336c

File tree

1 file changed

+45
-5
lines changed

1 file changed

+45
-5
lines changed

‎src/backend/commands/async.c

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,19 @@ typedef struct QueuePosition
198198
(x).page != (y).page ? (y) : \
199199
(x).offset < (y).offset ? (x) : (y))
200200

201+
/* choose logically larger QueuePosition */
202+
#defineQUEUE_POS_MAX(x,y) \
203+
(asyncQueuePagePrecedesLogically((x).page, (y).page) ? (y) : \
204+
(x).page != (y).page ? (x) : \
205+
(x).offset > (y).offset ? (x) : (y))
206+
201207
/*
202208
* Struct describing a listening backend's status
203209
*/
204210
typedefstructQueueBackendStatus
205211
{
206212
int32pid;/* either a PID or InvalidPid */
213+
Oiddboid;/* backend's database OID, or InvalidOid */
207214
QueuePositionpos;/* backend has read queue up to here */
208215
}QueueBackendStatus;
209216

@@ -222,6 +229,7 @@ typedef struct QueueBackendStatus
222229
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
223230
* of other backends and also change the head and tail pointers.
224231
*
232+
* AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
225233
* In order to avoid deadlocks, whenever we need both locks, we always first
226234
* get AsyncQueueLock and then AsyncCtlLock.
227235
*
@@ -232,8 +240,8 @@ typedef struct QueueBackendStatus
232240
typedefstructAsyncQueueControl
233241
{
234242
QueuePositionhead;/* head points to the next free location */
235-
QueuePositiontail;/* the global tail is equivalent to thetail
236-
*ofthe "slowest" backend */
243+
QueuePositiontail;/* the global tail is equivalent to thepos of
244+
* the "slowest" backend */
237245
TimestampTzlastQueueFillWarn;/* time of last queue-full msg */
238246
QueueBackendStatusbackend[1];/* actually of length MaxBackends+1 */
239247
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
@@ -244,6 +252,7 @@ static AsyncQueueControl *asyncQueueControl;
244252
#defineQUEUE_HEAD(asyncQueueControl->head)
245253
#defineQUEUE_TAIL(asyncQueueControl->tail)
246254
#defineQUEUE_BACKEND_PID(i)(asyncQueueControl->backend[i].pid)
255+
#defineQUEUE_BACKEND_DBOID(i)(asyncQueueControl->backend[i].dboid)
247256
#defineQUEUE_BACKEND_POS(i)(asyncQueueControl->backend[i].pos)
248257

249258
/*
@@ -477,6 +486,7 @@ AsyncShmemInit(void)
477486
for (i=0;i <=MaxBackends;i++)
478487
{
479488
QUEUE_BACKEND_PID(i)=InvalidPid;
489+
QUEUE_BACKEND_DBOID(i)=InvalidOid;
480490
SET_QUEUE_POS(QUEUE_BACKEND_POS(i),0,0);
481491
}
482492
}
@@ -929,6 +939,10 @@ AtCommit_Notify(void)
929939
staticvoid
930940
Exec_ListenPreCommit(void)
931941
{
942+
QueuePositionhead;
943+
QueuePositionmax;
944+
inti;
945+
932946
/*
933947
* Nothing to do if we are already listening to something, nor if we
934948
* already ran this routine in this transaction.
@@ -956,10 +970,34 @@ Exec_ListenPreCommit(void)
956970
* over already-committed notifications. This ensures we cannot miss any
957971
* not-yet-committed notifications. We might get a few more but that
958972
* doesn't hurt.
973+
*
974+
* In some scenarios there might be a lot of committed notifications that
975+
* have not yet been pruned away (because some backend is being lazy about
976+
* reading them). To reduce our startup time, we can look at other
977+
* backends and adopt the maximum "pos" pointer of any backend that's in
978+
* our database; any notifications it's already advanced over are surely
979+
* committed and need not be re-examined by us. (We must consider only
980+
* backends connected to our DB, because others will not have bothered to
981+
* check committed-ness of notifications in our DB.) But we only bother
982+
* with that if there's more than a page worth of notifications
983+
* outstanding, otherwise scanning all the other backends isn't worth it.
984+
*
985+
* We need exclusive lock here so we can look at other backends' entries.
959986
*/
960-
LWLockAcquire(AsyncQueueLock,LW_SHARED);
961-
QUEUE_BACKEND_POS(MyBackendId)=QUEUE_TAIL;
987+
LWLockAcquire(AsyncQueueLock,LW_EXCLUSIVE);
988+
head=QUEUE_HEAD;
989+
max=QUEUE_TAIL;
990+
if (QUEUE_POS_PAGE(max)!=QUEUE_POS_PAGE(head))
991+
{
992+
for (i=1;i <=MaxBackends;i++)
993+
{
994+
if (QUEUE_BACKEND_DBOID(i)==MyDatabaseId)
995+
max=QUEUE_POS_MAX(max,QUEUE_BACKEND_POS(i));
996+
}
997+
}
998+
QUEUE_BACKEND_POS(MyBackendId)=max;
962999
QUEUE_BACKEND_PID(MyBackendId)=MyProcPid;
1000+
QUEUE_BACKEND_DBOID(MyBackendId)=MyDatabaseId;
9631001
LWLockRelease(AsyncQueueLock);
9641002

9651003
/* Now we are listed in the global array, so remember we're listening */
@@ -975,7 +1013,8 @@ Exec_ListenPreCommit(void)
9751013
*
9761014
* This will also advance the global tail pointer if possible.
9771015
*/
978-
asyncQueueReadAllNotifications();
1016+
if (!QUEUE_POS_EQUAL(max,head))
1017+
asyncQueueReadAllNotifications();
9791018
}
9801019

9811020
/*
@@ -1178,6 +1217,7 @@ asyncQueueUnregister(void)
11781217
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId),QUEUE_TAIL);
11791218
/* ... then mark it invalid */
11801219
QUEUE_BACKEND_PID(MyBackendId)=InvalidPid;
1220+
QUEUE_BACKEND_DBOID(MyBackendId)=InvalidOid;
11811221
LWLockRelease(AsyncQueueLock);
11821222

11831223
/* mark ourselves as no longer listed in the global array */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp