Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita89c46f

Browse files
committed
Allow parallel copy/link in pg_upgrade
This patch implements parallel copying/linking of files by tablespaceusing the --jobs option in pg_upgrade.
1 parentc00dc33 commita89c46f

File tree

8 files changed

+256
-73
lines changed

8 files changed

+256
-73
lines changed

‎contrib/pg_upgrade/check.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ create_script_for_old_cluster_deletion(char **deletion_script_file_name)
606606
fprintf(script,RMDIR_CMD" %s\n",fix_path_separator(old_cluster.pgdata));
607607

608608
/* delete old cluster's alternate tablespaces */
609-
for (tblnum=0;tblnum<os_info.num_tablespaces;tblnum++)
609+
for (tblnum=0;tblnum<os_info.num_old_tablespaces;tblnum++)
610610
{
611611
/*
612612
* Do the old cluster's per-database directories share a directory
@@ -621,14 +621,14 @@ create_script_for_old_cluster_deletion(char **deletion_script_file_name)
621621
/* remove PG_VERSION? */
622622
if (GET_MAJOR_VERSION(old_cluster.major_version) <=804)
623623
fprintf(script,RM_CMD" %s%s%cPG_VERSION\n",
624-
fix_path_separator(os_info.tablespaces[tblnum]),
624+
fix_path_separator(os_info.old_tablespaces[tblnum]),
625625
fix_path_separator(old_cluster.tablespace_suffix),
626626
PATH_SEPARATOR);
627627

628628
for (dbnum=0;dbnum<old_cluster.dbarr.ndbs;dbnum++)
629629
{
630630
fprintf(script,RMDIR_CMD" %s%s%c%d\n",
631-
fix_path_separator(os_info.tablespaces[tblnum]),
631+
fix_path_separator(os_info.old_tablespaces[tblnum]),
632632
fix_path_separator(old_cluster.tablespace_suffix),
633633
PATH_SEPARATOR,old_cluster.dbarr.dbs[dbnum].db_oid);
634634
}
@@ -640,7 +640,7 @@ create_script_for_old_cluster_deletion(char **deletion_script_file_name)
640640
* or a version-specific subdirectory.
641641
*/
642642
fprintf(script,RMDIR_CMD" %s%s\n",
643-
fix_path_separator(os_info.tablespaces[tblnum]),
643+
fix_path_separator(os_info.old_tablespaces[tblnum]),
644644
fix_path_separator(old_cluster.tablespace_suffix));
645645
}
646646

‎contrib/pg_upgrade/info.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,25 @@ create_rel_filename_map(const char *old_data, const char *new_data,
106106
* relation belongs to the default tablespace, hence relfiles should
107107
* exist in the data directories.
108108
*/
109-
snprintf(map->old_dir,sizeof(map->old_dir),"%s/base/%u",old_data,
110-
old_db->db_oid);
111-
snprintf(map->new_dir,sizeof(map->new_dir),"%s/base/%u",new_data,
112-
new_db->db_oid);
109+
strlcpy(map->old_tablespace,old_data,sizeof(map->old_tablespace));
110+
strlcpy(map->new_tablespace,new_data,sizeof(map->new_tablespace));
111+
strlcpy(map->old_tablespace_suffix,"/base",sizeof(map->old_tablespace_suffix));
112+
strlcpy(map->new_tablespace_suffix,"/base",sizeof(map->new_tablespace_suffix));
113113
}
114114
else
115115
{
116116
/* relation belongs to a tablespace, so use the tablespace location */
117-
snprintf(map->old_dir,sizeof(map->old_dir),"%s%s/%u",old_rel->tablespace,
118-
old_cluster.tablespace_suffix,old_db->db_oid);
119-
snprintf(map->new_dir,sizeof(map->new_dir),"%s%s/%u",new_rel->tablespace,
120-
new_cluster.tablespace_suffix,new_db->db_oid);
117+
strlcpy(map->old_tablespace,old_rel->tablespace,sizeof(map->old_tablespace));
118+
strlcpy(map->new_tablespace,new_rel->tablespace,sizeof(map->new_tablespace));
119+
strlcpy(map->old_tablespace_suffix,old_cluster.tablespace_suffix,
120+
sizeof(map->old_tablespace_suffix));
121+
strlcpy(map->new_tablespace_suffix,new_cluster.tablespace_suffix,
122+
sizeof(map->new_tablespace_suffix));
121123
}
122124

125+
map->old_db_oid=old_db->db_oid;
126+
map->new_db_oid=new_db->db_oid;
127+
123128
/*
124129
* old_relfilenode might differ from pg_class.oid (and hence
125130
* new_relfilenode) because of CLUSTER, REINDEX, or VACUUM FULL.

‎contrib/pg_upgrade/parallel.c

Lines changed: 134 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,24 @@ typedef struct {
3434
charlog_file[MAXPGPATH];
3535
charopt_log_file[MAXPGPATH];
3636
charcmd[MAX_STRING];
37-
}thread_arg;
37+
}exec_thread_arg;
3838

39-
thread_arg**thread_args;
39+
typedefstruct {
40+
DbInfoArr*old_db_arr;
41+
DbInfoArr*new_db_arr;
42+
charold_pgdata[MAXPGPATH];
43+
charnew_pgdata[MAXPGPATH];
44+
charold_tablespace[MAXPGPATH];
45+
}transfer_thread_arg;
46+
47+
exec_thread_arg**exec_thread_args;
48+
transfer_thread_arg**transfer_thread_args;
49+
50+
/* track current thread_args struct so reap_child() can be used for all cases */
51+
void**cur_thread_args;
4052

41-
DWORDwin32_exec_prog(thread_arg*args);
53+
DWORDwin32_exec_prog(exec_thread_arg*args);
54+
DWORDwin32_transfer_all_new_dbs(transfer_thread_arg*args);
4255

4356
#endif
4457

@@ -58,7 +71,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
5871
pid_tchild;
5972
#else
6073
HANDLEchild;
61-
thread_arg*new_arg;
74+
exec_thread_arg*new_arg;
6275
#endif
6376

6477
va_start(args,fmt);
@@ -71,7 +84,9 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
7184
else
7285
{
7386
/* parallel */
74-
87+
#ifdefWIN32
88+
cur_thread_args= (void**)exec_thread_args;
89+
#endif
7590
/* harvest any dead children */
7691
while (reap_child(false)== true)
7792
;
@@ -100,19 +115,19 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
100115
inti;
101116

102117
thread_handles=pg_malloc(user_opts.jobs*sizeof(HANDLE));
103-
thread_args=pg_malloc(user_opts.jobs*sizeof(thread_arg*));
118+
exec_thread_args=pg_malloc(user_opts.jobs*sizeof(exec_thread_arg*));
104119

105120
/*
106121
*For safety and performance, we keep the args allocated during
107122
*the entire life of the process, and we don't free the args
108123
*in a thread different from the one that allocated it.
109124
*/
110125
for (i=0;i<user_opts.jobs;i++)
111-
thread_args[i]=pg_malloc(sizeof(thread_arg));
126+
exec_thread_args[i]=pg_malloc(sizeof(exec_thread_arg));
112127
}
113128

114129
/* use first empty array element */
115-
new_arg=thread_args[parallel_jobs-1];
130+
new_arg=exec_thread_args[parallel_jobs-1];
116131

117132
/* Can only pass one pointer into the function, so use a struct */
118133
strcpy(new_arg->log_file,log_file);
@@ -134,7 +149,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
134149

135150
#ifdefWIN32
136151
DWORD
137-
win32_exec_prog(thread_arg*args)
152+
win32_exec_prog(exec_thread_arg*args)
138153
{
139154
intret;
140155

@@ -146,6 +161,112 @@ win32_exec_prog(thread_arg *args)
146161
#endif
147162

148163

164+
/*
165+
*parallel_transfer_all_new_dbs
166+
*
167+
*This has the same API as transfer_all_new_dbs, except it does parallel execution
168+
*by transfering multiple tablespaces in parallel
169+
*/
170+
voidparallel_transfer_all_new_dbs(DbInfoArr*old_db_arr,DbInfoArr*new_db_arr,
171+
char*old_pgdata,char*new_pgdata,
172+
char*old_tablespace)
173+
{
174+
#ifndefWIN32
175+
pid_tchild;
176+
#else
177+
HANDLEchild;
178+
transfer_thread_arg*new_arg;
179+
#endif
180+
181+
if (user_opts.jobs <=1)
182+
/* throw_error must be true to allow jobs */
183+
transfer_all_new_dbs(old_db_arr,new_db_arr,old_pgdata,new_pgdata,NULL);
184+
else
185+
{
186+
/* parallel */
187+
#ifdefWIN32
188+
cur_thread_args= (void**)transfer_thread_args;
189+
#endif
190+
/* harvest any dead children */
191+
while (reap_child(false)== true)
192+
;
193+
194+
/* must we wait for a dead child? */
195+
if (parallel_jobs >=user_opts.jobs)
196+
reap_child(true);
197+
198+
/* set this before we start the job */
199+
parallel_jobs++;
200+
201+
/* Ensure stdio state is quiesced before forking */
202+
fflush(NULL);
203+
204+
#ifndefWIN32
205+
child=fork();
206+
if (child==0)
207+
{
208+
transfer_all_new_dbs(old_db_arr,new_db_arr,old_pgdata,new_pgdata,
209+
old_tablespace);
210+
/* if we take another exit path, it will be non-zero */
211+
/* use _exit to skip atexit() functions */
212+
_exit(0);
213+
}
214+
elseif (child<0)
215+
/* fork failed */
216+
pg_log(PG_FATAL,"could not create worker process: %s\n",strerror(errno));
217+
#else
218+
if (thread_handles==NULL)
219+
{
220+
inti;
221+
222+
thread_handles=pg_malloc(user_opts.jobs*sizeof(HANDLE));
223+
transfer_thread_args=pg_malloc(user_opts.jobs*sizeof(transfer_thread_arg*));
224+
225+
/*
226+
*For safety and performance, we keep the args allocated during
227+
*the entire life of the process, and we don't free the args
228+
*in a thread different from the one that allocated it.
229+
*/
230+
for (i=0;i<user_opts.jobs;i++)
231+
transfer_thread_args[i]=pg_malloc(sizeof(transfer_thread_arg));
232+
}
233+
234+
/* use first empty array element */
235+
new_arg=transfer_thread_args[parallel_jobs-1];
236+
237+
/* Can only pass one pointer into the function, so use a struct */
238+
new_arg->old_db_arr=old_db_arr;
239+
new_arg->new_db_arr=new_db_arr;
240+
strcpy(new_arg->old_pgdata,old_pgdata);
241+
strcpy(new_arg->new_pgdata,new_pgdata);
242+
strcpy(new_arg->old_tablespace,old_tablespace);
243+
244+
child= (HANDLE)_beginthreadex(NULL,0, (void*)win32_exec_prog,
245+
new_arg,0,NULL);
246+
if (child==0)
247+
pg_log(PG_FATAL,"could not create worker thread: %s\n",strerror(errno));
248+
249+
thread_handles[parallel_jobs-1]=child;
250+
#endif
251+
}
252+
253+
return;
254+
}
255+
256+
257+
#ifdefWIN32
258+
DWORD
259+
win32_transfer_all_new_dbs(transfer_thread_arg*args)
260+
{
261+
transfer_all_new_dbs(args->old_db_arr,args->new_db_arr,args->old_pgdata,
262+
args->new_pgdata,args->old_tablespace);
263+
264+
/* terminates thread */
265+
return0;
266+
}
267+
#endif
268+
269+
149270
/*
150271
*collect status from a completed worker child
151272
*/
@@ -195,7 +316,7 @@ reap_child(bool wait_for_child)
195316
/*Move last slot into dead child's position */
196317
if (thread_num!=parallel_jobs-1)
197318
{
198-
thread_arg*tmp_args;
319+
void*tmp_args;
199320

200321
thread_handles[thread_num]=thread_handles[parallel_jobs-1];
201322

@@ -205,9 +326,9 @@ reap_child(bool wait_for_child)
205326
*reused by the next created thread. Instead, the new thread
206327
*will use the arg struct of the thread that just died.
207328
*/
208-
tmp_args=thread_args[thread_num];
209-
thread_args[thread_num]=thread_args[parallel_jobs-1];
210-
thread_args[parallel_jobs-1]=tmp_args;
329+
tmp_args=cur_thread_args[thread_num];
330+
cur_thread_args[thread_num]=cur_thread_args[parallel_jobs-1];
331+
cur_thread_args[parallel_jobs-1]=tmp_args;
211332
}
212333
#endif
213334

‎contrib/pg_upgrade/pg_upgrade.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ main(int argc, char **argv)
133133
if (user_opts.transfer_mode==TRANSFER_MODE_LINK)
134134
disable_old_cluster();
135135

136-
transfer_all_new_dbs(&old_cluster.dbarr,&new_cluster.dbarr,
136+
transfer_all_new_tablespaces(&old_cluster.dbarr,&new_cluster.dbarr,
137137
old_cluster.pgdata,new_cluster.pgdata);
138138

139139
/*

‎contrib/pg_upgrade/pg_upgrade.h

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,12 @@ typedef struct
134134
*/
135135
typedefstruct
136136
{
137-
charold_dir[MAXPGPATH];
138-
charnew_dir[MAXPGPATH];
137+
charold_tablespace[MAXPGPATH];
138+
charnew_tablespace[MAXPGPATH];
139+
charold_tablespace_suffix[MAXPGPATH];
140+
charnew_tablespace_suffix[MAXPGPATH];
141+
Oidold_db_oid;
142+
Oidnew_db_oid;
139143

140144
/*
141145
* old/new relfilenodes might differ for pg_largeobject(_metadata) indexes
@@ -276,8 +280,8 @@ typedef struct
276280
constchar*progname;/* complete pathname for this program */
277281
char*exec_path;/* full path to my executable */
278282
char*user;/* username for clusters */
279-
char**tablespaces;/* tablespaces */
280-
intnum_tablespaces;
283+
char**old_tablespaces;/* tablespaces */
284+
intnum_old_tablespaces;
281285
char**libraries;/* loadable libraries */
282286
intnum_libraries;
283287
ClusterInfo*running_cluster;
@@ -398,9 +402,11 @@ voidget_sock_dir(ClusterInfo *cluster, bool live_check);
398402
/* relfilenode.c */
399403

400404
voidget_pg_database_relfilenode(ClusterInfo*cluster);
401-
voidtransfer_all_new_dbs(DbInfoArr*olddb_arr,
402-
DbInfoArr*newdb_arr,char*old_pgdata,char*new_pgdata);
403-
405+
voidtransfer_all_new_tablespaces(DbInfoArr*old_db_arr,
406+
DbInfoArr*new_db_arr,char*old_pgdata,char*new_pgdata);
407+
voidtransfer_all_new_dbs(DbInfoArr*old_db_arr,
408+
DbInfoArr*new_db_arr,char*old_pgdata,char*new_pgdata,
409+
char*old_tablespace);
404410

405411
/* tablespace.c */
406412

@@ -464,9 +470,11 @@ void old_8_3_invalidate_bpchar_pattern_ops_indexes(ClusterInfo *cluster,
464470
char*old_8_3_create_sequence_script(ClusterInfo*cluster);
465471

466472
/* parallel.c */
467-
voidparallel_exec_prog(constchar*log_file,constchar*opt_log_file,
473+
voidparallel_exec_prog(constchar*log_file,constchar*opt_log_file,
468474
constchar*fmt,...)
469475
__attribute__((format(PG_PRINTF_ATTRIBUTE,3,4)));
470-
471-
boolreap_child(boolwait_for_child);
476+
voidparallel_transfer_all_new_dbs(DbInfoArr*old_db_arr,DbInfoArr*new_db_arr,
477+
char*old_pgdata,char*new_pgdata,
478+
char*old_tablespace);
479+
boolreap_child(boolwait_for_child);
472480

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp