Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7610547

Browse files
committed
Avoid memory leaks when a GatherMerge node is rescanned.
Rescanning a GatherMerge led to leaking some memory in the executor'squery-lifespan context, because most of the node's working data structureswere simply abandoned and rebuilt from scratch. In practice, this mightnever amount to much, given the cost of relaunching worker processes ---but it's still pretty messy, so let's fix it.We can rearrange things so that the tuple arrays are simply cleared andreused, and we don't need to rebuild the TupleTableSlots either, justclear them. One small complication is that because we might get adifferent number of workers on each iteration, we can't keep the oldconvention that the leader's gm_slots[] entry is the last one; the leadermight clobber a TupleTableSlot that we need for a worker in a futureiteration. Hence, adjust the logic so that the leader has slot 0 always,while the active workers have slots 1..n.Back-patch to v10 to keep all the existing versions of nodeGatherMerge.cin sync --- because of the renumbering of the slots, there would otherwisebe a very large risk that any future backpatches in this module wouldintroduce bugs.Discussion:https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
1 parentb4fa938 commit7610547

File tree

2 files changed

+107
-55
lines changed

2 files changed

+107
-55
lines changed

‎src/backend/executor/nodeGatherMerge.c

Lines changed: 105 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
5555
staticTupleTableSlot*gather_merge_getnext(GatherMergeState*gm_state);
5656
staticHeapTuplegm_readnext_tuple(GatherMergeState*gm_state,intnreader,
5757
boolnowait,bool*done);
58-
staticvoidgather_merge_init(GatherMergeState*gm_state);
5958
staticvoidExecShutdownGatherMergeWorkers(GatherMergeState*node);
59+
staticvoidgather_merge_setup(GatherMergeState*gm_state);
60+
staticvoidgather_merge_init(GatherMergeState*gm_state);
61+
staticvoidgather_merge_clear_tuples(GatherMergeState*gm_state);
6062
staticboolgather_merge_readnext(GatherMergeState*gm_state,intreader,
6163
boolnowait);
6264
staticvoidload_tuple_array(GatherMergeState*gm_state,intreader);
@@ -148,14 +150,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
148150
}
149151

150152
/*
151-
*store the tuple descriptor into gather merge state, so we can use it
152-
*laterwhile initializing the gather merge slots.
153+
*Store the tuple descriptor into gather merge state, so we can use it
154+
* while initializing the gather merge slots.
153155
*/
154156
if (!ExecContextForcesOids(&gm_state->ps,&hasoid))
155157
hasoid= false;
156158
tupDesc=ExecTypeFromTL(outerNode->targetlist,hasoid);
157159
gm_state->tupDesc=tupDesc;
158160

161+
/* Now allocate the workspace for gather merge */
162+
gather_merge_setup(gm_state);
163+
159164
returngm_state;
160165
}
161166

@@ -338,6 +343,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
338343
/* Make sure any existing workers are gracefully shut down */
339344
ExecShutdownGatherMergeWorkers(node);
340345

346+
/* Free any unused tuples, so we don't leak memory across rescans */
347+
gather_merge_clear_tuples(node);
348+
341349
/* Mark node so that shared state will be rebuilt at next call */
342350
node->initialized= false;
343351
node->gm_initialized= false;
@@ -368,49 +376,93 @@ ExecReScanGatherMerge(GatherMergeState *node)
368376
}
369377

370378
/*
371-
* Initialize the Gather merge tuple read.
379+
* Set up the data structures that we'll need for Gather Merge.
380+
*
381+
* We allocate these once on the basis of gm->num_workers, which is an
382+
* upper bound for the number of workers we'll actually have. During
383+
* a rescan, we reset the structures to empty. This approach simplifies
384+
* not leaking memory across rescans.
372385
*
373-
* Pull at least a single tuple from each worker + leader and set up the heap.
386+
* In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
387+
* are for workers. The values placed into gm_heap correspond to indexes
388+
* in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
389+
* 0 to n-1; it has no entry for the leader.
374390
*/
375391
staticvoid
376-
gather_merge_init(GatherMergeState*gm_state)
392+
gather_merge_setup(GatherMergeState*gm_state)
377393
{
378-
intnreaders=gm_state->nreaders;
379-
boolnowait=true;
394+
GatherMerge*gm=castNode(GatherMerge,gm_state->ps.plan);
395+
intnreaders=gm->num_workers;
380396
inti;
381397

382398
/*
383399
* Allocate gm_slots for the number of workers + one more slot for leader.
384-
* Last slot is always for leader. Leader always calls ExecProcNode() to
385-
* read the tuple which will return the TupleTableSlot. Later it will
386-
* directly get assigned to gm_slot. So just initialize leader gm_slot
387-
* with NULL. For other slots, code below will call
388-
* ExecInitExtraTupleSlot() to create a slot for the worker's results.
400+
* Slot 0 is always for the leader. Leader always calls ExecProcNode() to
401+
* read the tuple, and then stores it directly into its gm_slots entry.
402+
* For other slots, code below will call ExecInitExtraTupleSlot() to
403+
* create a slot for the worker's results. Note that during any single
404+
* scan, we might have fewer than num_workers available workers, in which
405+
* case the extra array entries go unused.
389406
*/
390-
gm_state->gm_slots=
391-
palloc((gm_state->nreaders+1)*sizeof(TupleTableSlot*));
392-
gm_state->gm_slots[gm_state->nreaders]=NULL;
393-
394-
/* Initialize the tuple slot and tuple array for each worker */
395-
gm_state->gm_tuple_buffers=
396-
(GMReaderTupleBuffer*)palloc0(sizeof(GMReaderTupleBuffer)*
397-
gm_state->nreaders);
398-
for (i=0;i<gm_state->nreaders;i++)
407+
gm_state->gm_slots= (TupleTableSlot**)
408+
palloc0((nreaders+1)*sizeof(TupleTableSlot*));
409+
410+
/* Allocate the tuple slot and tuple array for each worker */
411+
gm_state->gm_tuple_buffers= (GMReaderTupleBuffer*)
412+
palloc0(nreaders*sizeof(GMReaderTupleBuffer));
413+
414+
for (i=0;i<nreaders;i++)
399415
{
400416
/* Allocate the tuple array with length MAX_TUPLE_STORE */
401417
gm_state->gm_tuple_buffers[i].tuple=
402418
(HeapTuple*)palloc0(sizeof(HeapTuple)*MAX_TUPLE_STORE);
403419

404-
/* Initialize slot for worker */
405-
gm_state->gm_slots[i]=ExecInitExtraTupleSlot(gm_state->ps.state);
406-
ExecSetSlotDescriptor(gm_state->gm_slots[i],
420+
/* Initializetupleslot for worker */
421+
gm_state->gm_slots[i+1]=ExecInitExtraTupleSlot(gm_state->ps.state);
422+
ExecSetSlotDescriptor(gm_state->gm_slots[i+1],
407423
gm_state->tupDesc);
408424
}
409425

410426
/* Allocate the resources for the merge */
411-
gm_state->gm_heap=binaryheap_allocate(gm_state->nreaders+1,
427+
gm_state->gm_heap=binaryheap_allocate(nreaders+1,
412428
heap_compare_slots,
413429
gm_state);
430+
}
431+
432+
/*
433+
* Initialize the Gather Merge.
434+
*
435+
* Reset data structures to ensure they're empty. Then pull at least one
436+
* tuple from leader + each worker (or set its "done" indicator), and set up
437+
* the heap.
438+
*/
439+
staticvoid
440+
gather_merge_init(GatherMergeState*gm_state)
441+
{
442+
intnreaders=gm_state->nreaders;
443+
boolnowait= true;
444+
inti;
445+
446+
/* Assert that gather_merge_setup made enough space */
447+
Assert(nreaders <=castNode(GatherMerge,gm_state->ps.plan)->num_workers);
448+
449+
/* Reset leader's tuple slot to empty */
450+
gm_state->gm_slots[0]=NULL;
451+
452+
/* Reset the tuple slot and tuple array for each worker */
453+
for (i=0;i<nreaders;i++)
454+
{
455+
/* Reset tuple array to empty */
456+
gm_state->gm_tuple_buffers[i].nTuples=0;
457+
gm_state->gm_tuple_buffers[i].readCounter=0;
458+
/* Reset done flag to not-done */
459+
gm_state->gm_tuple_buffers[i].done= false;
460+
/* Ensure output slot is empty */
461+
ExecClearTuple(gm_state->gm_slots[i+1]);
462+
}
463+
464+
/* Reset binary heap to empty */
465+
binaryheap_reset(gm_state->gm_heap);
414466

415467
/*
416468
* First, try to read a tuple from each worker (including leader) in
@@ -420,14 +472,13 @@ gather_merge_init(GatherMergeState *gm_state)
420472
* least one tuple) to the heap.
421473
*/
422474
reread:
423-
for (i=0;i<nreaders+1;i++)
475+
for (i=0;i <=nreaders;i++)
424476
{
425477
CHECK_FOR_INTERRUPTS();
426478

427-
/* ignore this source if already known done */
428-
if ((i<nreaders) ?
429-
!gm_state->gm_tuple_buffers[i].done :
430-
gm_state->need_to_scan_locally)
479+
/* skip this source if already known done */
480+
if ((i==0) ?gm_state->need_to_scan_locally :
481+
!gm_state->gm_tuple_buffers[i-1].done)
431482
{
432483
if (TupIsNull(gm_state->gm_slots[i]))
433484
{
@@ -448,9 +499,9 @@ gather_merge_init(GatherMergeState *gm_state)
448499
}
449500

450501
/* need not recheck leader, since nowait doesn't matter for it */
451-
for (i=0;i<nreaders;i++)
502+
for (i=1;i <=nreaders;i++)
452503
{
453-
if (!gm_state->gm_tuple_buffers[i].done&&
504+
if (!gm_state->gm_tuple_buffers[i-1].done&&
454505
TupIsNull(gm_state->gm_slots[i]))
455506
{
456507
nowait= false;
@@ -465,23 +516,23 @@ gather_merge_init(GatherMergeState *gm_state)
465516
}
466517

467518
/*
468-
* Clear out the tuple table slots for each gather merge input.
519+
* Clear out the tuple table slot, and any unused pending tuples,
520+
* for each gather merge input.
469521
*/
470522
staticvoid
471-
gather_merge_clear_slots(GatherMergeState*gm_state)
523+
gather_merge_clear_tuples(GatherMergeState*gm_state)
472524
{
473525
inti;
474526

475527
for (i=0;i<gm_state->nreaders;i++)
476528
{
477-
pfree(gm_state->gm_tuple_buffers[i].tuple);
478-
ExecClearTuple(gm_state->gm_slots[i]);
479-
}
529+
GMReaderTupleBuffer*tuple_buffer=&gm_state->gm_tuple_buffers[i];
480530

481-
/* Free tuple array as we don't need it any more */
482-
pfree(gm_state->gm_tuple_buffers);
483-
/* Free the binaryheap, which was created for sort */
484-
binaryheap_free(gm_state->gm_heap);
531+
while (tuple_buffer->readCounter<tuple_buffer->nTuples)
532+
heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
533+
534+
ExecClearTuple(gm_state->gm_slots[i+1]);
535+
}
485536
}
486537

487538
/*
@@ -524,7 +575,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
524575
if (binaryheap_empty(gm_state->gm_heap))
525576
{
526577
/* All the queues are exhausted, and so is the heap */
527-
gather_merge_clear_slots(gm_state);
578+
gather_merge_clear_tuples(gm_state);
528579
returnNULL;
529580
}
530581
else
@@ -546,10 +597,10 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
546597
inti;
547598

548599
/* Don't do anything if this is the leader. */
549-
if (reader==gm_state->nreaders)
600+
if (reader==0)
550601
return;
551602

552-
tuple_buffer=&gm_state->gm_tuple_buffers[reader];
603+
tuple_buffer=&gm_state->gm_tuple_buffers[reader-1];
553604

554605
/* If there's nothing in the array, reset the counters to zero. */
555606
if (tuple_buffer->nTuples==tuple_buffer->readCounter)
@@ -588,7 +639,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
588639
* If we're being asked to generate a tuple from the leader, then we just
589640
* call ExecProcNode as normal to produce one.
590641
*/
591-
if (gm_state->nreaders==reader)
642+
if (reader==0)
592643
{
593644
if (gm_state->need_to_scan_locally)
594645
{
@@ -599,7 +650,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
599650

600651
if (!TupIsNull(outerTupleSlot))
601652
{
602-
gm_state->gm_slots[reader]=outerTupleSlot;
653+
gm_state->gm_slots[0]=outerTupleSlot;
603654
return true;
604655
}
605656
/* need_to_scan_locally serves as "done" flag for leader */
@@ -609,7 +660,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
609660
}
610661

611662
/* Otherwise, check the state of the relevant tuple buffer. */
612-
tuple_buffer=&gm_state->gm_tuple_buffers[reader];
663+
tuple_buffer=&gm_state->gm_tuple_buffers[reader-1];
613664

614665
if (tuple_buffer->nTuples>tuple_buffer->readCounter)
615666
{
@@ -619,8 +670,8 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
619670
elseif (tuple_buffer->done)
620671
{
621672
/* Reader is known to be exhausted. */
622-
DestroyTupleQueueReader(gm_state->reader[reader]);
623-
gm_state->reader[reader]=NULL;
673+
DestroyTupleQueueReader(gm_state->reader[reader-1]);
674+
gm_state->reader[reader-1]=NULL;
624675
return false;
625676
}
626677
else
@@ -647,14 +698,14 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
647698
ExecStoreTuple(tup,/* tuple to store */
648699
gm_state->gm_slots[reader],/* slot in which to store the
649700
* tuple */
650-
InvalidBuffer,/* buffer associated with this tuple */
651-
true);/* pfreethis pointer if not from heap */
701+
InvalidBuffer,/*nobuffer associated with tuple */
702+
true);/* pfreetuple when done with it */
652703

653704
return true;
654705
}
655706

656707
/*
657-
* Attempt to read a tuple from givenreader.
708+
* Attempt to read a tuple from givenworker.
658709
*/
659710
staticHeapTuple
660711
gm_readnext_tuple(GatherMergeState*gm_state,intnreader,boolnowait,
@@ -669,7 +720,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
669720
CHECK_FOR_INTERRUPTS();
670721

671722
/* Attempt to read a tuple. */
672-
reader=gm_state->reader[nreader];
723+
reader=gm_state->reader[nreader-1];
673724

674725
/* Run TupleQueueReaders in per-tuple context */
675726
tupleContext=gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;

‎src/include/nodes/execnodes.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1944,7 +1944,8 @@ typedef struct GatherMergeState
19441944
intgm_nkeys;/* number of sort columns */
19451945
SortSupportgm_sortkeys;/* array of length gm_nkeys */
19461946
structParallelExecutorInfo*pei;
1947-
/* all remaining fields are reinitialized during a rescan: */
1947+
/* all remaining fields are reinitialized during a rescan */
1948+
/* (but the arrays are not reallocated, just cleared) */
19481949
intnworkers_launched;/* original number of workers */
19491950
intnreaders;/* number of active workers */
19501951
TupleTableSlot**gm_slots;/* array with nreaders+1 entries */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp