Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit51daa7b

Browse files
committed
Improve division of labor between execParallel.c and nodeGather[Merge].c.
Move the responsibility for creating/destroying TupleQueueReaders intoexecParallel.c, to avoid duplicative coding in nodeGather.c andnodeGatherMerge.c. Also, instead of having DestroyTupleQueueReader doshm_mq_detach, do it in the caller (which is now only ExecParallelFinish).This means execParallel.c does both the attaching and detaching of thetuple-queue-reader shm_mqs, which seems less weird than the previousarrangement.These changes also eliminate a vestigial memory leak (of the pei->tqueuearray). It's now demonstrable that rescans of Gather or GatherMerge don'tleak memory.Discussion:https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
1 parentc039ba0 commit51daa7b

File tree

5 files changed

+119
-89
lines changed

5 files changed

+119
-89
lines changed

‎src/backend/executor/execParallel.c

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -534,9 +534,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
534534
shm_toc_insert(pcxt->toc,PARALLEL_KEY_BUFFER_USAGE,bufusage_space);
535535
pei->buffer_usage=bufusage_space;
536536

537-
/* Set up tuple queues. */
537+
/* Set upthetuple queues that the workers will write into. */
538538
pei->tqueue=ExecParallelSetupTupleQueues(pcxt, false);
539539

540+
/* We don't need the TupleQueueReaders yet, though. */
541+
pei->reader=NULL;
542+
540543
/*
541544
* If instrumentation options were supplied, allocate space for the data.
542545
* It only gets partially initialized here; the rest happens during
@@ -603,6 +606,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
603606
returnpei;
604607
}
605608

609+
/*
610+
* Set up tuple queue readers to read the results of a parallel subplan.
611+
* All the workers are expected to return tuples matching tupDesc.
612+
*
613+
* This is separate from ExecInitParallelPlan() because we can launch the
614+
* worker processes and let them start doing something before we do this.
615+
*/
616+
void
617+
ExecParallelCreateReaders(ParallelExecutorInfo*pei,
618+
TupleDesctupDesc)
619+
{
620+
intnworkers=pei->pcxt->nworkers_launched;
621+
inti;
622+
623+
Assert(pei->reader==NULL);
624+
625+
if (nworkers>0)
626+
{
627+
pei->reader= (TupleQueueReader**)
628+
palloc(nworkers*sizeof(TupleQueueReader*));
629+
630+
for (i=0;i<nworkers;i++)
631+
{
632+
shm_mq_set_handle(pei->tqueue[i],
633+
pei->pcxt->worker[i].bgwhandle);
634+
pei->reader[i]=CreateTupleQueueReader(pei->tqueue[i],
635+
tupDesc);
636+
}
637+
}
638+
}
639+
606640
/*
607641
* Re-initialize the parallel executor shared memory state before launching
608642
* a fresh batch of workers.
@@ -616,6 +650,7 @@ ExecParallelReinitialize(PlanState *planstate,
616650

617651
ReinitializeParallelDSM(pei->pcxt);
618652
pei->tqueue=ExecParallelSetupTupleQueues(pei->pcxt, true);
653+
pei->reader=NULL;
619654
pei->finished= false;
620655

621656
/* Traverse plan tree and let each child node reset associated state. */
@@ -741,16 +776,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
741776
void
742777
ExecParallelFinish(ParallelExecutorInfo*pei)
743778
{
779+
intnworkers=pei->pcxt->nworkers_launched;
744780
inti;
745781

782+
/* Make this be a no-op if called twice in a row. */
746783
if (pei->finished)
747784
return;
748785

749-
/* First, wait for the workers to finish. */
786+
/*
787+
* Detach from tuple queues ASAP, so that any still-active workers will
788+
* notice that no further results are wanted.
789+
*/
790+
if (pei->tqueue!=NULL)
791+
{
792+
for (i=0;i<nworkers;i++)
793+
shm_mq_detach(pei->tqueue[i]);
794+
pfree(pei->tqueue);
795+
pei->tqueue=NULL;
796+
}
797+
798+
/*
799+
* While we're waiting for the workers to finish, let's get rid of the
800+
* tuple queue readers. (Any other local cleanup could be done here too.)
801+
*/
802+
if (pei->reader!=NULL)
803+
{
804+
for (i=0;i<nworkers;i++)
805+
DestroyTupleQueueReader(pei->reader[i]);
806+
pfree(pei->reader);
807+
pei->reader=NULL;
808+
}
809+
810+
/* Now wait for the workers to finish. */
750811
WaitForParallelWorkersToFinish(pei->pcxt);
751812

752-
/* Next, accumulate buffer usage. */
753-
for (i=0;i<pei->pcxt->nworkers_launched;++i)
813+
/*
814+
* Next, accumulate buffer usage. (This must wait for the workers to
815+
* finish, or we might get incomplete data.)
816+
*/
817+
for (i=0;i<nworkers;i++)
754818
InstrAccumParallelQuery(&pei->buffer_usage[i]);
755819

756820
/* Finally, accumulate instrumentation, if any. */

‎src/backend/executor/nodeGather.c

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ ExecGather(PlanState *pstate)
130130
{
131131
GatherState*node=castNode(GatherState,pstate);
132132
TupleTableSlot*fslot=node->funnel_slot;
133-
inti;
134133
TupleTableSlot*slot;
135134
ExprContext*econtext;
136135

@@ -173,33 +172,30 @@ ExecGather(PlanState *pstate)
173172
LaunchParallelWorkers(pcxt);
174173
/* We save # workers launched for the benefit of EXPLAIN */
175174
node->nworkers_launched=pcxt->nworkers_launched;
176-
node->nreaders=0;
177-
node->nextreader=0;
178175

179176
/* Set up tuple queue readers to read the results. */
180177
if (pcxt->nworkers_launched>0)
181178
{
182-
node->reader=palloc(pcxt->nworkers_launched*
183-
sizeof(TupleQueueReader*));
184-
185-
for (i=0;i<pcxt->nworkers_launched;++i)
186-
{
187-
shm_mq_set_handle(node->pei->tqueue[i],
188-
pcxt->worker[i].bgwhandle);
189-
node->reader[node->nreaders++]=
190-
CreateTupleQueueReader(node->pei->tqueue[i],
191-
fslot->tts_tupleDescriptor);
192-
}
179+
ExecParallelCreateReaders(node->pei,
180+
fslot->tts_tupleDescriptor);
181+
/* Make a working array showing the active readers */
182+
node->nreaders=pcxt->nworkers_launched;
183+
node->reader= (TupleQueueReader**)
184+
palloc(node->nreaders*sizeof(TupleQueueReader*));
185+
memcpy(node->reader,node->pei->reader,
186+
node->nreaders*sizeof(TupleQueueReader*));
193187
}
194188
else
195189
{
196190
/* No workers?Then never mind. */
197-
ExecShutdownGatherWorkers(node);
191+
node->nreaders=0;
192+
node->reader=NULL;
198193
}
194+
node->nextreader=0;
199195
}
200196

201197
/* Run plan locally if no workers or not single-copy. */
202-
node->need_to_scan_locally= (node->reader==NULL)
198+
node->need_to_scan_locally= (node->nreaders==0)
203199
|| !gather->single_copy;
204200
node->initialized= true;
205201
}
@@ -258,11 +254,11 @@ gather_getnext(GatherState *gatherstate)
258254
MemoryContexttupleContext=gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
259255
HeapTupletup;
260256

261-
while (gatherstate->reader!=NULL||gatherstate->need_to_scan_locally)
257+
while (gatherstate->nreaders>0||gatherstate->need_to_scan_locally)
262258
{
263259
CHECK_FOR_INTERRUPTS();
264260

265-
if (gatherstate->reader!=NULL)
261+
if (gatherstate->nreaders>0)
266262
{
267263
MemoryContextoldContext;
268264

@@ -319,19 +315,15 @@ gather_readnext(GatherState *gatherstate)
319315
tup=TupleQueueReaderNext(reader, true,&readerdone);
320316

321317
/*
322-
* If this reader is done, remove it, and collapse the array. If all
323-
* readersare done,clean up remaining worker state.
318+
* If this reader is done, remove it from our working array of active
319+
* readers. If all readersare done,we're outta here.
324320
*/
325321
if (readerdone)
326322
{
327323
Assert(!tup);
328-
DestroyTupleQueueReader(reader);
329324
--gatherstate->nreaders;
330325
if (gatherstate->nreaders==0)
331-
{
332-
ExecShutdownGatherWorkers(gatherstate);
333326
returnNULL;
334-
}
335327
memmove(&gatherstate->reader[gatherstate->nextreader],
336328
&gatherstate->reader[gatherstate->nextreader+1],
337329
sizeof(TupleQueueReader*)
@@ -378,37 +370,25 @@ gather_readnext(GatherState *gatherstate)
378370
/* ----------------------------------------------------------------
379371
*ExecShutdownGatherWorkers
380372
*
381-
*Destroy the parallel workers. Collect all the stats after
382-
*workers are stopped, else some work done by workers won't be
383-
*accounted.
373+
*Stop all the parallel workers.
384374
* ----------------------------------------------------------------
385375
*/
386376
staticvoid
387377
ExecShutdownGatherWorkers(GatherState*node)
388378
{
389-
/* Shut down tuple queue readers before shutting down workers. */
390-
if (node->reader!=NULL)
391-
{
392-
inti;
393-
394-
for (i=0;i<node->nreaders;++i)
395-
DestroyTupleQueueReader(node->reader[i]);
396-
397-
pfree(node->reader);
398-
node->reader=NULL;
399-
}
400-
401-
/* Now shut down the workers. */
402379
if (node->pei!=NULL)
403380
ExecParallelFinish(node->pei);
381+
382+
/* Flush local copy of reader array */
383+
if (node->reader)
384+
pfree(node->reader);
385+
node->reader=NULL;
404386
}
405387

406388
/* ----------------------------------------------------------------
407389
*ExecShutdownGather
408390
*
409391
*Destroy the setup for parallel workers including parallel context.
410-
*Collect all the stats after workers are stopped, else some work
411-
*done by workers won't be accounted.
412392
* ----------------------------------------------------------------
413393
*/
414394
void

‎src/backend/executor/nodeGatherMerge.c

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ ExecGatherMerge(PlanState *pstate)
178178
GatherMergeState*node=castNode(GatherMergeState,pstate);
179179
TupleTableSlot*slot;
180180
ExprContext*econtext;
181-
inti;
182181

183182
CHECK_FOR_INTERRUPTS();
184183

@@ -214,27 +213,23 @@ ExecGatherMerge(PlanState *pstate)
214213
LaunchParallelWorkers(pcxt);
215214
/* We save # workers launched for the benefit of EXPLAIN */
216215
node->nworkers_launched=pcxt->nworkers_launched;
217-
node->nreaders=0;
218216

219217
/* Set up tuple queue readers to read the results. */
220218
if (pcxt->nworkers_launched>0)
221219
{
222-
node->reader=palloc(pcxt->nworkers_launched*
223-
sizeof(TupleQueueReader*));
224-
225-
for (i=0;i<pcxt->nworkers_launched;++i)
226-
{
227-
shm_mq_set_handle(node->pei->tqueue[i],
228-
pcxt->worker[i].bgwhandle);
229-
node->reader[node->nreaders++]=
230-
CreateTupleQueueReader(node->pei->tqueue[i],
231-
node->tupDesc);
232-
}
220+
ExecParallelCreateReaders(node->pei,node->tupDesc);
221+
/* Make a working array showing the active readers */
222+
node->nreaders=pcxt->nworkers_launched;
223+
node->reader= (TupleQueueReader**)
224+
palloc(node->nreaders*sizeof(TupleQueueReader*));
225+
memcpy(node->reader,node->pei->reader,
226+
node->nreaders*sizeof(TupleQueueReader*));
233227
}
234228
else
235229
{
236230
/* No workers?Then never mind. */
237-
ExecShutdownGatherMergeWorkers(node);
231+
node->nreaders=0;
232+
node->reader=NULL;
238233
}
239234
}
240235

@@ -284,8 +279,6 @@ ExecEndGatherMerge(GatherMergeState *node)
284279
*ExecShutdownGatherMerge
285280
*
286281
*Destroy the setup for parallel workers including parallel context.
287-
*Collect all the stats after workers are stopped, else some work
288-
*done by workers won't be accounted.
289282
* ----------------------------------------------------------------
290283
*/
291284
void
@@ -304,30 +297,19 @@ ExecShutdownGatherMerge(GatherMergeState *node)
304297
/* ----------------------------------------------------------------
305298
*ExecShutdownGatherMergeWorkers
306299
*
307-
*Destroy the parallel workers. Collect all the stats after
308-
*workers are stopped, else some work done by workers won't be
309-
*accounted.
300+
*Stop all the parallel workers.
310301
* ----------------------------------------------------------------
311302
*/
312303
staticvoid
313304
ExecShutdownGatherMergeWorkers(GatherMergeState*node)
314305
{
315-
/* Shut down tuple queue readers before shutting down workers. */
316-
if (node->reader!=NULL)
317-
{
318-
inti;
319-
320-
for (i=0;i<node->nreaders;++i)
321-
if (node->reader[i])
322-
DestroyTupleQueueReader(node->reader[i]);
323-
324-
pfree(node->reader);
325-
node->reader=NULL;
326-
}
327-
328-
/* Now shut down the workers. */
329306
if (node->pei!=NULL)
330307
ExecParallelFinish(node->pei);
308+
309+
/* Flush local copy of reader array */
310+
if (node->reader)
311+
pfree(node->reader);
312+
node->reader=NULL;
331313
}
332314

333315
/* ----------------------------------------------------------------
@@ -672,8 +654,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
672654
elseif (tuple_buffer->done)
673655
{
674656
/* Reader is known to be exhausted. */
675-
DestroyTupleQueueReader(gm_state->reader[reader-1]);
676-
gm_state->reader[reader-1]=NULL;
677657
return false;
678658
}
679659
else

‎src/backend/executor/tqueue.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
651651

652652
/*
653653
* Destroy a tuple queue reader.
654+
*
655+
* Note: cleaning up the underlying shm_mq is the caller's responsibility.
656+
* We won't access it here, as it may be detached already.
654657
*/
655658
void
656659
DestroyTupleQueueReader(TupleQueueReader*reader)
657660
{
658-
shm_mq_detach(reader->queue);
659661
if (reader->typmodmap!=NULL)
660662
hash_destroy(reader->typmodmap);
661663
/* Is it worth trying to free substructure of the remap tree? */

‎src/include/executor/execParallel.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
2323

2424
typedefstructParallelExecutorInfo
2525
{
26-
PlanState*planstate;
27-
ParallelContext*pcxt;
28-
BufferUsage*buffer_usage;
29-
SharedExecutorInstrumentation*instrumentation;
30-
shm_mq_handle**tqueue;
31-
dsa_area*area;
32-
boolfinished;
26+
PlanState*planstate;/* plan subtree we're running in parallel */
27+
ParallelContext*pcxt;/* parallel context we're using */
28+
BufferUsage*buffer_usage;/* points to bufusage area in DSM */
29+
SharedExecutorInstrumentation*instrumentation;/* optional */
30+
dsa_area*area;/* points to DSA area in DSM */
31+
boolfinished;/* set true by ExecParallelFinish */
32+
/* These two arrays have pcxt->nworkers_launched entries: */
33+
shm_mq_handle**tqueue;/* tuple queues for worker output */
34+
structTupleQueueReader**reader;/* tuple reader/writer support */
3335
}ParallelExecutorInfo;
3436

3537
externParallelExecutorInfo*ExecInitParallelPlan(PlanState*planstate,
3638
EState*estate,intnworkers,int64tuples_needed);
39+
externvoidExecParallelCreateReaders(ParallelExecutorInfo*pei,
40+
TupleDesctupDesc);
3741
externvoidExecParallelFinish(ParallelExecutorInfo*pei);
3842
externvoidExecParallelCleanup(ParallelExecutorInfo*pei);
3943
externvoidExecParallelReinitialize(PlanState*planstate,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp