Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1c82051

Browse files
committed
Fix broken error handling in parallel pg_dump/pg_restore.
In the original design for parallel dump, worker processes reported errorsby sending them up to the master process, which would print the messages.This is unworkably fragile for a couple of reasons: it risks deadlock if aworker sends an error at an unexpected time, and if the master has alreadydied for some reason, the user will never get to see the error at all.Revert that idea and go back to just always printing messages to stderr.This approach means that if all the workers fail for similar reasons (eg,bad password or server shutdown), the user will see N copies of thatmessage, not only one as before. While that's slightly annoying, it'scertainly better than not seeing any message; not to mention that weshouldn't assume that only the first failure is interesting.An additional problem in the same area was that the master failed todisable SIGPIPE (at least until much too late), which meant that sending acommand to an already-dead worker would cause the master to crash silently.That was bad enough in itself but was made worse by the total reliance onthe master to print errors: even if the worker had reported an error, youwould probably not see it, depending on timing. Instead disable SIGPIPEright after we've forked the workers, before attempting to send themanything.Additionally, the master relies on seeing socket EOF to realize that aworker has exited prematurely --- but on Windows, there would be no EOFsince the socket is attached to the process that includes both the masterand worker threads, so it remains open. Make archive_close_connection()close the worker end of the sockets so that this acts more like the Unixcase. It's not perfect, because if a worker thread exits without goingthrough exit_nicely() the closures won't happen; but that's not reallysupposed to happen.This has been wrong all along, so back-patch to 9.3 where parallel dumpwas introduced.Report: <2458.1450894615@sss.pgh.pa.us>
1 parentff98ae9 commit1c82051

File tree

3 files changed

+98
-118
lines changed

3 files changed

+98
-118
lines changed

‎src/bin/pg_dump/parallel.c

Lines changed: 65 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ static ShutdownInformation shutdown_info;
7777
staticconstchar*modulename=gettext_noop("parallel archiver");
7878

7979
staticParallelSlot*GetMyPSlot(ParallelState*pstate);
80-
staticvoid
81-
parallel_msg_master(ParallelSlot*slot,constchar*modulename,
82-
constchar*fmt,va_listap)
83-
__attribute__((format(PG_PRINTF_ATTRIBUTE,3,0)));
8480
staticvoidarchive_close_connection(intcode,void*arg);
8581
staticvoidShutdownWorkersHard(ParallelState*pstate);
8682
staticvoidWaitForTerminatingWorkers(ParallelState*pstate);
@@ -165,65 +161,6 @@ GetMyPSlot(ParallelState *pstate)
165161
returnNULL;
166162
}
167163

168-
/*
169-
* Fail and die, with a message to stderr. Parameters as for write_msg.
170-
*
171-
* This is defined in parallel.c, because in parallel mode, things are more
172-
* complicated. If the worker process does exit_horribly(), we forward its
173-
* last words to the master process. The master process then does
174-
* exit_horribly() with this error message itself and prints it normally.
175-
* After printing the message, exit_horribly() on the master will shut down
176-
* the remaining worker processes.
177-
*/
178-
void
179-
exit_horribly(constchar*modulename,constchar*fmt,...)
180-
{
181-
va_listap;
182-
ParallelState*pstate=shutdown_info.pstate;
183-
ParallelSlot*slot;
184-
185-
va_start(ap,fmt);
186-
187-
if (pstate==NULL)
188-
{
189-
/* Not in parallel mode, just write to stderr */
190-
vwrite_msg(modulename,fmt,ap);
191-
}
192-
else
193-
{
194-
slot=GetMyPSlot(pstate);
195-
196-
if (!slot)
197-
/* We're the parent, just write the message out */
198-
vwrite_msg(modulename,fmt,ap);
199-
else
200-
/* If we're a worker process, send the msg to the master process */
201-
parallel_msg_master(slot,modulename,fmt,ap);
202-
}
203-
204-
va_end(ap);
205-
206-
exit_nicely(1);
207-
}
208-
209-
/* Sends the error message from the worker to the master process */
210-
staticvoid
211-
parallel_msg_master(ParallelSlot*slot,constchar*modulename,
212-
constchar*fmt,va_listap)
213-
{
214-
charbuf[512];
215-
intpipefd[2];
216-
217-
pipefd[PIPE_READ]=slot->pipeRevRead;
218-
pipefd[PIPE_WRITE]=slot->pipeRevWrite;
219-
220-
strcpy(buf,"ERROR ");
221-
vsnprintf(buf+strlen("ERROR "),
222-
sizeof(buf)-strlen("ERROR "),fmt,ap);
223-
224-
sendMessageToMaster(pipefd,buf);
225-
}
226-
227164
/*
228165
* A thread-local version of getLocalPQExpBuffer().
229166
*
@@ -274,7 +211,7 @@ getThreadLocalPQExpBuffer(void)
274211

275212
/*
276213
* pg_dump and pg_restore register the Archive pointer for the exit handler
277-
* (called fromexit_horribly). This function mainly exists so that we can
214+
* (called fromexit_nicely). This function mainly exists so that we can
278215
* keep shutdown_info in file scope only.
279216
*/
280217
void
@@ -285,8 +222,8 @@ on_exit_close_archive(Archive *AHX)
285222
}
286223

287224
/*
288-
*This function can close archives in both the paralleland non-parallel
289-
*case.
225+
*on_exit_nicely handler for shutting down database connectionsand
226+
*worker processes cleanly.
290227
*/
291228
staticvoid
292229
archive_close_connection(intcode,void*arg)
@@ -295,42 +232,62 @@ archive_close_connection(int code, void *arg)
295232

296233
if (si->pstate)
297234
{
235+
/* In parallel mode, must figure out who we are */
298236
ParallelSlot*slot=GetMyPSlot(si->pstate);
299237

300238
if (!slot)
301239
{
302240
/*
303-
* We're the master: We have already printed out the message
304-
* passed to exit_horribly() either from the master itself or from
305-
* a worker process. Now we need to close our own database
306-
* connection (only open during parallel dump but not restore) and
307-
* shut down the remaining workers.
241+
* We're the master. Close our own database connection, if any,
242+
* and then forcibly shut down workers.
308243
*/
309-
DisconnectDatabase(si->AHX);
244+
if (si->AHX)
245+
DisconnectDatabase(si->AHX);
246+
310247
#ifndefWIN32
311248

312249
/*
313-
* Setting aborting to true switches to best-effort-mode
314-
* (send/receive but ignore errors) in communicating with our
315-
* workers.
250+
* Setting aborting to true shuts off error/warning messages that
251+
* are no longer useful once we start killing workers.
316252
*/
317253
aborting= true;
318254
#endif
319255
ShutdownWorkersHard(si->pstate);
320256
}
321-
elseif (slot->args->AH)
322-
DisconnectDatabase(&(slot->args->AH->public));
257+
else
258+
{
259+
/*
260+
* We're a worker. Shut down our own DB connection if any. On
261+
* Windows, we also have to close our communication sockets, to
262+
* emulate what will happen on Unix when the worker process exits.
263+
* (Without this, if this is a premature exit, the master would
264+
* fail to detect it because there would be no EOF condition on
265+
* the other end of the pipe.)
266+
*/
267+
if (slot->args->AH)
268+
DisconnectDatabase(&(slot->args->AH->public));
269+
270+
#ifdefWIN32
271+
closesocket(slot->pipeRevRead);
272+
closesocket(slot->pipeRevWrite);
273+
#endif
274+
}
275+
}
276+
else
277+
{
278+
/* Non-parallel operation: just kill the master DB connection */
279+
if (si->AHX)
280+
DisconnectDatabase(si->AHX);
323281
}
324-
elseif (si->AHX)
325-
DisconnectDatabase(si->AHX);
326282
}
327283

328284
/*
329285
* If we have one worker that terminates for some reason, we'd like the other
330286
* threads to terminate as well (and not finish with their 70 GB table dump
331287
* first...). Now in UNIX we can just kill these processes, and let the signal
332288
* handler set wantAbort to 1. In Windows we set a termEvent and this serves
333-
* as the signal for everyone to terminate.
289+
* as the signal for everyone to terminate. We don't print any error message,
290+
* that would just clutter the screen.
334291
*/
335292
void
336293
checkAborting(ArchiveHandle*AH)
@@ -340,7 +297,7 @@ checkAborting(ArchiveHandle *AH)
340297
#else
341298
if (wantAbort)
342299
#endif
343-
exit_horribly(modulename,"worker is terminating\n");
300+
exit_nicely(1);
344301
}
345302

346303
/*
@@ -355,8 +312,6 @@ ShutdownWorkersHard(ParallelState *pstate)
355312
#ifndefWIN32
356313
inti;
357314

358-
signal(SIGPIPE,SIG_IGN);
359-
360315
/*
361316
* Close our write end of the sockets so that the workers know they can
362317
* exit.
@@ -431,28 +386,22 @@ sigTermHandler(int signum)
431386
#endif
432387

433388
/*
434-
* This function is called by both UNIX and Windows variants to set up a
435-
* worker process.
389+
* This function is called by both UNIX and Windows variants to set up
390+
* and run a worker process. Caller should exit the process (or thread)
391+
* upon return.
436392
*/
437393
staticvoid
438394
SetupWorker(ArchiveHandle*AH,intpipefd[2],intworker,
439395
RestoreOptions*ropt)
440396
{
441397
/*
442398
* Call the setup worker function that's defined in the ArchiveHandle.
443-
*
444-
* We get the raw connection only for the reason that we can close it
445-
* properly when we shut down. This happens only that way when it is
446-
* brought down because of an error.
447399
*/
448400
(AH->SetupWorkerPtr) ((Archive*)AH,ropt);
449401

450402
Assert(AH->connection!=NULL);
451403

452404
WaitForCommands(AH,pipefd);
453-
454-
closesocket(pipefd[PIPE_READ]);
455-
closesocket(pipefd[PIPE_WRITE]);
456405
}
457406

458407
#ifdefWIN32
@@ -539,15 +488,23 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
539488
pstate->parallelSlot[i].args= (ParallelArgs*)pg_malloc(sizeof(ParallelArgs));
540489
pstate->parallelSlot[i].args->AH=NULL;
541490
pstate->parallelSlot[i].args->te=NULL;
491+
492+
/* master's ends of the pipes */
493+
pstate->parallelSlot[i].pipeRead=pipeWM[PIPE_READ];
494+
pstate->parallelSlot[i].pipeWrite=pipeMW[PIPE_WRITE];
495+
/* child's ends of the pipes */
496+
pstate->parallelSlot[i].pipeRevRead=pipeMW[PIPE_READ];
497+
pstate->parallelSlot[i].pipeRevWrite=pipeWM[PIPE_WRITE];
498+
542499
#ifdefWIN32
543500
/* Allocate a new structure for every worker */
544501
wi= (WorkerInfo*)pg_malloc(sizeof(WorkerInfo));
545502

546503
wi->ropt=ropt;
547504
wi->worker=i;
548505
wi->AH=AH;
549-
wi->pipeRead=pstate->parallelSlot[i].pipeRevRead=pipeMW[PIPE_READ];
550-
wi->pipeWrite=pstate->parallelSlot[i].pipeRevWrite=pipeWM[PIPE_WRITE];
506+
wi->pipeRead=pipeMW[PIPE_READ];
507+
wi->pipeWrite=pipeWM[PIPE_WRITE];
551508

552509
handle=_beginthreadex(NULL,0, (void*)&init_spawned_worker_win32,
553510
wi,0,&(pstate->parallelSlot[i].threadId));
@@ -563,15 +520,6 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
563520
pipefd[0]=pipeMW[PIPE_READ];
564521
pipefd[1]=pipeWM[PIPE_WRITE];
565522

566-
/*
567-
* Store the fds for the reverse communication in pstate. Actually
568-
* we only use this in case of an error and don't use pstate
569-
* otherwise in the worker process. On Windows we write to the
570-
* global pstate, in Unix we write to our process-local copy but
571-
* that's also where we'd retrieve this information back from.
572-
*/
573-
pstate->parallelSlot[i].pipeRevRead=pipefd[PIPE_READ];
574-
pstate->parallelSlot[i].pipeRevWrite=pipefd[PIPE_WRITE];
575523
pstate->parallelSlot[i].pid=getpid();
576524

577525
/*
@@ -590,7 +538,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
590538

591539
/*
592540
* Close all inherited fds for communication of the master with
593-
*the other workers.
541+
*previously-forked workers.
594542
*/
595543
for (j=0;j<i;j++)
596544
{
@@ -618,11 +566,16 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
618566

619567
pstate->parallelSlot[i].pid=pid;
620568
#endif
621-
622-
pstate->parallelSlot[i].pipeRead=pipeWM[PIPE_READ];
623-
pstate->parallelSlot[i].pipeWrite=pipeMW[PIPE_WRITE];
624569
}
625570

571+
/*
572+
* Having forked off the workers, disable SIGPIPE so that master isn't
573+
* killed if it tries to send a command to a dead worker.
574+
*/
575+
#ifndefWIN32
576+
signal(SIGPIPE,SIG_IGN);
577+
#endif
578+
626579
returnpstate;
627580
}
628581

@@ -983,16 +936,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
983936
}
984937
else
985938
exit_horribly(modulename,
986-
"invalid message received from worker: %s\n",msg);
987-
}
988-
elseif (messageStartsWith(msg,"ERROR "))
989-
{
990-
Assert(AH->format==archDirectory||AH->format==archCustom);
991-
pstate->parallelSlot[worker].workerStatus=WRKR_TERMINATED;
992-
exit_horribly(modulename,"%s",msg+strlen("ERROR "));
939+
"invalid message received from worker: \"%s\"\n",
940+
msg);
993941
}
994942
else
995-
exit_horribly(modulename,"invalid message received from worker: %s\n",msg);
943+
exit_horribly(modulename,
944+
"invalid message received from worker: \"%s\"\n",
945+
msg);
996946

997947
/* both Unix and Win32 return pg_malloc()ed space, so we free it */
998948
free(msg);

‎src/bin/pg_dump/parallel.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ typedef struct ParallelSlot
4545
ParallelArgs*args;
4646
T_WorkerStatusworkerStatus;
4747
intstatus;
48-
intpipeRead;
48+
intpipeRead;/* master's end of the pipes */
4949
intpipeWrite;
50-
intpipeRevRead;
50+
intpipeRevRead;/* child's end of the pipes */
5151
intpipeRevWrite;
5252
#ifdefWIN32
5353
uintptr_thThread;

‎src/bin/pg_dump/pg_backup_utils.c

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,23 @@ vwrite_msg(const char *modulename, const char *fmt, va_list ap)
9393
vfprintf(stderr,_(fmt),ap);
9494
}
9595

96+
/*
97+
* Fail and die, with a message to stderr. Parameters as for write_msg.
98+
*
99+
* Note that on_exit_nicely callbacks will get run.
100+
*/
101+
void
102+
exit_horribly(constchar*modulename,constchar*fmt,...)
103+
{
104+
va_listap;
105+
106+
va_start(ap,fmt);
107+
vwrite_msg(modulename,fmt,ap);
108+
va_end(ap);
109+
110+
exit_nicely(1);
111+
}
112+
96113
/* Register a callback to be run when exit_nicely is invoked. */
97114
void
98115
on_exit_nicely(on_exit_nicely_callbackfunction,void*arg)
@@ -106,7 +123,20 @@ on_exit_nicely(on_exit_nicely_callback function, void *arg)
106123

107124
/*
108125
* Run accumulated on_exit_nicely callbacks in reverse order and then exit
109-
* quietly. This needs to be thread-safe.
126+
* without printing any message.
127+
*
128+
* If running in a parallel worker thread on Windows, we only exit the thread,
129+
* not the whole process.
130+
*
131+
* Note that in parallel operation on Windows, the callback(s) will be run
132+
* by each thread since the list state is necessarily shared by all threads;
133+
* each callback must contain logic to ensure it does only what's appropriate
134+
* for its thread. On Unix, callbacks are also run by each process, but only
135+
* for callbacks established before we fork off the child processes. (It'd
136+
* be cleaner to reset the list after fork(), and let each child establish
137+
* its own callbacks; but then the behavior would be completely inconsistent
138+
* between Windows and Unix. For now, just be sure to establish callbacks
139+
* before forking to avoid inconsistency.)
110140
*/
111141
void
112142
exit_nicely(intcode)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp