@@ -76,8 +76,6 @@ static ShutdownInformation shutdown_info;
76
76
static const char * modulename = gettext_noop ("parallel archiver" );
77
77
78
78
static ParallelSlot * GetMyPSlot (ParallelState * pstate );
79
- static void parallel_msg_master (ParallelSlot * slot ,const char * modulename ,
80
- const char * fmt ,va_list ap )pg_attribute_printf (3 ,0 );
81
79
static void archive_close_connection (int code ,void * arg );
82
80
static void ShutdownWorkersHard (ParallelState * pstate );
83
81
static void WaitForTerminatingWorkers (ParallelState * pstate );
@@ -161,65 +159,6 @@ GetMyPSlot(ParallelState *pstate)
161
159
return NULL ;
162
160
}
163
161
164
- /*
165
- * Fail and die, with a message to stderr. Parameters as for write_msg.
166
- *
167
- * This is defined in parallel.c, because in parallel mode, things are more
168
- * complicated. If the worker process does exit_horribly(), we forward its
169
- * last words to the master process. The master process then does
170
- * exit_horribly() with this error message itself and prints it normally.
171
- * After printing the message, exit_horribly() on the master will shut down
172
- * the remaining worker processes.
173
- */
174
- void
175
- exit_horribly (const char * modulename ,const char * fmt ,...)
176
- {
177
- va_list ap ;
178
- ParallelState * pstate = shutdown_info .pstate ;
179
- ParallelSlot * slot ;
180
-
181
- va_start (ap ,fmt );
182
-
183
- if (pstate == NULL )
184
- {
185
- /* Not in parallel mode, just write to stderr */
186
- vwrite_msg (modulename ,fmt ,ap );
187
- }
188
- else
189
- {
190
- slot = GetMyPSlot (pstate );
191
-
192
- if (!slot )
193
- /* We're the parent, just write the message out */
194
- vwrite_msg (modulename ,fmt ,ap );
195
- else
196
- /* If we're a worker process, send the msg to the master process */
197
- parallel_msg_master (slot ,modulename ,fmt ,ap );
198
- }
199
-
200
- va_end (ap );
201
-
202
- exit_nicely (1 );
203
- }
204
-
205
- /* Sends the error message from the worker to the master process */
206
- static void
207
- parallel_msg_master (ParallelSlot * slot ,const char * modulename ,
208
- const char * fmt ,va_list ap )
209
- {
210
- char buf [512 ];
211
- int pipefd [2 ];
212
-
213
- pipefd [PIPE_READ ]= slot -> pipeRevRead ;
214
- pipefd [PIPE_WRITE ]= slot -> pipeRevWrite ;
215
-
216
- strcpy (buf ,"ERROR " );
217
- vsnprintf (buf + strlen ("ERROR " ),
218
- sizeof (buf )- strlen ("ERROR " ),fmt ,ap );
219
-
220
- sendMessageToMaster (pipefd ,buf );
221
- }
222
-
223
162
/*
224
163
* A thread-local version of getLocalPQExpBuffer().
225
164
*
@@ -270,7 +209,7 @@ getThreadLocalPQExpBuffer(void)
270
209
271
210
/*
272
211
* pg_dump and pg_restore register the Archive pointer for the exit handler
273
- * (called fromexit_horribly ). This function mainly exists so that we can
212
+ * (called fromexit_nicely ). This function mainly exists so that we can
274
213
* keep shutdown_info in file scope only.
275
214
*/
276
215
void
@@ -281,8 +220,8 @@ on_exit_close_archive(Archive *AHX)
281
220
}
282
221
283
222
/*
284
- *This function can close archives in both the parallel and non-parallel
285
- *case .
223
+ *on_exit_nicely handler for shutting down database connections and
224
+ *worker processes cleanly .
286
225
*/
287
226
static void
288
227
archive_close_connection (int code ,void * arg )
@@ -291,42 +230,62 @@ archive_close_connection(int code, void *arg)
291
230
292
231
if (si -> pstate )
293
232
{
233
+ /* In parallel mode, must figure out who we are */
294
234
ParallelSlot * slot = GetMyPSlot (si -> pstate );
295
235
296
236
if (!slot )
297
237
{
298
238
/*
299
- * We're the master: We have already printed out the message
300
- * passed to exit_horribly() either from the master itself or from
301
- * a worker process. Now we need to close our own database
302
- * connection (only open during parallel dump but not restore) and
303
- * shut down the remaining workers.
239
+ * We're the master. Close our own database connection, if any,
240
+ * and then forcibly shut down workers.
304
241
*/
305
- DisconnectDatabase (si -> AHX );
242
+ if (si -> AHX )
243
+ DisconnectDatabase (si -> AHX );
244
+
306
245
#ifndef WIN32
307
246
308
247
/*
309
- * Setting aborting to true switches to best-effort-mode
310
- * (send/receive but ignore errors) in communicating with our
311
- * workers.
248
+ * Setting aborting to true shuts off error/warning messages that
249
+ * are no longer useful once we start killing workers.
312
250
*/
313
251
aborting = true;
314
252
#endif
315
253
ShutdownWorkersHard (si -> pstate );
316
254
}
317
- else if (slot -> args -> AH )
318
- DisconnectDatabase (& (slot -> args -> AH -> public ));
255
+ else
256
+ {
257
+ /*
258
+ * We're a worker. Shut down our own DB connection if any. On
259
+ * Windows, we also have to close our communication sockets, to
260
+ * emulate what will happen on Unix when the worker process exits.
261
+ * (Without this, if this is a premature exit, the master would
262
+ * fail to detect it because there would be no EOF condition on
263
+ * the other end of the pipe.)
264
+ */
265
+ if (slot -> args -> AH )
266
+ DisconnectDatabase (& (slot -> args -> AH -> public ));
267
+
268
+ #ifdef WIN32
269
+ closesocket (slot -> pipeRevRead );
270
+ closesocket (slot -> pipeRevWrite );
271
+ #endif
272
+ }
273
+ }
274
+ else
275
+ {
276
+ /* Non-parallel operation: just kill the master DB connection */
277
+ if (si -> AHX )
278
+ DisconnectDatabase (si -> AHX );
319
279
}
320
- else if (si -> AHX )
321
- DisconnectDatabase (si -> AHX );
322
280
}
323
281
324
282
/*
325
283
* If we have one worker that terminates for some reason, we'd like the other
326
284
* threads to terminate as well (and not finish with their 70 GB table dump
327
285
* first...). Now in UNIX we can just kill these processes, and let the signal
328
286
* handler set wantAbort to 1. In Windows we set a termEvent and this serves
329
- * as the signal for everyone to terminate.
287
+ * as the signal for everyone to terminate. We don't print any error message,
288
+ * that would just clutter the screen.
330
289
*/
331
290
void
332
291
checkAborting (ArchiveHandle * AH )
@@ -336,7 +295,7 @@ checkAborting(ArchiveHandle *AH)
336
295
#else
337
296
if (wantAbort )
338
297
#endif
339
- exit_horribly ( modulename , "worker is terminating\n" );
298
+ exit_nicely ( 1 );
340
299
}
341
300
342
301
/*
@@ -351,8 +310,6 @@ ShutdownWorkersHard(ParallelState *pstate)
351
310
#ifndef WIN32
352
311
int i ;
353
312
354
- signal (SIGPIPE ,SIG_IGN );
355
-
356
313
/*
357
314
* Close our write end of the sockets so that the workers know they can
358
315
* exit.
@@ -427,27 +384,21 @@ sigTermHandler(int signum)
427
384
#endif
428
385
429
386
/*
430
- * This function is called by both UNIX and Windows variants to set up a
431
- * worker process.
387
+ * This function is called by both UNIX and Windows variants to set up
388
+ * and run a worker process. Caller should exit the process (or thread)
389
+ * upon return.
432
390
*/
433
391
static void
434
392
SetupWorker (ArchiveHandle * AH ,int pipefd [2 ],int worker )
435
393
{
436
394
/*
437
395
* Call the setup worker function that's defined in the ArchiveHandle.
438
- *
439
- * We get the raw connection only for the reason that we can close it
440
- * properly when we shut down. This happens only that way when it is
441
- * brought down because of an error.
442
396
*/
443
397
(AH -> SetupWorkerPtr ) ((Archive * )AH );
444
398
445
399
Assert (AH -> connection != NULL );
446
400
447
401
WaitForCommands (AH ,pipefd );
448
-
449
- closesocket (pipefd [PIPE_READ ]);
450
- closesocket (pipefd [PIPE_WRITE ]);
451
402
}
452
403
453
404
#ifdef WIN32
@@ -533,14 +484,22 @@ ParallelBackupStart(ArchiveHandle *AH)
533
484
pstate -> parallelSlot [i ].args = (ParallelArgs * )pg_malloc (sizeof (ParallelArgs ));
534
485
pstate -> parallelSlot [i ].args -> AH = NULL ;
535
486
pstate -> parallelSlot [i ].args -> te = NULL ;
487
+
488
+ /* master's ends of the pipes */
489
+ pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
490
+ pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
491
+ /* child's ends of the pipes */
492
+ pstate -> parallelSlot [i ].pipeRevRead = pipeMW [PIPE_READ ];
493
+ pstate -> parallelSlot [i ].pipeRevWrite = pipeWM [PIPE_WRITE ];
494
+
536
495
#ifdef WIN32
537
496
/* Allocate a new structure for every worker */
538
497
wi = (WorkerInfo * )pg_malloc (sizeof (WorkerInfo ));
539
498
540
499
wi -> worker = i ;
541
500
wi -> AH = AH ;
542
- wi -> pipeRead = pstate -> parallelSlot [ i ]. pipeRevRead = pipeMW [PIPE_READ ];
543
- wi -> pipeWrite = pstate -> parallelSlot [ i ]. pipeRevWrite = pipeWM [PIPE_WRITE ];
501
+ wi -> pipeRead = pipeMW [PIPE_READ ];
502
+ wi -> pipeWrite = pipeWM [PIPE_WRITE ];
544
503
545
504
handle = _beginthreadex (NULL ,0 , (void * )& init_spawned_worker_win32 ,
546
505
wi ,0 ,& (pstate -> parallelSlot [i ].threadId ));
@@ -556,15 +515,6 @@ ParallelBackupStart(ArchiveHandle *AH)
556
515
pipefd [0 ]= pipeMW [PIPE_READ ];
557
516
pipefd [1 ]= pipeWM [PIPE_WRITE ];
558
517
559
- /*
560
- * Store the fds for the reverse communication in pstate. Actually
561
- * we only use this in case of an error and don't use pstate
562
- * otherwise in the worker process. On Windows we write to the
563
- * global pstate, in Unix we write to our process-local copy but
564
- * that's also where we'd retrieve this information back from.
565
- */
566
- pstate -> parallelSlot [i ].pipeRevRead = pipefd [PIPE_READ ];
567
- pstate -> parallelSlot [i ].pipeRevWrite = pipefd [PIPE_WRITE ];
568
518
pstate -> parallelSlot [i ].pid = getpid ();
569
519
570
520
/*
@@ -583,7 +533,7 @@ ParallelBackupStart(ArchiveHandle *AH)
583
533
584
534
/*
585
535
* Close all inherited fds for communication of the master with
586
- *the other workers.
536
+ *previously-forked workers.
587
537
*/
588
538
for (j = 0 ;j < i ;j ++ )
589
539
{
@@ -611,11 +561,16 @@ ParallelBackupStart(ArchiveHandle *AH)
611
561
612
562
pstate -> parallelSlot [i ].pid = pid ;
613
563
#endif
614
-
615
- pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
616
- pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
617
564
}
618
565
566
+ /*
567
+ * Having forked off the workers, disable SIGPIPE so that master isn't
568
+ * killed if it tries to send a command to a dead worker.
569
+ */
570
+ #ifndef WIN32
571
+ signal (SIGPIPE ,SIG_IGN );
572
+ #endif
573
+
619
574
return pstate ;
620
575
}
621
576
@@ -976,16 +931,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
976
931
}
977
932
else
978
933
exit_horribly (modulename ,
979
- "invalid message received from worker: %s\n" ,msg );
980
- }
981
- else if (messageStartsWith (msg ,"ERROR " ))
982
- {
983
- Assert (AH -> format == archDirectory || AH -> format == archCustom );
984
- pstate -> parallelSlot [worker ].workerStatus = WRKR_TERMINATED ;
985
- exit_horribly (modulename ,"%s" ,msg + strlen ("ERROR " ));
934
+ "invalid message received from worker: \"%s\"\n" ,
935
+ msg );
986
936
}
987
937
else
988
- exit_horribly (modulename ,"invalid message received from worker: %s\n" ,msg );
938
+ exit_horribly (modulename ,
939
+ "invalid message received from worker: \"%s\"\n" ,
940
+ msg );
989
941
990
942
/* both Unix and Win32 return pg_malloc()ed space, so we free it */
991
943
free (msg );