@@ -34,11 +34,24 @@ typedef struct {
3434char log_file [MAXPGPATH ];
3535char opt_log_file [MAXPGPATH ];
3636char cmd [MAX_STRING ];
37- }thread_arg ;
37+ }exec_thread_arg ;
3838
39- thread_arg * * thread_args ;
39+ typedef struct {
40+ DbInfoArr * old_db_arr ;
41+ DbInfoArr * new_db_arr ;
42+ char old_pgdata [MAXPGPATH ];
43+ char new_pgdata [MAXPGPATH ];
44+ char old_tablespace [MAXPGPATH ];
45+ }transfer_thread_arg ;
46+
47+ exec_thread_arg * * exec_thread_args ;
48+ transfer_thread_arg * * transfer_thread_args ;
49+
50+ /* track current thread_args struct so reap_child() can be used for all cases */
51+ void * * cur_thread_args ;
4052
41- DWORD win32_exec_prog (thread_arg * args );
53+ DWORD win32_exec_prog (exec_thread_arg * args );
54+ DWORD win32_transfer_all_new_dbs (transfer_thread_arg * args );
4255
4356#endif
4457
@@ -58,7 +71,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
5871pid_t child ;
5972#else
6073HANDLE child ;
61- thread_arg * new_arg ;
74+ exec_thread_arg * new_arg ;
6275#endif
6376
6477va_start (args ,fmt );
@@ -71,7 +84,9 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
7184else
7285{
7386/* parallel */
74-
87+ #ifdef WIN32
88+ cur_thread_args = (void * * )exec_thread_args ;
89+ #endif
7590/* harvest any dead children */
7691while (reap_child (false)== true)
7792;
@@ -100,19 +115,19 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
100115int i ;
101116
102117thread_handles = pg_malloc (user_opts .jobs * sizeof (HANDLE ));
103- thread_args = pg_malloc (user_opts .jobs * sizeof (thread_arg * ));
118+ exec_thread_args = pg_malloc (user_opts .jobs * sizeof (exec_thread_arg * ));
104119
105120/*
106121 *For safety and performance, we keep the args allocated during
107122 *the entire life of the process, and we don't free the args
108123 *in a thread different from the one that allocated it.
109124 */
110125for (i = 0 ;i < user_opts .jobs ;i ++ )
111- thread_args [i ]= pg_malloc (sizeof (thread_arg ));
126+ exec_thread_args [i ]= pg_malloc (sizeof (exec_thread_arg ));
112127}
113128
114129/* use first empty array element */
115- new_arg = thread_args [parallel_jobs - 1 ];
130+ new_arg = exec_thread_args [parallel_jobs - 1 ];
116131
117132/* Can only pass one pointer into the function, so use a struct */
118133strcpy (new_arg -> log_file ,log_file );
@@ -134,7 +149,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
134149
135150#ifdef WIN32
136151DWORD
137- win32_exec_prog (thread_arg * args )
152+ win32_exec_prog (exec_thread_arg * args )
138153{
139154int ret ;
140155
@@ -146,6 +161,112 @@ win32_exec_prog(thread_arg *args)
146161#endif
147162
148163
164+ /*
165+ *parallel_transfer_all_new_dbs
166+ *
167+ *This has the same API as transfer_all_new_dbs, except it does parallel execution
168+ *by transfering multiple tablespaces in parallel
169+ */
170+ void parallel_transfer_all_new_dbs (DbInfoArr * old_db_arr ,DbInfoArr * new_db_arr ,
171+ char * old_pgdata ,char * new_pgdata ,
172+ char * old_tablespace )
173+ {
174+ #ifndef WIN32
175+ pid_t child ;
176+ #else
177+ HANDLE child ;
178+ transfer_thread_arg * new_arg ;
179+ #endif
180+
181+ if (user_opts .jobs <=1 )
182+ /* throw_error must be true to allow jobs */
183+ transfer_all_new_dbs (old_db_arr ,new_db_arr ,old_pgdata ,new_pgdata ,NULL );
184+ else
185+ {
186+ /* parallel */
187+ #ifdef WIN32
188+ cur_thread_args = (void * * )transfer_thread_args ;
189+ #endif
190+ /* harvest any dead children */
191+ while (reap_child (false)== true)
192+ ;
193+
194+ /* must we wait for a dead child? */
195+ if (parallel_jobs >=user_opts .jobs )
196+ reap_child (true);
197+
198+ /* set this before we start the job */
199+ parallel_jobs ++ ;
200+
201+ /* Ensure stdio state is quiesced before forking */
202+ fflush (NULL );
203+
204+ #ifndef WIN32
205+ child = fork ();
206+ if (child == 0 )
207+ {
208+ transfer_all_new_dbs (old_db_arr ,new_db_arr ,old_pgdata ,new_pgdata ,
209+ old_tablespace );
210+ /* if we take another exit path, it will be non-zero */
211+ /* use _exit to skip atexit() functions */
212+ _exit (0 );
213+ }
214+ else if (child < 0 )
215+ /* fork failed */
216+ pg_log (PG_FATAL ,"could not create worker process: %s\n" ,strerror (errno ));
217+ #else
218+ if (thread_handles == NULL )
219+ {
220+ int i ;
221+
222+ thread_handles = pg_malloc (user_opts .jobs * sizeof (HANDLE ));
223+ transfer_thread_args = pg_malloc (user_opts .jobs * sizeof (transfer_thread_arg * ));
224+
225+ /*
226+ *For safety and performance, we keep the args allocated during
227+ *the entire life of the process, and we don't free the args
228+ *in a thread different from the one that allocated it.
229+ */
230+ for (i = 0 ;i < user_opts .jobs ;i ++ )
231+ transfer_thread_args [i ]= pg_malloc (sizeof (transfer_thread_arg ));
232+ }
233+
234+ /* use first empty array element */
235+ new_arg = transfer_thread_args [parallel_jobs - 1 ];
236+
237+ /* Can only pass one pointer into the function, so use a struct */
238+ new_arg -> old_db_arr = old_db_arr ;
239+ new_arg -> new_db_arr = new_db_arr ;
240+ strcpy (new_arg -> old_pgdata ,old_pgdata );
241+ strcpy (new_arg -> new_pgdata ,new_pgdata );
242+ strcpy (new_arg -> old_tablespace ,old_tablespace );
243+
244+ child = (HANDLE )_beginthreadex (NULL ,0 , (void * )win32_exec_prog ,
245+ new_arg ,0 ,NULL );
246+ if (child == 0 )
247+ pg_log (PG_FATAL ,"could not create worker thread: %s\n" ,strerror (errno ));
248+
249+ thread_handles [parallel_jobs - 1 ]= child ;
250+ #endif
251+ }
252+
253+ return ;
254+ }
255+
256+
257+ #ifdef WIN32
258+ DWORD
259+ win32_transfer_all_new_dbs (transfer_thread_arg * args )
260+ {
261+ transfer_all_new_dbs (args -> old_db_arr ,args -> new_db_arr ,args -> old_pgdata ,
262+ args -> new_pgdata ,args -> old_tablespace );
263+
264+ /* terminates thread */
265+ return 0 ;
266+ }
267+ #endif
268+
269+
149270/*
150271 *collect status from a completed worker child
151272 */
@@ -195,7 +316,7 @@ reap_child(bool wait_for_child)
195316/*Move last slot into dead child's position */
196317if (thread_num != parallel_jobs - 1 )
197318{
198- thread_arg * tmp_args ;
319+ void * tmp_args ;
199320
200321thread_handles [thread_num ]= thread_handles [parallel_jobs - 1 ];
201322
@@ -205,9 +326,9 @@ reap_child(bool wait_for_child)
205326 *reused by the next created thread. Instead, the new thread
206327 *will use the arg struct of the thread that just died.
207328 */
208- tmp_args = thread_args [thread_num ];
209- thread_args [thread_num ]= thread_args [parallel_jobs - 1 ];
210- thread_args [parallel_jobs - 1 ]= tmp_args ;
329+ tmp_args = cur_thread_args [thread_num ];
330+ cur_thread_args [thread_num ]= cur_thread_args [parallel_jobs - 1 ];
331+ cur_thread_args [parallel_jobs - 1 ]= tmp_args ;
211332}
212333#endif
213334