Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6708e44

Browse files
committed
Clean up shm_mq cleanup.
The logic around shm_mq_detach was a few bricks shy of a load, because(contrary to the comments for shm_mq_attach) all it did was update theshared shm_mq state. That left us leaking a bit of process-localmemory, but much worse, the on_dsm_detach callback for shm_mq_detachwas still armed. That means that whenever we ultimately detach fromthe DSM segment, we'd run shm_mq_detach again for already-detached,possibly long-dead queues. This accidentally fails to fail today,because we only ever re-use a shm_mq's memory for another shm_mq, andmultiple detach attempts on the last such shm_mq are fairly harmless.But it's gonna bite us someday, so let's clean it up.To do that, change shm_mq_detach's API so it takes a shm_mq_handlenot the underlying shm_mq. This makes the callers simpler in mostcases anyway. Also fix a few places in parallel.c that were justpfree'ing the handle structs rather than doing proper cleanup.Back-patch to v10 because of the risk that the revenant shm_mq_detachcallbacks would cause a live bug sometime. Since this is an APIchange, it's too late to do it in 9.6. (We could make a variantpatch that preserves API, but I'm not excited enough to do that.)Discussion:https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
1 parent4b1dd62 commit6708e44

File tree

5 files changed

+51
-21
lines changed

5 files changed

+51
-21
lines changed

‎src/backend/access/transam/parallel.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
480480
*/
481481
any_registrations_failed= true;
482482
pcxt->worker[i].bgwhandle=NULL;
483-
pfree(pcxt->worker[i].error_mqh);
483+
shm_mq_detach(pcxt->worker[i].error_mqh);
484484
pcxt->worker[i].error_mqh=NULL;
485485
}
486486
}
@@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt)
612612
{
613613
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
614614

615-
pfree(pcxt->worker[i].error_mqh);
615+
shm_mq_detach(pcxt->worker[i].error_mqh);
616616
pcxt->worker[i].error_mqh=NULL;
617617
}
618618
}
@@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
861861

862862
case'X':/* Terminate, indicating clean exit */
863863
{
864-
pfree(pcxt->worker[i].error_mqh);
864+
shm_mq_detach(pcxt->worker[i].error_mqh);
865865
pcxt->worker[i].error_mqh=NULL;
866866
break;
867867
}

‎src/backend/executor/tqueue.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,9 @@ tqueueShutdownReceiver(DestReceiver *self)
578578
{
579579
TQueueDestReceiver*tqueue= (TQueueDestReceiver*)self;
580580

581-
shm_mq_detach(shm_mq_get_queue(tqueue->queue));
581+
if (tqueue->queue!=NULL)
582+
shm_mq_detach(tqueue->queue);
583+
tqueue->queue=NULL;
582584
}
583585

584586
/*
@@ -589,6 +591,9 @@ tqueueDestroyReceiver(DestReceiver *self)
589591
{
590592
TQueueDestReceiver*tqueue= (TQueueDestReceiver*)self;
591593

594+
/* We probably already detached from queue, but let's be sure */
595+
if (tqueue->queue!=NULL)
596+
shm_mq_detach(tqueue->queue);
592597
if (tqueue->tmpcontext!=NULL)
593598
MemoryContextDelete(tqueue->tmpcontext);
594599
if (tqueue->recordhtab!=NULL)
@@ -650,7 +655,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
650655
void
651656
DestroyTupleQueueReader(TupleQueueReader*reader)
652657
{
653-
shm_mq_detach(shm_mq_get_queue(reader->queue));
658+
shm_mq_detach(reader->queue);
654659
if (reader->typmodmap!=NULL)
655660
hash_destroy(reader->typmodmap);
656661
/* Is it worth trying to free substructure of the remap tree? */

‎src/backend/libpq/pqmq.c

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include"tcop/tcopprot.h"
2222
#include"utils/builtins.h"
2323

24-
staticshm_mq*pq_mq;
2524
staticshm_mq_handle*pq_mq_handle;
2625
staticboolpq_mq_busy= false;
2726
staticpid_tpq_mq_parallel_master_pid=0;
@@ -56,7 +55,6 @@ void
5655
pq_redirect_to_shm_mq(dsm_segment*seg,shm_mq_handle*mqh)
5756
{
5857
PqCommMethods=&PqCommMqMethods;
59-
pq_mq=shm_mq_get_queue(mqh);
6058
pq_mq_handle=mqh;
6159
whereToSendOutput=DestRemote;
6260
FrontendProtocol=PG_PROTOCOL_LATEST;
@@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
7068
staticvoid
7169
pq_cleanup_redirect_to_shm_mq(dsm_segment*seg,Datumarg)
7270
{
73-
pq_mq=NULL;
7471
pq_mq_handle=NULL;
7572
whereToSendOutput=DestNone;
7673
}
@@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
135132
*/
136133
if (pq_mq_busy)
137134
{
138-
if (pq_mq!=NULL)
139-
shm_mq_detach(pq_mq);
140-
pq_mq=NULL;
135+
if (pq_mq_handle!=NULL)
136+
shm_mq_detach(pq_mq_handle);
141137
pq_mq_handle=NULL;
142138
returnEOF;
143139
}
@@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len)
148144
* be generated late in the shutdown sequence, after all DSMs have already
149145
* been detached.
150146
*/
151-
if (pq_mq==NULL)
147+
if (pq_mq_handle==NULL)
152148
return0;
153149

154150
pq_mq_busy= true;

‎src/backend/storage/ipc/shm_mq.c

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ struct shm_mq
8383
* This structure is a backend-private handle for access to a queue.
8484
*
8585
* mqh_queue is a pointer to the queue we've attached, and mqh_segment is
86-
* a pointer to the dynamic shared memory segment that contains it.
86+
* an optional pointer to the dynamic shared memory segment that contains it.
87+
* (If mqh_segment is provided, we register an on_dsm_detach callback to
88+
* make sure we detach from the queue before detaching from DSM.)
8789
*
8890
* If this queue is intended to connect the current process with a background
8991
* worker that started it, the user can pass a pointer to the worker handle
@@ -139,6 +141,7 @@ struct shm_mq_handle
139141
MemoryContextmqh_context;
140142
};
141143

144+
staticvoidshm_mq_detach_internal(shm_mq*mq);
142145
staticshm_mq_resultshm_mq_send_bytes(shm_mq_handle*mq,Sizenbytes,
143146
constvoid*data,boolnowait,Size*bytes_written);
144147
staticshm_mq_resultshm_mq_receive_bytes(shm_mq*mq,Sizebytes_needed,
@@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
288291
Assert(mq->mq_receiver==MyProc||mq->mq_sender==MyProc);
289292
mqh->mqh_queue=mq;
290293
mqh->mqh_segment=seg;
291-
mqh->mqh_buffer=NULL;
292294
mqh->mqh_handle=handle;
295+
mqh->mqh_buffer=NULL;
293296
mqh->mqh_buflen=0;
294297
mqh->mqh_consume_pending=0;
295-
mqh->mqh_context=CurrentMemoryContext;
296298
mqh->mqh_partial_bytes=0;
299+
mqh->mqh_expected_bytes=0;
297300
mqh->mqh_length_word_complete= false;
298301
mqh->mqh_counterparty_attached= false;
302+
mqh->mqh_context=CurrentMemoryContext;
299303

300304
if (seg!=NULL)
301305
on_dsm_detach(seg,shm_mq_detach_callback,PointerGetDatum(mq));
@@ -765,17 +769,42 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
765769
}
766770

767771
/*
768-
* Detach a shared message queue.
772+
* Detach from a shared message queue, and destroy the shm_mq_handle.
773+
*/
774+
void
775+
shm_mq_detach(shm_mq_handle*mqh)
776+
{
777+
/* Notify counterparty that we're outta here. */
778+
shm_mq_detach_internal(mqh->mqh_queue);
779+
780+
/* Cancel on_dsm_detach callback, if any. */
781+
if (mqh->mqh_segment)
782+
cancel_on_dsm_detach(mqh->mqh_segment,
783+
shm_mq_detach_callback,
784+
PointerGetDatum(mqh->mqh_queue));
785+
786+
/* Release local memory associated with handle. */
787+
if (mqh->mqh_buffer!=NULL)
788+
pfree(mqh->mqh_buffer);
789+
pfree(mqh);
790+
}
791+
792+
/*
793+
* Notify counterparty that we're detaching from shared message queue.
769794
*
770795
* The purpose of this function is to make sure that the process
771796
* with which we're communicating doesn't block forever waiting for us to
772797
* fill or drain the queue once we've lost interest. When the sender
773798
* detaches, the receiver can read any messages remaining in the queue;
774799
* further reads will return SHM_MQ_DETACHED. If the receiver detaches,
775800
* further attempts to send messages will likewise return SHM_MQ_DETACHED.
801+
*
802+
* This is separated out from shm_mq_detach() because if the on_dsm_detach
803+
* callback fires, we only want to do this much. We do not try to touch
804+
* the local shm_mq_handle, as it may have been pfree'd already.
776805
*/
777-
void
778-
shm_mq_detach(shm_mq*mq)
806+
staticvoid
807+
shm_mq_detach_internal(shm_mq*mq)
779808
{
780809
volatileshm_mq*vmq=mq;
781810
PGPROC*victim;
@@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
11931222
{
11941223
shm_mq*mq= (shm_mq*)DatumGetPointer(arg);
11951224

1196-
shm_mq_detach(mq);
1225+
shm_mq_detach_internal(mq);
11971226
}

‎src/include/storage/shm_mq.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
6262
/* Associate worker handle with shm_mq. */
6363
externvoidshm_mq_set_handle(shm_mq_handle*,BackgroundWorkerHandle*);
6464

65-
/* Break connection. */
66-
externvoidshm_mq_detach(shm_mq*);
65+
/* Break connection, release handle resources. */
66+
externvoidshm_mq_detach(shm_mq_handle*mqh);
6767

6868
/* Get the shm_mq from handle. */
6969
externshm_mq*shm_mq_get_queue(shm_mq_handle*mqh);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp