Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6f1b9e4

Browse files
committed
Add pg_upgrade --jobs parameter
Add pg_upgrade --jobs, which allows parallel dump/restore of databases,which improves performance.
1 parent3f88b08 commit6f1b9e4

File tree

7 files changed

+277
-19
lines changed

7 files changed

+277
-19
lines changed

‎contrib/pg_upgrade/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ PGAPPICON = win32
55

66
PROGRAM = pg_upgrade
77
OBJS = check.o controldata.o dump.o exec.o file.o function.o info.o\
8-
option.o page.o pg_upgrade.o relfilenode.o server.o\
8+
option.o page.oparallel.opg_upgrade.o relfilenode.o server.o\
99
tablespace.o util.o version.o version_old_8_3.o$(WIN32RES)
1010

1111
PG_CPPFLAGS = -DFRONTEND -DDLSUFFIX=\"$(DLSUFFIX)\" -I$(srcdir) -I$(libpq_srcdir)

‎contrib/pg_upgrade/dump.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,23 @@ generate_old_dump(void)
3333
/* create per-db dump files */
3434
for (dbnum=0;dbnum<old_cluster.dbarr.ndbs;dbnum++)
3535
{
36-
charfile_name[MAXPGPATH];
36+
charsql_file_name[MAXPGPATH],log_file_name[MAXPGPATH];
3737
DbInfo*old_db=&old_cluster.dbarr.dbs[dbnum];
3838

3939
pg_log(PG_STATUS,"%s",old_db->db_name);
40-
snprintf(file_name,sizeof(file_name),DB_DUMP_FILE_MASK,old_db->db_oid);
40+
snprintf(sql_file_name,sizeof(sql_file_name),DB_DUMP_FILE_MASK,old_db->db_oid);
41+
snprintf(log_file_name,sizeof(log_file_name),DB_DUMP_LOG_FILE_MASK,old_db->db_oid);
4142

42-
exec_prog(RESTORE_LOG_FILE,NULL, true,
43+
parallel_exec_prog(log_file_name,NULL,
4344
"\"%s/pg_dump\" %s --schema-only --binary-upgrade --format=custom %s --file=\"%s\" \"%s\"",
4445
new_cluster.bindir,cluster_conn_opts(&old_cluster),
45-
log_opts.verbose ?"--verbose" :"",file_name,old_db->db_name);
46+
log_opts.verbose ?"--verbose" :"",sql_file_name,old_db->db_name);
4647
}
4748

49+
/* reap all children */
50+
while (reap_child(true)== true)
51+
;
52+
4853
end_progress_output();
4954
check_ok();
5055
}

‎contrib/pg_upgrade/option.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ parseCommandLine(int argc, char *argv[])
5252
{"check",no_argument,NULL,'c'},
5353
{"link",no_argument,NULL,'k'},
5454
{"retain",no_argument,NULL,'r'},
55+
{"jobs",required_argument,NULL,'j'},
5556
{"verbose",no_argument,NULL,'v'},
5657
{NULL,0,NULL,0}
5758
};
@@ -101,7 +102,7 @@ parseCommandLine(int argc, char *argv[])
101102
if ((log_opts.internal=fopen_priv(INTERNAL_LOG_FILE,"a"))==NULL)
102103
pg_log(PG_FATAL,"cannot write to log file %s\n",INTERNAL_LOG_FILE);
103104

104-
while ((option=getopt_long(argc,argv,"d:D:b:B:cko:O:p:P:ru:v",
105+
while ((option=getopt_long(argc,argv,"d:D:b:B:cj:ko:O:p:P:ru:v",
105106
long_options,&optindex))!=-1)
106107
{
107108
switch (option)
@@ -128,6 +129,10 @@ parseCommandLine(int argc, char *argv[])
128129
new_cluster.pgconfig=pg_strdup(optarg);
129130
break;
130131

132+
case'j':
133+
user_opts.jobs=atoi(optarg);
134+
break;
135+
131136
case'k':
132137
user_opts.transfer_mode=TRANSFER_MODE_LINK;
133138
break;
@@ -229,6 +234,7 @@ Options:\n\
229234
-c, --check check clusters only, don't change any data\n\
230235
-d, --old-datadir=OLDDATADIR old cluster data directory\n\
231236
-D, --new-datadir=NEWDATADIR new cluster data directory\n\
237+
-j, --jobs number of simultaneous processes or threads to use\n\
232238
-k, --link link instead of copying files to new cluster\n\
233239
-o, --old-options=OPTIONS old cluster options to pass to the server\n\
234240
-O, --new-options=OPTIONS new cluster options to pass to the server\n\

‎contrib/pg_upgrade/parallel.c

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
*parallel.c
3+
*
4+
*multi-process support
5+
*
6+
*Copyright (c) 2010-2012, PostgreSQL Global Development Group
7+
*contrib/pg_upgrade/parallel.c
8+
*/
9+
10+
#include"postgres.h"
11+
12+
#include"pg_upgrade.h"
13+
14+
#include<stdlib.h>
15+
#include<string.h>
16+
#include<sys/types.h>
17+
#include<sys/wait.h>
18+
19+
#ifdefWIN32
20+
#include<io.h>
21+
#endif
22+
23+
staticintparallel_jobs;
24+
25+
#ifdefWIN32
26+
/*
27+
*Array holding all active threads. There can't be any gaps/zeros so
28+
*it can be passed to WaitForMultipleObjects(). We use two arrays
29+
*so the thread_handles array can be passed to WaitForMultipleObjects().
30+
*/
31+
HANDLE*thread_handles;
32+
33+
typedefstruct {
34+
charlog_file[MAXPGPATH];
35+
charopt_log_file[MAXPGPATH];
36+
charcmd[MAX_STRING];
37+
}thread_arg;
38+
39+
thread_arg**thread_args;
40+
41+
DWORDwin32_exec_prog(thread_arg*args);
42+
43+
#endif
44+
45+
/*
46+
*parallel_exec_prog
47+
*
48+
*This has the same API as exec_prog, except it does parallel execution,
49+
*and therefore must throw errors and doesn't return an error status.
50+
*/
51+
void
52+
parallel_exec_prog(constchar*log_file,constchar*opt_log_file,
53+
constchar*fmt,...)
54+
{
55+
va_listargs;
56+
charcmd[MAX_STRING];
57+
#ifndefWIN32
58+
pid_tchild;
59+
#else
60+
HANDLEchild;
61+
thread_arg*new_arg;
62+
#endif
63+
64+
va_start(args,fmt);
65+
vsnprintf(cmd,sizeof(cmd),fmt,args);
66+
va_end(args);
67+
68+
if (user_opts.jobs <=1)
69+
/* throw_error must be true to allow jobs */
70+
exec_prog(log_file,opt_log_file, true,"%s",cmd);
71+
else
72+
{
73+
/* parallel */
74+
75+
/* harvest any dead children */
76+
while (reap_child(false)== true)
77+
;
78+
79+
/* must we wait for a dead child? */
80+
if (parallel_jobs >=user_opts.jobs)
81+
reap_child(true);
82+
83+
/* set this before we start the job */
84+
parallel_jobs++;
85+
86+
/* Ensure stdio state is quiesced before forking */
87+
fflush(NULL);
88+
89+
#ifndefWIN32
90+
child=fork();
91+
if (child==0)
92+
/* use _exit to skip atexit() functions */
93+
_exit(!exec_prog(log_file,opt_log_file, true,"%s",cmd));
94+
elseif (child<0)
95+
/* fork failed */
96+
pg_log(PG_FATAL,"could not create worker process: %s\n",strerror(errno));
97+
#else
98+
if (thread_handles==NULL)
99+
{
100+
inti;
101+
102+
thread_handles=pg_malloc(user_opts.jobs*sizeof(HANDLE));
103+
thread_args=pg_malloc(user_opts.jobs*sizeof(thread_arg*));
104+
105+
/*
106+
*For safety and performance, we keep the args allocated during
107+
*the entire life of the process, and we don't free the args
108+
*in a thread different from the one that allocated it.
109+
*/
110+
for (i=0;i<user_opts.jobs;i++)
111+
thread_args[i]=pg_malloc(sizeof(thread_arg));
112+
}
113+
114+
/* use first empty array element */
115+
new_arg=thread_args[parallel_jobs-1];
116+
117+
/* Can only pass one pointer into the function, so use a struct */
118+
strcpy(new_arg->log_file,log_file);
119+
strcpy(new_arg->opt_log_file,opt_log_file);
120+
strcpy(new_arg->cmd,cmd);
121+
122+
child= (HANDLE)_beginthreadex(NULL,0, (void*)win32_exec_prog,
123+
new_arg,0,NULL);
124+
if (child==0)
125+
pg_log(PG_FATAL,"could not create worker thread: %s\n",strerror(errno));
126+
127+
thread_handles[parallel_jobs-1]=child;
128+
#endif
129+
}
130+
131+
return;
132+
}
133+
134+
135+
#ifdefWIN32
136+
DWORD
137+
win32_exec_prog(thread_arg*args)
138+
{
139+
intret;
140+
141+
ret= !exec_prog(args->log_file,args->opt_log_file, true,"%s",args->cmd);
142+
143+
/* terminates thread */
144+
returnret;
145+
}
146+
#endif
147+
148+
149+
/*
150+
*collect status from a completed worker child
151+
*/
152+
bool
153+
reap_child(boolwait_for_child)
154+
{
155+
#ifndefWIN32
156+
intwork_status;
157+
intret;
158+
#else
159+
intthread_num;
160+
DWORDres;
161+
#endif
162+
163+
if (user_opts.jobs <=1||parallel_jobs==0)
164+
return false;
165+
166+
#ifndefWIN32
167+
ret=waitpid(-1,&work_status,wait_for_child ?0 :WNOHANG);
168+
169+
/* no children or, for WNOHANG, no dead children */
170+
if (ret <=0|| !WIFEXITED(work_status))
171+
return false;
172+
173+
if (WEXITSTATUS(work_status)!=0)
174+
pg_log(PG_FATAL,"child worker exited abnormally: %s\n",strerror(errno));
175+
176+
#else
177+
/* wait for one to finish */
178+
thread_num=WaitForMultipleObjects(parallel_jobs,thread_handles,
179+
false,wait_for_child ?INFINITE :0);
180+
181+
if (thread_num==WAIT_TIMEOUT||thread_num==WAIT_FAILED)
182+
return false;
183+
184+
/* compute thread index in active_threads */
185+
thread_num-=WAIT_OBJECT_0;
186+
187+
/* get the result */
188+
GetExitCodeThread(thread_handles[thread_num],&res);
189+
if (res!=0)
190+
pg_log(PG_FATAL,"child worker exited abnormally: %s\n",strerror(errno));
191+
192+
/* dispose of handle to stop leaks */
193+
CloseHandle(thread_handles[thread_num]);
194+
195+
/*Move last slot into dead child's position */
196+
if (thread_num!=parallel_jobs-1)
197+
{
198+
thread_arg*tmp_args;
199+
200+
thread_handles[thread_num]=thread_handles[parallel_jobs-1];
201+
202+
/*
203+
*We must swap the arg struct pointers because the thread we
204+
*just moved is active, and we must make sure it is not
205+
*reused by the next created thread. Instead, the new thread
206+
*will use the arg struct of the thread that just died.
207+
*/
208+
tmp_args=thread_args[thread_num];
209+
thread_args[thread_num]=thread_args[parallel_jobs-1];
210+
thread_args[parallel_jobs-1]=tmp_args;
211+
}
212+
#endif
213+
214+
/* do this after job has been removed */
215+
parallel_jobs--;
216+
217+
return true;
218+
}

‎contrib/pg_upgrade/pg_upgrade.c

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ char *output_files[] = {
6161
/* unique file for pg_ctl start */
6262
SERVER_START_LOG_FILE,
6363
#endif
64-
RESTORE_LOG_FILE,
6564
UTILITY_LOG_FILE,
6665
INTERNAL_LOG_FILE,
6766
NULL
@@ -270,7 +269,7 @@ prepare_new_databases(void)
270269
* support functions in template1 but pg_dumpall creates database using
271270
* the template0 template.
272271
*/
273-
exec_prog(RESTORE_LOG_FILE,NULL, true,
272+
exec_prog(UTILITY_LOG_FILE,NULL, true,
274273
"\"%s/psql\" "EXEC_PSQL_ARGS" %s -f \"%s\"",
275274
new_cluster.bindir,cluster_conn_opts(&new_cluster),
276275
GLOBALS_DUMP_FILE);
@@ -307,22 +306,28 @@ create_new_objects(void)
307306

308307
for (dbnum=0;dbnum<old_cluster.dbarr.ndbs;dbnum++)
309308
{
310-
charfile_name[MAXPGPATH];
309+
charsql_file_name[MAXPGPATH],log_file_name[MAXPGPATH];
311310
DbInfo*old_db=&old_cluster.dbarr.dbs[dbnum];
312311

313312
pg_log(PG_STATUS,"%s",old_db->db_name);
314-
snprintf(file_name,sizeof(file_name),DB_DUMP_FILE_MASK,old_db->db_oid);
313+
snprintf(sql_file_name,sizeof(sql_file_name),DB_DUMP_FILE_MASK,old_db->db_oid);
314+
snprintf(log_file_name,sizeof(log_file_name),DB_DUMP_LOG_FILE_MASK,old_db->db_oid);
315315

316316
/*
317317
*Using pg_restore --single-transaction is faster than other
318318
*methods, like --jobs. pg_dump only produces its output at the
319319
*end, so there is little parallelism using the pipe.
320320
*/
321-
exec_prog(RESTORE_LOG_FILE,NULL, true,
321+
parallel_exec_prog(log_file_name,NULL,
322322
"\"%s/pg_restore\" %s --exit-on-error --single-transaction --verbose --dbname \"%s\" \"%s\"",
323323
new_cluster.bindir,cluster_conn_opts(&new_cluster),
324-
old_db->db_name,file_name);
324+
old_db->db_name,sql_file_name);
325325
}
326+
327+
/* reap all children */
328+
while (reap_child(true)== true)
329+
;
330+
326331
end_progress_output();
327332
check_ok();
328333

@@ -494,11 +499,14 @@ cleanup(void)
494499
if (old_cluster.dbarr.dbs)
495500
for (dbnum=0;dbnum<old_cluster.dbarr.ndbs;dbnum++)
496501
{
497-
charfile_name[MAXPGPATH];
502+
charsql_file_name[MAXPGPATH],log_file_name[MAXPGPATH];
498503
DbInfo*old_db=&old_cluster.dbarr.dbs[dbnum];
499504

500-
snprintf(file_name,sizeof(file_name),DB_DUMP_FILE_MASK,old_db->db_oid);
501-
unlink(file_name);
505+
snprintf(sql_file_name,sizeof(sql_file_name),DB_DUMP_FILE_MASK,old_db->db_oid);
506+
unlink(sql_file_name);
507+
508+
snprintf(log_file_name,sizeof(log_file_name),DB_DUMP_FILE_MASK,old_db->db_oid);
509+
unlink(log_file_name);
502510
}
503511
}
504512
}

‎contrib/pg_upgrade/pg_upgrade.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
#defineGLOBALS_DUMP_FILE"pg_upgrade_dump_globals.sql"
3333
#defineDB_DUMP_FILE_MASK"pg_upgrade_dump_%u.custom"
3434

35+
#defineDB_DUMP_LOG_FILE_MASK"pg_upgrade_dump_%u.log"
3536
#defineSERVER_LOG_FILE"pg_upgrade_server.log"
36-
#defineRESTORE_LOG_FILE"pg_upgrade_restore.log"
3737
#defineUTILITY_LOG_FILE"pg_upgrade_utility.log"
3838
#defineINTERNAL_LOG_FILE"pg_upgrade_internal.log"
3939

@@ -264,6 +264,7 @@ typedef struct
264264
boolcheck;/* TRUE -> ask user for permission to make
265265
* changes */
266266
transferModetransfer_mode;/* copy files or link them? */
267+
intjobs;
267268
}UserOpts;
268269

269270

@@ -461,3 +462,11 @@ voidold_8_3_invalidate_hash_gin_indexes(ClusterInfo *cluster, bool check_mode)
461462
voidold_8_3_invalidate_bpchar_pattern_ops_indexes(ClusterInfo*cluster,
462463
boolcheck_mode);
463464
char*old_8_3_create_sequence_script(ClusterInfo*cluster);
465+
466+
/* parallel.c */
467+
voidparallel_exec_prog(constchar*log_file,constchar*opt_log_file,
468+
constchar*fmt,...)
469+
__attribute__((format(PG_PRINTF_ATTRIBUTE,3,4)));
470+
471+
boolreap_child(boolwait_for_child);
472+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp