@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
5555static TupleTableSlot * gather_merge_getnext (GatherMergeState * gm_state );
5656static HeapTuple gm_readnext_tuple (GatherMergeState * gm_state ,int nreader ,
5757bool nowait ,bool * done );
58- static void gather_merge_init (GatherMergeState * gm_state );
5958static void ExecShutdownGatherMergeWorkers (GatherMergeState * node );
59+ static void gather_merge_setup (GatherMergeState * gm_state );
60+ static void gather_merge_init (GatherMergeState * gm_state );
61+ static void gather_merge_clear_tuples (GatherMergeState * gm_state );
6062static bool gather_merge_readnext (GatherMergeState * gm_state ,int reader ,
6163bool nowait );
6264static void load_tuple_array (GatherMergeState * gm_state ,int reader );
@@ -149,14 +151,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
149151}
150152
151153/*
152- *store the tuple descriptor into gather merge state, so we can use it
153- *later while initializing the gather merge slots.
154+ *Store the tuple descriptor into gather merge state, so we can use it
155+ * while initializing the gather merge slots.
154156 */
155157if (!ExecContextForcesOids (& gm_state -> ps ,& hasoid ))
156158hasoid = false;
157159tupDesc = ExecTypeFromTL (outerNode -> targetlist ,hasoid );
158160gm_state -> tupDesc = tupDesc ;
159161
162+ /* Now allocate the workspace for gather merge */
163+ gather_merge_setup (gm_state );
164+
160165return gm_state ;
161166}
162167
@@ -340,6 +345,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
340345/* Make sure any existing workers are gracefully shut down */
341346ExecShutdownGatherMergeWorkers (node );
342347
348+ /* Free any unused tuples, so we don't leak memory across rescans */
349+ gather_merge_clear_tuples (node );
350+
343351/* Mark node so that shared state will be rebuilt at next call */
344352node -> initialized = false;
345353node -> gm_initialized = false;
@@ -370,49 +378,93 @@ ExecReScanGatherMerge(GatherMergeState *node)
370378}
371379
372380/*
373- * Initialize the Gather merge tuple read.
381+ * Set up the data structures that we'll need for Gather Merge.
382+ *
383+ * We allocate these once on the basis of gm->num_workers, which is an
384+ * upper bound for the number of workers we'll actually have. During
385+ * a rescan, we reset the structures to empty. This approach simplifies
386+ * not leaking memory across rescans.
374387 *
375- * Pull at least a single tuple from each worker + leader and set up the heap.
388+ * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
389+ * are for workers. The values placed into gm_heap correspond to indexes
390+ * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
391+ * 0 to n-1; it has no entry for the leader.
376392 */
377393static void
378- gather_merge_init (GatherMergeState * gm_state )
394+ gather_merge_setup (GatherMergeState * gm_state )
379395{
380- int nreaders = gm_state -> nreaders ;
381- bool nowait = true ;
396+ GatherMerge * gm = castNode ( GatherMerge , gm_state -> ps . plan ) ;
397+ int nreaders = gm -> num_workers ;
382398int i ;
383399
384400/*
385401 * Allocate gm_slots for the number of workers + one more slot for leader.
386- * Last slot is always for leader. Leader always calls ExecProcNode() to
387- * read the tuple which will return the TupleTableSlot. Later it will
388- * directly get assigned to gm_slot. So just initialize leader gm_slot
389- * with NULL. For other slots, code below will call
390- * ExecInitExtraTupleSlot() to create a slot for the worker's results.
402+ * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
403+ * read the tuple, and then stores it directly into its gm_slots entry.
404+ * For other slots, code below will call ExecInitExtraTupleSlot() to
405+ * create a slot for the worker's results. Note that during any single
406+ * scan, we might have fewer than num_workers available workers, in which
407+ * case the extra array entries go unused.
391408 */
392- gm_state -> gm_slots =
393- palloc ((gm_state -> nreaders + 1 )* sizeof (TupleTableSlot * ));
394- gm_state -> gm_slots [gm_state -> nreaders ]= NULL ;
395-
396- /* Initialize the tuple slot and tuple array for each worker */
397- gm_state -> gm_tuple_buffers =
398- (GMReaderTupleBuffer * )palloc0 (sizeof (GMReaderTupleBuffer )*
399- gm_state -> nreaders );
400- for (i = 0 ;i < gm_state -> nreaders ;i ++ )
409+ gm_state -> gm_slots = (TupleTableSlot * * )
410+ palloc0 ((nreaders + 1 )* sizeof (TupleTableSlot * ));
411+
412+ /* Allocate the tuple slot and tuple array for each worker */
413+ gm_state -> gm_tuple_buffers = (GMReaderTupleBuffer * )
414+ palloc0 (nreaders * sizeof (GMReaderTupleBuffer ));
415+
416+ for (i = 0 ;i < nreaders ;i ++ )
401417{
402418/* Allocate the tuple array with length MAX_TUPLE_STORE */
403419gm_state -> gm_tuple_buffers [i ].tuple =
404420(HeapTuple * )palloc0 (sizeof (HeapTuple )* MAX_TUPLE_STORE );
405421
406- /* Initialize slot for worker */
407- gm_state -> gm_slots [i ]= ExecInitExtraTupleSlot (gm_state -> ps .state );
408- ExecSetSlotDescriptor (gm_state -> gm_slots [i ],
422+ /* Initializetuple slot for worker */
423+ gm_state -> gm_slots [i + 1 ]= ExecInitExtraTupleSlot (gm_state -> ps .state );
424+ ExecSetSlotDescriptor (gm_state -> gm_slots [i + 1 ],
409425gm_state -> tupDesc );
410426}
411427
412428/* Allocate the resources for the merge */
413- gm_state -> gm_heap = binaryheap_allocate (gm_state -> nreaders + 1 ,
429+ gm_state -> gm_heap = binaryheap_allocate (nreaders + 1 ,
414430heap_compare_slots ,
415431gm_state );
432+ }
433+
434+ /*
435+ * Initialize the Gather Merge.
436+ *
437+ * Reset data structures to ensure they're empty. Then pull at least one
438+ * tuple from leader + each worker (or set its "done" indicator), and set up
439+ * the heap.
440+ */
441+ static void
442+ gather_merge_init (GatherMergeState * gm_state )
443+ {
444+ int nreaders = gm_state -> nreaders ;
445+ bool nowait = true;
446+ int i ;
447+
448+ /* Assert that gather_merge_setup made enough space */
449+ Assert (nreaders <=castNode (GatherMerge ,gm_state -> ps .plan )-> num_workers );
450+
451+ /* Reset leader's tuple slot to empty */
452+ gm_state -> gm_slots [0 ]= NULL ;
453+
454+ /* Reset the tuple slot and tuple array for each worker */
455+ for (i = 0 ;i < nreaders ;i ++ )
456+ {
457+ /* Reset tuple array to empty */
458+ gm_state -> gm_tuple_buffers [i ].nTuples = 0 ;
459+ gm_state -> gm_tuple_buffers [i ].readCounter = 0 ;
460+ /* Reset done flag to not-done */
461+ gm_state -> gm_tuple_buffers [i ].done = false;
462+ /* Ensure output slot is empty */
463+ ExecClearTuple (gm_state -> gm_slots [i + 1 ]);
464+ }
465+
466+ /* Reset binary heap to empty */
467+ binaryheap_reset (gm_state -> gm_heap );
416468
417469/*
418470 * First, try to read a tuple from each worker (including leader) in
@@ -422,14 +474,13 @@ gather_merge_init(GatherMergeState *gm_state)
422474 * least one tuple) to the heap.
423475 */
424476reread :
425- for (i = 0 ;i < nreaders + 1 ;i ++ )
477+ for (i = 0 ;i <= nreaders ;i ++ )
426478{
427479CHECK_FOR_INTERRUPTS ();
428480
429- /* ignore this source if already known done */
430- if ((i < nreaders ) ?
431- !gm_state -> gm_tuple_buffers [i ].done :
432- gm_state -> need_to_scan_locally )
481+ /* skip this source if already known done */
482+ if ((i == 0 ) ?gm_state -> need_to_scan_locally :
483+ !gm_state -> gm_tuple_buffers [i - 1 ].done )
433484{
434485if (TupIsNull (gm_state -> gm_slots [i ]))
435486{
@@ -450,9 +501,9 @@ gather_merge_init(GatherMergeState *gm_state)
450501}
451502
452503/* need not recheck leader, since nowait doesn't matter for it */
453- for (i = 0 ;i < nreaders ;i ++ )
504+ for (i = 1 ;i <= nreaders ;i ++ )
454505{
455- if (!gm_state -> gm_tuple_buffers [i ].done &&
506+ if (!gm_state -> gm_tuple_buffers [i - 1 ].done &&
456507TupIsNull (gm_state -> gm_slots [i ]))
457508{
458509nowait = false;
@@ -467,23 +518,23 @@ gather_merge_init(GatherMergeState *gm_state)
467518}
468519
469520/*
470- * Clear out the tuple table slots for each gather merge input.
521+ * Clear out the tuple table slot, and any unused pending tuples,
522+ * for each gather merge input.
471523 */
472524static void
473- gather_merge_clear_slots (GatherMergeState * gm_state )
525+ gather_merge_clear_tuples (GatherMergeState * gm_state )
474526{
475527int i ;
476528
477529for (i = 0 ;i < gm_state -> nreaders ;i ++ )
478530{
479- pfree (gm_state -> gm_tuple_buffers [i ].tuple );
480- ExecClearTuple (gm_state -> gm_slots [i ]);
481- }
531+ GMReaderTupleBuffer * tuple_buffer = & gm_state -> gm_tuple_buffers [i ];
482532
483- /* Free tuple array as we don't need it any more */
484- pfree (gm_state -> gm_tuple_buffers );
485- /* Free the binaryheap, which was created for sort */
486- binaryheap_free (gm_state -> gm_heap );
533+ while (tuple_buffer -> readCounter < tuple_buffer -> nTuples )
534+ heap_freetuple (tuple_buffer -> tuple [tuple_buffer -> readCounter ++ ]);
535+
536+ ExecClearTuple (gm_state -> gm_slots [i + 1 ]);
537+ }
487538}
488539
489540/*
@@ -526,7 +577,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
526577if (binaryheap_empty (gm_state -> gm_heap ))
527578{
528579/* All the queues are exhausted, and so is the heap */
529- gather_merge_clear_slots (gm_state );
580+ gather_merge_clear_tuples (gm_state );
530581return NULL ;
531582}
532583else
@@ -548,10 +599,10 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
548599int i ;
549600
550601/* Don't do anything if this is the leader. */
551- if (reader == gm_state -> nreaders )
602+ if (reader == 0 )
552603return ;
553604
554- tuple_buffer = & gm_state -> gm_tuple_buffers [reader ];
605+ tuple_buffer = & gm_state -> gm_tuple_buffers [reader - 1 ];
555606
556607/* If there's nothing in the array, reset the counters to zero. */
557608if (tuple_buffer -> nTuples == tuple_buffer -> readCounter )
@@ -590,7 +641,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
590641 * If we're being asked to generate a tuple from the leader, then we just
591642 * call ExecProcNode as normal to produce one.
592643 */
593- if (gm_state -> nreaders == reader )
644+ if (reader == 0 )
594645{
595646if (gm_state -> need_to_scan_locally )
596647{
@@ -601,7 +652,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
601652
602653if (!TupIsNull (outerTupleSlot ))
603654{
604- gm_state -> gm_slots [reader ]= outerTupleSlot ;
655+ gm_state -> gm_slots [0 ]= outerTupleSlot ;
605656return true;
606657}
607658/* need_to_scan_locally serves as "done" flag for leader */
@@ -611,7 +662,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
611662}
612663
613664/* Otherwise, check the state of the relevant tuple buffer. */
614- tuple_buffer = & gm_state -> gm_tuple_buffers [reader ];
665+ tuple_buffer = & gm_state -> gm_tuple_buffers [reader - 1 ];
615666
616667if (tuple_buffer -> nTuples > tuple_buffer -> readCounter )
617668{
@@ -621,8 +672,8 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
621672else if (tuple_buffer -> done )
622673{
623674/* Reader is known to be exhausted. */
624- DestroyTupleQueueReader (gm_state -> reader [reader ]);
625- gm_state -> reader [reader ]= NULL ;
675+ DestroyTupleQueueReader (gm_state -> reader [reader - 1 ]);
676+ gm_state -> reader [reader - 1 ]= NULL ;
626677return false;
627678}
628679else
@@ -649,14 +700,14 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
649700ExecStoreTuple (tup ,/* tuple to store */
650701gm_state -> gm_slots [reader ],/* slot in which to store the
651702 * tuple */
652- InvalidBuffer ,/* buffer associated with this tuple */
653- true);/* pfreethis pointer if not from heap */
703+ InvalidBuffer ,/*no buffer associated with tuple */
704+ true);/* pfreetuple when done with it */
654705
655706return true;
656707}
657708
658709/*
659- * Attempt to read a tuple from givenreader .
710+ * Attempt to read a tuple from givenworker .
660711 */
661712static HeapTuple
662713gm_readnext_tuple (GatherMergeState * gm_state ,int nreader ,bool nowait ,
@@ -671,7 +722,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
671722CHECK_FOR_INTERRUPTS ();
672723
673724/* Attempt to read a tuple. */
674- reader = gm_state -> reader [nreader ];
725+ reader = gm_state -> reader [nreader - 1 ];
675726
676727/* Run TupleQueueReaders in per-tuple context */
677728tupleContext = gm_state -> ps .ps_ExprContext -> ecxt_per_tuple_memory ;