Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4684643

Browse files
committed
shm_mq: Update mq_bytes_written less often.
Do not update shm_mq's mq_bytes_written until we have writtenan amount of data greater than 1/4th of the ring size, unlessthe caller of shm_mq_send(v) requests a flush at the end ofthe message. This reduces the number of calls to SetLatch(),and also the number of CPU cache misses, considerably, and thusmakes shm_mq significantly faster.Dilip Kumar, reviewed by Zhihong Yu and Tomas Vondra. Someminor cosmetic changes by me.Discussion:http://postgr.es/m/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com
1 parent7821a0b commit4684643

File tree

6 files changed

+69
-19
lines changed

6 files changed

+69
-19
lines changed

‎src/backend/executor/tqueue.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
6060

6161
/* Send the tuple itself. */
6262
tuple=ExecFetchSlotMinimalTuple(slot,&should_free);
63-
result=shm_mq_send(tqueue->queue,tuple->t_len,tuple, false);
63+
result=shm_mq_send(tqueue->queue,tuple->t_len,tuple, false, false);
6464

6565
if (should_free)
6666
pfree(tuple);

‎src/backend/libpq/pqmq.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,12 @@ mq_putmessage(char msgtype, const char *s, size_t len)
154154

155155
for (;;)
156156
{
157-
result=shm_mq_sendv(pq_mq_handle,iov,2, true);
157+
/*
158+
* Immediately notify the receiver by passing force_flush as true so
159+
* that the shared memory value is updated before we send the parallel
160+
* message signal right after this.
161+
*/
162+
result=shm_mq_sendv(pq_mq_handle,iov,2, true, true);
158163

159164
if (pq_mq_parallel_leader_pid!=0)
160165
SendProcSignal(pq_mq_parallel_leader_pid,

‎src/backend/storage/ipc/shm_mq.c

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ struct shm_mq
109109
* locally by copying the chunks into a backend-local buffer. mqh_buffer is
110110
* the buffer, and mqh_buflen is the number of bytes allocated for it.
111111
*
112+
* mqh_send_pending, is number of bytes that is written to the queue but not
113+
* yet updated in the shared memory. We will not update it until the written
114+
* data is 1/4th of the ring size or the tuple queue is full. This will
115+
* prevent frequent CPU cache misses, and it will also avoid frequent
116+
* SetLatch() calls, which are quite expensive.
117+
*
112118
* mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
113119
* are used to track the state of non-blocking operations. When the caller
114120
* attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
@@ -137,6 +143,7 @@ struct shm_mq_handle
137143
char*mqh_buffer;
138144
Sizemqh_buflen;
139145
Sizemqh_consume_pending;
146+
Sizemqh_send_pending;
140147
Sizemqh_partial_bytes;
141148
Sizemqh_expected_bytes;
142149
boolmqh_length_word_complete;
@@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
292299
mqh->mqh_buffer=NULL;
293300
mqh->mqh_buflen=0;
294301
mqh->mqh_consume_pending=0;
302+
mqh->mqh_send_pending=0;
295303
mqh->mqh_partial_bytes=0;
296304
mqh->mqh_expected_bytes=0;
297305
mqh->mqh_length_word_complete= false;
@@ -319,14 +327,15 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
319327
* Write a message into a shared message queue.
320328
*/
321329
shm_mq_result
322-
shm_mq_send(shm_mq_handle*mqh,Sizenbytes,constvoid*data,boolnowait)
330+
shm_mq_send(shm_mq_handle*mqh,Sizenbytes,constvoid*data,boolnowait,
331+
boolforce_flush)
323332
{
324333
shm_mq_ioveciov;
325334

326335
iov.data=data;
327336
iov.len=nbytes;
328337

329-
returnshm_mq_sendv(mqh,&iov,1,nowait);
338+
returnshm_mq_sendv(mqh,&iov,1,nowait,force_flush);
330339
}
331340

332341
/*
@@ -343,9 +352,15 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
343352
* arguments, each time the process latch is set. (Once begun, the sending
344353
* of a message cannot be aborted except by detaching from the queue; changing
345354
* the length or payload will corrupt the queue.)
355+
*
356+
* When force_flush = true, we immediately update the shm_mq's mq_bytes_written
357+
* and notify the receiver (if it is already attached). Otherwise, we don't
358+
* update it until we have written an amount of data greater than 1/4th of the
359+
* ring size.
346360
*/
347361
shm_mq_result
348-
shm_mq_sendv(shm_mq_handle*mqh,shm_mq_iovec*iov,intiovcnt,boolnowait)
362+
shm_mq_sendv(shm_mq_handle*mqh,shm_mq_iovec*iov,intiovcnt,boolnowait,
363+
boolforce_flush)
349364
{
350365
shm_mq_resultres;
351366
shm_mq*mq=mqh->mqh_queue;
@@ -518,8 +533,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
518533
mqh->mqh_counterparty_attached= true;
519534
}
520535

521-
/* Notify receiver of the newly-written data, and return. */
522-
SetLatch(&receiver->procLatch);
536+
/*
537+
* If the caller has requested force flush or we have written more than 1/4
538+
* of the ring size, mark it as written in shared memory and notify the
539+
* receiver.
540+
*/
541+
if (force_flush||mqh->mqh_send_pending> (mq->mq_ring_size >>2))
542+
{
543+
shm_mq_inc_bytes_written(mq,mqh->mqh_send_pending);
544+
SetLatch(&receiver->procLatch);
545+
mqh->mqh_send_pending=0;
546+
}
547+
523548
returnSHM_MQ_SUCCESS;
524549
}
525550

@@ -816,6 +841,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
816841
void
817842
shm_mq_detach(shm_mq_handle*mqh)
818843
{
844+
/* Before detaching, notify the receiver about any already-written data. */
845+
if (mqh->mqh_send_pending>0)
846+
{
847+
shm_mq_inc_bytes_written(mqh->mqh_queue,mqh->mqh_send_pending);
848+
mqh->mqh_send_pending=0;
849+
}
850+
819851
/* Notify counterparty that we're outta here. */
820852
shm_mq_detach_internal(mqh->mqh_queue);
821853

@@ -894,7 +926,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
894926

895927
/* Compute number of ring buffer bytes used and available. */
896928
rb=pg_atomic_read_u64(&mq->mq_bytes_read);
897-
wb=pg_atomic_read_u64(&mq->mq_bytes_written);
929+
wb=pg_atomic_read_u64(&mq->mq_bytes_written)+mqh->mqh_send_pending;
898930
Assert(wb >=rb);
899931
used=wb-rb;
900932
Assert(used <=ringsize);
@@ -951,6 +983,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
951983
}
952984
elseif (available==0)
953985
{
986+
/* Update the pending send bytes in the shared memory. */
987+
shm_mq_inc_bytes_written(mq,mqh->mqh_send_pending);
988+
954989
/*
955990
* Since mq->mqh_counterparty_attached is known to be true at this
956991
* point, mq_receiver has been set, and it can't change once set.
@@ -959,6 +994,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
959994
Assert(mqh->mqh_counterparty_attached);
960995
SetLatch(&mq->mq_receiver->procLatch);
961996

997+
/*
998+
* We have just updated the mqh_send_pending bytes in the shared
999+
* memory so reset it.
1000+
*/
1001+
mqh->mqh_send_pending=0;
1002+
9621003
/* Skip manipulation of our latch if nowait = true. */
9631004
if (nowait)
9641005
{
@@ -1009,13 +1050,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
10091050
* MAXIMUM_ALIGNOF, and each read is as well.
10101051
*/
10111052
Assert(sent==nbytes||sendnow==MAXALIGN(sendnow));
1012-
shm_mq_inc_bytes_written(mq,MAXALIGN(sendnow));
10131053

10141054
/*
1015-
* For efficiency, we don't set the reader's latch here. We'll do
1016-
* that only when the buffer fills up or after writing an entire
1017-
* message.
1055+
* For efficiency, we don't update the bytes written in the shared
1056+
* memory and also don't set the reader's latch here. Refer to
1057+
* the comments atop the shm_mq_handle structure for more
1058+
* information.
10181059
*/
1060+
mqh->mqh_send_pending+=MAXALIGN(sendnow);
10191061
}
10201062
}
10211063

‎src/include/storage/shm_mq.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,13 @@ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
7070

7171
/* Send or receive messages. */
7272
externshm_mq_resultshm_mq_send(shm_mq_handle*mqh,
73-
Sizenbytes,constvoid*data,boolnowait);
74-
externshm_mq_resultshm_mq_sendv(shm_mq_handle*mqh,
75-
shm_mq_iovec*iov,intiovcnt,boolnowait);
73+
Sizenbytes,constvoid*data,boolnowait,
74+
boolforce_flush);
75+
externshm_mq_resultshm_mq_sendv(shm_mq_handle*mqh,shm_mq_iovec*iov,
76+
intiovcnt,boolnowait,boolforce_flush);
7677
externshm_mq_resultshm_mq_receive(shm_mq_handle*mqh,
7778
Size*nbytesp,void**datap,boolnowait);
79+
externvoidshm_mq_flush(shm_mq_handle*mqh);
7880

7981
/* Wait for our counterparty to attach to the queue. */
8082
externshm_mq_resultshm_mq_wait_for_attach(shm_mq_handle*mqh);

‎src/test/modules/test_shm_mq/test.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
7373
test_shm_mq_setup(queue_size,nworkers,&seg,&outqh,&inqh);
7474

7575
/* Send the initial message. */
76-
res=shm_mq_send(outqh,message_size,message_contents, false);
76+
res=shm_mq_send(outqh,message_size,message_contents, false, true);
7777
if (res!=SHM_MQ_SUCCESS)
7878
ereport(ERROR,
7979
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -97,7 +97,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
9797
break;
9898

9999
/* Send it back out. */
100-
res=shm_mq_send(outqh,len,data, false);
100+
res=shm_mq_send(outqh,len,data, false, true);
101101
if (res!=SHM_MQ_SUCCESS)
102102
ereport(ERROR,
103103
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -177,7 +177,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS)
177177
*/
178178
if (send_count<loop_count)
179179
{
180-
res=shm_mq_send(outqh,message_size,message_contents, true);
180+
res=shm_mq_send(outqh,message_size,message_contents, true,
181+
true);
181182
if (res==SHM_MQ_SUCCESS)
182183
{
183184
++send_count;

‎src/test/modules/test_shm_mq/worker.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
190190
break;
191191

192192
/* Send it back out. */
193-
res=shm_mq_send(outqh,len,data, false);
193+
res=shm_mq_send(outqh,len,data, false, true);
194194
if (res!=SHM_MQ_SUCCESS)
195195
break;
196196
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp