Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2425f8f

Browse files
committed
Fix parallel pg_dump/pg_restore for failure to create worker processes.
If we failed to fork a worker process, or create a communication pipefor one, WaitForTerminatingWorkers would suffer an assertion failureif assert-enabled, otherwise crash or go into an infinite loop. Thiswas a consequence of not accounting for the startup condition wherewe've not yet forked all the workers.The original bug was that ParallelBackupStart would set workerStatus toWRKR_IDLE before it had successfully forked a worker. I made thingsworse in commitb7b8cc0 by not understanding the undocumented factthat the WRKR_TERMINATED state was also meant to represent the casewhere a worker hadn't been started yet: I changed enum T_WorkerStatusso that *all* the worker slots were initially in WRKR_IDLE state. Butthis wasn't any more broken in practice, since even one slot in thewrong state would keep WaitForTerminatingWorkers from terminating.In v10 and later, introduce an explicit T_WorkerStatus value forworker-not-started, in hopes of preventing future oversights of thesame ilk. Before that, just document that WRKR_TERMINATED is supposedto cover that case (partly because it wasn't actively broken, andpartly because the enum is exposed outside parallel.c in those branches,so there's microscopically more risk involved in changing it).In all branches, introduce a WORKER_IS_RUNNING status test macroto hide which T_WorkerStatus values mean that, and be more carefulnot to access ParallelSlot fields till we're sure they're valid.Per report from Vignesh C, though this is my patch not his.Back-patch to all supported branches.Discussion:https://postgr.es/m/CALDaNm1Luv-E3sarR+-unz-BjchquHHyfP+YC+2FS2pt_J+wxg@mail.gmail.com
1 parenta9cff89 commit2425f8f

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

‎src/bin/pg_dump/parallel.c

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
*
4343
* In the master process, the workerStatus field for each worker has one of
4444
* the following values:
45+
*WRKR_NOT_STARTED: we've not yet forked this worker
4546
*WRKR_IDLE: it's waiting for a command
4647
*WRKR_WORKING: it's working on a command
4748
*WRKR_TERMINATED: process ended
@@ -75,11 +76,15 @@
7576
/* Worker process statuses */
7677
typedefenum
7778
{
79+
WRKR_NOT_STARTED=0,
7880
WRKR_IDLE,
7981
WRKR_WORKING,
8082
WRKR_TERMINATED
8183
}T_WorkerStatus;
8284

85+
#defineWORKER_IS_RUNNING(workerStatus) \
86+
((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87+
8388
/*
8489
* Private per-parallel-worker state (typedef for this is in parallel.h).
8590
*
@@ -412,7 +417,9 @@ ShutdownWorkersHard(ParallelState *pstate)
412417

413418
/*
414419
* Close our write end of the sockets so that any workers waiting for
415-
* commands know they can exit.
420+
* commands know they can exit. (Note: some of the pipeWrite fields might
421+
* still be zero, if we failed to initialize all the workers. Hence, just
422+
* ignore errors here.)
416423
*/
417424
for (i=0;i<pstate->numWorkers;i++)
418425
closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -486,7 +493,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)
486493

487494
for (j=0;j<pstate->numWorkers;j++)
488495
{
489-
if (pstate->parallelSlot[j].workerStatus!=WRKR_TERMINATED)
496+
if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
490497
{
491498
lpHandles[nrun]= (HANDLE)pstate->parallelSlot[j].hThread;
492499
nrun++;
@@ -922,6 +929,7 @@ ParallelBackupStart(ArchiveHandle *AH)
922929
if (AH->public.numWorkers==1)
923930
returnpstate;
924931

932+
/* Create status arrays, being sure to initialize all fields to 0 */
925933
pstate->te= (TocEntry**)
926934
pg_malloc0(pstate->numWorkers*sizeof(TocEntry*));
927935
pstate->parallelSlot= (ParallelSlot*)
@@ -969,13 +977,6 @@ ParallelBackupStart(ArchiveHandle *AH)
969977
if (pgpipe(pipeMW)<0||pgpipe(pipeWM)<0)
970978
fatal("could not create communication channels: %m");
971979

972-
pstate->te[i]=NULL;/* just for safety */
973-
974-
slot->workerStatus=WRKR_IDLE;
975-
slot->AH=NULL;
976-
slot->callback=NULL;
977-
slot->callback_data=NULL;
978-
979980
/* master's ends of the pipes */
980981
slot->pipeRead=pipeWM[PIPE_READ];
981982
slot->pipeWrite=pipeMW[PIPE_WRITE];
@@ -993,6 +994,7 @@ ParallelBackupStart(ArchiveHandle *AH)
993994
handle=_beginthreadex(NULL,0, (void*)&init_spawned_worker_win32,
994995
wi,0,&(slot->threadId));
995996
slot->hThread=handle;
997+
slot->workerStatus=WRKR_IDLE;
996998
#else/* !WIN32 */
997999
pid=fork();
9981000
if (pid==0)
@@ -1035,6 +1037,7 @@ ParallelBackupStart(ArchiveHandle *AH)
10351037

10361038
/* In Master after successful fork */
10371039
slot->pid=pid;
1040+
slot->workerStatus=WRKR_IDLE;
10381041

10391042
/* close read end of Master -> Worker */
10401043
closesocket(pipeMW[PIPE_READ]);
@@ -1262,7 +1265,7 @@ GetIdleWorker(ParallelState *pstate)
12621265
}
12631266

12641267
/*
1265-
* Return true iffevery worker isin the WRKR_TERMINATED state.
1268+
* Return true iffno worker isrunning.
12661269
*/
12671270
staticbool
12681271
HasEveryWorkerTerminated(ParallelState*pstate)
@@ -1271,7 +1274,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)
12711274

12721275
for (i=0;i<pstate->numWorkers;i++)
12731276
{
1274-
if (pstate->parallelSlot[i].workerStatus!=WRKR_TERMINATED)
1277+
if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
12751278
return false;
12761279
}
12771280
return true;
@@ -1603,7 +1606,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
16031606
FD_ZERO(&workerset);
16041607
for (i=0;i<pstate->numWorkers;i++)
16051608
{
1606-
if (pstate->parallelSlot[i].workerStatus==WRKR_TERMINATED)
1609+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
16071610
continue;
16081611
FD_SET(pstate->parallelSlot[i].pipeRead,&workerset);
16091612
if (pstate->parallelSlot[i].pipeRead>maxFd)
@@ -1628,6 +1631,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
16281631
{
16291632
char*msg;
16301633

1634+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1635+
continue;
16311636
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead,&workerset))
16321637
continue;
16331638

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp