Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb7b8cc0

Browse files
committed
Redesign parallel dump/restore's wait-for-workers logic.
The ListenToWorkers/ReapWorkerStatus APIs were messy and hard to use.Instead, make DispatchJobForTocEntry register a callback function thatwill take care of state cleanup, doing whatever had been done by the callerof ReapWorkerStatus in the old design. (This callback is essentially justthe old mark_work_done function in the restore case, and a trivial test forworker failure in the dump case.) Then we can have ListenToWorkers callthe callback immediately on receipt of a status message, and return theworker to WRKR_IDLE state; so the WRKR_FINISHED state goes away.This allows us to design a unified wait-for-worker-messages loop:WaitForWorkers replaces EnsureIdleWorker and EnsureWorkersFinished as wellas the mess in restore_toc_entries_parallel. Also, we no longer need thefragile API spec that the caller of DispatchJobForTocEntry is responsiblefor ensuring there's an idle worker, since DispatchJobForTocEntry can justwait until there is one.In passing, I got rid of the ParallelArgs struct, which was a net negativein terms of notational verboseness, and didn't seem to be providing anynoticeable amount of abstraction either.Tom Lane, reviewed by Kevin GrittnerDiscussion: <1188.1464544443@sss.pgh.pa.us>
1 parentf31a931 commitb7b8cc0

File tree

6 files changed

+224
-232
lines changed

6 files changed

+224
-232
lines changed

‎src/bin/pg_dump/parallel.c

Lines changed: 107 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
* the required action (dump or restore) and returns a malloc'd status string.
3636
* The status string is passed back to the master where it is interpreted by
3737
* AH->MasterEndParallelItemPtr, another format-specific routine. That
38-
* function can updatestate or catalog information on the master's side,
38+
* function can updateformat-specific information on the master's side,
3939
* depending on the reply from the worker process. In the end it returns a
40-
* status code, which is 0 for successful execution.
40+
* status code, which we pass to the ParallelCompletionPtr callback function
41+
* that was passed to DispatchJobForTocEntry(). The callback function does
42+
* state updating for the master control logic in pg_backup_archiver.c.
4143
*
4244
* Remember that we have forked off the workers only after we have read in
4345
* the catalog. That's why our worker processes can also access the catalog
@@ -48,13 +50,8 @@
4850
* In the master process, the workerStatus field for each worker has one of
4951
* the following values:
5052
*WRKR_IDLE: it's waiting for a command
51-
*WRKR_WORKING: it's been sent a command
52-
*WRKR_FINISHED: it's returned a result
53+
*WRKR_WORKING: it's working on a command
5354
*WRKR_TERMINATED: process ended
54-
* The FINISHED state indicates that the worker is idle, but we've not yet
55-
* dealt with the status code it returned from the prior command.
56-
* ReapWorkerStatus() extracts the unhandled command status value and sets
57-
* the workerStatus back to WRKR_IDLE.
5855
*/
5956

6057
#include"postgres_fe.h"
@@ -79,6 +76,8 @@
7976
#definePIPE_READ0
8077
#definePIPE_WRITE1
8178

79+
#defineNO_SLOT (-1)/* Failure result for GetIdleWorker() */
80+
8281
#ifdefWIN32
8382

8483
/*
@@ -175,9 +174,12 @@ static void setup_cancel_handler(void);
175174
staticvoidset_cancel_pstate(ParallelState*pstate);
176175
staticvoidset_cancel_slot_archive(ParallelSlot*slot,ArchiveHandle*AH);
177176
staticvoidRunWorker(ArchiveHandle*AH,ParallelSlot*slot);
177+
staticintGetIdleWorker(ParallelState*pstate);
178178
staticboolHasEveryWorkerTerminated(ParallelState*pstate);
179179
staticvoidlockTableForWorker(ArchiveHandle*AH,TocEntry*te);
180180
staticvoidWaitForCommands(ArchiveHandle*AH,intpipefd[2]);
181+
staticboolListenToWorkers(ArchiveHandle*AH,ParallelState*pstate,
182+
booldo_wait);
181183
staticchar*getMessageFromMaster(intpipefd[2]);
182184
staticvoidsendMessageToMaster(intpipefd[2],constchar*str);
183185
staticintselect_loop(intmaxFd,fd_set*workerset);
@@ -349,8 +351,8 @@ archive_close_connection(int code, void *arg)
349351
* fail to detect it because there would be no EOF condition on
350352
* the other end of the pipe.)
351353
*/
352-
if (slot->args->AH)
353-
DisconnectDatabase(&(slot->args->AH->public));
354+
if (slot->AH)
355+
DisconnectDatabase(&(slot->AH->public));
354356

355357
#ifdefWIN32
356358
closesocket(slot->pipeRevRead);
@@ -407,7 +409,7 @@ ShutdownWorkersHard(ParallelState *pstate)
407409
EnterCriticalSection(&signal_info_lock);
408410
for (i=0;i<pstate->numWorkers;i++)
409411
{
410-
ArchiveHandle*AH=pstate->parallelSlot[i].args->AH;
412+
ArchiveHandle*AH=pstate->parallelSlot[i].AH;
411413
charerrbuf[1];
412414

413415
if (AH!=NULL&&AH->connCancel!=NULL)
@@ -634,7 +636,7 @@ consoleHandler(DWORD dwCtrlType)
634636
for (i=0;i<signal_info.pstate->numWorkers;i++)
635637
{
636638
ParallelSlot*slot=&(signal_info.pstate->parallelSlot[i]);
637-
ArchiveHandle*AH=slot->args->AH;
639+
ArchiveHandle*AH=slot->AH;
638640
HANDLEhThread= (HANDLE)slot->hThread;
639641

640642
/*
@@ -789,7 +791,7 @@ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
789791
EnterCriticalSection(&signal_info_lock);
790792
#endif
791793

792-
slot->args->AH=AH;
794+
slot->AH=AH;
793795

794796
#ifdefWIN32
795797
LeaveCriticalSection(&signal_info_lock);
@@ -935,9 +937,10 @@ ParallelBackupStart(ArchiveHandle *AH)
935937
strerror(errno));
936938

937939
slot->workerStatus=WRKR_IDLE;
938-
slot->args= (ParallelArgs*)pg_malloc(sizeof(ParallelArgs));
939-
slot->args->AH=NULL;
940-
slot->args->te=NULL;
940+
slot->AH=NULL;
941+
slot->te=NULL;
942+
slot->callback=NULL;
943+
slot->callback_data=NULL;
941944

942945
/* master's ends of the pipes */
943946
slot->pipeRead=pipeWM[PIPE_READ];
@@ -1071,20 +1074,28 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
10711074
}
10721075

10731076
/*
1074-
* Dispatch a job to some free worker (caller must ensure there is one!)
1077+
* Dispatch a job to some free worker.
10751078
*
10761079
* te is the TocEntry to be processed, act is the action to be taken on it.
1080+
* callback is the function to call on completion of the job.
1081+
*
1082+
* If no worker is currently available, this will block, and previously
1083+
* registered callback functions may be called.
10771084
*/
10781085
void
1079-
DispatchJobForTocEntry(ArchiveHandle*AH,ParallelState*pstate,TocEntry*te,
1080-
T_Actionact)
1086+
DispatchJobForTocEntry(ArchiveHandle*AH,
1087+
ParallelState*pstate,
1088+
TocEntry*te,
1089+
T_Actionact,
1090+
ParallelCompletionPtrcallback,
1091+
void*callback_data)
10811092
{
10821093
intworker;
10831094
char*arg;
10841095

1085-
/*our caller makes sure that at least one worker is idle */
1086-
worker=GetIdleWorker(pstate);
1087-
Assert(worker!=NO_SLOT);
1096+
/*Get a worker, waiting if none are idle */
1097+
while ((worker=GetIdleWorker(pstate))==NO_SLOT)
1098+
WaitForWorkers(AH,pstate,WFW_ONE_IDLE);
10881099

10891100
/* Construct and send command string */
10901101
arg= (AH->MasterStartParallelItemPtr) (AH,te,act);
@@ -1095,14 +1106,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
10951106

10961107
/* Remember worker is busy, and which TocEntry it's working on */
10971108
pstate->parallelSlot[worker].workerStatus=WRKR_WORKING;
1098-
pstate->parallelSlot[worker].args->te=te;
1109+
pstate->parallelSlot[worker].te=te;
1110+
pstate->parallelSlot[worker].callback=callback;
1111+
pstate->parallelSlot[worker].callback_data=callback_data;
10991112
}
11001113

11011114
/*
11021115
* Find an idle worker and return its slot number.
11031116
* Return NO_SLOT if none are idle.
11041117
*/
1105-
int
1118+
staticint
11061119
GetIdleWorker(ParallelState*pstate)
11071120
{
11081121
inti;
@@ -1274,17 +1287,16 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
12741287
* immediately if there is none available.
12751288
*
12761289
* When we get a status message, we let MasterEndParallelItemPtr process it,
1277-
* then save the resulting status code and switch the worker's state to
1278-
* WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify
1279-
* that the status was "OK" and push the worker back to IDLE state.
1290+
* then pass the resulting status code to the callback function that was
1291+
* specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
12801292
*
1281-
*XXX Rube Goldberg would be proud of this API, but no oneelseshould be.
1293+
*Returns true if we collected a status message,elsefalse.
12821294
*
12831295
* XXX is it worth checking for more than one status message per call?
12841296
* It seems somewhat unlikely that multiple workers would finish at exactly
12851297
* the same time.
12861298
*/
1287-
void
1299+
staticbool
12881300
ListenToWorkers(ArchiveHandle*AH,ParallelState*pstate,booldo_wait)
12891301
{
12901302
intworker;
@@ -1298,34 +1310,39 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
12981310
/* If do_wait is true, we must have detected EOF on some socket */
12991311
if (do_wait)
13001312
exit_horribly(modulename,"a worker process died unexpectedly\n");
1301-
return;
1313+
return false;
13021314
}
13031315

13041316
/* Process it and update our idea of the worker's status */
13051317
if (messageStartsWith(msg,"OK "))
13061318
{
1307-
TocEntry*te=pstate->parallelSlot[worker].args->te;
1319+
ParallelSlot*slot=&pstate->parallelSlot[worker];
1320+
TocEntry*te=slot->te;
13081321
char*statusString;
1322+
intstatus;
13091323

13101324
if (messageStartsWith(msg,"OK RESTORE "))
13111325
{
13121326
statusString=msg+strlen("OK RESTORE ");
1313-
pstate->parallelSlot[worker].status=
1327+
status=
13141328
(AH->MasterEndParallelItemPtr)
13151329
(AH,te,statusString,ACT_RESTORE);
1330+
slot->callback(AH,te,status,slot->callback_data);
13161331
}
13171332
elseif (messageStartsWith(msg,"OK DUMP "))
13181333
{
13191334
statusString=msg+strlen("OK DUMP ");
1320-
pstate->parallelSlot[worker].status=
1335+
status=
13211336
(AH->MasterEndParallelItemPtr)
13221337
(AH,te,statusString,ACT_DUMP);
1338+
slot->callback(AH,te,status,slot->callback_data);
13231339
}
13241340
else
13251341
exit_horribly(modulename,
13261342
"invalid message received from worker: \"%s\"\n",
13271343
msg);
1328-
pstate->parallelSlot[worker].workerStatus=WRKR_FINISHED;
1344+
slot->workerStatus=WRKR_IDLE;
1345+
slot->te=NULL;
13291346
}
13301347
else
13311348
exit_horribly(modulename,
@@ -1334,110 +1351,79 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
13341351

13351352
/* Free the string returned from getMessageFromWorker */
13361353
free(msg);
1337-
}
1338-
1339-
/*
1340-
* Check to see if any worker is in WRKR_FINISHED state. If so,
1341-
* return its command status code into *status, reset it to IDLE state,
1342-
* and return its slot number. Otherwise return NO_SLOT.
1343-
*
1344-
* This function is executed in the master process.
1345-
*/
1346-
int
1347-
ReapWorkerStatus(ParallelState*pstate,int*status)
1348-
{
1349-
inti;
13501354

1351-
for (i=0;i<pstate->numWorkers;i++)
1352-
{
1353-
if (pstate->parallelSlot[i].workerStatus==WRKR_FINISHED)
1354-
{
1355-
*status=pstate->parallelSlot[i].status;
1356-
pstate->parallelSlot[i].status=0;
1357-
pstate->parallelSlot[i].workerStatus=WRKR_IDLE;
1358-
returni;
1359-
}
1360-
}
1361-
returnNO_SLOT;
1355+
return true;
13621356
}
13631357

13641358
/*
1365-
* Wait, if necessary, until we have at least one idle worker.
1366-
* Reap worker status as necessary to move FINISHED workers to IDLE state.
1359+
* Check for status results from workers, waiting if necessary.
13671360
*
1368-
* We assume that no extra processing is required when reaping a finished
1369-
* command, except for checking that the status was OK (zero).
1370-
* Caution: that assumption means that this function can only be used in
1371-
* parallel dump, not parallel restore, because the latter has a more
1372-
* complex set of rules about handling status.
1361+
* Available wait modes are:
1362+
* WFW_NO_WAIT: reap any available status, but don't block
1363+
* WFW_GOT_STATUS: wait for at least one more worker to finish
1364+
* WFW_ONE_IDLE: wait for at least one worker to be idle
1365+
* WFW_ALL_IDLE: wait for all workers to be idle
1366+
*
1367+
* Any received results are passed to MasterEndParallelItemPtr and then
1368+
* to the callback specified to DispatchJobForTocEntry.
13731369
*
13741370
* This function is executed in the master process.
13751371
*/
13761372
void
1377-
EnsureIdleWorker(ArchiveHandle*AH,ParallelState*pstate)
1373+
WaitForWorkers(ArchiveHandle*AH,ParallelState*pstate,WFW_WaitOptionmode)
13781374
{
1379-
intret_worker;
1380-
intwork_status;
1375+
booldo_wait= false;
13811376

1382-
for (;;)
1377+
/*
1378+
* In GOT_STATUS mode, always block waiting for a message, since we can't
1379+
* return till we get something. In other modes, we don't block the first
1380+
* time through the loop.
1381+
*/
1382+
if (mode==WFW_GOT_STATUS)
13831383
{
1384-
intnTerm=0;
1385-
1386-
while ((ret_worker=ReapWorkerStatus(pstate,&work_status))!=NO_SLOT)
1387-
{
1388-
if (work_status!=0)
1389-
exit_horribly(modulename,"error processing a parallel work item\n");
1390-
1391-
nTerm++;
1392-
}
1393-
1394-
/*
1395-
* We need to make sure that we have an idle worker before dispatching
1396-
* the next item. If nTerm > 0 we already have that (quick check).
1397-
*/
1398-
if (nTerm>0)
1399-
return;
1400-
1401-
/* explicit check for an idle worker */
1402-
if (GetIdleWorker(pstate)!=NO_SLOT)
1403-
return;
1384+
/* Assert that caller knows what it's doing */
1385+
Assert(!IsEveryWorkerIdle(pstate));
1386+
do_wait= true;
1387+
}
14041388

1389+
for (;;)
1390+
{
14051391
/*
1406-
* If we have no idle worker, read the result of one or more workers
1407-
* and loop the loop to call ReapWorkerStatus() on them
1392+
* Check for status messages, even if we don't need to block. We do
1393+
* not try very hard to reap all available messages, though, since
1394+
* there's unlikely to be more than one.
14081395
*/
1409-
ListenToWorkers(AH,pstate, true);
1410-
}
1411-
}
1412-
1413-
/*
1414-
* Wait for all workers to be idle.
1415-
* Reap worker status as necessary to move FINISHED workers to IDLE state.
1416-
*
1417-
* We assume that no extra processing is required when reaping a finished
1418-
* command, except for checking that the status was OK (zero).
1419-
* Caution: that assumption means that this function can only be used in
1420-
* parallel dump, not parallel restore, because the latter has a more
1421-
* complex set of rules about handling status.
1422-
*
1423-
* This function is executed in the master process.
1424-
*/
1425-
void
1426-
EnsureWorkersFinished(ArchiveHandle*AH,ParallelState*pstate)
1427-
{
1428-
intwork_status;
1396+
if (ListenToWorkers(AH,pstate,do_wait))
1397+
{
1398+
/*
1399+
* If we got a message, we are done by definition for GOT_STATUS
1400+
* mode, and we can also be certain that there's at least one idle
1401+
* worker. So we're done in all but ALL_IDLE mode.
1402+
*/
1403+
if (mode!=WFW_ALL_IDLE)
1404+
return;
1405+
}
14291406

1430-
if (!pstate||pstate->numWorkers==1)
1431-
return;
1407+
/* Check whether we must wait for new status messages */
1408+
switch (mode)
1409+
{
1410+
caseWFW_NO_WAIT:
1411+
return;/* never wait */
1412+
caseWFW_GOT_STATUS:
1413+
Assert(false);/* can't get here, because we waited */
1414+
break;
1415+
caseWFW_ONE_IDLE:
1416+
if (GetIdleWorker(pstate)!=NO_SLOT)
1417+
return;
1418+
break;
1419+
caseWFW_ALL_IDLE:
1420+
if (IsEveryWorkerIdle(pstate))
1421+
return;
1422+
break;
1423+
}
14321424

1433-
/* Waiting for the remaining worker processes to finish */
1434-
while (!IsEveryWorkerIdle(pstate))
1435-
{
1436-
if (ReapWorkerStatus(pstate,&work_status)==NO_SLOT)
1437-
ListenToWorkers(AH,pstate, true);
1438-
elseif (work_status!=0)
1439-
exit_horribly(modulename,
1440-
"error processing a parallel work item\n");
1425+
/* Loop back, and this time wait for something to happen */
1426+
do_wait= true;
14411427
}
14421428
}
14431429

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp