Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaf33039

Browse files
committed
Fix worst memory leaks in tqueue.c.
TupleQueueReaderNext() leaks like a sieve if it has to do any tupledisassembly/reconstruction. While we could try to clean up its allocationspiecemeal, it seems like a better idea just to insist that it should be runin a short-lived memory context, so that any transient space goes awayautomatically. I chose to have nodeGather.c switch into its existingper-tuple context before the call, rather than inventing a separatecontext inside tqueue.c.This is sufficient to stop all leakage in the simple case I exhibitedearlier today (see link below), but it does not deal with leaks inducedin more complex cases by tqueue.c's insistence on using TopMemoryContextfor data that it's not actually trying hard to keep track of. That issueis intertwined with another major source of inefficiency, namely failureto cache lookup results across calls, so it seems best to deal with itseparately.In passing, improve some comments, and modify gather_readnext's method fordeciding when it's visited all the readers so that it's more obviouslycorrect. (I'm not actually convinced that the previous code *is*correct in the case of a reader deletion; it certainly seems fragile.)Discussion: <32763.1469821037@sss.pgh.pa.us>
1 parentbf4ae68 commitaf33039

File tree

3 files changed

+45
-30
lines changed

3 files changed

+45
-30
lines changed

‎src/backend/executor/nodeGather.c

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,11 @@ ExecGather(GatherState *node)
214214
/*
215215
* Reset per-tuple memory context to free any expression evaluation
216216
* storage allocated in the previous tuple cycle. Note we can't do this
217-
* until we're done projecting.
217+
* until we're done projecting. This will also clear any previous tuple
218+
* returned by a TupleQueueReader; to make sure we don't leave a dangling
219+
* pointer around, clear the working slot first.
218220
*/
221+
ExecClearTuple(node->funnel_slot);
219222
econtext=node->ps.ps_ExprContext;
220223
ResetExprContext(econtext);
221224

@@ -274,22 +277,27 @@ gather_getnext(GatherState *gatherstate)
274277
PlanState*outerPlan=outerPlanState(gatherstate);
275278
TupleTableSlot*outerTupleSlot;
276279
TupleTableSlot*fslot=gatherstate->funnel_slot;
280+
MemoryContexttupleContext=gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
277281
HeapTupletup;
278282

279283
while (gatherstate->reader!=NULL||gatherstate->need_to_scan_locally)
280284
{
281285
if (gatherstate->reader!=NULL)
282286
{
287+
MemoryContextoldContext;
288+
289+
/* Run TupleQueueReaders in per-tuple context */
290+
oldContext=MemoryContextSwitchTo(tupleContext);
283291
tup=gather_readnext(gatherstate);
292+
MemoryContextSwitchTo(oldContext);
284293

285294
if (HeapTupleIsValid(tup))
286295
{
287296
ExecStoreTuple(tup,/* tuple to store */
288297
fslot,/* slot in which to store the tuple */
289298
InvalidBuffer,/* buffer associated with this
290299
* tuple */
291-
true);/* pfree this pointer if not from heap */
292-
300+
false);/* slot should not pfree tuple */
293301
returnfslot;
294302
}
295303
}
@@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate)
314322
staticHeapTuple
315323
gather_readnext(GatherState*gatherstate)
316324
{
317-
intwaitpos=gatherstate->nextreader;
325+
intnvisited=0;
318326

319327
for (;;)
320328
{
@@ -335,24 +343,20 @@ gather_readnext(GatherState *gatherstate)
335343
*/
336344
if (readerdone)
337345
{
346+
Assert(!tup);
338347
DestroyTupleQueueReader(reader);
339348
--gatherstate->nreaders;
340349
if (gatherstate->nreaders==0)
341350
{
342351
ExecShutdownGatherWorkers(gatherstate);
343352
returnNULL;
344353
}
345-
else
346-
{
347-
memmove(&gatherstate->reader[gatherstate->nextreader],
348-
&gatherstate->reader[gatherstate->nextreader+1],
349-
sizeof(TupleQueueReader*)
350-
* (gatherstate->nreaders-gatherstate->nextreader));
351-
if (gatherstate->nextreader >=gatherstate->nreaders)
352-
gatherstate->nextreader=0;
353-
if (gatherstate->nextreader<waitpos)
354-
--waitpos;
355-
}
354+
memmove(&gatherstate->reader[gatherstate->nextreader],
355+
&gatherstate->reader[gatherstate->nextreader+1],
356+
sizeof(TupleQueueReader*)
357+
* (gatherstate->nreaders-gatherstate->nextreader));
358+
if (gatherstate->nextreader >=gatherstate->nreaders)
359+
gatherstate->nextreader=0;
356360
continue;
357361
}
358362

@@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate)
367371
* every tuple, but it turns out to be much more efficient to keep
368372
* reading from the same queue until that would require blocking.
369373
*/
370-
gatherstate->nextreader=
371-
(gatherstate->nextreader+1) %gatherstate->nreaders;
374+
gatherstate->nextreader++;
375+
if (gatherstate->nextreader >=gatherstate->nreaders)
376+
gatherstate->nextreader=0;
372377

373-
/* Have we visited every TupleQueueReader? */
374-
if (gatherstate->nextreader==waitpos)
378+
/* Have we visited every (surviving) TupleQueueReader? */
379+
nvisited++;
380+
if (nvisited >=gatherstate->nreaders)
375381
{
376382
/*
377383
* If (still) running plan locally, return NULL so caller can
@@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate)
384390
WaitLatch(MyLatch,WL_LATCH_SET,0);
385391
CHECK_FOR_INTERRUPTS();
386392
ResetLatch(MyLatch);
393+
nvisited=0;
387394
}
388395
}
389396
}

‎src/backend/executor/tqueue.c

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -524,13 +524,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
524524
/*
525525
* Fetch a tuple from a tuple queue reader.
526526
*
527+
* The return value is NULL if there are no remaining tuples or if
528+
* nowait = true and no tuple is ready to return. *done, if not NULL,
529+
* is set to true when there are no remaining tuples and otherwise to false.
530+
*
531+
* The returned tuple, if any, is allocated in CurrentMemoryContext.
532+
* That should be a short-lived (tuple-lifespan) context, because we are
533+
* pretty cavalier about leaking memory in that context if we have to do
534+
* tuple remapping.
535+
*
527536
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
528537
* accumulate bytes from a partially-read message, so it's useful to call
529538
* this with nowait = true even if nothing is returned.
530-
*
531-
* The return value is NULL if there are no remaining queues or if
532-
* nowait = true and no tuple is ready to return. *done, if not NULL,
533-
* is set to true when queue is detached and otherwise to false.
534539
*/
535540
HeapTuple
536541
TupleQueueReaderNext(TupleQueueReader*reader,boolnowait,bool*done)
@@ -565,10 +570,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
565570
* OK, we got a message. Process it.
566571
*
567572
* One-byte messages are mode switch messages, so that we can switch
568-
* between "control" and "data" mode. When in "data" mode, each
569-
* message (unless exactly one byte) is a tuple. When in "control"
570-
* mode, each message provides a transient-typmod-to-tupledesc mapping
571-
* so we can interpret future tuples.
573+
* between "control" and "data" mode. Otherwise, when in "data" mode,
574+
* each message is a tuple. When in "control" mode, each message
575+
* provides a transient-typmod-to-tupledesc mapping to let us
576+
* interpret future tuples. Both of those cases certainly require
577+
* more than one byte, so no confusion is possible.
572578
*/
573579
if (nbytes==1)
574580
{

‎src/include/executor/tqueue.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717
#include"storage/shm_mq.h"
1818
#include"tcop/dest.h"
1919

20+
/* Opaque struct, only known inside tqueue.c. */
21+
typedefstructTupleQueueReaderTupleQueueReader;
22+
2023
/* Use this to send tuples to a shm_mq. */
2124
externDestReceiver*CreateTupleQueueDestReceiver(shm_mq_handle*handle);
2225

2326
/* Use these to receive tuples from a shm_mq. */
24-
typedefstructTupleQueueReaderTupleQueueReader;
2527
externTupleQueueReader*CreateTupleQueueReader(shm_mq_handle*handle,
2628
TupleDesctupledesc);
27-
externvoidDestroyTupleQueueReader(TupleQueueReader*funnel);
28-
externHeapTupleTupleQueueReaderNext(TupleQueueReader*,
29+
externvoidDestroyTupleQueueReader(TupleQueueReader*reader);
30+
externHeapTupleTupleQueueReaderNext(TupleQueueReader*reader,
2931
boolnowait,bool*done);
3032

3133
#endif/* TQUEUE_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp