Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc2e58c0

Browse files
committed
Fix remaining race condition with CLOG truncation and LISTEN/NOTIFY
Previous commit fixed a bug where VACUUM would truncate the CLOGthat's still needed to check the commit status of XIDs in the asyncnotify queue, but as mentioned in the commit message, it wasn't a fullfix. If a backend is executing asyncQueueReadAllNotifications() andhas just made a local copy of an async SLRU page which contains oldXIDs, vacuum can concurrently truncate the CLOG covering those XIDs,and the backend still gets an error when it callsTransactionIdDidCommit() on those XIDs in the local copy. This commitfixes that race condition.To fix, hold the SLRU bank lock across the TransactionIdDidCommit()calls in NOTIFY processing.Per Tom Lane's idea. Backpatch to all supported versions.Reviewed-by: Joel Jacobson <joel@compiler.org>Reviewed-by: Arseniy Mukhin <arseniy.mukhin.dev@gmail.com>Discussion:https://www.postgresql.org/message-id/2759499.1761756503@sss.pgh.pa.usBackpatch-through: 14
1 parenteba917d commitc2e58c0

File tree

1 file changed

+62
-61
lines changed

1 file changed

+62
-61
lines changed

‎src/backend/commands/async.c‎

Lines changed: 62 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,6 @@ static void SignalBackends(void);
463463
staticvoidasyncQueueReadAllNotifications(void);
464464
staticboolasyncQueueProcessPageEntries(QueuePosition*current,
465465
QueuePositionstop,
466-
char*page_buffer,
467466
Snapshotsnapshot);
468467
staticvoidasyncQueueAdvanceTail(void);
469468
staticvoidProcessIncomingNotify(boolflush);
@@ -1917,13 +1916,6 @@ asyncQueueReadAllNotifications(void)
19171916
QueuePositionhead;
19181917
Snapshotsnapshot;
19191918

1920-
/* page_buffer must be adequately aligned, so use a union */
1921-
union
1922-
{
1923-
charbuf[QUEUE_PAGESIZE];
1924-
AsyncQueueEntryalign;
1925-
}page_buffer;
1926-
19271919
/* Fetch current state */
19281920
LWLockAcquire(NotifyQueueLock,LW_SHARED);
19291921
/* Assert checks that we have a valid state entry */
@@ -2004,37 +1996,6 @@ asyncQueueReadAllNotifications(void)
20041996

20051997
do
20061998
{
2007-
intcurpage=QUEUE_POS_PAGE(pos);
2008-
intcuroffset=QUEUE_POS_OFFSET(pos);
2009-
intslotno;
2010-
intcopysize;
2011-
2012-
/*
2013-
* We copy the data from SLRU into a local buffer, so as to avoid
2014-
* holding the NotifySLRULock while we are examining the entries
2015-
* and possibly transmitting them to our frontend. Copy only the
2016-
* part of the page we will actually inspect.
2017-
*/
2018-
slotno=SimpleLruReadPage_ReadOnly(NotifyCtl,curpage,
2019-
InvalidTransactionId);
2020-
if (curpage==QUEUE_POS_PAGE(head))
2021-
{
2022-
/* we only want to read as far as head */
2023-
copysize=QUEUE_POS_OFFSET(head)-curoffset;
2024-
if (copysize<0)
2025-
copysize=0;/* just for safety */
2026-
}
2027-
else
2028-
{
2029-
/* fetch all the rest of the page */
2030-
copysize=QUEUE_PAGESIZE-curoffset;
2031-
}
2032-
memcpy(page_buffer.buf+curoffset,
2033-
NotifyCtl->shared->page_buffer[slotno]+curoffset,
2034-
copysize);
2035-
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2036-
LWLockRelease(NotifySLRULock);
2037-
20381999
/*
20392000
* Process messages up to the stop position, end of page, or an
20402001
* uncommitted message.
@@ -2050,9 +2011,7 @@ asyncQueueReadAllNotifications(void)
20502011
* rewrite pages under us. Especially we don't want to hold a lock
20512012
* while sending the notifications to the frontend.
20522013
*/
2053-
reachedStop=asyncQueueProcessPageEntries(&pos,head,
2054-
page_buffer.buf,
2055-
snapshot);
2014+
reachedStop=asyncQueueProcessPageEntries(&pos,head,snapshot);
20562015
}while (!reachedStop);
20572016

20582017
/* Update shared state */
@@ -2071,13 +2030,6 @@ asyncQueueReadAllNotifications(void)
20712030
* Fetch notifications from the shared queue, beginning at position current,
20722031
* and deliver relevant ones to my frontend.
20732032
*
2074-
* The current page must have been fetched into page_buffer from shared
2075-
* memory. (We could access the page right in shared memory, but that
2076-
* would imply holding the NotifySLRULock throughout this routine.)
2077-
*
2078-
* We stop if we reach the "stop" position, or reach a notification from an
2079-
* uncommitted transaction, or reach the end of the page.
2080-
*
20812033
* The function returns true once we have reached the stop position or an
20822034
* uncommitted notification, and false if we have finished with the page.
20832035
* In other words: once it returns true there is no need to look further.
@@ -2086,16 +2038,34 @@ asyncQueueReadAllNotifications(void)
20862038
staticbool
20872039
asyncQueueProcessPageEntries(QueuePosition*current,
20882040
QueuePositionstop,
2089-
char*page_buffer,
20902041
Snapshotsnapshot)
20912042
{
2043+
int64curpage=QUEUE_POS_PAGE(*current);
2044+
intslotno;
2045+
char*page_buffer;
20922046
boolreachedStop= false;
20932047
boolreachedEndOfPage;
2094-
AsyncQueueEntry*qe;
2048+
2049+
/*
2050+
* We copy the entries into a local buffer to avoid holding the SLRU lock
2051+
* while we transmit them to our frontend. The local buffer must be
2052+
* adequately aligned, so use a union.
2053+
*/
2054+
union
2055+
{
2056+
charbuf[QUEUE_PAGESIZE];
2057+
AsyncQueueEntryalign;
2058+
}local_buf;
2059+
char*local_buf_end=local_buf.buf;
2060+
2061+
slotno=SimpleLruReadPage_ReadOnly(NotifyCtl,curpage,
2062+
InvalidTransactionId);
2063+
page_buffer=NotifyCtl->shared->page_buffer[slotno];
20952064

20962065
do
20972066
{
20982067
QueuePositionthisentry=*current;
2068+
AsyncQueueEntry*qe;
20992069

21002070
if (QUEUE_POS_EQUAL(thisentry,stop))
21012071
break;
@@ -2137,18 +2107,23 @@ asyncQueueProcessPageEntries(QueuePosition *current,
21372107
reachedStop= true;
21382108
break;
21392109
}
2140-
elseif (TransactionIdDidCommit(qe->xid))
2141-
{
2142-
/* qe->data is the null-terminated channel name */
2143-
char*channel=qe->data;
21442110

2145-
if (IsListeningOn(channel))
2146-
{
2147-
/* payload follows channel name */
2148-
char*payload=qe->data+strlen(channel)+1;
2111+
/*
2112+
* Quick check for the case that we're not listening on any
2113+
* channels, before calling TransactionIdDidCommit(). This makes
2114+
* that case a little faster, but more importantly, it ensures
2115+
* that if there's a bad entry in the queue for which
2116+
* TransactionIdDidCommit() fails for some reason, we can skip
2117+
* over it on the first LISTEN in a session, and not get stuck on
2118+
* it indefinitely.
2119+
*/
2120+
if (listenChannels==NIL)
2121+
continue;
21492122

2150-
NotifyMyFrontEnd(channel,payload,qe->srcPid);
2151-
}
2123+
if (TransactionIdDidCommit(qe->xid))
2124+
{
2125+
memcpy(local_buf_end,qe,qe->length);
2126+
local_buf_end+=qe->length;
21522127
}
21532128
else
21542129
{
@@ -2162,6 +2137,32 @@ asyncQueueProcessPageEntries(QueuePosition *current,
21622137
/* Loop back if we're not at end of page */
21632138
}while (!reachedEndOfPage);
21642139

2140+
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2141+
LWLockRelease(NotifySLRULock);
2142+
2143+
/*
2144+
* Now that we have let go of the SLRU bank lock, send the notifications
2145+
* to our backend
2146+
*/
2147+
Assert(local_buf_end-local_buf.buf <=BLCKSZ);
2148+
for (char*p=local_buf.buf;p<local_buf_end;)
2149+
{
2150+
AsyncQueueEntry*qe= (AsyncQueueEntry*)p;
2151+
2152+
/* qe->data is the null-terminated channel name */
2153+
char*channel=qe->data;
2154+
2155+
if (IsListeningOn(channel))
2156+
{
2157+
/* payload follows channel name */
2158+
char*payload=qe->data+strlen(channel)+1;
2159+
2160+
NotifyMyFrontEnd(channel,payload,qe->srcPid);
2161+
}
2162+
2163+
p+=qe->length;
2164+
}
2165+
21652166
if (QUEUE_POS_EQUAL(*current,stop))
21662167
reachedStop= true;
21672168

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp