Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit01edb5c

Browse files
committed
Improve division of labor between execParallel.c and nodeGather[Merge].c.
Move the responsibility for creating/destroying TupleQueueReaders intoexecParallel.c, to avoid duplicative coding in nodeGather.c andnodeGatherMerge.c. Also, instead of having DestroyTupleQueueReader doshm_mq_detach, do it in the caller (which is now only ExecParallelFinish).This means execParallel.c does both the attaching and detaching of thetuple-queue-reader shm_mqs, which seems less weird than the previousarrangement.These changes also eliminate a vestigial memory leak (of the pei->tqueuearray). It's now demonstrable that rescans of Gather or GatherMerge don'tleak memory.Discussion:https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
1 parentf2fe1cb commit01edb5c

File tree

5 files changed

+119
-89
lines changed

5 files changed

+119
-89
lines changed

‎src/backend/executor/execParallel.c

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -498,9 +498,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
498498
shm_toc_insert(pcxt->toc,PARALLEL_KEY_BUFFER_USAGE,bufusage_space);
499499
pei->buffer_usage=bufusage_space;
500500

501-
/* Set up tuple queues. */
501+
/* Set upthetuple queues that the workers will write into. */
502502
pei->tqueue=ExecParallelSetupTupleQueues(pcxt, false);
503503

504+
/* We don't need the TupleQueueReaders yet, though. */
505+
pei->reader=NULL;
506+
504507
/*
505508
* If instrumentation options were supplied, allocate space for the data.
506509
* It only gets partially initialized here; the rest happens during
@@ -567,6 +570,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
567570
returnpei;
568571
}
569572

573+
/*
574+
* Set up tuple queue readers to read the results of a parallel subplan.
575+
* All the workers are expected to return tuples matching tupDesc.
576+
*
577+
* This is separate from ExecInitParallelPlan() because we can launch the
578+
* worker processes and let them start doing something before we do this.
579+
*/
580+
void
581+
ExecParallelCreateReaders(ParallelExecutorInfo*pei,
582+
TupleDesctupDesc)
583+
{
584+
intnworkers=pei->pcxt->nworkers_launched;
585+
inti;
586+
587+
Assert(pei->reader==NULL);
588+
589+
if (nworkers>0)
590+
{
591+
pei->reader= (TupleQueueReader**)
592+
palloc(nworkers*sizeof(TupleQueueReader*));
593+
594+
for (i=0;i<nworkers;i++)
595+
{
596+
shm_mq_set_handle(pei->tqueue[i],
597+
pei->pcxt->worker[i].bgwhandle);
598+
pei->reader[i]=CreateTupleQueueReader(pei->tqueue[i],
599+
tupDesc);
600+
}
601+
}
602+
}
603+
570604
/*
571605
* Re-initialize the parallel executor shared memory state before launching
572606
* a fresh batch of workers.
@@ -580,6 +614,7 @@ ExecParallelReinitialize(PlanState *planstate,
580614

581615
ReinitializeParallelDSM(pei->pcxt);
582616
pei->tqueue=ExecParallelSetupTupleQueues(pei->pcxt, true);
617+
pei->reader=NULL;
583618
pei->finished= false;
584619

585620
/* Traverse plan tree and let each child node reset associated state. */
@@ -691,16 +726,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
691726
void
692727
ExecParallelFinish(ParallelExecutorInfo*pei)
693728
{
729+
intnworkers=pei->pcxt->nworkers_launched;
694730
inti;
695731

732+
/* Make this be a no-op if called twice in a row. */
696733
if (pei->finished)
697734
return;
698735

699-
/* First, wait for the workers to finish. */
736+
/*
737+
* Detach from tuple queues ASAP, so that any still-active workers will
738+
* notice that no further results are wanted.
739+
*/
740+
if (pei->tqueue!=NULL)
741+
{
742+
for (i=0;i<nworkers;i++)
743+
shm_mq_detach(pei->tqueue[i]);
744+
pfree(pei->tqueue);
745+
pei->tqueue=NULL;
746+
}
747+
748+
/*
749+
* While we're waiting for the workers to finish, let's get rid of the
750+
* tuple queue readers. (Any other local cleanup could be done here too.)
751+
*/
752+
if (pei->reader!=NULL)
753+
{
754+
for (i=0;i<nworkers;i++)
755+
DestroyTupleQueueReader(pei->reader[i]);
756+
pfree(pei->reader);
757+
pei->reader=NULL;
758+
}
759+
760+
/* Now wait for the workers to finish. */
700761
WaitForParallelWorkersToFinish(pei->pcxt);
701762

702-
/* Next, accumulate buffer usage. */
703-
for (i=0;i<pei->pcxt->nworkers_launched;++i)
763+
/*
764+
* Next, accumulate buffer usage. (This must wait for the workers to
765+
* finish, or we might get incomplete data.)
766+
*/
767+
for (i=0;i<nworkers;i++)
704768
InstrAccumParallelQuery(&pei->buffer_usage[i]);
705769

706770
/* Finally, accumulate instrumentation, if any. */

‎src/backend/executor/nodeGather.c

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ ExecGather(PlanState *pstate)
129129
{
130130
GatherState*node=castNode(GatherState,pstate);
131131
TupleTableSlot*fslot=node->funnel_slot;
132-
inti;
133132
TupleTableSlot*slot;
134133
ExprContext*econtext;
135134

@@ -171,33 +170,30 @@ ExecGather(PlanState *pstate)
171170
LaunchParallelWorkers(pcxt);
172171
/* We save # workers launched for the benefit of EXPLAIN */
173172
node->nworkers_launched=pcxt->nworkers_launched;
174-
node->nreaders=0;
175-
node->nextreader=0;
176173

177174
/* Set up tuple queue readers to read the results. */
178175
if (pcxt->nworkers_launched>0)
179176
{
180-
node->reader=palloc(pcxt->nworkers_launched*
181-
sizeof(TupleQueueReader*));
182-
183-
for (i=0;i<pcxt->nworkers_launched;++i)
184-
{
185-
shm_mq_set_handle(node->pei->tqueue[i],
186-
pcxt->worker[i].bgwhandle);
187-
node->reader[node->nreaders++]=
188-
CreateTupleQueueReader(node->pei->tqueue[i],
189-
fslot->tts_tupleDescriptor);
190-
}
177+
ExecParallelCreateReaders(node->pei,
178+
fslot->tts_tupleDescriptor);
179+
/* Make a working array showing the active readers */
180+
node->nreaders=pcxt->nworkers_launched;
181+
node->reader= (TupleQueueReader**)
182+
palloc(node->nreaders*sizeof(TupleQueueReader*));
183+
memcpy(node->reader,node->pei->reader,
184+
node->nreaders*sizeof(TupleQueueReader*));
191185
}
192186
else
193187
{
194188
/* No workers?Then never mind. */
195-
ExecShutdownGatherWorkers(node);
189+
node->nreaders=0;
190+
node->reader=NULL;
196191
}
192+
node->nextreader=0;
197193
}
198194

199195
/* Run plan locally if no workers or not single-copy. */
200-
node->need_to_scan_locally= (node->reader==NULL)
196+
node->need_to_scan_locally= (node->nreaders==0)
201197
|| !gather->single_copy;
202198
node->initialized= true;
203199
}
@@ -256,11 +252,11 @@ gather_getnext(GatherState *gatherstate)
256252
MemoryContexttupleContext=gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
257253
HeapTupletup;
258254

259-
while (gatherstate->reader!=NULL||gatherstate->need_to_scan_locally)
255+
while (gatherstate->nreaders>0||gatherstate->need_to_scan_locally)
260256
{
261257
CHECK_FOR_INTERRUPTS();
262258

263-
if (gatherstate->reader!=NULL)
259+
if (gatherstate->nreaders>0)
264260
{
265261
MemoryContextoldContext;
266262

@@ -317,19 +313,15 @@ gather_readnext(GatherState *gatherstate)
317313
tup=TupleQueueReaderNext(reader, true,&readerdone);
318314

319315
/*
320-
* If this reader is done, remove it, and collapse the array. If all
321-
* readersare done,clean up remaining worker state.
316+
* If this reader is done, remove it from our working array of active
317+
* readers. If all readersare done,we're outta here.
322318
*/
323319
if (readerdone)
324320
{
325321
Assert(!tup);
326-
DestroyTupleQueueReader(reader);
327322
--gatherstate->nreaders;
328323
if (gatherstate->nreaders==0)
329-
{
330-
ExecShutdownGatherWorkers(gatherstate);
331324
returnNULL;
332-
}
333325
memmove(&gatherstate->reader[gatherstate->nextreader],
334326
&gatherstate->reader[gatherstate->nextreader+1],
335327
sizeof(TupleQueueReader*)
@@ -376,37 +368,25 @@ gather_readnext(GatherState *gatherstate)
376368
/* ----------------------------------------------------------------
377369
*ExecShutdownGatherWorkers
378370
*
379-
*Destroy the parallel workers. Collect all the stats after
380-
*workers are stopped, else some work done by workers won't be
381-
*accounted.
371+
*Stop all the parallel workers.
382372
* ----------------------------------------------------------------
383373
*/
384374
staticvoid
385375
ExecShutdownGatherWorkers(GatherState*node)
386376
{
387-
/* Shut down tuple queue readers before shutting down workers. */
388-
if (node->reader!=NULL)
389-
{
390-
inti;
391-
392-
for (i=0;i<node->nreaders;++i)
393-
DestroyTupleQueueReader(node->reader[i]);
394-
395-
pfree(node->reader);
396-
node->reader=NULL;
397-
}
398-
399-
/* Now shut down the workers. */
400377
if (node->pei!=NULL)
401378
ExecParallelFinish(node->pei);
379+
380+
/* Flush local copy of reader array */
381+
if (node->reader)
382+
pfree(node->reader);
383+
node->reader=NULL;
402384
}
403385

404386
/* ----------------------------------------------------------------
405387
*ExecShutdownGather
406388
*
407389
*Destroy the setup for parallel workers including parallel context.
408-
*Collect all the stats after workers are stopped, else some work
409-
*done by workers won't be accounted.
410390
* ----------------------------------------------------------------
411391
*/
412392
void

‎src/backend/executor/nodeGatherMerge.c

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ ExecGatherMerge(PlanState *pstate)
177177
GatherMergeState*node=castNode(GatherMergeState,pstate);
178178
TupleTableSlot*slot;
179179
ExprContext*econtext;
180-
inti;
181180

182181
CHECK_FOR_INTERRUPTS();
183182

@@ -212,27 +211,23 @@ ExecGatherMerge(PlanState *pstate)
212211
LaunchParallelWorkers(pcxt);
213212
/* We save # workers launched for the benefit of EXPLAIN */
214213
node->nworkers_launched=pcxt->nworkers_launched;
215-
node->nreaders=0;
216214

217215
/* Set up tuple queue readers to read the results. */
218216
if (pcxt->nworkers_launched>0)
219217
{
220-
node->reader=palloc(pcxt->nworkers_launched*
221-
sizeof(TupleQueueReader*));
222-
223-
for (i=0;i<pcxt->nworkers_launched;++i)
224-
{
225-
shm_mq_set_handle(node->pei->tqueue[i],
226-
pcxt->worker[i].bgwhandle);
227-
node->reader[node->nreaders++]=
228-
CreateTupleQueueReader(node->pei->tqueue[i],
229-
node->tupDesc);
230-
}
218+
ExecParallelCreateReaders(node->pei,node->tupDesc);
219+
/* Make a working array showing the active readers */
220+
node->nreaders=pcxt->nworkers_launched;
221+
node->reader= (TupleQueueReader**)
222+
palloc(node->nreaders*sizeof(TupleQueueReader*));
223+
memcpy(node->reader,node->pei->reader,
224+
node->nreaders*sizeof(TupleQueueReader*));
231225
}
232226
else
233227
{
234228
/* No workers?Then never mind. */
235-
ExecShutdownGatherMergeWorkers(node);
229+
node->nreaders=0;
230+
node->reader=NULL;
236231
}
237232
}
238233

@@ -282,8 +277,6 @@ ExecEndGatherMerge(GatherMergeState *node)
282277
*ExecShutdownGatherMerge
283278
*
284279
*Destroy the setup for parallel workers including parallel context.
285-
*Collect all the stats after workers are stopped, else some work
286-
*done by workers won't be accounted.
287280
* ----------------------------------------------------------------
288281
*/
289282
void
@@ -302,30 +295,19 @@ ExecShutdownGatherMerge(GatherMergeState *node)
302295
/* ----------------------------------------------------------------
303296
*ExecShutdownGatherMergeWorkers
304297
*
305-
*Destroy the parallel workers. Collect all the stats after
306-
*workers are stopped, else some work done by workers won't be
307-
*accounted.
298+
*Stop all the parallel workers.
308299
* ----------------------------------------------------------------
309300
*/
310301
staticvoid
311302
ExecShutdownGatherMergeWorkers(GatherMergeState*node)
312303
{
313-
/* Shut down tuple queue readers before shutting down workers. */
314-
if (node->reader!=NULL)
315-
{
316-
inti;
317-
318-
for (i=0;i<node->nreaders;++i)
319-
if (node->reader[i])
320-
DestroyTupleQueueReader(node->reader[i]);
321-
322-
pfree(node->reader);
323-
node->reader=NULL;
324-
}
325-
326-
/* Now shut down the workers. */
327304
if (node->pei!=NULL)
328305
ExecParallelFinish(node->pei);
306+
307+
/* Flush local copy of reader array */
308+
if (node->reader)
309+
pfree(node->reader);
310+
node->reader=NULL;
329311
}
330312

331313
/* ----------------------------------------------------------------
@@ -670,8 +652,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
670652
elseif (tuple_buffer->done)
671653
{
672654
/* Reader is known to be exhausted. */
673-
DestroyTupleQueueReader(gm_state->reader[reader-1]);
674-
gm_state->reader[reader-1]=NULL;
675655
return false;
676656
}
677657
else

‎src/backend/executor/tqueue.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
651651

652652
/*
653653
* Destroy a tuple queue reader.
654+
*
655+
* Note: cleaning up the underlying shm_mq is the caller's responsibility.
656+
* We won't access it here, as it may be detached already.
654657
*/
655658
void
656659
DestroyTupleQueueReader(TupleQueueReader*reader)
657660
{
658-
shm_mq_detach(reader->queue);
659661
if (reader->typmodmap!=NULL)
660662
hash_destroy(reader->typmodmap);
661663
/* Is it worth trying to free substructure of the remap tree? */

‎src/include/executor/execParallel.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
2323

2424
typedefstructParallelExecutorInfo
2525
{
26-
PlanState*planstate;
27-
ParallelContext*pcxt;
28-
BufferUsage*buffer_usage;
29-
SharedExecutorInstrumentation*instrumentation;
30-
shm_mq_handle**tqueue;
31-
dsa_area*area;
32-
boolfinished;
26+
PlanState*planstate;/* plan subtree we're running in parallel */
27+
ParallelContext*pcxt;/* parallel context we're using */
28+
BufferUsage*buffer_usage;/* points to bufusage area in DSM */
29+
SharedExecutorInstrumentation*instrumentation;/* optional */
30+
dsa_area*area;/* points to DSA area in DSM */
31+
boolfinished;/* set true by ExecParallelFinish */
32+
/* These two arrays have pcxt->nworkers_launched entries: */
33+
shm_mq_handle**tqueue;/* tuple queues for worker output */
34+
structTupleQueueReader**reader;/* tuple reader/writer support */
3335
}ParallelExecutorInfo;
3436

3537
externParallelExecutorInfo*ExecInitParallelPlan(PlanState*planstate,
3638
EState*estate,intnworkers);
39+
externvoidExecParallelCreateReaders(ParallelExecutorInfo*pei,
40+
TupleDesctupDesc);
3741
externvoidExecParallelFinish(ParallelExecutorInfo*pei);
3842
externvoidExecParallelCleanup(ParallelExecutorInfo*pei);
3943
externvoidExecParallelReinitialize(PlanState*planstate,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp