Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2d44c58

Browse files
committed
Avoid memory leaks when a GatherMerge node is rescanned.
Rescanning a GatherMerge led to leaking some memory in the executor'squery-lifespan context, because most of the node's working data structureswere simply abandoned and rebuilt from scratch. In practice, this mightnever amount to much, given the cost of relaunching worker processes ---but it's still pretty messy, so let's fix it.We can rearrange things so that the tuple arrays are simply cleared andreused, and we don't need to rebuild the TupleTableSlots either, justclear them. One small complication is that because we might get adifferent number of workers on each iteration, we can't keep the oldconvention that the leader's gm_slots[] entry is the last one; the leadermight clobber a TupleTableSlot that we need for a worker in a futureiteration. Hence, adjust the logic so that the leader has slot 0 always,while the active workers have slots 1..n.Back-patch to v10 to keep all the existing versions of nodeGatherMerge.cin sync --- because of the renumbering of the slots, there would otherwisebe a very large risk that any future backpatches in this module wouldintroduce bugs.Discussion:https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
1 parent30833ba commit2d44c58

File tree

2 files changed

+107
-55
lines changed

2 files changed

+107
-55
lines changed

‎src/backend/executor/nodeGatherMerge.c

Lines changed: 105 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
5555
staticTupleTableSlot*gather_merge_getnext(GatherMergeState*gm_state);
5656
staticHeapTuplegm_readnext_tuple(GatherMergeState*gm_state,intnreader,
5757
boolnowait,bool*done);
58-
staticvoidgather_merge_init(GatherMergeState*gm_state);
5958
staticvoidExecShutdownGatherMergeWorkers(GatherMergeState*node);
59+
staticvoidgather_merge_setup(GatherMergeState*gm_state);
60+
staticvoidgather_merge_init(GatherMergeState*gm_state);
61+
staticvoidgather_merge_clear_tuples(GatherMergeState*gm_state);
6062
staticboolgather_merge_readnext(GatherMergeState*gm_state,intreader,
6163
boolnowait);
6264
staticvoidload_tuple_array(GatherMergeState*gm_state,intreader);
@@ -149,14 +151,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
149151
}
150152

151153
/*
152-
*store the tuple descriptor into gather merge state, so we can use it
153-
*laterwhile initializing the gather merge slots.
154+
*Store the tuple descriptor into gather merge state, so we can use it
155+
* while initializing the gather merge slots.
154156
*/
155157
if (!ExecContextForcesOids(&gm_state->ps,&hasoid))
156158
hasoid= false;
157159
tupDesc=ExecTypeFromTL(outerNode->targetlist,hasoid);
158160
gm_state->tupDesc=tupDesc;
159161

162+
/* Now allocate the workspace for gather merge */
163+
gather_merge_setup(gm_state);
164+
160165
returngm_state;
161166
}
162167

@@ -340,6 +345,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
340345
/* Make sure any existing workers are gracefully shut down */
341346
ExecShutdownGatherMergeWorkers(node);
342347

348+
/* Free any unused tuples, so we don't leak memory across rescans */
349+
gather_merge_clear_tuples(node);
350+
343351
/* Mark node so that shared state will be rebuilt at next call */
344352
node->initialized= false;
345353
node->gm_initialized= false;
@@ -370,49 +378,93 @@ ExecReScanGatherMerge(GatherMergeState *node)
370378
}
371379

372380
/*
373-
* Initialize the Gather merge tuple read.
381+
* Set up the data structures that we'll need for Gather Merge.
382+
*
383+
* We allocate these once on the basis of gm->num_workers, which is an
384+
* upper bound for the number of workers we'll actually have. During
385+
* a rescan, we reset the structures to empty. This approach simplifies
386+
* not leaking memory across rescans.
374387
*
375-
* Pull at least a single tuple from each worker + leader and set up the heap.
388+
* In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
389+
* are for workers. The values placed into gm_heap correspond to indexes
390+
* in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
391+
* 0 to n-1; it has no entry for the leader.
376392
*/
377393
staticvoid
378-
gather_merge_init(GatherMergeState*gm_state)
394+
gather_merge_setup(GatherMergeState*gm_state)
379395
{
380-
intnreaders=gm_state->nreaders;
381-
boolnowait=true;
396+
GatherMerge*gm=castNode(GatherMerge,gm_state->ps.plan);
397+
intnreaders=gm->num_workers;
382398
inti;
383399

384400
/*
385401
* Allocate gm_slots for the number of workers + one more slot for leader.
386-
* Last slot is always for leader. Leader always calls ExecProcNode() to
387-
* read the tuple which will return the TupleTableSlot. Later it will
388-
* directly get assigned to gm_slot. So just initialize leader gm_slot
389-
* with NULL. For other slots, code below will call
390-
* ExecInitExtraTupleSlot() to create a slot for the worker's results.
402+
* Slot 0 is always for the leader. Leader always calls ExecProcNode() to
403+
* read the tuple, and then stores it directly into its gm_slots entry.
404+
* For other slots, code below will call ExecInitExtraTupleSlot() to
405+
* create a slot for the worker's results. Note that during any single
406+
* scan, we might have fewer than num_workers available workers, in which
407+
* case the extra array entries go unused.
391408
*/
392-
gm_state->gm_slots=
393-
palloc((gm_state->nreaders+1)*sizeof(TupleTableSlot*));
394-
gm_state->gm_slots[gm_state->nreaders]=NULL;
395-
396-
/* Initialize the tuple slot and tuple array for each worker */
397-
gm_state->gm_tuple_buffers=
398-
(GMReaderTupleBuffer*)palloc0(sizeof(GMReaderTupleBuffer)*
399-
gm_state->nreaders);
400-
for (i=0;i<gm_state->nreaders;i++)
409+
gm_state->gm_slots= (TupleTableSlot**)
410+
palloc0((nreaders+1)*sizeof(TupleTableSlot*));
411+
412+
/* Allocate the tuple slot and tuple array for each worker */
413+
gm_state->gm_tuple_buffers= (GMReaderTupleBuffer*)
414+
palloc0(nreaders*sizeof(GMReaderTupleBuffer));
415+
416+
for (i=0;i<nreaders;i++)
401417
{
402418
/* Allocate the tuple array with length MAX_TUPLE_STORE */
403419
gm_state->gm_tuple_buffers[i].tuple=
404420
(HeapTuple*)palloc0(sizeof(HeapTuple)*MAX_TUPLE_STORE);
405421

406-
/* Initialize slot for worker */
407-
gm_state->gm_slots[i]=ExecInitExtraTupleSlot(gm_state->ps.state);
408-
ExecSetSlotDescriptor(gm_state->gm_slots[i],
422+
/* Initializetupleslot for worker */
423+
gm_state->gm_slots[i+1]=ExecInitExtraTupleSlot(gm_state->ps.state);
424+
ExecSetSlotDescriptor(gm_state->gm_slots[i+1],
409425
gm_state->tupDesc);
410426
}
411427

412428
/* Allocate the resources for the merge */
413-
gm_state->gm_heap=binaryheap_allocate(gm_state->nreaders+1,
429+
gm_state->gm_heap=binaryheap_allocate(nreaders+1,
414430
heap_compare_slots,
415431
gm_state);
432+
}
433+
434+
/*
435+
* Initialize the Gather Merge.
436+
*
437+
* Reset data structures to ensure they're empty. Then pull at least one
438+
* tuple from leader + each worker (or set its "done" indicator), and set up
439+
* the heap.
440+
*/
441+
staticvoid
442+
gather_merge_init(GatherMergeState*gm_state)
443+
{
444+
intnreaders=gm_state->nreaders;
445+
boolnowait= true;
446+
inti;
447+
448+
/* Assert that gather_merge_setup made enough space */
449+
Assert(nreaders <=castNode(GatherMerge,gm_state->ps.plan)->num_workers);
450+
451+
/* Reset leader's tuple slot to empty */
452+
gm_state->gm_slots[0]=NULL;
453+
454+
/* Reset the tuple slot and tuple array for each worker */
455+
for (i=0;i<nreaders;i++)
456+
{
457+
/* Reset tuple array to empty */
458+
gm_state->gm_tuple_buffers[i].nTuples=0;
459+
gm_state->gm_tuple_buffers[i].readCounter=0;
460+
/* Reset done flag to not-done */
461+
gm_state->gm_tuple_buffers[i].done= false;
462+
/* Ensure output slot is empty */
463+
ExecClearTuple(gm_state->gm_slots[i+1]);
464+
}
465+
466+
/* Reset binary heap to empty */
467+
binaryheap_reset(gm_state->gm_heap);
416468

417469
/*
418470
* First, try to read a tuple from each worker (including leader) in
@@ -422,14 +474,13 @@ gather_merge_init(GatherMergeState *gm_state)
422474
* least one tuple) to the heap.
423475
*/
424476
reread:
425-
for (i=0;i<nreaders+1;i++)
477+
for (i=0;i <=nreaders;i++)
426478
{
427479
CHECK_FOR_INTERRUPTS();
428480

429-
/* ignore this source if already known done */
430-
if ((i<nreaders) ?
431-
!gm_state->gm_tuple_buffers[i].done :
432-
gm_state->need_to_scan_locally)
481+
/* skip this source if already known done */
482+
if ((i==0) ?gm_state->need_to_scan_locally :
483+
!gm_state->gm_tuple_buffers[i-1].done)
433484
{
434485
if (TupIsNull(gm_state->gm_slots[i]))
435486
{
@@ -450,9 +501,9 @@ gather_merge_init(GatherMergeState *gm_state)
450501
}
451502

452503
/* need not recheck leader, since nowait doesn't matter for it */
453-
for (i=0;i<nreaders;i++)
504+
for (i=1;i <=nreaders;i++)
454505
{
455-
if (!gm_state->gm_tuple_buffers[i].done&&
506+
if (!gm_state->gm_tuple_buffers[i-1].done&&
456507
TupIsNull(gm_state->gm_slots[i]))
457508
{
458509
nowait= false;
@@ -467,23 +518,23 @@ gather_merge_init(GatherMergeState *gm_state)
467518
}
468519

469520
/*
470-
* Clear out the tuple table slots for each gather merge input.
521+
* Clear out the tuple table slot, and any unused pending tuples,
522+
* for each gather merge input.
471523
*/
472524
staticvoid
473-
gather_merge_clear_slots(GatherMergeState*gm_state)
525+
gather_merge_clear_tuples(GatherMergeState*gm_state)
474526
{
475527
inti;
476528

477529
for (i=0;i<gm_state->nreaders;i++)
478530
{
479-
pfree(gm_state->gm_tuple_buffers[i].tuple);
480-
ExecClearTuple(gm_state->gm_slots[i]);
481-
}
531+
GMReaderTupleBuffer*tuple_buffer=&gm_state->gm_tuple_buffers[i];
482532

483-
/* Free tuple array as we don't need it any more */
484-
pfree(gm_state->gm_tuple_buffers);
485-
/* Free the binaryheap, which was created for sort */
486-
binaryheap_free(gm_state->gm_heap);
533+
while (tuple_buffer->readCounter<tuple_buffer->nTuples)
534+
heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
535+
536+
ExecClearTuple(gm_state->gm_slots[i+1]);
537+
}
487538
}
488539

489540
/*
@@ -526,7 +577,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
526577
if (binaryheap_empty(gm_state->gm_heap))
527578
{
528579
/* All the queues are exhausted, and so is the heap */
529-
gather_merge_clear_slots(gm_state);
580+
gather_merge_clear_tuples(gm_state);
530581
returnNULL;
531582
}
532583
else
@@ -548,10 +599,10 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
548599
inti;
549600

550601
/* Don't do anything if this is the leader. */
551-
if (reader==gm_state->nreaders)
602+
if (reader==0)
552603
return;
553604

554-
tuple_buffer=&gm_state->gm_tuple_buffers[reader];
605+
tuple_buffer=&gm_state->gm_tuple_buffers[reader-1];
555606

556607
/* If there's nothing in the array, reset the counters to zero. */
557608
if (tuple_buffer->nTuples==tuple_buffer->readCounter)
@@ -590,7 +641,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
590641
* If we're being asked to generate a tuple from the leader, then we just
591642
* call ExecProcNode as normal to produce one.
592643
*/
593-
if (gm_state->nreaders==reader)
644+
if (reader==0)
594645
{
595646
if (gm_state->need_to_scan_locally)
596647
{
@@ -601,7 +652,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
601652

602653
if (!TupIsNull(outerTupleSlot))
603654
{
604-
gm_state->gm_slots[reader]=outerTupleSlot;
655+
gm_state->gm_slots[0]=outerTupleSlot;
605656
return true;
606657
}
607658
/* need_to_scan_locally serves as "done" flag for leader */
@@ -611,7 +662,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
611662
}
612663

613664
/* Otherwise, check the state of the relevant tuple buffer. */
614-
tuple_buffer=&gm_state->gm_tuple_buffers[reader];
665+
tuple_buffer=&gm_state->gm_tuple_buffers[reader-1];
615666

616667
if (tuple_buffer->nTuples>tuple_buffer->readCounter)
617668
{
@@ -621,8 +672,8 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
621672
elseif (tuple_buffer->done)
622673
{
623674
/* Reader is known to be exhausted. */
624-
DestroyTupleQueueReader(gm_state->reader[reader]);
625-
gm_state->reader[reader]=NULL;
675+
DestroyTupleQueueReader(gm_state->reader[reader-1]);
676+
gm_state->reader[reader-1]=NULL;
626677
return false;
627678
}
628679
else
@@ -649,14 +700,14 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
649700
ExecStoreTuple(tup,/* tuple to store */
650701
gm_state->gm_slots[reader],/* slot in which to store the
651702
* tuple */
652-
InvalidBuffer,/* buffer associated with this tuple */
653-
true);/* pfreethis pointer if not from heap */
703+
InvalidBuffer,/*nobuffer associated with tuple */
704+
true);/* pfreetuple when done with it */
654705

655706
return true;
656707
}
657708

658709
/*
659-
* Attempt to read a tuple from givenreader.
710+
* Attempt to read a tuple from givenworker.
660711
*/
661712
staticHeapTuple
662713
gm_readnext_tuple(GatherMergeState*gm_state,intnreader,boolnowait,
@@ -671,7 +722,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
671722
CHECK_FOR_INTERRUPTS();
672723

673724
/* Attempt to read a tuple. */
674-
reader=gm_state->reader[nreader];
725+
reader=gm_state->reader[nreader-1];
675726

676727
/* Run TupleQueueReaders in per-tuple context */
677728
tupleContext=gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;

‎src/include/nodes/execnodes.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1958,7 +1958,8 @@ typedef struct GatherMergeState
19581958
intgm_nkeys;/* number of sort columns */
19591959
SortSupportgm_sortkeys;/* array of length gm_nkeys */
19601960
structParallelExecutorInfo*pei;
1961-
/* all remaining fields are reinitialized during a rescan: */
1961+
/* all remaining fields are reinitialized during a rescan */
1962+
/* (but the arrays are not reallocated, just cleared) */
19621963
intnworkers_launched;/* original number of workers */
19631964
intnreaders;/* number of active workers */
19641965
TupleTableSlot**gm_slots;/* array with nreaders+1 entries */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp