Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbbc7aec

Browse files
committed
[Issue#449] multithread delete for WAL archive
1 parent5dacdf5 commitbbc7aec

File tree

10 files changed

+332
-49
lines changed

10 files changed

+332
-49
lines changed

‎src/catalog.c‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1727,7 +1727,10 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
17271727
parray_walk(timelines,pfree);
17281728
parray_free(timelines);
17291729
}
1730-
/* add WAL archive subdirectories to filelist (used only in delete) */
1730+
/*
1731+
* Add WAL archive subdirectories to filelist (used only in delete)
1732+
* TODO: currently only directory with 8-character name is treated as WAL subdir, is it ok?
1733+
*/
17311734
elseif (S_ISDIR(file->mode)&&strspn(file->rel_path,"0123456789ABCDEF")==8)
17321735
{
17331736
if (instanceState->wal_archive_subdirs==NULL)
@@ -1760,6 +1763,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
17601763
parray_append(tlinfo->backups,backup);
17611764
}
17621765
}
1766+
1767+
/* setup locks */
1768+
xlogfilearray_clear_locks(tlinfo->xlog_filelist);
17631769
}
17641770

17651771
/* determine oldest backup and closest backup for every timeline */

‎src/delete.c‎

Lines changed: 112 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,23 @@ static bool backup_deleted = false; /* At least one backup was deleted */
2929
staticboolbackup_merged= false;/* At least one merge was enacted */
3030
staticboolwal_deleted= false;/* At least one WAL segments was deleted */
3131

32+
typedefstruct
33+
{
34+
parray*xlog_filelist;
35+
intthread_num;
36+
boolpurge_all;
37+
XLogSegNoOldestToKeepSegNo;
38+
constchar*archive_root_dir;
39+
40+
/*
41+
* Return value from the thread.
42+
* 0 means there is no error, 1 - there is an error.
43+
*/
44+
intret;
45+
}delete_files_arg;
46+
47+
staticvoid*delete_walfiles_in_tli_internal(void*arg);
48+
3249
void
3350
do_delete(InstanceState*instanceState,time_tbackup_id)
3451
{
@@ -782,7 +799,7 @@ delete_backup_files(pgBackup *backup)
782799
elog(INFO,"Progress: (%zd/%zd). Delete file \"%s\"",
783800
i+1,num_files,full_path);
784801

785-
pgFileDelete(file->mode,full_path);
802+
pgFileDelete(file->mode,full_path,ERROR);
786803
}
787804

788805
parray_walk(files,pgFileFree);
@@ -826,6 +843,10 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
826843
size_twal_size_actual=0;
827844
charwal_pretty_size[20];
828845
boolpurge_all= false;
846+
// multi-thread stuff
847+
pthread_t*threads;
848+
delete_files_arg*threads_args;
849+
booldelete_isok= true;
829850

830851

831852
/* Timeline is completely empty */
@@ -925,22 +946,105 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
925946
if (dry_run)
926947
return;
927948

949+
/* init thread args with own file lists */
950+
threads= (pthread_t*)palloc(sizeof(pthread_t)*num_threads);
951+
threads_args= (delete_files_arg*)palloc(sizeof(delete_files_arg)*num_threads);
952+
953+
for (i=0;i<num_threads;i++)
954+
{
955+
delete_files_arg*arg=&(threads_args[i]);
956+
957+
arg->purge_all=purge_all;
958+
arg->OldestToKeepSegNo=OldestToKeepSegNo;
959+
arg->archive_root_dir=instanceState->instance_wal_subdir_path;
960+
arg->xlog_filelist=tlinfo->xlog_filelist;
961+
arg->thread_num=i+1;
962+
/* By default there are some error */
963+
arg->ret=1;
964+
}
965+
966+
/* Run threads */
967+
thread_interrupted= false;
968+
for (i=0;i<num_threads;i++)
969+
{
970+
delete_files_arg*arg=&(threads_args[i]);
971+
972+
elog(VERBOSE,"Start thread num: %i",i);
973+
pthread_create(&threads[i],NULL,delete_walfiles_in_tli_internal,arg);
974+
}
975+
976+
/* Wait threads */
977+
for (i=0;i<num_threads;i++)
978+
{
979+
pthread_join(threads[i],NULL);
980+
if (threads_args[i].ret==1)
981+
delete_isok= false;
982+
}
983+
984+
/* TODO: */
985+
//if delete_isok
986+
987+
/* cleanup */
928988
for (i=0;i<parray_num(tlinfo->xlog_filelist);i++)
929989
{
930990
xlogFile*wal_file= (xlogFile*)parray_get(tlinfo->xlog_filelist,i);
931991

932-
if (interrupted)
992+
if (wal_file->deleted)
993+
{
994+
pgXlogFileFree(wal_file);
995+
parray_remove(tlinfo->xlog_filelist,i);
996+
i--;
997+
}
998+
}
999+
pg_free(threads);
1000+
pg_free(threads_args);
1001+
1002+
/* Remove empty subdirectories */
1003+
if (!instanceState->wal_archive_subdirs)
1004+
return;
1005+
1006+
for (i=0;i<parray_num(instanceState->wal_archive_subdirs);i++)
1007+
{
1008+
charfullpath[MAXPGPATH];
1009+
pgFile*file= (pgFile*)parray_get(instanceState->wal_archive_subdirs,i);
1010+
1011+
join_path_components(fullpath,instanceState->instance_wal_subdir_path,file->name);
1012+
1013+
if (dir_is_empty(fullpath,FIO_LOCAL_HOST))
1014+
{
1015+
pgFileDelete(file->mode,fullpath,WARNING);/* WARNING (not ERROR) due to possible race condition */
1016+
pgFileFree(file);
1017+
parray_remove(instanceState->wal_archive_subdirs,i);
1018+
i--;
1019+
}
1020+
}
1021+
}
1022+
1023+
void*
1024+
delete_walfiles_in_tli_internal(void*arg)
1025+
{
1026+
inti;
1027+
delete_files_arg*args= (delete_files_arg*)arg;
1028+
1029+
for (i=0;i<parray_num(args->xlog_filelist);i++)
1030+
{
1031+
xlogFile*wal_file= (xlogFile*)parray_get(args->xlog_filelist,i);
1032+
1033+
if (interrupted||thread_interrupted)
9331034
elog(ERROR,"interrupted during WAL archive purge");
9341035

1036+
if (!pg_atomic_test_set_flag(&wal_file->lock))
1037+
continue;
1038+
9351039
/*
9361040
* Any segment equal or greater than EndSegNo must be kept
9371041
* unless it`s a 'purge all' scenario.
9381042
*/
939-
if (purge_all||wal_file->segno<OldestToKeepSegNo)
1043+
if (args->purge_all||wal_file->segno<args->OldestToKeepSegNo)
9401044
{
9411045
charwal_fullpath[MAXPGPATH];
9421046

943-
join_path_components(wal_fullpath,instanceState->instance_wal_subdir_path,wal_file->file.rel_path);
1047+
join_path_components(wal_fullpath,args->archive_root_dir,wal_file->file.rel_path);
9441048

9451049
/* save segment from purging */
9461050
if (instance_config.wal_depth >=0&&wal_file->keep)
@@ -954,8 +1058,8 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
9541058
{
9551059
/* Missing file is not considered as error condition */
9561060
if (errno!=ENOENT)
957-
elog(ERROR,"Could not remove file \"%s\": %s",
958-
wal_fullpath,strerror(errno));
1061+
elog(ERROR,"[Thread: %d]Could not remove file \"%s\": %s",
1062+
args->thread_num,wal_fullpath,strerror(errno));
9591063
}
9601064
else
9611065
{
@@ -970,33 +1074,11 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
9701074
}
9711075

9721076
wal_deleted= true;
973-
974-
/* cleanup */
975-
pgXlogFileFree(wal_file);
976-
parray_remove(tlinfo->xlog_filelist,i);
977-
i--;
1077+
wal_file->deleted= true;
9781078
}
9791079
}
9801080

981-
/* Remove empty subdirectories */
982-
if (!instanceState->wal_archive_subdirs)
983-
return;
984-
985-
for (i=0;i<parray_num(instanceState->wal_archive_subdirs);i++)
986-
{
987-
charfullpath[MAXPGPATH];
988-
pgFile*file= (pgFile*)parray_get(instanceState->wal_archive_subdirs,i);
989-
990-
join_path_components(fullpath,instanceState->instance_wal_subdir_path,file->name);
991-
992-
if (dir_is_empty(fullpath,FIO_LOCAL_HOST))
993-
{
994-
pgFileDelete(file->mode,fullpath);
995-
pgFileFree(file);
996-
parray_remove(instanceState->wal_archive_subdirs,i);
997-
i--;
998-
}
999-
}
1081+
returnNULL;
10001082
}
10011083

10021084

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp