@@ -143,10 +143,11 @@ struct shm_mq_handle
143143};
144144
145145static void shm_mq_detach_internal (shm_mq * mq );
146- static shm_mq_result shm_mq_send_bytes (shm_mq_handle * mq ,Size nbytes ,
146+ static shm_mq_result shm_mq_send_bytes (shm_mq_handle * mqh ,Size nbytes ,
147147const void * data ,bool nowait ,Size * bytes_written );
148- static shm_mq_result shm_mq_receive_bytes (shm_mq * mq ,Size bytes_needed ,
149- bool nowait ,Size * nbytesp ,void * * datap );
148+ static shm_mq_result shm_mq_receive_bytes (shm_mq_handle * mqh ,
149+ Size bytes_needed ,bool nowait ,Size * nbytesp ,
150+ void * * datap );
150151static bool shm_mq_counterparty_gone (shm_mq * mq ,
151152BackgroundWorkerHandle * handle );
152153static bool shm_mq_wait_internal (shm_mq * mq ,PGPROC * * ptr ,
@@ -582,8 +583,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
582583mqh -> mqh_counterparty_attached = true;
583584}
584585
585- /* Consume any zero-copy data from previous receive operation. */
586- if (mqh -> mqh_consume_pending > 0 )
586+ /*
587+ * If we've consumed an amount of data greater than 1/4th of the ring
588+ * size, mark it consumed in shared memory. We try to avoid doing this
589+ * unnecessarily when only a small amount of data has been consumed,
590+ * because SetLatch() is fairly expensive and we don't want to do it too
591+ * often.
592+ */
593+ if (mqh -> mqh_consume_pending > mq -> mq_ring_size /4 )
587594{
588595shm_mq_inc_bytes_read (mq ,mqh -> mqh_consume_pending );
589596mqh -> mqh_consume_pending = 0 ;
@@ -594,7 +601,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
594601{
595602/* Try to receive the message length word. */
596603Assert (mqh -> mqh_partial_bytes < sizeof (Size ));
597- res = shm_mq_receive_bytes (mq ,sizeof (Size )- mqh -> mqh_partial_bytes ,
604+ res = shm_mq_receive_bytes (mqh ,sizeof (Size )- mqh -> mqh_partial_bytes ,
598605nowait ,& rb ,& rawdata );
599606if (res != SHM_MQ_SUCCESS )
600607return res ;
@@ -614,13 +621,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
614621needed = MAXALIGN (sizeof (Size ))+ MAXALIGN (nbytes );
615622if (rb >=needed )
616623{
617- /*
618- * Technically, we could consume the message length
619- * information at this point, but the extra write to shared
620- * memory wouldn't be free and in most cases we would reap no
621- * benefit.
622- */
623- mqh -> mqh_consume_pending = needed ;
624+ mqh -> mqh_consume_pending += needed ;
624625* nbytesp = nbytes ;
625626* datap = ((char * )rawdata )+ MAXALIGN (sizeof (Size ));
626627return SHM_MQ_SUCCESS ;
@@ -632,7 +633,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
632633 */
633634mqh -> mqh_expected_bytes = nbytes ;
634635mqh -> mqh_length_word_complete = true;
635- shm_mq_inc_bytes_read ( mq , MAXALIGN (sizeof (Size ) ));
636+ mqh -> mqh_consume_pending += MAXALIGN (sizeof (Size ));
636637rb -= MAXALIGN (sizeof (Size ));
637638}
638639else
@@ -651,15 +652,15 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
651652}
652653Assert (mqh -> mqh_buflen >=sizeof (Size ));
653654
654- /* Copyand consume partial length word. */
655+ /* Copy partial length word; remember to consume it . */
655656if (mqh -> mqh_partial_bytes + rb > sizeof (Size ))
656657lengthbytes = sizeof (Size )- mqh -> mqh_partial_bytes ;
657658else
658659lengthbytes = rb ;
659660memcpy (& mqh -> mqh_buffer [mqh -> mqh_partial_bytes ],rawdata ,
660661lengthbytes );
661662mqh -> mqh_partial_bytes += lengthbytes ;
662- shm_mq_inc_bytes_read ( mq , MAXALIGN (lengthbytes ) );
663+ mqh -> mqh_consume_pending += MAXALIGN (lengthbytes );
663664rb -= lengthbytes ;
664665
665666/* If we now have the whole word, we're ready to read payload. */
@@ -681,13 +682,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
681682 * we need not copy the data and can return a pointer directly into
682683 * shared memory.
683684 */
684- res = shm_mq_receive_bytes (mq ,nbytes ,nowait ,& rb ,& rawdata );
685+ res = shm_mq_receive_bytes (mqh ,nbytes ,nowait ,& rb ,& rawdata );
685686if (res != SHM_MQ_SUCCESS )
686687return res ;
687688if (rb >=nbytes )
688689{
689690mqh -> mqh_length_word_complete = false;
690- mqh -> mqh_consume_pending = MAXALIGN (nbytes );
691+ mqh -> mqh_consume_pending + =MAXALIGN (nbytes );
691692* nbytesp = nbytes ;
692693* datap = rawdata ;
693694return SHM_MQ_SUCCESS ;
@@ -727,21 +728,21 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
727728mqh -> mqh_partial_bytes += rb ;
728729
729730/*
730- * Update count of bytesread, with alignment padding. Note that this
731- *will never actually insert any padding except at the end of a
732- * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
733- * and each read and write is as well.
731+ * Update count of bytesthat can be consumed, accounting for
732+ *alignment padding. Note that this will never actually insert any
733+ *padding except at the end of a message, because the buffer size is
734+ *a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
734735 */
735736Assert (mqh -> mqh_partial_bytes == nbytes || rb == MAXALIGN (rb ));
736- shm_mq_inc_bytes_read ( mq , MAXALIGN (rb ) );
737+ mqh -> mqh_consume_pending += MAXALIGN (rb );
737738
738739/* If we got all the data, exit the loop. */
739740if (mqh -> mqh_partial_bytes >=nbytes )
740741break ;
741742
742743/* Wait for some more data. */
743744still_needed = nbytes - mqh -> mqh_partial_bytes ;
744- res = shm_mq_receive_bytes (mq ,still_needed ,nowait ,& rb ,& rawdata );
745+ res = shm_mq_receive_bytes (mqh ,still_needed ,nowait ,& rb ,& rawdata );
745746if (res != SHM_MQ_SUCCESS )
746747return res ;
747748if (rb > still_needed )
@@ -1007,9 +1008,10 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
10071008 * is SHM_MQ_SUCCESS.
10081009 */
10091010static shm_mq_result
1010- shm_mq_receive_bytes (shm_mq * mq ,Size bytes_needed ,bool nowait ,
1011+ shm_mq_receive_bytes (shm_mq_handle * mqh ,Size bytes_needed ,bool nowait ,
10111012Size * nbytesp ,void * * datap )
10121013{
1014+ shm_mq * mq = mqh -> mqh_queue ;
10131015Size ringsize = mq -> mq_ring_size ;
10141016uint64 used ;
10151017uint64 written ;
@@ -1021,7 +1023,13 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
10211023
10221024/* Get bytes written, so we can compute what's available to read. */
10231025written = pg_atomic_read_u64 (& mq -> mq_bytes_written );
1024- read = pg_atomic_read_u64 (& mq -> mq_bytes_read );
1026+
1027+ /*
1028+ * Get bytes read. Include bytes we could consume but have not yet
1029+ * consumed.
1030+ */
1031+ read = pg_atomic_read_u64 (& mq -> mq_bytes_read )+
1032+ mqh -> mqh_consume_pending ;
10251033used = written - read ;
10261034Assert (used <=ringsize );
10271035offset = read % (uint64 )ringsize ;
@@ -1052,6 +1060,16 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
10521060if (mq -> mq_detached )
10531061return SHM_MQ_DETACHED ;
10541062
1063+ /*
1064+ * We didn't get enough data to satisfy the request, so mark any data
1065+ * previously-consumed as read to make more buffer space.
1066+ */
1067+ if (mqh -> mqh_consume_pending > 0 )
1068+ {
1069+ shm_mq_inc_bytes_read (mq ,mqh -> mqh_consume_pending );
1070+ mqh -> mqh_consume_pending = 0 ;
1071+ }
1072+
10551073/* Skip manipulation of our latch if nowait = true. */
10561074if (nowait )
10571075return SHM_MQ_WOULD_BLOCK ;