Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit497171d

Browse files
committed
shm_mq: Have the receiver set the sender's less frequently.
Instead of marking data from the ringer buffer consumed and setting thesender's latch for every message, do it only when the amount of data wecan consume is at least 1/4 of the size of the ring buffer, or when nodata remains in the ring buffer. This is dramatically faster in mytesting; apparently, the savings from sending signals less frequentlyoutweighs the benefit of letting the sender know about available bufferspace sooner.Patch by me, reviewed by Andres Freund and tested by Rafia Sabih.Discussion:http://postgr.es/m/CA+TgmoYK7RFj6r7KLEfSGtYZCi3zqTRhAz8mcsDbUAjEmLOZ3Q@mail.gmail.com
1 parent34db06e commit497171d

File tree

1 file changed

+44
-26
lines changed

1 file changed

+44
-26
lines changed

‎src/backend/storage/ipc/shm_mq.c

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,11 @@ struct shm_mq_handle
143143
};
144144

145145
staticvoidshm_mq_detach_internal(shm_mq*mq);
146-
staticshm_mq_resultshm_mq_send_bytes(shm_mq_handle*mq,Sizenbytes,
146+
staticshm_mq_resultshm_mq_send_bytes(shm_mq_handle*mqh,Sizenbytes,
147147
constvoid*data,boolnowait,Size*bytes_written);
148-
staticshm_mq_resultshm_mq_receive_bytes(shm_mq*mq,Sizebytes_needed,
149-
boolnowait,Size*nbytesp,void**datap);
148+
staticshm_mq_resultshm_mq_receive_bytes(shm_mq_handle*mqh,
149+
Sizebytes_needed,boolnowait,Size*nbytesp,
150+
void**datap);
150151
staticboolshm_mq_counterparty_gone(shm_mq*mq,
151152
BackgroundWorkerHandle*handle);
152153
staticboolshm_mq_wait_internal(shm_mq*mq,PGPROC**ptr,
@@ -582,8 +583,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
582583
mqh->mqh_counterparty_attached= true;
583584
}
584585

585-
/* Consume any zero-copy data from previous receive operation. */
586-
if (mqh->mqh_consume_pending>0)
586+
/*
587+
* If we've consumed an amount of data greater than 1/4th of the ring
588+
* size, mark it consumed in shared memory. We try to avoid doing this
589+
* unnecessarily when only a small amount of data has been consumed,
590+
* because SetLatch() is fairly expensive and we don't want to do it too
591+
* often.
592+
*/
593+
if (mqh->mqh_consume_pending>mq->mq_ring_size /4)
587594
{
588595
shm_mq_inc_bytes_read(mq,mqh->mqh_consume_pending);
589596
mqh->mqh_consume_pending=0;
@@ -594,7 +601,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
594601
{
595602
/* Try to receive the message length word. */
596603
Assert(mqh->mqh_partial_bytes<sizeof(Size));
597-
res=shm_mq_receive_bytes(mq,sizeof(Size)-mqh->mqh_partial_bytes,
604+
res=shm_mq_receive_bytes(mqh,sizeof(Size)-mqh->mqh_partial_bytes,
598605
nowait,&rb,&rawdata);
599606
if (res!=SHM_MQ_SUCCESS)
600607
returnres;
@@ -614,13 +621,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
614621
needed=MAXALIGN(sizeof(Size))+MAXALIGN(nbytes);
615622
if (rb >=needed)
616623
{
617-
/*
618-
* Technically, we could consume the message length
619-
* information at this point, but the extra write to shared
620-
* memory wouldn't be free and in most cases we would reap no
621-
* benefit.
622-
*/
623-
mqh->mqh_consume_pending=needed;
624+
mqh->mqh_consume_pending+=needed;
624625
*nbytesp=nbytes;
625626
*datap= ((char*)rawdata)+MAXALIGN(sizeof(Size));
626627
returnSHM_MQ_SUCCESS;
@@ -632,7 +633,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
632633
*/
633634
mqh->mqh_expected_bytes=nbytes;
634635
mqh->mqh_length_word_complete= true;
635-
shm_mq_inc_bytes_read(mq,MAXALIGN(sizeof(Size)));
636+
mqh->mqh_consume_pending+=MAXALIGN(sizeof(Size));
636637
rb-=MAXALIGN(sizeof(Size));
637638
}
638639
else
@@ -651,15 +652,15 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
651652
}
652653
Assert(mqh->mqh_buflen >=sizeof(Size));
653654

654-
/* Copyand consumepartial length word. */
655+
/* Copy partial length word; remember to consume it. */
655656
if (mqh->mqh_partial_bytes+rb>sizeof(Size))
656657
lengthbytes=sizeof(Size)-mqh->mqh_partial_bytes;
657658
else
658659
lengthbytes=rb;
659660
memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes],rawdata,
660661
lengthbytes);
661662
mqh->mqh_partial_bytes+=lengthbytes;
662-
shm_mq_inc_bytes_read(mq,MAXALIGN(lengthbytes));
663+
mqh->mqh_consume_pending+=MAXALIGN(lengthbytes);
663664
rb-=lengthbytes;
664665

665666
/* If we now have the whole word, we're ready to read payload. */
@@ -681,13 +682,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
681682
* we need not copy the data and can return a pointer directly into
682683
* shared memory.
683684
*/
684-
res=shm_mq_receive_bytes(mq,nbytes,nowait,&rb,&rawdata);
685+
res=shm_mq_receive_bytes(mqh,nbytes,nowait,&rb,&rawdata);
685686
if (res!=SHM_MQ_SUCCESS)
686687
returnres;
687688
if (rb >=nbytes)
688689
{
689690
mqh->mqh_length_word_complete= false;
690-
mqh->mqh_consume_pending=MAXALIGN(nbytes);
691+
mqh->mqh_consume_pending+=MAXALIGN(nbytes);
691692
*nbytesp=nbytes;
692693
*datap=rawdata;
693694
returnSHM_MQ_SUCCESS;
@@ -727,21 +728,21 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
727728
mqh->mqh_partial_bytes+=rb;
728729

729730
/*
730-
* Update count of bytesread, with alignment padding. Note that this
731-
*will never actually insert any padding except at the end of a
732-
* message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
733-
* and each read and write is as well.
731+
* Update count of bytesthat can be consumed, accounting for
732+
*alignment padding. Note that this will never actually insert any
733+
*padding except at the end of amessage, because the buffer size is
734+
*a multiple of MAXIMUM_ALIGNOF,and each read and write is as well.
734735
*/
735736
Assert(mqh->mqh_partial_bytes==nbytes||rb==MAXALIGN(rb));
736-
shm_mq_inc_bytes_read(mq,MAXALIGN(rb));
737+
mqh->mqh_consume_pending+=MAXALIGN(rb);
737738

738739
/* If we got all the data, exit the loop. */
739740
if (mqh->mqh_partial_bytes >=nbytes)
740741
break;
741742

742743
/* Wait for some more data. */
743744
still_needed=nbytes-mqh->mqh_partial_bytes;
744-
res=shm_mq_receive_bytes(mq,still_needed,nowait,&rb,&rawdata);
745+
res=shm_mq_receive_bytes(mqh,still_needed,nowait,&rb,&rawdata);
745746
if (res!=SHM_MQ_SUCCESS)
746747
returnres;
747748
if (rb>still_needed)
@@ -1007,9 +1008,10 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
10071008
* is SHM_MQ_SUCCESS.
10081009
*/
10091010
staticshm_mq_result
1010-
shm_mq_receive_bytes(shm_mq*mq,Sizebytes_needed,boolnowait,
1011+
shm_mq_receive_bytes(shm_mq_handle*mqh,Sizebytes_needed,boolnowait,
10111012
Size*nbytesp,void**datap)
10121013
{
1014+
shm_mq*mq=mqh->mqh_queue;
10131015
Sizeringsize=mq->mq_ring_size;
10141016
uint64used;
10151017
uint64written;
@@ -1021,7 +1023,13 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
10211023

10221024
/* Get bytes written, so we can compute what's available to read. */
10231025
written=pg_atomic_read_u64(&mq->mq_bytes_written);
1024-
read=pg_atomic_read_u64(&mq->mq_bytes_read);
1026+
1027+
/*
1028+
* Get bytes read. Include bytes we could consume but have not yet
1029+
* consumed.
1030+
*/
1031+
read=pg_atomic_read_u64(&mq->mq_bytes_read)+
1032+
mqh->mqh_consume_pending;
10251033
used=written-read;
10261034
Assert(used <=ringsize);
10271035
offset=read % (uint64)ringsize;
@@ -1052,6 +1060,16 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
10521060
if (mq->mq_detached)
10531061
returnSHM_MQ_DETACHED;
10541062

1063+
/*
1064+
* We didn't get enough data to satisfy the request, so mark any data
1065+
* previously-consumed as read to make more buffer space.
1066+
*/
1067+
if (mqh->mqh_consume_pending>0)
1068+
{
1069+
shm_mq_inc_bytes_read(mq,mqh->mqh_consume_pending);
1070+
mqh->mqh_consume_pending=0;
1071+
}
1072+
10551073
/* Skip manipulation of our latch if nowait = true. */
10561074
if (nowait)
10571075
returnSHM_MQ_WOULD_BLOCK;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp