Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5d60df8

Browse files
committed
Fix parallel pg_dump/pg_restore for failure to create worker processes.
If we failed to fork a worker process, or create a communication pipefor one, WaitForTerminatingWorkers would suffer an assertion failureif assert-enabled, otherwise crash or go into an infinite loop. Thiswas a consequence of not accounting for the startup condition wherewe've not yet forked all the workers.The original bug was that ParallelBackupStart would set workerStatus toWRKR_IDLE before it had successfully forked a worker. I made thingsworse in commitb7b8cc0 by not understanding the undocumented factthat the WRKR_TERMINATED state was also meant to represent the casewhere a worker hadn't been started yet: I changed enum T_WorkerStatusso that *all* the worker slots were initially in WRKR_IDLE state. Butthis wasn't any more broken in practice, since even one slot in thewrong state would keep WaitForTerminatingWorkers from terminating.In v10 and later, introduce an explicit T_WorkerStatus value forworker-not-started, in hopes of preventing future oversights of thesame ilk. Before that, just document that WRKR_TERMINATED is supposedto cover that case (partly because it wasn't actively broken, andpartly because the enum is exposed outside parallel.c in those branches,so there's microscopically more risk involved in changing it).In all branches, introduce a WORKER_IS_RUNNING status test macroto hide which T_WorkerStatus values mean that, and be more carefulnot to access ParallelSlot fields till we're sure they're valid.Per report from Vignesh C, though this is my patch not his.Back-patch to all supported branches.Discussion:https://postgr.es/m/CALDaNm1Luv-E3sarR+-unz-BjchquHHyfP+YC+2FS2pt_J+wxg@mail.gmail.com
1 parent56bc82a commit5d60df8

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

‎src/bin/pg_dump/parallel.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
*WRKR_IDLE: it's waiting for a command
5151
*WRKR_WORKING: it's been sent a command
5252
*WRKR_FINISHED: it's returned a result
53-
*WRKR_TERMINATED: process ended
53+
*WRKR_TERMINATED: process ended (or not started yet)
5454
* The FINISHED state indicates that the worker is idle, but we've not yet
5555
* dealt with the status code it returned from the prior command.
5656
* ReapWorkerStatus() extracts the unhandled command status value and sets
@@ -381,7 +381,9 @@ ShutdownWorkersHard(ParallelState *pstate)
381381

382382
/*
383383
* Close our write end of the sockets so that any workers waiting for
384-
* commands know they can exit.
384+
* commands know they can exit. (Note: some of the pipeWrite fields might
385+
* still be zero, if we failed to initialize all the workers. Hence, just
386+
* ignore errors here.)
385387
*/
386388
for (i=0;i<pstate->numWorkers;i++)
387389
closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -455,7 +457,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)
455457

456458
for (j=0;j<pstate->numWorkers;j++)
457459
{
458-
if (pstate->parallelSlot[j].workerStatus!=WRKR_TERMINATED)
460+
if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
459461
{
460462
lpHandles[nrun]= (HANDLE)pstate->parallelSlot[j].hThread;
461463
nrun++;
@@ -891,6 +893,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
891893
if (AH->public.numWorkers==1)
892894
returnpstate;
893895

896+
/* Create status array, being sure to initialize all fields to 0 */
894897
pstate->parallelSlot= (ParallelSlot*)pg_malloc(slotSize);
895898
memset((void*)pstate->parallelSlot,0,slotSize);
896899

@@ -932,17 +935,16 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
932935
intpipeMW[2],
933936
pipeWM[2];
934937

938+
slot->args= (ParallelArgs*)pg_malloc(sizeof(ParallelArgs));
939+
slot->args->AH=NULL;
940+
slot->args->te=NULL;
941+
935942
/* Create communication pipes for this worker */
936943
if (pgpipe(pipeMW)<0||pgpipe(pipeWM)<0)
937944
exit_horribly(modulename,
938945
"could not create communication channels: %s\n",
939946
strerror(errno));
940947

941-
slot->workerStatus=WRKR_IDLE;
942-
slot->args= (ParallelArgs*)pg_malloc(sizeof(ParallelArgs));
943-
slot->args->AH=NULL;
944-
slot->args->te=NULL;
945-
946948
/* master's ends of the pipes */
947949
slot->pipeRead=pipeWM[PIPE_READ];
948950
slot->pipeWrite=pipeMW[PIPE_WRITE];
@@ -961,6 +963,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
961963
handle=_beginthreadex(NULL,0, (void*)&init_spawned_worker_win32,
962964
wi,0,&(slot->threadId));
963965
slot->hThread=handle;
966+
slot->workerStatus=WRKR_IDLE;
964967
#else/* !WIN32 */
965968
pid=fork();
966969
if (pid==0)
@@ -1005,6 +1008,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
10051008

10061009
/* In Master after successful fork */
10071010
slot->pid=pid;
1011+
slot->workerStatus=WRKR_IDLE;
10081012

10091013
/* close read end of Master -> Worker */
10101014
closesocket(pipeMW[PIPE_READ]);
@@ -1121,7 +1125,7 @@ GetIdleWorker(ParallelState *pstate)
11211125
}
11221126

11231127
/*
1124-
* Return true iffevery worker isin the WRKR_TERMINATED state.
1128+
* Return true iffno worker isrunning.
11251129
*/
11261130
staticbool
11271131
HasEveryWorkerTerminated(ParallelState*pstate)
@@ -1130,7 +1134,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)
11301134

11311135
for (i=0;i<pstate->numWorkers;i++)
11321136
{
1133-
if (pstate->parallelSlot[i].workerStatus!=WRKR_TERMINATED)
1137+
if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
11341138
return false;
11351139
}
11361140
return true;
@@ -1530,7 +1534,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
15301534
FD_ZERO(&workerset);
15311535
for (i=0;i<pstate->numWorkers;i++)
15321536
{
1533-
if (pstate->parallelSlot[i].workerStatus==WRKR_TERMINATED)
1537+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
15341538
continue;
15351539
FD_SET(pstate->parallelSlot[i].pipeRead,&workerset);
15361540
if (pstate->parallelSlot[i].pipeRead>maxFd)
@@ -1555,6 +1559,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
15551559
{
15561560
char*msg;
15571561

1562+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1563+
continue;
15581564
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead,&workerset))
15591565
continue;
15601566

‎src/bin/pg_dump/parallel.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ typedef enum
3232
WRKR_FINISHED
3333
}T_WorkerStatus;
3434

35+
#defineWORKER_IS_RUNNING(workerStatus) \
36+
((workerStatus) != WRKR_TERMINATED)
37+
3538
/* Arguments needed for a worker process */
3639
typedefstructParallelArgs
3740
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp