@@ -463,7 +463,6 @@ static void SignalBackends(void);
463463static void asyncQueueReadAllNotifications (void );
464464static bool asyncQueueProcessPageEntries (QueuePosition * current ,
465465QueuePosition stop ,
466- char * page_buffer ,
467466Snapshot snapshot );
468467static void asyncQueueAdvanceTail (void );
469468static void ProcessIncomingNotify (bool flush );
@@ -1917,13 +1916,6 @@ asyncQueueReadAllNotifications(void)
19171916QueuePosition head ;
19181917Snapshot snapshot ;
19191918
1920- /* page_buffer must be adequately aligned, so use a union */
1921- union
1922- {
1923- char buf [QUEUE_PAGESIZE ];
1924- AsyncQueueEntry align ;
1925- }page_buffer ;
1926-
19271919/* Fetch current state */
19281920LWLockAcquire (NotifyQueueLock ,LW_SHARED );
19291921/* Assert checks that we have a valid state entry */
@@ -2004,37 +1996,6 @@ asyncQueueReadAllNotifications(void)
20041996
20051997do
20061998{
2007- int curpage = QUEUE_POS_PAGE (pos );
2008- int curoffset = QUEUE_POS_OFFSET (pos );
2009- int slotno ;
2010- int copysize ;
2011-
2012- /*
2013- * We copy the data from SLRU into a local buffer, so as to avoid
2014- * holding the NotifySLRULock while we are examining the entries
2015- * and possibly transmitting them to our frontend. Copy only the
2016- * part of the page we will actually inspect.
2017- */
2018- slotno = SimpleLruReadPage_ReadOnly (NotifyCtl ,curpage ,
2019- InvalidTransactionId );
2020- if (curpage == QUEUE_POS_PAGE (head ))
2021- {
2022- /* we only want to read as far as head */
2023- copysize = QUEUE_POS_OFFSET (head )- curoffset ;
2024- if (copysize < 0 )
2025- copysize = 0 ;/* just for safety */
2026- }
2027- else
2028- {
2029- /* fetch all the rest of the page */
2030- copysize = QUEUE_PAGESIZE - curoffset ;
2031- }
2032- memcpy (page_buffer .buf + curoffset ,
2033- NotifyCtl -> shared -> page_buffer [slotno ]+ curoffset ,
2034- copysize );
2035- /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2036- LWLockRelease (NotifySLRULock );
2037-
20381999/*
20392000 * Process messages up to the stop position, end of page, or an
20402001 * uncommitted message.
@@ -2050,9 +2011,7 @@ asyncQueueReadAllNotifications(void)
20502011 * rewrite pages under us. Especially we don't want to hold a lock
20512012 * while sending the notifications to the frontend.
20522013 */
2053- reachedStop = asyncQueueProcessPageEntries (& pos ,head ,
2054- page_buffer .buf ,
2055- snapshot );
2014+ reachedStop = asyncQueueProcessPageEntries (& pos ,head ,snapshot );
20562015}while (!reachedStop );
20572016
20582017/* Update shared state */
@@ -2071,13 +2030,6 @@ asyncQueueReadAllNotifications(void)
20712030 * Fetch notifications from the shared queue, beginning at position current,
20722031 * and deliver relevant ones to my frontend.
20732032 *
2074- * The current page must have been fetched into page_buffer from shared
2075- * memory. (We could access the page right in shared memory, but that
2076- * would imply holding the NotifySLRULock throughout this routine.)
2077- *
2078- * We stop if we reach the "stop" position, or reach a notification from an
2079- * uncommitted transaction, or reach the end of the page.
2080- *
20812033 * The function returns true once we have reached the stop position or an
20822034 * uncommitted notification, and false if we have finished with the page.
20832035 * In other words: once it returns true there is no need to look further.
@@ -2086,16 +2038,34 @@ asyncQueueReadAllNotifications(void)
20862038static bool
20872039asyncQueueProcessPageEntries (QueuePosition * current ,
20882040QueuePosition stop ,
2089- char * page_buffer ,
20902041Snapshot snapshot )
20912042{
2043+ int64 curpage = QUEUE_POS_PAGE (* current );
2044+ int slotno ;
2045+ char * page_buffer ;
20922046bool reachedStop = false;
20932047bool reachedEndOfPage ;
2094- AsyncQueueEntry * qe ;
2048+
2049+ /*
2050+ * We copy the entries into a local buffer to avoid holding the SLRU lock
2051+ * while we transmit them to our frontend. The local buffer must be
2052+ * adequately aligned, so use a union.
2053+ */
2054+ union
2055+ {
2056+ char buf [QUEUE_PAGESIZE ];
2057+ AsyncQueueEntry align ;
2058+ }local_buf ;
2059+ char * local_buf_end = local_buf .buf ;
2060+
2061+ slotno = SimpleLruReadPage_ReadOnly (NotifyCtl ,curpage ,
2062+ InvalidTransactionId );
2063+ page_buffer = NotifyCtl -> shared -> page_buffer [slotno ];
20952064
20962065do
20972066{
20982067QueuePosition thisentry = * current ;
2068+ AsyncQueueEntry * qe ;
20992069
21002070if (QUEUE_POS_EQUAL (thisentry ,stop ))
21012071break ;
@@ -2137,18 +2107,23 @@ asyncQueueProcessPageEntries(QueuePosition *current,
21372107reachedStop = true;
21382108break ;
21392109}
2140- else if (TransactionIdDidCommit (qe -> xid ))
2141- {
2142- /* qe->data is the null-terminated channel name */
2143- char * channel = qe -> data ;
21442110
2145- if (IsListeningOn (channel ))
2146- {
2147- /* payload follows channel name */
2148- char * payload = qe -> data + strlen (channel )+ 1 ;
2111+ /*
2112+ * Quick check for the case that we're not listening on any
2113+ * channels, before calling TransactionIdDidCommit(). This makes
2114+ * that case a little faster, but more importantly, it ensures
2115+ * that if there's a bad entry in the queue for which
2116+ * TransactionIdDidCommit() fails for some reason, we can skip
2117+ * over it on the first LISTEN in a session, and not get stuck on
2118+ * it indefinitely.
2119+ */
2120+ if (listenChannels == NIL )
2121+ continue ;
21492122
2150- NotifyMyFrontEnd (channel ,payload ,qe -> srcPid );
2151- }
2123+ if (TransactionIdDidCommit (qe -> xid ))
2124+ {
2125+ memcpy (local_buf_end ,qe ,qe -> length );
2126+ local_buf_end += qe -> length ;
21522127}
21532128else
21542129{
@@ -2162,6 +2137,32 @@ asyncQueueProcessPageEntries(QueuePosition *current,
21622137/* Loop back if we're not at end of page */
21632138}while (!reachedEndOfPage );
21642139
2140+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2141+ LWLockRelease (NotifySLRULock );
2142+
2143+ /*
2144+ * Now that we have let go of the SLRU bank lock, send the notifications
2145+ * to our backend
2146+ */
2147+ Assert (local_buf_end - local_buf .buf <=BLCKSZ );
2148+ for (char * p = local_buf .buf ;p < local_buf_end ;)
2149+ {
2150+ AsyncQueueEntry * qe = (AsyncQueueEntry * )p ;
2151+
2152+ /* qe->data is the null-terminated channel name */
2153+ char * channel = qe -> data ;
2154+
2155+ if (IsListeningOn (channel ))
2156+ {
2157+ /* payload follows channel name */
2158+ char * payload = qe -> data + strlen (channel )+ 1 ;
2159+
2160+ NotifyMyFrontEnd (channel ,payload ,qe -> srcPid );
2161+ }
2162+
2163+ p += qe -> length ;
2164+ }
2165+
21652166if (QUEUE_POS_EQUAL (* current ,stop ))
21662167reachedStop = true;
21672168