Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit51004c7

Browse files
committed
Make some efficiency improvements in LISTEN/NOTIFY.
Move the responsibility for advancing the NOTIFY queue tail pointerfrom the listener(s) to the notification sender, and only have thesender do it once every few queue pages, rather than after every batchof notifications as at present. This reduces the number of times weexecute asyncQueueAdvanceTail, and reduces contention when there aremultiple listeners (since that function requires exclusive lock).This change relies on the observation that we don't really need the tailpointer to be exactly up-to-date. It's certainly not necessary toattempt to release disk space more often than once per SLRU segment.The only other usage of the tail pointer is that an incoming listener,if it's the only listener in its database, will need to scan the queueforward from the tail; but that's surely a less performance-criticalpath than routine sending and receiving of notifies. We compromise byadvancing the tail pointer after every 4 pages of output, so that itshouldn't get more than a few pages behind.Also, when sending signals to other backends after adding notifymessage(s) to the queue, recognize that only backends in our owndatabase are going to care about those messages, so only suchbackends really need to be awakened promptly. Backends in otherdatabases should get kicked if they're well behind on reading thequeue, else they'll hold back the global tail pointer; but wakeningthem for every single message is pointless. This change cansubstantially reduce signal traffic if listeners are spread amongmany databases. It won't help for the common case of only a singleactive database, but the extra check costs very little.Martijn van Oosterhout, with some adjustments by meDiscussion:https://postgr.es/m/CADWG95vtRBFDdrx1JdT1_9nhOFw48KaeTev6F_LtDQAFVpSPhA@mail.gmail.comDiscussion:https://postgr.es/m/CADWG95uFj8rLM52Er80JnhRsTbb_AqPP1ANHS8XQRGbqLrU+jA@mail.gmail.com
1 parent72c48c3 commit51004c7

File tree

1 file changed

+103
-78
lines changed

1 file changed

+103
-78
lines changed

‎src/backend/commands/async.c

Lines changed: 103 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@
7575
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
7676
* to every listening backend (we don't know which backend is listening on
7777
* which channel so we must signal them all). We can exclude backends that
78-
* are already up to date, though. We don't bother with a self-signal
79-
* either, but just process the queue directly.
78+
* are already up to date, though, and we can also exclude backends that
79+
* are in other databases (unless they are way behind and should be kicked
80+
* to make them advance their pointers). We don't bother with a
81+
* self-signal either, but just process the queue directly.
8082
*
8183
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
8284
* sets the process's latch, which triggers the event to be processed
@@ -89,13 +91,14 @@
8991
* Inbound-notify processing consists of reading all of the notifications
9092
* that have arrived since scanning last time. We read every notification
9193
* until we reach either a notification from an uncommitted transaction or
92-
* the head pointer's position. Then we check if we were the laziest
93-
* backend: if our pointer is set to the same position as the global tail
94-
* pointer is set, then we move the global tail pointer ahead to where the
95-
* second-laziest backend is (in general, we take the MIN of the current
96-
* head position and all active backends' new tail pointers). Whenever we
97-
* move the global tail pointer we also truncate now-unused pages (i.e.,
98-
* delete files in pg_notify/ that are no longer used).
94+
* the head pointer's position.
95+
*
96+
* 6. To avoid SLRU wraparound and limit disk space consumption, the tail
97+
* pointer needs to be advanced so that old pages can be truncated.
98+
* This is relatively expensive (notably, it requires an exclusive lock),
99+
* so we don't want to do it often. We make sending backends do this work
100+
* if they advanced the queue head into a new page, but only once every
101+
* QUEUE_CLEANUP_DELAY pages.
99102
*
100103
* An application that listens on the same channel it notifies will get
101104
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@@ -211,6 +214,19 @@ typedef struct QueuePosition
211214
(x).page != (y).page ? (x) : \
212215
(x).offset > (y).offset ? (x) : (y))
213216

217+
/*
218+
* Parameter determining how often we try to advance the tail pointer:
219+
* we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
220+
* also the distance by which a backend in another database needs to be
221+
* behind before we'll decide we need to wake it up to advance its pointer.
222+
*
223+
* Resist the temptation to make this really large. While that would save
224+
* work in some places, it would add cost in others. In particular, this
225+
* should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends
226+
* catch up before the pages they'll need to read fall out of SLRU cache.
227+
*/
228+
#defineQUEUE_CLEANUP_DELAY 4
229+
214230
/*
215231
* Struct describing a listening backend's status
216232
*/
@@ -252,8 +268,8 @@ typedef struct QueueBackendStatus
252268
typedefstructAsyncQueueControl
253269
{
254270
QueuePositionhead;/* head points to the next free location */
255-
QueuePositiontail;/*the globaltailis equivalent to thepos of
256-
*the "slowest" backend */
271+
QueuePositiontail;/* tailmust be <= thequeue position of every
272+
*listening backend */
257273
BackendIdfirstListener;/* id of first listener, or InvalidBackendId */
258274
TimestampTzlastQueueFillWarn;/* time of last queue-full msg */
259275
QueueBackendStatusbackend[FLEXIBLE_ARRAY_MEMBER];
@@ -402,10 +418,14 @@ static bool amRegisteredListener = false;
402418
/* has this backend sent notifications in the current transaction? */
403419
staticboolbackendHasSentNotifications= false;
404420

421+
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
422+
staticboolbackendTryAdvanceTail= false;
423+
405424
/* GUC parameter */
406425
boolTrace_notify= false;
407426

408427
/* local function prototypes */
428+
staticintasyncQueuePageDiff(intp,intq);
409429
staticboolasyncQueuePagePrecedes(intp,intq);
410430
staticvoidqueue_listen(ListenActionKindaction,constchar*channel);
411431
staticvoidAsync_UnlistenOnExit(intcode,Datumarg);
@@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
421441
staticListCell*asyncQueueAddEntries(ListCell*nextNotify);
422442
staticdoubleasyncQueueUsage(void);
423443
staticvoidasyncQueueFillWarning(void);
424-
staticboolSignalBackends(void);
444+
staticvoidSignalBackends(void);
425445
staticvoidasyncQueueReadAllNotifications(void);
426446
staticboolasyncQueueProcessPageEntries(volatileQueuePosition*current,
427447
QueuePositionstop,
@@ -436,10 +456,11 @@ static intnotification_match(const void *key1, const void *key2, Size keysize);
436456
staticvoidClearPendingActionsAndNotifies(void);
437457

438458
/*
439-
* We will work on the page range of 0..QUEUE_MAX_PAGE.
459+
* Compute the difference between two queue page numbers (i.e., p - q),
460+
* accounting for wraparound.
440461
*/
441-
staticbool
442-
asyncQueuePagePrecedes(intp,intq)
462+
staticint
463+
asyncQueuePageDiff(intp,intq)
443464
{
444465
intdiff;
445466

@@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q)
455476
diff-=QUEUE_MAX_PAGE+1;
456477
elseif (diff<-((QUEUE_MAX_PAGE+1) /2))
457478
diff+=QUEUE_MAX_PAGE+1;
458-
returndiff<0;
479+
returndiff;
480+
}
481+
482+
/* Is p < q, accounting for wraparound? */
483+
staticbool
484+
asyncQueuePagePrecedes(intp,intq)
485+
{
486+
returnasyncQueuePageDiff(p,q)<0;
459487
}
460488

461489
/*
@@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void)
10511079
* notification to the frontend. Also, although our transaction might
10521080
* have executed NOTIFY, those message(s) aren't queued yet so we can't
10531081
* see them in the queue.
1054-
*
1055-
* This will also advance the global tail pointer if possible.
10561082
*/
10571083
if (!QUEUE_POS_EQUAL(max,head))
10581084
asyncQueueReadAllNotifications();
@@ -1138,6 +1164,8 @@ Exec_UnlistenAllCommit(void)
11381164
* of a transaction. If we issued any notifications in the just-completed
11391165
* transaction, send signals to other backends to process them, and also
11401166
* process the queue ourselves to send messages to our own frontend.
1167+
* Also, if we filled enough queue pages with new notifies, try to advance
1168+
* the queue tail pointer.
11411169
*
11421170
* The reason that this is not done in AtCommit_Notify is that there is
11431171
* a nonzero chance of errors here (for example, encoding conversion errors
@@ -1156,7 +1184,6 @@ void
11561184
ProcessCompletedNotifies(void)
11571185
{
11581186
MemoryContextcaller_context;
1159-
boolsignalled;
11601187

11611188
/* Nothing to do if we didn't send any notifications */
11621189
if (!backendHasSentNotifications)
@@ -1185,23 +1212,20 @@ ProcessCompletedNotifies(void)
11851212
StartTransactionCommand();
11861213

11871214
/* Send signals to other backends */
1188-
signalled=SignalBackends();
1215+
SignalBackends();
11891216

11901217
if (listenChannels!=NIL)
11911218
{
11921219
/* Read the queue ourselves, and send relevant stuff to the frontend */
11931220
asyncQueueReadAllNotifications();
11941221
}
1195-
elseif (!signalled)
1222+
1223+
/*
1224+
* If it's time to try to advance the global tail pointer, do that.
1225+
*/
1226+
if (backendTryAdvanceTail)
11961227
{
1197-
/*
1198-
* If we found no other listening backends, and we aren't listening
1199-
* ourselves, then we must execute asyncQueueAdvanceTail to flush the
1200-
* queue, because ain't nobody else gonna do it. This prevents queue
1201-
* overflow when we're sending useless notifies to nobody. (A new
1202-
* listener could have joined since we looked, but if so this is
1203-
* harmless.)
1204-
*/
1228+
backendTryAdvanceTail= false;
12051229
asyncQueueAdvanceTail();
12061230
}
12071231

@@ -1242,8 +1266,6 @@ IsListeningOn(const char *channel)
12421266
staticvoid
12431267
asyncQueueUnregister(void)
12441268
{
1245-
booladvanceTail;
1246-
12471269
Assert(listenChannels==NIL);/* else caller error */
12481270

12491271
if (!amRegisteredListener)/* nothing to do */
@@ -1253,10 +1275,7 @@ asyncQueueUnregister(void)
12531275
* Need exclusive lock here to manipulate list links.
12541276
*/
12551277
LWLockAcquire(AsyncQueueLock,LW_EXCLUSIVE);
1256-
/* check if entry is valid and oldest ... */
1257-
advanceTail= (MyProcPid==QUEUE_BACKEND_PID(MyBackendId))&&
1258-
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId),QUEUE_TAIL);
1259-
/* ... then mark it invalid */
1278+
/* Mark our entry as invalid */
12601279
QUEUE_BACKEND_PID(MyBackendId)=InvalidPid;
12611280
QUEUE_BACKEND_DBOID(MyBackendId)=InvalidOid;
12621281
/* and remove it from the list */
@@ -1278,10 +1297,6 @@ asyncQueueUnregister(void)
12781297

12791298
/* mark ourselves as no longer listed in the global array */
12801299
amRegisteredListener= false;
1281-
1282-
/* If we were the laziest backend, try to advance the tail pointer */
1283-
if (advanceTail)
1284-
asyncQueueAdvanceTail();
12851300
}
12861301

12871302
/*
@@ -1467,6 +1482,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
14671482
* page without overrunning the queue.
14681483
*/
14691484
slotno=SimpleLruZeroPage(AsyncCtl,QUEUE_POS_PAGE(queue_head));
1485+
1486+
/*
1487+
* If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1488+
* set flag to remember that we should try to advance the tail
1489+
* pointer (we don't want to actually do that right here).
1490+
*/
1491+
if (QUEUE_POS_PAGE(queue_head) %QUEUE_CLEANUP_DELAY==0)
1492+
backendTryAdvanceTail= true;
1493+
14701494
/* And exit the loop */
14711495
break;
14721496
}
@@ -1570,31 +1594,30 @@ asyncQueueFillWarning(void)
15701594
}
15711595

15721596
/*
1573-
* Send signals toalllistening backends (except our own).
1597+
* Send signals to listening backends.
15741598
*
1575-
*Returns true if we sent at least one signal.
1599+
*We never signal our own process; that should be handled by our caller.
15761600
*
1577-
* Since we need EXCLUSIVE lock anyway we also check the position of the other
1578-
* backends and in case one is already up-to-date we don't signal it.
1579-
* This can happen if concurrent notifying transactions have sent a signal and
1580-
* the signaled backend has read the other notifications and ours in the same
1581-
* step.
1601+
* Normally we signal only backends in our own database, since only those
1602+
* backends could be interested in notifies we send. However, if there's
1603+
* notify traffic in our database but no traffic in another database that
1604+
* does have listener(s), those listeners will fall further and further
1605+
* behind. Waken them anyway if they're far enough behind, so that they'll
1606+
* advance their queue position pointers, allowing the global tail to advance.
15821607
*
15831608
* Since we know the BackendId and the Pid the signalling is quite cheap.
15841609
*/
1585-
staticbool
1610+
staticvoid
15861611
SignalBackends(void)
15871612
{
1588-
boolsignalled= false;
15891613
int32*pids;
15901614
BackendId*ids;
15911615
intcount;
1592-
int32pid;
15931616

15941617
/*
1595-
* Identifyallbackends thatare listening and not already up-to-date. We
1596-
*don't want to sendsignals while holding the AsyncQueueLock, sowejust
1597-
*build alist of target PIDs.
1618+
* Identify backends thatwe need to signal. We don't want to send
1619+
* signals while holding the AsyncQueueLock, sothis loopjust builds a
1620+
* list of target PIDs.
15981621
*
15991622
* XXX in principle these pallocs could fail, which would be bad. Maybe
16001623
* preallocate the arrays?But in practice this is only run in trivial
@@ -1607,26 +1630,43 @@ SignalBackends(void)
16071630
LWLockAcquire(AsyncQueueLock,LW_EXCLUSIVE);
16081631
for (BackendIdi=QUEUE_FIRST_LISTENER;i>0;i=QUEUE_NEXT_LISTENER(i))
16091632
{
1610-
pid=QUEUE_BACKEND_PID(i);
1633+
int32pid=QUEUE_BACKEND_PID(i);
1634+
QueuePositionpos;
1635+
16111636
Assert(pid!=InvalidPid);
1612-
if (pid!=MyProcPid)
1637+
if (pid==MyProcPid)
1638+
continue;/* never signal self */
1639+
pos=QUEUE_BACKEND_POS(i);
1640+
if (QUEUE_BACKEND_DBOID(i)==MyDatabaseId)
16131641
{
1614-
QueuePositionpos=QUEUE_BACKEND_POS(i);
1615-
1616-
if (!QUEUE_POS_EQUAL(pos,QUEUE_HEAD))
1617-
{
1618-
pids[count]=pid;
1619-
ids[count]=i;
1620-
count++;
1621-
}
1642+
/*
1643+
* Always signal listeners in our own database, unless they're
1644+
* already caught up (unlikely, but possible).
1645+
*/
1646+
if (QUEUE_POS_EQUAL(pos,QUEUE_HEAD))
1647+
continue;
1648+
}
1649+
else
1650+
{
1651+
/*
1652+
* Listeners in other databases should be signaled only if they
1653+
* are far behind.
1654+
*/
1655+
if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
1656+
QUEUE_POS_PAGE(pos))<QUEUE_CLEANUP_DELAY)
1657+
continue;
16221658
}
1659+
/* OK, need to signal this one */
1660+
pids[count]=pid;
1661+
ids[count]=i;
1662+
count++;
16231663
}
16241664
LWLockRelease(AsyncQueueLock);
16251665

