Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit797e9ea

Browse files
committed
Fix remaining race condition with CLOG truncation and LISTEN/NOTIFY
Previous commit fixed a bug where VACUUM would truncate the CLOGthat's still needed to check the commit status of XIDs in the asyncnotify queue, but as mentioned in the commit message, it wasn't a fullfix. If a backend is executing asyncQueueReadAllNotifications() andhas just made a local copy of an async SLRU page which contains oldXIDs, vacuum can concurrently truncate the CLOG covering those XIDs,and the backend still gets an error when it callsTransactionIdDidCommit() on those XIDs in the local copy. This commitfixes that race condition.To fix, hold the SLRU bank lock across the TransactionIdDidCommit()calls in NOTIFY processing.Per Tom Lane's idea. Backpatch to all supported versions.Reviewed-by: Joel Jacobson <joel@compiler.org>Reviewed-by: Arseniy Mukhin <arseniy.mukhin.dev@gmail.com>Discussion:https://www.postgresql.org/message-id/2759499.1761756503@sss.pgh.pa.usBackpatch-through: 14
1 parent8eeb4a0 commit797e9ea

File tree

1 file changed

+62
-61
lines changed

1 file changed

+62
-61
lines changed

‎src/backend/commands/async.c‎

Lines changed: 62 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,6 @@ static void SignalBackends(void);
448448
staticvoidasyncQueueReadAllNotifications(void);
449449
staticboolasyncQueueProcessPageEntries(QueuePosition*current,
450450
QueuePositionstop,
451-
char*page_buffer,
452451
Snapshotsnapshot);
453452
staticvoidasyncQueueAdvanceTail(void);
454453
staticvoidProcessIncomingNotify(boolflush);
@@ -1854,13 +1853,6 @@ asyncQueueReadAllNotifications(void)
18541853
QueuePositionhead;
18551854
Snapshotsnapshot;
18561855

1857-
/* page_buffer must be adequately aligned, so use a union */
1858-
union
1859-
{
1860-
charbuf[QUEUE_PAGESIZE];
1861-
AsyncQueueEntryalign;
1862-
}page_buffer;
1863-
18641856
/* Fetch current state */
18651857
LWLockAcquire(NotifyQueueLock,LW_SHARED);
18661858
/* Assert checks that we have a valid state entry */
@@ -1941,37 +1933,6 @@ asyncQueueReadAllNotifications(void)
19411933

19421934
do
19431935
{
1944-
int64curpage=QUEUE_POS_PAGE(pos);
1945-
intcuroffset=QUEUE_POS_OFFSET(pos);
1946-
intslotno;
1947-
intcopysize;
1948-
1949-
/*
1950-
* We copy the data from SLRU into a local buffer, so as to avoid
1951-
* holding the SLRU lock while we are examining the entries and
1952-
* possibly transmitting them to our frontend. Copy only the part
1953-
* of the page we will actually inspect.
1954-
*/
1955-
slotno=SimpleLruReadPage_ReadOnly(NotifyCtl,curpage,
1956-
InvalidTransactionId);
1957-
if (curpage==QUEUE_POS_PAGE(head))
1958-
{
1959-
/* we only want to read as far as head */
1960-
copysize=QUEUE_POS_OFFSET(head)-curoffset;
1961-
if (copysize<0)
1962-
copysize=0;/* just for safety */
1963-
}
1964-
else
1965-
{
1966-
/* fetch all the rest of the page */
1967-
copysize=QUEUE_PAGESIZE-curoffset;
1968-
}
1969-
memcpy(page_buffer.buf+curoffset,
1970-
NotifyCtl->shared->page_buffer[slotno]+curoffset,
1971-
copysize);
1972-
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
1973-
LWLockRelease(SimpleLruGetBankLock(NotifyCtl,curpage));
1974-
19751936
/*
19761937
* Process messages up to the stop position, end of page, or an
19771938
* uncommitted message.
@@ -1987,9 +1948,7 @@ asyncQueueReadAllNotifications(void)
19871948
* rewrite pages under us. Especially we don't want to hold a lock
19881949
* while sending the notifications to the frontend.
19891950
*/
1990-
reachedStop=asyncQueueProcessPageEntries(&pos,head,
1991-
page_buffer.buf,
1992-
snapshot);
1951+
reachedStop=asyncQueueProcessPageEntries(&pos,head,snapshot);
19931952
}while (!reachedStop);
19941953

19951954
/* Update shared state */
@@ -2008,13 +1967,6 @@ asyncQueueReadAllNotifications(void)
20081967
* Fetch notifications from the shared queue, beginning at position current,
20091968
* and deliver relevant ones to my frontend.
20101969
*
2011-
* The current page must have been fetched into page_buffer from shared
2012-
* memory. (We could access the page right in shared memory, but that
2013-
* would imply holding the SLRU bank lock throughout this routine.)
2014-
*
2015-
* We stop if we reach the "stop" position, or reach a notification from an
2016-
* uncommitted transaction, or reach the end of the page.
2017-
*
20181970
* The function returns true once we have reached the stop position or an
20191971
* uncommitted notification, and false if we have finished with the page.
20201972
* In other words: once it returns true there is no need to look further.
@@ -2023,16 +1975,34 @@ asyncQueueReadAllNotifications(void)
20231975
staticbool
20241976
asyncQueueProcessPageEntries(QueuePosition*current,
20251977
QueuePositionstop,
2026-
char*page_buffer,
20271978
Snapshotsnapshot)
20281979
{
1980+
int64curpage=QUEUE_POS_PAGE(*current);
1981+
intslotno;
1982+
char*page_buffer;
20291983
boolreachedStop= false;
20301984
boolreachedEndOfPage;
2031-
AsyncQueueEntry*qe;
1985+
1986+
/*
1987+
* We copy the entries into a local buffer to avoid holding the SLRU lock
1988+
* while we transmit them to our frontend. The local buffer must be
1989+
* adequately aligned, so use a union.
1990+
*/
1991+
union
1992+
{
1993+
charbuf[QUEUE_PAGESIZE];
1994+
AsyncQueueEntryalign;
1995+
}local_buf;
1996+
char*local_buf_end=local_buf.buf;
1997+
1998+
slotno=SimpleLruReadPage_ReadOnly(NotifyCtl,curpage,
1999+
InvalidTransactionId);
2000+
page_buffer=NotifyCtl->shared->page_buffer[slotno];
20322001

20332002
do
20342003
{
20352004
QueuePositionthisentry=*current;
2005+
AsyncQueueEntry*qe;
20362006

20372007
if (QUEUE_POS_EQUAL(thisentry,stop))
20382008
break;
@@ -2074,18 +2044,23 @@ asyncQueueProcessPageEntries(QueuePosition *current,
20742044
reachedStop= true;
20752045
break;
20762046
}
2077-
elseif (TransactionIdDidCommit(qe->xid))
2078-
{
2079-
/* qe->data is the null-terminated channel name */
2080-
char*channel=qe->data;
20812047

2082-
if (IsListeningOn(channel))
2083-
{
2084-
/* payload follows channel name */
2085-
char*payload=qe->data+strlen(channel)+1;
2048+
/*
2049+
* Quick check for the case that we're not listening on any
2050+
* channels, before calling TransactionIdDidCommit(). This makes
2051+
* that case a little faster, but more importantly, it ensures
2052+
* that if there's a bad entry in the queue for which
2053+
* TransactionIdDidCommit() fails for some reason, we can skip
2054+
* over it on the first LISTEN in a session, and not get stuck on
2055+
* it indefinitely.
2056+
*/
2057+
if (listenChannels==NIL)
2058+
continue;
20862059

2087-
NotifyMyFrontEnd(channel,payload,qe->srcPid);
2088-
}
2060+
if (TransactionIdDidCommit(qe->xid))
2061+
{
2062+
memcpy(local_buf_end,qe,qe->length);
2063+
local_buf_end+=qe->length;
20892064
}
20902065
else
20912066
{
@@ -2099,6 +2074,32 @@ asyncQueueProcessPageEntries(QueuePosition *current,
20992074
/* Loop back if we're not at end of page */
21002075
}while (!reachedEndOfPage);
21012076

2077+
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2078+
LWLockRelease(SimpleLruGetBankLock(NotifyCtl,curpage));
2079+
2080+
/*
2081+
* Now that we have let go of the SLRU bank lock, send the notifications
2082+
* to our backend
2083+
*/
2084+
Assert(local_buf_end-local_buf.buf <=BLCKSZ);
2085+
for (char*p=local_buf.buf;p<local_buf_end;)
2086+
{
2087+
AsyncQueueEntry*qe= (AsyncQueueEntry*)p;
2088+
2089+
/* qe->data is the null-terminated channel name */
2090+
char*channel=qe->data;
2091+
2092+
if (IsListeningOn(channel))
2093+
{
2094+
/* payload follows channel name */
2095+
char*payload=qe->data+strlen(channel)+1;
2096+
2097+
NotifyMyFrontEnd(channel,payload,qe->srcPid);
2098+
}
2099+
2100+
p+=qe->length;
2101+
}
2102+
21022103
if (QUEUE_POS_EQUAL(*current,stop))
21032104
reachedStop= true;
21042105

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp