Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7bb0e97

Browse files
committed
Extend shm_mq API with new functions shm_mq_sendv, shm_mq_set_handle.
shm_mq_sendv sends a message to the queue assembled from multiplelocations. This is expected to be used by forthcoming patches toallow frontend/backend protocol messages to be sent via shm_mq, butmight be useful for other purposes as well.shm_mq_set_handle associates a BackgroundWorkerHandle with analready-existing shm_mq_handle. This solves a timing problem whencreating a shm_mq to communicate with a newly-launched backgroundworker: if you attach to the queue first, and the background workerfails to start, you might block forever trying to do I/O on the queue;but if you start the background worker first, but then die beforeattaching to the queue, the background worrker might block forevertrying to do I/O on the queue. This lets you attach before startingthe worker (so that the worker is protected) and then associate theBackgroundWorkerHandle later (so that you are also protected).Patch by me, reviewed by Stephen Frost.
1 parentdf630b0 commit7bb0e97

File tree

2 files changed

+124
-16
lines changed

2 files changed

+124
-16
lines changed

‎src/backend/storage/ipc/shm_mq.c

Lines changed: 111 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ struct shm_mq_handle
139139
};
140140

141141
staticshm_mq_resultshm_mq_send_bytes(shm_mq_handle*mq,Sizenbytes,
142-
void*data,boolnowait,Size*bytes_written);
142+
constvoid*data,boolnowait,Size*bytes_written);
143143
staticshm_mq_resultshm_mq_receive_bytes(shm_mq*mq,Sizebytes_needed,
144144
boolnowait,Size*nbytesp,void**datap);
145145
staticboolshm_mq_wait_internal(volatileshm_mq*mq,PGPROC*volatile*ptr,
@@ -300,8 +300,34 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
300300
returnmqh;
301301
}
302302

303+
/*
304+
* Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
305+
* been passed to shm_mq_attach.
306+
*/
307+
void
308+
shm_mq_set_handle(shm_mq_handle*mqh,BackgroundWorkerHandle*handle)
309+
{
310+
Assert(mqh->mqh_handle==NULL);
311+
mqh->mqh_handle=handle;
312+
}
313+
303314
/*
304315
* Write a message into a shared message queue.
316+
*/
317+
shm_mq_result
318+
shm_mq_send(shm_mq_handle*mqh,Sizenbytes,constvoid*data,boolnowait)
319+
{
320+
shm_mq_ioveciov;
321+
322+
iov.data=data;
323+
iov.len=nbytes;
324+
325+
returnshm_mq_sendv(mqh,&iov,1,nowait);
326+
}
327+
328+
/*
329+
* Write a message into a shared message queue, gathered from multiple
330+
* addresses.
305331
*
306332
* When nowait = false, we'll wait on our process latch when the ring buffer
307333
* fills up, and then continue writing once the receiver has drained some data.
@@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
315341
* the length or payload will corrupt the queue.)
316342
*/
317343
shm_mq_result
318-
shm_mq_send(shm_mq_handle*mqh,Sizenbytes,void*data,boolnowait)
344+
shm_mq_sendv(shm_mq_handle*mqh,shm_mq_iovec*iov,intiovcnt,boolnowait)
319345
{
320346
shm_mq_resultres;
321347
shm_mq*mq=mqh->mqh_queue;
348+
Sizenbytes=0;
322349
Sizebytes_written;
350+
inti;
351+
intwhich_iov=0;
352+
Sizeoffset;
323353

324354
Assert(mq->mq_sender==MyProc);
325355

356+
/* Compute total size of write. */
357+
for (i=0;i<iovcnt;++i)
358+
nbytes+=iov[i].len;
359+
326360
/* Try to write, or finish writing, the length word into the buffer. */
327361
while (!mqh->mqh_length_word_complete)
328362
{
@@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
348382

349383
/* Write the actual data bytes into the buffer. */
350384
Assert(mqh->mqh_partial_bytes <=nbytes);
351-
res=shm_mq_send_bytes(mqh,nbytes-mqh->mqh_partial_bytes,
352-
((char*)data)+mqh->mqh_partial_bytes,
353-
nowait,&bytes_written);
354-
if (res==SHM_MQ_WOULD_BLOCK)
355-
mqh->mqh_partial_bytes+=bytes_written;
356-
else
385+
offset=mqh->mqh_partial_bytes;
386+
do
357387
{
358-
mqh->mqh_partial_bytes=0;
359-
mqh->mqh_length_word_complete= false;
360-
}
361-
if (res!=SHM_MQ_SUCCESS)
362-
returnres;
388+
Sizechunksize;
389+
390+
/* Figure out which bytes need to be sent next. */
391+
if (offset >=iov[which_iov].len)
392+
{
393+
offset-=iov[which_iov].len;
394+
++which_iov;
395+
if (which_iov >=iovcnt)
396+
break;
397+
continue;
398+
}
399+
400+
/*
401+
* We want to avoid copying the data if at all possible, but every
402+
* chunk of bytes we write into the queue has to be MAXALIGN'd,
403+
* except the last. Thus, if a chunk other than the last one ends
404+
* on a non-MAXALIGN'd boundary, we have to combine the tail end of
405+
* its data with data from one or more following chunks until we
406+
* either reach the last chunk or accumulate a number of bytes which
407+
* is MAXALIGN'd.
408+
*/
409+
if (which_iov+1<iovcnt&&
410+
offset+MAXIMUM_ALIGNOF>iov[which_iov].len)
411+
{
412+
chartmpbuf[MAXIMUM_ALIGNOF];
413+
intj=0;
414+
415+
for (;;)
416+
{
417+
if (offset<iov[which_iov].len)
418+
{
419+
tmpbuf[j]=iov[which_iov].data[offset];
420+
j++;
421+
offset++;
422+
if (j==MAXIMUM_ALIGNOF)
423+
break;
424+
}
425+
else
426+
{
427+
offset-=iov[which_iov].len;
428+
which_iov++;
429+
if (which_iov >=iovcnt)
430+
break;
431+
}
432+
}
433+
res=shm_mq_send_bytes(mqh,j,tmpbuf,nowait,&bytes_written);
434+
mqh->mqh_partial_bytes+=bytes_written;
435+
if (res!=SHM_MQ_SUCCESS)
436+
returnres;
437+
continue;
438+
}
439+
440+
/*
441+
* If this is the last chunk, we can write all the data, even if it
442+
* isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
443+
* MAXALIGN_DOWN the write size.
444+
*/
445+
chunksize=iov[which_iov].len-offset;
446+
if (which_iov+1<iovcnt)
447+
chunksize=MAXALIGN_DOWN(chunksize);
448+
res=shm_mq_send_bytes(mqh,chunksize,&iov[which_iov].data[offset],
449+
nowait,&bytes_written);
450+
mqh->mqh_partial_bytes+=bytes_written;
451+
offset+=bytes_written;
452+
if (res!=SHM_MQ_SUCCESS)
453+
returnres;
454+
}while (mqh->mqh_partial_bytes<nbytes);
455+
456+
/* Reset for next message. */
457+
mqh->mqh_partial_bytes=0;
458+
mqh->mqh_length_word_complete= false;
363459

364460
/* Notify receiver of the newly-written data, and return. */
365461
returnshm_mq_notify_receiver(mq);
@@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq)
653749
* Write bytes into a shared message queue.
654750
*/
655751
staticshm_mq_result
656-
shm_mq_send_bytes(shm_mq_handle*mqh,Sizenbytes,void*data,boolnowait,
657-
Size*bytes_written)
752+
shm_mq_send_bytes(shm_mq_handle*mqh,Sizenbytes,constvoid*data,
753+
boolnowait,Size*bytes_written)
658754
{
659755
shm_mq*mq=mqh->mqh_queue;
660756
Sizesent=0;

‎src/include/storage/shm_mq.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq;
2525
structshm_mq_handle;
2626
typedefstructshm_mq_handleshm_mq_handle;
2727

28+
/* Descriptors for a single write spanning multiple locations. */
29+
typedefstruct
30+
{
31+
constchar*data;
32+
Sizelen;
33+
}shm_mq_iovec;
34+
2835
/* Possible results of a send or receive operation. */
2936
typedefenum
3037
{
@@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *);
5259
externshm_mq_handle*shm_mq_attach(shm_mq*mq,dsm_segment*seg,
5360
BackgroundWorkerHandle*handle);
5461

62+
/* Associate worker handle with shm_mq. */
63+
externvoidshm_mq_set_handle(shm_mq_handle*,BackgroundWorkerHandle*);
64+
5565
/* Break connection. */
5666
externvoidshm_mq_detach(shm_mq*);
5767

5868
/* Send or receive messages. */
5969
externshm_mq_resultshm_mq_send(shm_mq_handle*mqh,
60-
Sizenbytes,void*data,boolnowait);
70+
Sizenbytes,constvoid*data,boolnowait);
71+
externshm_mq_resultshm_mq_sendv(shm_mq_handle*mqh,
72+
shm_mq_iovec*iov,intiovcnt,boolnowait);
6173
externshm_mq_resultshm_mq_receive(shm_mq_handle*mqh,
6274
Size*nbytesp,void**datap,boolnowait);
6375

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp