@@ -29,6 +29,23 @@ static bool backup_deleted = false; /* At least one backup was deleted */
2929static bool backup_merged = false;/* At least one merge was enacted */
3030static bool wal_deleted = false;/* At least one WAL segments was deleted */
3131
32+ typedef struct
33+ {
34+ parray * xlog_filelist ;
35+ int thread_num ;
36+ bool purge_all ;
37+ XLogSegNo OldestToKeepSegNo ;
38+ const char * archive_root_dir ;
39+
40+ /*
41+ * Return value from the thread.
42+ * 0 means there is no error, 1 - there is an error.
43+ */
44+ int ret ;
45+ }delete_files_arg ;
46+
47+ static void * delete_walfiles_in_tli_internal (void * arg );
48+
3249void
3350do_delete (InstanceState * instanceState ,time_t backup_id )
3451{
@@ -782,7 +799,7 @@ delete_backup_files(pgBackup *backup)
782799elog (INFO ,"Progress: (%zd/%zd). Delete file \"%s\"" ,
783800i + 1 ,num_files ,full_path );
784801
785- pgFileDelete (file -> mode ,full_path );
802+ pgFileDelete (file -> mode ,full_path , ERROR );
786803}
787804
788805parray_walk (files ,pgFileFree );
@@ -826,6 +843,10 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
826843size_t wal_size_actual = 0 ;
827844char wal_pretty_size [20 ];
828845bool purge_all = false;
846+ // multi-thread stuff
847+ pthread_t * threads ;
848+ delete_files_arg * threads_args ;
849+ bool delete_isok = true;
829850
830851
831852/* Timeline is completely empty */
@@ -925,22 +946,105 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
925946if (dry_run )
926947return ;
927948
949+ /* init thread args with own file lists */
950+ threads = (pthread_t * )palloc (sizeof (pthread_t )* num_threads );
951+ threads_args = (delete_files_arg * )palloc (sizeof (delete_files_arg )* num_threads );
952+
953+ for (i = 0 ;i < num_threads ;i ++ )
954+ {
955+ delete_files_arg * arg = & (threads_args [i ]);
956+
957+ arg -> purge_all = purge_all ;
958+ arg -> OldestToKeepSegNo = OldestToKeepSegNo ;
959+ arg -> archive_root_dir = instanceState -> instance_wal_subdir_path ;
960+ arg -> xlog_filelist = tlinfo -> xlog_filelist ;
961+ arg -> thread_num = i + 1 ;
962+ /* By default there are some error */
963+ arg -> ret = 1 ;
964+ }
965+
966+ /* Run threads */
967+ thread_interrupted = false;
968+ for (i = 0 ;i < num_threads ;i ++ )
969+ {
970+ delete_files_arg * arg = & (threads_args [i ]);
971+
972+ elog (VERBOSE ,"Start thread num: %i" ,i );
973+ pthread_create (& threads [i ],NULL ,delete_walfiles_in_tli_internal ,arg );
974+ }
975+
976+ /* Wait threads */
977+ for (i = 0 ;i < num_threads ;i ++ )
978+ {
979+ pthread_join (threads [i ],NULL );
980+ if (threads_args [i ].ret == 1 )
981+ delete_isok = false;
982+ }
983+
984+ /* TODO: */
985+ //if delete_isok
986+
987+ /* cleanup */
928988for (i = 0 ;i < parray_num (tlinfo -> xlog_filelist );i ++ )
929989{
930990xlogFile * wal_file = (xlogFile * )parray_get (tlinfo -> xlog_filelist ,i );
931991
932- if (interrupted )
992+ if (wal_file -> deleted )
993+ {
994+ pgXlogFileFree (wal_file );
995+ parray_remove (tlinfo -> xlog_filelist ,i );
996+ i -- ;
997+ }
998+ }
999+ pg_free (threads );
1000+ pg_free (threads_args );
1001+
1002+ /* Remove empty subdirectories */
1003+ if (!instanceState -> wal_archive_subdirs )
1004+ return ;
1005+
1006+ for (i = 0 ;i < parray_num (instanceState -> wal_archive_subdirs );i ++ )
1007+ {
1008+ char fullpath [MAXPGPATH ];
1009+ pgFile * file = (pgFile * )parray_get (instanceState -> wal_archive_subdirs ,i );
1010+
1011+ join_path_components (fullpath ,instanceState -> instance_wal_subdir_path ,file -> name );
1012+
1013+ if (dir_is_empty (fullpath ,FIO_LOCAL_HOST ))
1014+ {
1015+ pgFileDelete (file -> mode ,fullpath ,WARNING );/* WARNING (not ERROR) due to possible race condition */
1016+ pgFileFree (file );
1017+ parray_remove (instanceState -> wal_archive_subdirs ,i );
1018+ i -- ;
1019+ }
1020+ }
1021+ }
1022+
1023+ void *
1024+ delete_walfiles_in_tli_internal (void * arg )
1025+ {
1026+ int i ;
1027+ delete_files_arg * args = (delete_files_arg * )arg ;
1028+
1029+ for (i = 0 ;i < parray_num (args -> xlog_filelist );i ++ )
1030+ {
1031+ xlogFile * wal_file = (xlogFile * )parray_get (args -> xlog_filelist ,i );
1032+
1033+ if (interrupted || thread_interrupted )
9331034elog (ERROR ,"interrupted during WAL archive purge" );
9341035
1036+ if (!pg_atomic_test_set_flag (& wal_file -> lock ))
1037+ continue ;
1038+
9351039/*
9361040 * Any segment equal or greater than EndSegNo must be kept
9371041 * unless it`s a 'purge all' scenario.
9381042 */
939- if (purge_all || wal_file -> segno < OldestToKeepSegNo )
1043+ if (args -> purge_all || wal_file -> segno < args -> OldestToKeepSegNo )
9401044{
9411045char wal_fullpath [MAXPGPATH ];
9421046
943- join_path_components (wal_fullpath ,instanceState -> instance_wal_subdir_path ,wal_file -> file .rel_path );
1047+ join_path_components (wal_fullpath ,args -> archive_root_dir ,wal_file -> file .rel_path );
9441048
9451049/* save segment from purging */
9461050if (instance_config .wal_depth >=0 && wal_file -> keep )
@@ -954,8 +1058,8 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
9541058{
9551059/* Missing file is not considered as error condition */
9561060if (errno != ENOENT )
957- elog (ERROR ,"Could not remove file \"%s\": %s" ,
958- wal_fullpath ,strerror (errno ));
1061+ elog (ERROR ,"[Thread: %d] Could not remove file \"%s\": %s" ,
1062+ args -> thread_num , wal_fullpath ,strerror (errno ));
9591063}
9601064else
9611065{
@@ -970,33 +1074,11 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
9701074}
9711075
9721076wal_deleted = true;
973-
974- /* cleanup */
975- pgXlogFileFree (wal_file );
976- parray_remove (tlinfo -> xlog_filelist ,i );
977- i -- ;
1077+ wal_file -> deleted = true;
9781078}
9791079}
9801080
981- /* Remove empty subdirectories */
982- if (!instanceState -> wal_archive_subdirs )
983- return ;
984-
985- for (i = 0 ;i < parray_num (instanceState -> wal_archive_subdirs );i ++ )
986- {
987- char fullpath [MAXPGPATH ];
988- pgFile * file = (pgFile * )parray_get (instanceState -> wal_archive_subdirs ,i );
989-
990- join_path_components (fullpath ,instanceState -> instance_wal_subdir_path ,file -> name );
991-
992- if (dir_is_empty (fullpath ,FIO_LOCAL_HOST ))
993- {
994- pgFileDelete (file -> mode ,fullpath );
995- pgFileFree (file );
996- parray_remove (instanceState -> wal_archive_subdirs ,i );
997- i -- ;
998- }
999- }
1081+ return NULL ;
10001082}
10011083
10021084