Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3a1f861

Browse files
committed
Update parallel executor support to reuse the same DSM.
Commitb0b0d84 purported to make itpossible to relaunch workers using the same parallel context, but it hadan unpleasant race condition: we might reinitialize after the workershave sent their last control message but before they have dettached theDSM, leaving to crashes. Repair by introducing a new ParallelContextoperation, ReinitializeParallelDSM.Adjust execParallel.c to use this new support, so that we can rescan aGather node by relaunching workers but without needing to recreate theDSM.Amit Kapila, with some adjustments by me. Extracted from latest parallelsequential scan patch.
1 parentc6baec9 commit3a1f861

File tree

6 files changed

+166
-108
lines changed

6 files changed

+166
-108
lines changed

‎src/backend/access/transam/README.parallel

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ pattern looks like this:
222222

223223
ExitParallelMode();
224224

225-
If desired, after WaitForParallelWorkersToFinish() has been called, another
226-
call to LaunchParallelWorkers() can be made using the same parallel context.
227-
Calls to these two functions can be alternated any number of times before
228-
destroying the parallel context.
225+
If desired, after WaitForParallelWorkersToFinish() has been called, the
226+
context can be reset so that workers can be launched anew using the same
227+
parallel context. To do this, first call ReinitializeParallelDSM() to
228+
reinitialize state managed by the parallel context machinery itself; then,
229+
perform any other necessary resetting of state; after that, you can again
230+
call LaunchParallelWorkers.

‎src/backend/access/transam/parallel.c

Lines changed: 92 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
110110
staticvoidParallelErrorContext(void*arg);
111111
staticvoidParallelExtensionTrampoline(dsm_segment*seg,shm_toc*toc);
112112
staticvoidParallelWorkerMain(Datummain_arg);
113+
staticvoidWaitForParallelWorkersToExit(ParallelContext*pcxt);
113114

114115
/*
115116
* Establish a new parallel context. This should be done after entering
@@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
383384
MemoryContextSwitchTo(oldcontext);
384385
}
385386

387+
/*
388+
* Reinitialize the dynamic shared memory segment for a parallel context such
389+
* that we could launch workers for it again.
390+
*/
391+
void
392+
ReinitializeParallelDSM(ParallelContext*pcxt)
393+
{
394+
FixedParallelState*fps;
395+
char*error_queue_space;
396+
inti;
397+
398+
if (pcxt->nworkers_launched==0)
399+
return;
400+
401+
WaitForParallelWorkersToFinish(pcxt);
402+
WaitForParallelWorkersToExit(pcxt);
403+
404+
/* Reset a few bits of fixed parallel state to a clean state. */
405+
fps=shm_toc_lookup(pcxt->toc,PARALLEL_KEY_FIXED);
406+
fps->workers_attached=0;
407+
fps->last_xlog_end=0;
408+
409+
/* Recreate error queues. */
410+
error_queue_space=
411+
shm_toc_lookup(pcxt->toc,PARALLEL_KEY_ERROR_QUEUE);
412+
for (i=0;i<pcxt->nworkers;++i)
413+
{
414+
char*start;
415+
shm_mq*mq;
416+
417+
start=error_queue_space+i*PARALLEL_ERROR_QUEUE_SIZE;
418+
mq=shm_mq_create(start,PARALLEL_ERROR_QUEUE_SIZE);
419+
shm_mq_set_receiver(mq,MyProc);
420+
pcxt->worker[i].error_mqh=shm_mq_attach(mq,pcxt->seg,NULL);
421+
}
422+
423+
/* Reset number of workers launched. */
424+
pcxt->nworkers_launched=0;
425+
}
426+
386427
/*
387428
* Launch parallel workers.
388429
*/
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
404445
/* We might be running in a short-lived memory context. */
405446
oldcontext=MemoryContextSwitchTo(TopTransactionContext);
406447

407-
/*
408-
* This function can be called for a parallel context for which it has
409-
* already been called previously, but only if all of the old workers
410-
* have already exited. When this case arises, we need to do some extra
411-
* reinitialization.
412-
*/
413-
if (pcxt->nworkers_launched>0)
414-
{
415-
FixedParallelState*fps;
416-
char*error_queue_space;
417-
418-
/* Clean out old worker handles. */
419-
for (i=0;i<pcxt->nworkers;++i)
420-
{
421-
if (pcxt->worker[i].error_mqh!=NULL)
422-
elog(ERROR,"previously launched worker still alive");
423-
if (pcxt->worker[i].bgwhandle!=NULL)
424-
{
425-
pfree(pcxt->worker[i].bgwhandle);
426-
pcxt->worker[i].bgwhandle=NULL;
427-
}
428-
}
429-
430-
/* Reset a few bits of fixed parallel state to a clean state. */
431-
fps=shm_toc_lookup(pcxt->toc,PARALLEL_KEY_FIXED);
432-
fps->workers_attached=0;
433-
fps->last_xlog_end=0;
434-
435-
/* Recreate error queues. */
436-
error_queue_space=
437-
shm_toc_lookup(pcxt->toc,PARALLEL_KEY_ERROR_QUEUE);
438-
for (i=0;i<pcxt->nworkers;++i)
439-
{
440-
char*start;
441-
shm_mq*mq;
442-
443-
start=error_queue_space+i*PARALLEL_ERROR_QUEUE_SIZE;
444-
mq=shm_mq_create(start,PARALLEL_ERROR_QUEUE_SIZE);
445-
shm_mq_set_receiver(mq,MyProc);
446-
pcxt->worker[i].error_mqh=shm_mq_attach(mq,pcxt->seg,NULL);
447-
}
448-
449-
/* Reset number of workers launched. */
450-
pcxt->nworkers_launched=0;
451-
}
452-
453448
/* Configure a worker. */
454449
snprintf(worker.bgw_name,BGW_MAXLEN,"parallel worker for PID %d",
455450
MyProcPid);
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
501496
}
502497

503498
/*
504-
* Wait for all workers toexit.
499+
* Wait for all workers tofinish computing.
505500
*
506501
* Even if the parallel operation seems to have completed successfully, it's
507502
* important to call this function afterwards. We must not miss any errors
@@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
552547
}
553548
}
554549

550+
/*
551+
* Wait for all workers to exit.
552+
*
553+
* This function ensures that workers have been completely shutdown. The
554+
* difference between WaitForParallelWorkersToFinish and this function is
555+
* that former just ensures that last message sent by worker backend is
556+
* received by master backend whereas this ensures the complete shutdown.
557+
*/
558+
staticvoid
559+
WaitForParallelWorkersToExit(ParallelContext*pcxt)
560+
{
561+
inti;
562+
563+
/* Wait until the workers actually die. */
564+
for (i=0;i<pcxt->nworkers;++i)
565+
{
566+
BgwHandleStatusstatus;
567+
568+
if (pcxt->worker==NULL||pcxt->worker[i].bgwhandle==NULL)
569+
continue;
570+
571+
status=WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
572+
573+
/*
574+
* If the postmaster kicked the bucket, we have no chance of cleaning
575+
* up safely -- we won't be able to tell when our workers are actually
576+
* dead. This doesn't necessitate a PANIC since they will all abort
577+
* eventually, but we can't safely continue this session.
578+
*/
579+
if (status==BGWH_POSTMASTER_DIED)
580+
ereport(FATAL,
581+
(errcode(ERRCODE_ADMIN_SHUTDOWN),
582+
errmsg("postmaster exited during a parallel transaction")));
583+
584+
/* Release memory. */
585+
pfree(pcxt->worker[i].bgwhandle);
586+
pcxt->worker[i].bgwhandle=NULL;
587+
}
588+
}
589+
555590
/*
556591
* Destroy a parallel context.
557592
*
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
578613
{
579614
for (i=0;i<pcxt->nworkers;++i)
580615
{
581-
if (pcxt->worker[i].bgwhandle!=NULL)
582-
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
583616
if (pcxt->worker[i].error_mqh!=NULL)
584617
{
618+
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
619+
585620
pfree(pcxt->worker[i].error_mqh);
586621
pcxt->worker[i].error_mqh=NULL;
587622
}
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
609644
pcxt->private_memory=NULL;
610645
}
611646

612-
/* Wait until the workers actually die. */
613-
for (i=0;i<pcxt->nworkers;++i)
614-
{
615-
BgwHandleStatusstatus;
616-
617-
if (pcxt->worker==NULL||pcxt->worker[i].bgwhandle==NULL)
618-
continue;
619-
620-
/*
621-
* We can't finish transaction commit or abort until all of the
622-
* workers are dead. This means, in particular, that we can't respond
623-
* to interrupts at this stage.
624-
*/
625-
HOLD_INTERRUPTS();
626-
status=WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
627-
RESUME_INTERRUPTS();
628-
629-
/*
630-
* If the postmaster kicked the bucket, we have no chance of cleaning
631-
* up safely -- we won't be able to tell when our workers are actually
632-
* dead. This doesn't necessitate a PANIC since they will all abort
633-
* eventually, but we can't safely continue this session.
634-
*/
635-
if (status==BGWH_POSTMASTER_DIED)
636-
ereport(FATAL,
637-
(errcode(ERRCODE_ADMIN_SHUTDOWN),
638-
errmsg("postmaster exited during a parallel transaction")));
639-
640-
/* Release memory. */
641-
pfree(pcxt->worker[i].bgwhandle);
642-
pcxt->worker[i].bgwhandle=NULL;
643-
}
647+
/*
648+
* We can't finish transaction commit or abort until all of the
649+
* workers have exited. This means, in particular, that we can't respond
650+
* to interrupts at this stage.
651+
*/
652+
HOLD_INTERRUPTS();
653+
WaitForParallelWorkersToExit(pcxt);
654+
RESUME_INTERRUPTS();
644655

645656
/* Free the worker array itself. */
646657
if (pcxt->worker!=NULL)
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
799810

800811
case'X':/* Terminate, indicating clean exit */
801812
{
802-
pfree(pcxt->worker[i].bgwhandle);
803813
pfree(pcxt->worker[i].error_mqh);
804-
pcxt->worker[i].bgwhandle=NULL;
805814
pcxt->worker[i].error_mqh=NULL;
806815
break;
807816
}

‎src/backend/executor/execParallel.c

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
8484
ExecParallelEstimateContext*e);
8585
staticboolExecParallelInitializeDSM(PlanState*node,
8686
ExecParallelInitializeDSMContext*d);
87-
staticshm_mq_handle**ExecParallelSetupTupleQueues(ParallelContext*pcxt);
87+
staticshm_mq_handle**ExecParallelSetupTupleQueues(ParallelContext*pcxt,
88+
boolreinitialize);
8889
staticboolExecParallelRetrieveInstrumentation(PlanState*planstate,
8990
SharedExecutorInstrumentation*instrumentation);
9091

@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
217218
* to the main backend and start the workers.
218219
*/
219220
staticshm_mq_handle**
220-
ExecParallelSetupTupleQueues(ParallelContext*pcxt)
221+
ExecParallelSetupTupleQueues(ParallelContext*pcxt,boolreinitialize)
221222
{
222223
shm_mq_handle**responseq;
223224
char*tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
231232
responseq= (shm_mq_handle**)
232233
palloc(pcxt->nworkers*sizeof(shm_mq_handle*));
233234

234-
/* Allocate space from the DSM for the queues themselves. */
235-
tqueuespace=shm_toc_allocate(pcxt->toc,
236-
PARALLEL_TUPLE_QUEUE_SIZE*pcxt->nworkers);
235+
/*
236+
* If not reinitializing, allocate space from the DSM for the queues;
237+
* otherwise, find the already allocated space.
238+
*/
239+
if (!reinitialize)
240+
tqueuespace=
241+
shm_toc_allocate(pcxt->toc,
242+
PARALLEL_TUPLE_QUEUE_SIZE*pcxt->nworkers);
243+
else
244+
tqueuespace=shm_toc_lookup(pcxt->toc,PARALLEL_KEY_TUPLE_QUEUE);
237245

238246
/* Create the queues, and become the receiver for each. */
239247
for (i=0;i<pcxt->nworkers;++i)
@@ -248,12 +256,23 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
248256
}
249257

250258
/* Add array of queues to shm_toc, so others can find it. */
251-
shm_toc_insert(pcxt->toc,PARALLEL_KEY_TUPLE_QUEUE,tqueuespace);
259+
if (!reinitialize)
260+
shm_toc_insert(pcxt->toc,PARALLEL_KEY_TUPLE_QUEUE,tqueuespace);
252261

253262
/* Return array of handles. */
254263
returnresponseq;
255264
}
256265

266+
/*
267+
* Re-initialize the response queues for backend workers to return tuples
268+
* to the main backend and start the workers.
269+
*/
270+
shm_mq_handle**
271+
ExecParallelReinitializeTupleQueues(ParallelContext*pcxt)
272+
{
273+
returnExecParallelSetupTupleQueues(pcxt, true);
274+
}
275+
257276
/*
258277
* Sets up the required infrastructure for backend workers to perform
259278
* execution and return results to the main backend.
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
363382
pei->buffer_usage=bufusage_space;
364383

365384
/* Set up tuple queues. */
366-
pei->tqueue=ExecParallelSetupTupleQueues(pcxt);
385+
pei->tqueue=ExecParallelSetupTupleQueues(pcxt, false);
367386

368387
/*
369388
* If instrumentation options were supplied, allocate space for the

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp