Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0109ab2

Browse files
committed
Make struct ParallelSlot private within pg_dump/parallel.c.
The only field of this struct that other files have any need to touchis the pointer to the TocEntry a worker is working on. (Well,pg_backup_archiver.c is actually looking at workerStatus too, but thatcan be finessed by specifying that the TocEntry pointer is NULL for anon-busy worker.)Hence, move out the TocEntry pointers to a separate array withinstruct ParallelState, and then we can make struct ParallelSlot private.I noted the possibility of this previously, but hadn't got round toactually doing it.Discussion: <1188.1464544443@sss.pgh.pa.us>
1 parentfb03d08 commit0109ab2

File tree

3 files changed

+65
-54
lines changed

3 files changed

+65
-54
lines changed

‎src/bin/pg_dump/parallel.c

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
*WRKR_IDLE: it's waiting for a command
4646
*WRKR_WORKING: it's working on a command
4747
*WRKR_TERMINATED: process ended
48+
* The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
49+
* state, and must be NULL in other states.
4850
*/
4951

5052
#include"postgres_fe.h"
@@ -71,6 +73,45 @@
7173

7274
#defineNO_SLOT (-1)/* Failure result for GetIdleWorker() */
7375

76+
/* Worker process statuses */
77+
typedefenum
78+
{
79+
WRKR_IDLE,
80+
WRKR_WORKING,
81+
WRKR_TERMINATED
82+
}T_WorkerStatus;
83+
84+
/*
85+
* Private per-parallel-worker state (typedef for this is in parallel.h).
86+
*
87+
* Much of this is valid only in the master process (or, on Windows, should
88+
* be touched only by the master thread). But the AH field should be touched
89+
* only by workers. The pipe descriptors are valid everywhere.
90+
*/
91+
structParallelSlot
92+
{
93+
T_WorkerStatusworkerStatus;/* see enum above */
94+
95+
/* These fields are valid if workerStatus == WRKR_WORKING: */
96+
ParallelCompletionPtrcallback;/* function to call on completion */
97+
void*callback_data;/* passthru data for it */
98+
99+
ArchiveHandle*AH;/* Archive data worker is using */
100+
101+
intpipeRead;/* master's end of the pipes */
102+
intpipeWrite;
103+
intpipeRevRead;/* child's end of the pipes */
104+
intpipeRevWrite;
105+
106+
/* Child process/thread identity info: */
107+
#ifdefWIN32
108+
uintptr_thThread;
109+
unsignedintthreadId;
110+
#else
111+
pid_tpid;
112+
#endif
113+
};
114+
74115
#ifdefWIN32
75116

76117
/*
@@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate)
475516
}
476517
#endif/* WIN32 */
477518

478-
/* On all platforms, update workerStatus as well */
519+
/* On all platforms, update workerStatusand te[]as well */
479520
Assert(j<pstate->numWorkers);
480521
slot->workerStatus=WRKR_TERMINATED;
522+
pstate->te[j]=NULL;
481523
}
482524
}
483525

@@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH)
870912
{
871913
ParallelState*pstate;
872914
inti;
873-
constsize_tslotSize=AH->public.numWorkers*sizeof(ParallelSlot);
874915

875916
Assert(AH->public.numWorkers>0);
876917

877918
pstate= (ParallelState*)pg_malloc(sizeof(ParallelState));
878919

879920
pstate->numWorkers=AH->public.numWorkers;
921+
pstate->te=NULL;
880922
pstate->parallelSlot=NULL;
881923

882924
if (AH->public.numWorkers==1)
883925
returnpstate;
884926

885-
pstate->parallelSlot= (ParallelSlot*)pg_malloc(slotSize);
886-
memset((void*)pstate->parallelSlot,0,slotSize);
927+
pstate->te= (TocEntry**)
928+
pg_malloc0(pstate->numWorkers*sizeof(TocEntry*));
929+
pstate->parallelSlot= (ParallelSlot*)
930+
pg_malloc0(pstate->numWorkers*sizeof(ParallelSlot));
887931

888932
#ifdefWIN32
889933
/* Make fmtId() and fmtQualifiedId() use thread-local storage */
@@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH)
929973
"could not create communication channels: %s\n",
930974
strerror(errno));
931975

976+
pstate->te[i]=NULL;/* just for safety */
977+
932978
slot->workerStatus=WRKR_IDLE;
933979
slot->AH=NULL;
934-
slot->te=NULL;
935980
slot->callback=NULL;
936981
slot->callback_data=NULL;
937982

@@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
10621107
set_cancel_pstate(NULL);
10631108

10641109
/* Release state (mere neatnik-ism, since we're about to terminate) */
1110+
free(pstate->te);
10651111
free(pstate->parallelSlot);
10661112
free(pstate);
10671113
}
@@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH,
12011247

12021248
/* Remember worker is busy, and which TocEntry it's working on */
12031249
pstate->parallelSlot[worker].workerStatus=WRKR_WORKING;
1204-
pstate->parallelSlot[worker].te=te;
12051250
pstate->parallelSlot[worker].callback=callback;
12061251
pstate->parallelSlot[worker].callback_data=callback_data;
1252+
pstate->te[worker]=te;
12071253
}
12081254

12091255
/*
@@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
13941440
if (messageStartsWith(msg,"OK "))
13951441
{
13961442
ParallelSlot*slot=&pstate->parallelSlot[worker];
1397-
TocEntry*te=slot->te;
1443+
TocEntry*te=pstate->te[worker];
13981444
intstatus;
13991445

14001446
status=parseWorkerResponse(AH,te,msg);
14011447
slot->callback(AH,te,status,slot->callback_data);
14021448
slot->workerStatus=WRKR_IDLE;
1403-
slot->te=NULL;
1449+
pstate->te[worker]=NULL;
14041450
}
14051451
else
14061452
exit_horribly(modulename,

‎src/bin/pg_dump/parallel.h

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,51 +33,16 @@ typedef enum
3333
WFW_ALL_IDLE
3434
}WFW_WaitOption;
3535

36-
/* Worker process statuses */
37-
typedefenum
38-
{
39-
WRKR_IDLE,
40-
WRKR_WORKING,
41-
WRKR_TERMINATED
42-
}T_WorkerStatus;
43-
44-
/*
45-
* Per-parallel-worker state of parallel.c.
46-
*
47-
* Much of this is valid only in the master process (or, on Windows, should
48-
* be touched only by the master thread). But the AH field should be touched
49-
* only by workers. The pipe descriptors are valid everywhere.
50-
*/
51-
typedefstructParallelSlot
52-
{
53-
T_WorkerStatusworkerStatus;/* see enum above */
54-
55-
/* These fields are valid if workerStatus == WRKR_WORKING: */
56-
TocEntry*te;/* item being worked on */
57-
ParallelCompletionPtrcallback;/* function to call on completion */
58-
void*callback_data;/* passthru data for it */
59-
60-
ArchiveHandle*AH;/* Archive data worker is using */
61-
62-
intpipeRead;/* master's end of the pipes */
63-
intpipeWrite;
64-
intpipeRevRead;/* child's end of the pipes */
65-
intpipeRevWrite;
66-
67-
/* Child process/thread identity info: */
68-
#ifdefWIN32
69-
uintptr_thThread;
70-
unsignedintthreadId;
71-
#else
72-
pid_tpid;
73-
#endif
74-
}ParallelSlot;
36+
/* ParallelSlot is an opaque struct known only within parallel.c */
37+
typedefstructParallelSlotParallelSlot;
7538

7639
/* Overall state for parallel.c */
7740
typedefstructParallelState
7841
{
7942
intnumWorkers;/* allowed number of workers */
80-
ParallelSlot*parallelSlot;/* array of numWorkers slots */
43+
/* these arrays have numWorkers entries, one per worker: */
44+
TocEntry**te;/* item being worked on, or NULL */
45+
ParallelSlot*parallelSlot;/* private info about each worker */
8146
}ParallelState;
8247

8348
#ifdefWIN32

‎src/bin/pg_dump/pg_backup_archiver.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4027,8 +4027,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
40274027

40284028
for (k=0;k<pstate->numWorkers;k++)
40294029
{
4030-
if (pstate->parallelSlot[k].workerStatus==WRKR_WORKING&&
4031-
pstate->parallelSlot[k].te->section==SECTION_DATA)
4030+
TocEntry*running_te=pstate->te[k];
4031+
4032+
if (running_te!=NULL&&
4033+
running_te->section==SECTION_DATA)
40324034
count++;
40334035
}
40344036
if (pstate->numWorkers==0||count*4<pstate->numWorkers)
@@ -4049,12 +4051,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
40494051
*/
40504052
for (i=0;i<pstate->numWorkers;i++)
40514053
{
4052-
TocEntry*running_te;
4054+
TocEntry*running_te=pstate->te[i];
40534055

4054-
if (pstate->parallelSlot[i].workerStatus!=WRKR_WORKING)
4056+
if (running_te==NULL)
40554057
continue;
4056-
running_te=pstate->parallelSlot[i].te;
4057-
40584058
if (has_lock_conflicts(te,running_te)||
40594059
has_lock_conflicts(running_te,te))
40604060
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp