13
13
#include <sys/stat.h>
14
14
#include <sys/types.h>
15
15
#include <unistd.h>
16
+ #include <pthread.h>
16
17
17
18
#include "catalog/pg_control.h"
18
19
20
+ typedef struct
21
+ {
22
+ parray * files ;
23
+ pgBackup * backup ;
24
+ unsignedint start_file_idx ;
25
+ unsignedint end_file_idx ;
26
+ }restore_files_args ;
27
+
19
28
static void backup_online_files (bool re_recovery );
20
29
static void restore_database (pgBackup * backup );
21
30
static void create_recovery_conf (const char * target_time ,
@@ -35,6 +44,8 @@ static void print_backup_lsn(const pgBackup *backup);
35
44
static void search_next_wal (const char * path ,
36
45
XLogRecPtr * need_lsn ,
37
46
parray * timelines );
47
+ static void restore_files (void * arg );
48
+
38
49
39
50
int
40
51
do_restore (const char * target_time ,
@@ -230,6 +241,8 @@ restore_database(pgBackup *backup)
230
241
int ret ;
231
242
parray * files ;
232
243
int i ;
244
+ pthread_t restore_threads [num_threads ];
245
+ restore_files_args * restore_threads_args [num_threads ];
233
246
234
247
/* confirm block size compatibility */
235
248
if (backup -> block_size != BLCKSZ )
@@ -300,46 +313,36 @@ restore_database(pgBackup *backup)
300
313
pgFileFree (parray_remove (files ,i ));
301
314
}
302
315
316
+ if (num_threads < 1 )
317
+ num_threads = 1 ;
318
+
303
319
/* restore files into $PGDATA */
304
- for (i = 0 ;i < parray_num ( files ) ;i ++ )
320
+ for (i = 0 ;i < num_threads ;i ++ )
305
321
{
306
- char from_root [MAXPGPATH ];
307
- pgFile * file = (pgFile * )parray_get (files ,i );
308
-
309
- pgBackupGetPath (backup ,from_root ,lengthof (from_root ),DATABASE_DIR );
310
-
311
- /* check for interrupt */
312
- if (interrupted )
313
- elog (ERROR ,"interrupted during restore database" );
314
-
315
- /* print progress */
316
- if (!check )
317
- elog (LOG ,"(%d/%lu) %s " ,i + 1 , (unsigned long )parray_num (files ),
318
- file -> path + strlen (from_root )+ 1 );
319
-
320
- /* directories are created with mkdirs.sh */
321
- if (S_ISDIR (file -> mode ))
322
- {
323
- if (!check )
324
- elog (LOG ,"directory, skip" );
325
- continue ;
326
- }
322
+ restore_files_args * arg = pg_malloc (sizeof (restore_files_args ));
323
+ arg -> files = files ;
324
+ arg -> backup = backup ;
325
+ arg -> start_file_idx = i * (parray_num (files )/num_threads );
326
+ if (i == num_threads - 1 )
327
+ arg -> end_file_idx = parray_num (files );
328
+ else
329
+ arg -> end_file_idx = (i + 1 )* (parray_num (files )/num_threads );
327
330
328
- /* not backed up */
329
- if (file -> write_size == BYTES_INVALID )
330
- {
331
- if (!check )
332
- elog (LOG ,"not backed up, skip" );
333
- continue ;
334
- }
331
+ if (verbose )
332
+ elog (WARNING ,"Start thread for start_file_idx:%i end_file_idx:%i num:%li" ,
333
+ arg -> start_file_idx ,
334
+ arg -> end_file_idx ,
335
+ parray_num (files ));
335
336
336
- /* restore file */
337
- if (! check )
338
- restore_data_file ( from_root , pgdata , file );
337
+ restore_threads_args [ i ] = arg ;
338
+ pthread_create ( & restore_threads [ i ], NULL , ( void * ( * )( void * )) restore_files , arg );
339
+ }
339
340
340
- /* print size of restored file */
341
- if (!check )
342
- elog (LOG ,"restored %lu\n" , (unsigned long )file -> write_size );
341
+ /* Wait theads */
342
+ for (i = 0 ;i < num_threads ;i ++ )
343
+ {
344
+ pthread_join (restore_threads [i ],NULL );
345
+ pg_free (restore_threads_args [i ]);
343
346
}
344
347
345
348
/* Delete files which are not in file list. */
@@ -391,6 +394,56 @@ restore_database(pgBackup *backup)
391
394
}
392
395
393
396
397
+ static void
398
+ restore_files (void * arg )
399
+ {
400
+ int i ;
401
+
402
+ restore_files_args * arguments = (restore_files_args * )arg ;
403
+
404
+ /* restore files into $PGDATA */
405
+ for (i = arguments -> start_file_idx ;i < arguments -> end_file_idx ;i ++ )
406
+ {
407
+ char from_root [MAXPGPATH ];
408
+ pgFile * file = (pgFile * )parray_get (arguments -> files ,i );
409
+
410
+ pgBackupGetPath (arguments -> backup ,from_root ,lengthof (from_root ),DATABASE_DIR );
411
+
412
+ /* check for interrupt */
413
+ if (interrupted )
414
+ elog (ERROR ,"interrupted during restore database" );
415
+
416
+ /* print progress */
417
+ if (!check )
418
+ elog (LOG ,"(%d/%lu) %s " ,i + 1 , (unsigned long )parray_num (arguments -> files ),
419
+ file -> path + strlen (from_root )+ 1 );
420
+
421
+ /* directories are created with mkdirs.sh */
422
+ if (S_ISDIR (file -> mode ))
423
+ {
424
+ if (!check )
425
+ elog (LOG ,"directory, skip" );
426
+ continue ;
427
+ }
428
+
429
+ /* not backed up */
430
+ if (file -> write_size == BYTES_INVALID )
431
+ {
432
+ if (!check )
433
+ elog (LOG ,"not backed up, skip" );
434
+ continue ;
435
+ }
436
+
437
+ /* restore file */
438
+ if (!check )
439
+ restore_data_file (from_root ,pgdata ,file );
440
+
441
+ /* print size of restored file */
442
+ if (!check )
443
+ elog (LOG ,"restored %lu\n" , (unsigned long )file -> write_size );
444
+ }
445
+ }
446
+
394
447
static void
395
448
create_recovery_conf (const char * target_time ,
396
449
const char * target_xid ,