16261666
/* Now send signals */
16271667
for (inti=0;i<count;i++)
16281668
{
1629-
pid=pids[i];
1669+
int32pid=pids[i];
16301670

16311671
/*
16321672
* Note: assuming things aren't broken, a signal failure here could
@@ -1636,14 +1676,10 @@ SignalBackends(void)
16361676
*/
16371677
if (SendProcSignal(pid,PROCSIG_NOTIFY_INTERRUPT,ids[i])<0)
16381678
elog(DEBUG3,"could not signal backend with PID %d: %m",pid);
1639-
else
1640-
signalled= true;
16411679
}
16421680

16431681
pfree(pids);
16441682
pfree(ids);
1645-
1646-
returnsignalled;
16471683
}
16481684

16491685
/*
@@ -1844,7 +1880,6 @@ asyncQueueReadAllNotifications(void)
18441880
QueuePositionoldpos;
18451881
QueuePositionhead;
18461882
Snapshotsnapshot;
1847-
booladvanceTail;
18481883

18491884
/* page_buffer must be adequately aligned, so use a union */
18501885
union
@@ -1966,27 +2001,17 @@ asyncQueueReadAllNotifications(void)
19662001
/* Update shared state */
19672002
LWLockAcquire(AsyncQueueLock,LW_SHARED);
19682003
QUEUE_BACKEND_POS(MyBackendId)=pos;
1969-
advanceTail=QUEUE_POS_EQUAL(oldpos,QUEUE_TAIL);
19702004
LWLockRelease(AsyncQueueLock);
19712005

1972-
/* If we were the laziest backend, try to advance the tail pointer */
1973-
if (advanceTail)
1974-
asyncQueueAdvanceTail();
1975-
19762006
PG_RE_THROW();
19772007
}
19782008
PG_END_TRY();
19792009

19802010
/* Update shared state */
19812011
LWLockAcquire(AsyncQueueLock,LW_SHARED);
19822012
QUEUE_BACKEND_POS(MyBackendId)=pos;
1983-
advanceTail=QUEUE_POS_EQUAL(oldpos,QUEUE_TAIL);
19842013
LWLockRelease(AsyncQueueLock);
19852014

1986-
/* If we were the laziest backend, try to advance the tail pointer */
1987-
if (advanceTail)
1988-
asyncQueueAdvanceTail();
1989-
19902015
/* Done with snapshot */
19912016
UnregisterSnapshot(snapshot);
19922017
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp