Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2843c01

Browse files
committed
Report an ERROR if a parallel worker fails to start properly.
Commit28724fd fixed things so thatif a background worker fails to start due to fork() failure or becauseit is terminated before startup succeeds, BGWH_STOPPED will bereported. However, that only helps if the code that uses thebackground worker machinery notices the change in status, and the codein parallel.c did not.To fix that, do two things. First, make sure that when a workerexits, it triggers the leader to read from error queues. That way, ifa worker which has attached to an error queue exits uncleanly, theleader is sure to throw some error, either the contents of theErrorResponse sent by the worker, or "lost connection to parallelworker" if it exited without sending one. To cover the case wherethe worker never starts up in the first place or exits beforeattaching to the error queue, the ParallelContext now keeps trackof which workers have sent at least one message via the errorqueue. A worker which sends no messages by the time the paralleloperation finishes will be checked to see whether it exited beforeattaching to the error queue; if so, a new error message, "parallelworker failed to initialize", will be reported. If not, we'llcontinue to wait until it either starts up and exits cleanly, startsup and exits uncleanly, or fails to start, and then take theappropriate action.Patch by me, reviewed by Amit Kapila.Discussion:http://postgr.es/m/CA+TgmoYnBgXgdTu6wk5YPdWhmgabYc9nY_pFLq=tB=FSLYkD8Q@mail.gmail.com
1 parentcda1ba3 commit2843c01

File tree

2 files changed

+110
-9
lines changed

2 files changed

+110
-9
lines changed

‎src/backend/access/transam/parallel.c

Lines changed: 109 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ static FixedParallelState *MyFixedParallelState;
109109
/* List of active parallel contexts. */
110110
staticdlist_headpcxt_list=DLIST_STATIC_INIT(pcxt_list);
111111

112+
/* Backend-local copy of data from FixedParallelState. */
113+
staticpid_tParallelMasterPid;
114+
112115
/*
113116
* List of internal parallel worker entry points. We need this for
114117
* reasons explained in LookupParallelWorkerFunction(), below.
@@ -129,6 +132,7 @@ static const struct
129132
staticvoidHandleParallelMessage(ParallelContext*pcxt,inti,StringInfomsg);
130133
staticvoidWaitForParallelWorkersToExit(ParallelContext*pcxt);
131134
staticparallel_worker_main_typeLookupParallelWorkerFunction(char*libraryname,char*funcname);
135+
staticvoidParallelWorkerShutdown(intcode,Datumarg);
132136

133137

134138
/*
@@ -427,6 +431,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
427431
WaitForParallelWorkersToFinish(pcxt);
428432
WaitForParallelWorkersToExit(pcxt);
429433
pcxt->nworkers_launched=0;
434+
if (pcxt->any_message_received)
435+
{
436+
pfree(pcxt->any_message_received);
437+
pcxt->any_message_received=NULL;
438+
}
430439
}
431440

432441
/* Reset a few bits of fixed parallel state to a clean state. */
@@ -523,6 +532,14 @@ LaunchParallelWorkers(ParallelContext *pcxt)
523532
}
524533
}
525534

535+
/*
536+
* Now that nworkers_launched has taken its final value, we can initialize
537+
* any_message_received.
538+
*/
539+
if (pcxt->nworkers_launched>0)
540+
pcxt->any_message_received=
541+
palloc0(sizeof(bool)*pcxt->nworkers_launched);
542+
526543
/* Restore previous memory context. */
527544
MemoryContextSwitchTo(oldcontext);
528545
}
@@ -544,6 +561,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
544561
for (;;)
545562
{
546563
boolanyone_alive= false;
564+
intnfinished=0;
547565
inti;
548566

549567
/*
@@ -555,15 +573,78 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
555573

556574
for (i=0;i<pcxt->nworkers_launched;++i)
557575
{
558-
if (pcxt->worker[i].error_mqh!=NULL)
576+
/*
577+
* If error_mqh is NULL, then the worker has already exited
578+
* cleanly. If we have received a message through error_mqh from
579+
* the worker, we know it started up cleanly, and therefore we're
580+
* certain to be notified when it exits.
581+
*/
582+
if (pcxt->worker[i].error_mqh==NULL)
583+
++nfinished;
584+
elseif (pcxt->any_message_received[i])
559585
{
560586
anyone_alive= true;
561587
break;
562588
}
563589
}
564590

565591
if (!anyone_alive)
566-
break;
592+
{
593+
/* If all workers are known to have finished, we're done. */
594+
if (nfinished >=pcxt->nworkers_launched)
595+
{
596+
Assert(nfinished==pcxt->nworkers_launched);
597+
break;
598+
}
599+
600+
/*
601+
* We didn't detect any living workers, but not all workers are
602+
* known to have exited cleanly. Either not all workers have
603+
* launched yet, or maybe some of them failed to start or
604+
* terminated abnormally.
605+
*/
606+
for (i=0;i<pcxt->nworkers_launched;++i)
607+
{
608+
pid_tpid;
609+
shm_mq*mq;
610+
611+
/*
612+
* If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
613+
* should just keep waiting. If it is BGWH_STOPPED, then
614+
* further investigation is needed.
615+
*/
616+
if (pcxt->worker[i].error_mqh==NULL||
617+
pcxt->worker[i].bgwhandle==NULL||
618+
GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
619+
&pid)!=BGWH_STOPPED)
620+
continue;
621+
622+
/*
623+
* Check whether the worker ended up stopped without ever
624+
* attaching to the error queue. If so, the postmaster was
625+
* unable to fork the worker or it exited without initializing
626+
* properly. We must throw an error, since the caller may
627+
* have been expecting the worker to do some work before
628+
* exiting.
629+
*/
630+
mq=shm_mq_get_queue(pcxt->worker[i].error_mqh);
631+
if (shm_mq_get_sender(mq)==NULL)
632+
ereport(ERROR,
633+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
634+
errmsg("parallel worker failed to initialize"),
635+
errhint("More details may be available in the server log.")));
636+
637+
/*
638+
* The worker is stopped, but is attached to the error queue.
639+
* Unless there's a bug somewhere, this will only happen when
640+
* the worker writes messages and terminates after the
641+
* CHECK_FOR_INTERRUPTS() near the top of this function and
642+
* before the call to GetBackgroundWorkerPid(). In that case,
643+
* or latch should have been set as well and the right things
644+
* will happen on the next pass through the loop.
645+
*/
646+
}
647+
}
567648

568649
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
569650
ResetLatch(&MyProc->procLatch);
@@ -821,6 +902,9 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
821902
{
822903
charmsgtype;
823904

905+
if (pcxt->any_message_received!=NULL)
906+
pcxt->any_message_received[i]= true;
907+
824908
msgtype=pq_getmsgbyte(msg);
825909

826910
switch (msgtype)
@@ -1014,11 +1098,16 @@ ParallelWorkerMain(Datum main_arg)
10141098
Assert(fps!=NULL);
10151099
MyFixedParallelState=fps;
10161100

1101+
/* Arrange to signal the leader if we exit. */
1102+
ParallelMasterPid=fps->parallel_master_pid;
1103+
ParallelMasterBackendId=fps->parallel_master_backend_id;
1104+
on_shmem_exit(ParallelWorkerShutdown, (Datum)0);
1105+
10171106
/*
1018-
* Nowthatwehave a worker number, wecan find and attach to the error
1019-
*queue provided for us. That'sgood, because until we do that, any
1020-
*errors that happen here will not bereported back to the process that
1021-
*requested that this worker belaunched.
1107+
* Now we can find and attach to the error queue provided for us. That's
1108+
* good, because until we do that, any errors that happen here will not be
1109+
* reported back to the process that requested that this worker be
1110+
* launched.
10221111
*/
10231112
error_queue_space=shm_toc_lookup(toc,PARALLEL_KEY_ERROR_QUEUE);
10241113
mq= (shm_mq*) (error_queue_space+
@@ -1144,9 +1233,6 @@ ParallelWorkerMain(Datum main_arg)
11441233
SetTempNamespaceState(fps->temp_namespace_id,
11451234
fps->temp_toast_namespace_id);
11461235

1147-
/* Set ParallelMasterBackendId so we know how to address temp relations. */
1148-
ParallelMasterBackendId=fps->parallel_master_backend_id;
1149-
11501236
/*
11511237
* We've initialized all of our state now; nothing should change
11521238
* hereafter.
@@ -1192,6 +1278,20 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
11921278
SpinLockRelease(&fps->mutex);
11931279
}
11941280

1281+
/*
1282+
* Make sure the leader tries to read from our error queue one more time.
1283+
* This guards against the case where we exit uncleanly without sending an
1284+
* ErrorResponse to the leader, for example because some code calls proc_exit
1285+
* directly.
1286+
*/
1287+
staticvoid
1288+
ParallelWorkerShutdown(intcode,Datumarg)
1289+
{
1290+
SendProcSignal(ParallelMasterPid,
1291+
PROCSIG_PARALLEL_MESSAGE,
1292+
ParallelMasterBackendId);
1293+
}
1294+
11951295
/*
11961296
* Look up (and possibly load) a parallel worker entry point function.
11971297
*

‎src/include/access/parallel.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ typedef struct ParallelContext
4444
void*private_memory;
4545
shm_toc*toc;
4646
ParallelWorkerInfo*worker;
47+
bool*any_message_received;
4748
}ParallelContext;
4849

4950
externvolatileboolParallelMessagePending;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp