@@ -448,7 +448,6 @@ static void SignalBackends(void);
448448static void asyncQueueReadAllNotifications (void );
449449static bool asyncQueueProcessPageEntries (QueuePosition * current ,
450450QueuePosition stop ,
451- char * page_buffer ,
452451Snapshot snapshot );
453452static void asyncQueueAdvanceTail (void );
454453static void ProcessIncomingNotify (bool flush );
@@ -1854,13 +1853,6 @@ asyncQueueReadAllNotifications(void)
18541853QueuePosition head ;
18551854Snapshot snapshot ;
18561855
1857- /* page_buffer must be adequately aligned, so use a union */
1858- union
1859- {
1860- char buf [QUEUE_PAGESIZE ];
1861- AsyncQueueEntry align ;
1862- }page_buffer ;
1863-
18641856/* Fetch current state */
18651857LWLockAcquire (NotifyQueueLock ,LW_SHARED );
18661858/* Assert checks that we have a valid state entry */
@@ -1941,37 +1933,6 @@ asyncQueueReadAllNotifications(void)
19411933
19421934do
19431935{
1944- int64 curpage = QUEUE_POS_PAGE (pos );
1945- int curoffset = QUEUE_POS_OFFSET (pos );
1946- int slotno ;
1947- int copysize ;
1948-
1949- /*
1950- * We copy the data from SLRU into a local buffer, so as to avoid
1951- * holding the SLRU lock while we are examining the entries and
1952- * possibly transmitting them to our frontend. Copy only the part
1953- * of the page we will actually inspect.
1954- */
1955- slotno = SimpleLruReadPage_ReadOnly (NotifyCtl ,curpage ,
1956- InvalidTransactionId );
1957- if (curpage == QUEUE_POS_PAGE (head ))
1958- {
1959- /* we only want to read as far as head */
1960- copysize = QUEUE_POS_OFFSET (head )- curoffset ;
1961- if (copysize < 0 )
1962- copysize = 0 ;/* just for safety */
1963- }
1964- else
1965- {
1966- /* fetch all the rest of the page */
1967- copysize = QUEUE_PAGESIZE - curoffset ;
1968- }
1969- memcpy (page_buffer .buf + curoffset ,
1970- NotifyCtl -> shared -> page_buffer [slotno ]+ curoffset ,
1971- copysize );
1972- /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
1973- LWLockRelease (SimpleLruGetBankLock (NotifyCtl ,curpage ));
1974-
19751936/*
19761937 * Process messages up to the stop position, end of page, or an
19771938 * uncommitted message.
@@ -1987,9 +1948,7 @@ asyncQueueReadAllNotifications(void)
19871948 * rewrite pages under us. Especially we don't want to hold a lock
19881949 * while sending the notifications to the frontend.
19891950 */
1990- reachedStop = asyncQueueProcessPageEntries (& pos ,head ,
1991- page_buffer .buf ,
1992- snapshot );
1951+ reachedStop = asyncQueueProcessPageEntries (& pos ,head ,snapshot );
19931952}while (!reachedStop );
19941953
19951954/* Update shared state */
@@ -2008,13 +1967,6 @@ asyncQueueReadAllNotifications(void)
20081967 * Fetch notifications from the shared queue, beginning at position current,
20091968 * and deliver relevant ones to my frontend.
20101969 *
2011- * The current page must have been fetched into page_buffer from shared
2012- * memory. (We could access the page right in shared memory, but that
2013- * would imply holding the SLRU bank lock throughout this routine.)
2014- *
2015- * We stop if we reach the "stop" position, or reach a notification from an
2016- * uncommitted transaction, or reach the end of the page.
2017- *
20181970 * The function returns true once we have reached the stop position or an
20191971 * uncommitted notification, and false if we have finished with the page.
20201972 * In other words: once it returns true there is no need to look further.
@@ -2023,16 +1975,34 @@ asyncQueueReadAllNotifications(void)
20231975static bool
20241976asyncQueueProcessPageEntries (QueuePosition * current ,
20251977QueuePosition stop ,
2026- char * page_buffer ,
20271978Snapshot snapshot )
20281979{
1980+ int64 curpage = QUEUE_POS_PAGE (* current );
1981+ int slotno ;
1982+ char * page_buffer ;
20291983bool reachedStop = false;
20301984bool reachedEndOfPage ;
2031- AsyncQueueEntry * qe ;
1985+
1986+ /*
1987+ * We copy the entries into a local buffer to avoid holding the SLRU lock
1988+ * while we transmit them to our frontend. The local buffer must be
1989+ * adequately aligned, so use a union.
1990+ */
1991+ union
1992+ {
1993+ char buf [QUEUE_PAGESIZE ];
1994+ AsyncQueueEntry align ;
1995+ }local_buf ;
1996+ char * local_buf_end = local_buf .buf ;
1997+
1998+ slotno = SimpleLruReadPage_ReadOnly (NotifyCtl ,curpage ,
1999+ InvalidTransactionId );
2000+ page_buffer = NotifyCtl -> shared -> page_buffer [slotno ];
20322001
20332002do
20342003{
20352004QueuePosition thisentry = * current ;
2005+ AsyncQueueEntry * qe ;
20362006
20372007if (QUEUE_POS_EQUAL (thisentry ,stop ))
20382008break ;
@@ -2074,18 +2044,23 @@ asyncQueueProcessPageEntries(QueuePosition *current,
20742044reachedStop = true;
20752045break ;
20762046}
2077- else if (TransactionIdDidCommit (qe -> xid ))
2078- {
2079- /* qe->data is the null-terminated channel name */
2080- char * channel = qe -> data ;
20812047
2082- if (IsListeningOn (channel ))
2083- {
2084- /* payload follows channel name */
2085- char * payload = qe -> data + strlen (channel )+ 1 ;
2048+ /*
2049+ * Quick check for the case that we're not listening on any
2050+ * channels, before calling TransactionIdDidCommit(). This makes
2051+ * that case a little faster, but more importantly, it ensures
2052+ * that if there's a bad entry in the queue for which
2053+ * TransactionIdDidCommit() fails for some reason, we can skip
2054+ * over it on the first LISTEN in a session, and not get stuck on
2055+ * it indefinitely.
2056+ */
2057+ if (listenChannels == NIL )
2058+ continue ;
20862059
2087- NotifyMyFrontEnd (channel ,payload ,qe -> srcPid );
2088- }
2060+ if (TransactionIdDidCommit (qe -> xid ))
2061+ {
2062+ memcpy (local_buf_end ,qe ,qe -> length );
2063+ local_buf_end += qe -> length ;
20892064}
20902065else
20912066{
@@ -2099,6 +2074,32 @@ asyncQueueProcessPageEntries(QueuePosition *current,
20992074/* Loop back if we're not at end of page */
21002075}while (!reachedEndOfPage );
21012076
2077+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2078+ LWLockRelease (SimpleLruGetBankLock (NotifyCtl ,curpage ));
2079+
2080+ /*
2081+ * Now that we have let go of the SLRU bank lock, send the notifications
2082+ * to our backend
2083+ */
2084+ Assert (local_buf_end - local_buf .buf <=BLCKSZ );
2085+ for (char * p = local_buf .buf ;p < local_buf_end ;)
2086+ {
2087+ AsyncQueueEntry * qe = (AsyncQueueEntry * )p ;
2088+
2089+ /* qe->data is the null-terminated channel name */
2090+ char * channel = qe -> data ;
2091+
2092+ if (IsListeningOn (channel ))
2093+ {
2094+ /* payload follows channel name */
2095+ char * payload = qe -> data + strlen (channel )+ 1 ;
2096+
2097+ NotifyMyFrontEnd (channel ,payload ,qe -> srcPid );
2098+ }
2099+
2100+ p += qe -> length ;
2101+ }
2102+
21022103if (QUEUE_POS_EQUAL (* current ,stop ))
21032104reachedStop = true;
21042105