Expand Up @@ -15,14 +15,17 @@ static int push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir, const char *archive_dir, bool overwrite, bool no_sync, uint32 archive_timeout); uint32 archive_timeout, xlogFileType type ); #ifdef HAVE_LIBZ static int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, const char *archive_dir, bool overwrite, bool no_sync, int compress_level, uint32 archive_timeout); const char *archive_dir, bool overwrite, bool no_sync, int compress_level, uint32 archive_timeout, xlogFileType type ); #endif static void *push_files(void *arg); static void *get_files(void *arg); static bool get_wal_file_wrapper(const char *filename, const char *archive_root_dir, const char *to_fullpath, bool prefetch_mode); static bool get_wal_file(const char *filename, const char *from_path, const char *to_path, bool prefetch_mode); static int get_wal_file_internal(const char *from_path, const char *to_path, FILE *out, Expand Down Expand Up @@ -89,8 +92,9 @@ typedef struct typedef struct WALSegno { char name[MAXFNAMELEN]; volatile pg_atomic_flag lock; char name[MAXFNAMELEN]; volatile pg_atomic_flag lock; xlogFileType type; } WALSegno; static int push_file(WALSegno *xlogfile, const char *archive_status_dir, Expand All @@ -101,6 +105,29 @@ static int push_file(WALSegno *xlogfile, const char *archive_status_dir, static parray *setup_push_filelist(const char *archive_status_dir, const char *first_file, int batch_size); static parray *setup_archive_subdirs(parray *batch_files, const char *archive_dir); static xlogFileType get_xlogFileType(const char *filename) { if IsXLogFileName(filename) return SEGMENT; else if IsPartialXLogFileName(filename) return PARTIAL_SEGMENT; else if IsBackupHistoryFileName(filename) return BACKUP_HISTORY_FILE; else if IsTLHistoryFileName(filename) return HISTORY_FILE; else if IsBackupHistoryFileName(filename) return BACKUP_HISTORY_FILE; return UNKNOWN; } /* * At this point, we already done one roundtrip to archive server Expand Down Expand Up @@ -137,6 +164,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg /* files to push in multi-thread mode */ parray *batch_files = NULL; parray *archive_subdirs = NULL; int n_threads; if (!no_ready_rename || batch_size > 1) Expand All @@ -160,6 +188,20 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg parray_num(batch_files), batch_size, is_compress ? "zlib" : "none"); /* Extract subdirectories */ archive_subdirs = setup_archive_subdirs(batch_files, instanceState->instance_wal_subdir_path); if (archive_subdirs) { for (i = 0; i < parray_num(archive_subdirs); i++) { char *subdir = (char *) parray_get(archive_subdirs, i); if (fio_mkdir(subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0) elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", subdir); pg_free(subdir); } parray_free(archive_subdirs); } num_threads = n_threads; /* Single-thread push Expand Down Expand Up @@ -339,12 +381,12 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir, if (!is_compress) rc = push_file_internal_uncompressed(xlogfile->name, pg_xlog_dir, archive_dir, overwrite, no_sync, archive_timeout); archive_timeout, xlogfile->type ); #ifdef HAVE_LIBZ else rc = push_file_internal_gz(xlogfile->name, pg_xlog_dir, archive_dir, overwrite, no_sync, compress_level, archive_timeout); archive_timeout, xlogfile->type ); #endif /* take '--no-ready-rename' flag into account */ Expand Down Expand Up @@ -383,13 +425,14 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir, int push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir, const char *archive_dir, bool overwrite, bool no_sync, uint32 archive_timeout) uint32 archive_timeout, xlogFileType type ) { FILE *in = NULL; intout = -1; char *buf = pgut_malloc(OUT_BUF_SIZE); /* 1MB buffer */ charfrom_fullpath[MAXPGPATH]; charto_fullpath[MAXPGPATH]; char archive_subdir[MAXPGPATH]; /* partial handling */ struct statst; charto_fullpath_part[MAXPGPATH]; Expand All @@ -402,8 +445,12 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d /* from path */ join_path_components(from_fullpath, pg_xlog_dir, wal_file_name); canonicalize_path(from_fullpath); /* calculate subdir in WAL archive */ get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type); /* to path */ join_path_components(to_fullpath,archive_dir , wal_file_name); join_path_components(to_fullpath,archive_subdir , wal_file_name); canonicalize_path(to_fullpath); /* Open source file for read */ Expand Down Expand Up @@ -622,14 +669,15 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, const char *archive_dir, bool overwrite, bool no_sync, int compress_level, uint32 archive_timeout) int compress_level, uint32 archive_timeout, xlogFileType type ) { FILE *in = NULL; gzFileout = NULL; char *buf = pgut_malloc(OUT_BUF_SIZE); charfrom_fullpath[MAXPGPATH]; charto_fullpath[MAXPGPATH]; charto_fullpath_gz[MAXPGPATH]; char archive_subdir[MAXPGPATH]; /* partial handling */ struct statst; Expand All @@ -644,8 +692,12 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, /* from path */ join_path_components(from_fullpath, pg_xlog_dir, wal_file_name); canonicalize_path(from_fullpath); /* calculate subdir in WAL archive */ get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type); /* to path */ join_path_components(to_fullpath,archive_dir , wal_file_name); join_path_components(to_fullpath,archive_subdir , wal_file_name); canonicalize_path(to_fullpath); /* destination file with .gz suffix */ Expand Down Expand Up @@ -915,15 +967,17 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file, { int i; WALSegno *xlogfile = NULL; parray *status_files = NULL; parray *batch_files = parray_new(); parray *status_files = NULL; parray *batch_files = parray_new(); /* guarantee that first filename is in batch list */ xlogfile = palloc(sizeof(WALSegno)); pg_atomic_init_flag(&xlogfile->lock); snprintf(xlogfile->name, MAXFNAMELEN, "%s", first_file); parray_append(batch_files, xlogfile); xlogfile->type = get_xlogFileType(xlogfile->name); if (batch_size < 2) return batch_files; Expand Down Expand Up @@ -955,6 +1009,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file, pg_atomic_init_flag(&xlogfile->lock); snprintf(xlogfile->name, MAXFNAMELEN, "%s", filename); xlogfile->type = get_xlogFileType(xlogfile->name); parray_append(batch_files, xlogfile); if (parray_num(batch_files) >= batch_size) Expand Down Expand Up @@ -1023,7 +1079,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha /* full filepath to WAL file in archive directory. * $BACKUP_PATH/wal/instance_name/000000010000000000000001 */ join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name); // join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name);INSTR_TIME_SET_CURRENT(start_time); if (num_threads > batch_size) Expand Down Expand Up @@ -1152,7 +1208,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha while (fail_count < 3) { if (get_wal_file (wal_file_name,backup_wal_file_path , absolute_wal_file_path, false)) if (get_wal_file_wrapper (wal_file_name,instanceState->instance_wal_subdir_path , absolute_wal_file_path, false)) { fail_count = 0; elog(INFO, "pg_probackup archive-get copied WAL file %s", wal_file_name); Expand Down Expand Up @@ -1235,7 +1291,7 @@ uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir, /* It is ok, maybe requested batch is greater than the number of available * files in the archive */ if (!get_wal_file (xlogfile->name,from_fullpath , to_fullpath, true)) if (!get_wal_file_wrapper (xlogfile->name,archive_dir , to_fullpath, true)) { elog(LOG, "Thread [%d]: Failed to prefetch WAL segment %s", 0, xlogfile->name); break; Expand Down Expand Up @@ -1309,7 +1365,7 @@ get_files(void *arg) join_path_components(from_fullpath, args->archive_dir, xlogfile->name); join_path_components(to_fullpath, args->prefetch_dir, xlogfile->name); if (!get_wal_file (xlogfile->name,from_fullpath , to_fullpath, true)) if (!get_wal_file_wrapper (xlogfile->name,args->archive_dir , to_fullpath, true)) { /* It is ok, maybe requested batch is greater than the number of available * files in the archive Expand All @@ -1328,6 +1384,38 @@ get_files(void *arg) return NULL; } /* * First we try to copy from WAL archive subdirectory: * Failing that, try WAL archive root directory */ bool get_wal_file_wrapper(const char *filename, const char *archive_root_dir, const char *to_fullpath, bool prefetch_mode) { bool success = false; char archive_subdir[MAXPGPATH]; char from_fullpath[MAXPGPATH]; xlogFileType type = get_xlogFileType(filename); if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE) { /* first try subdir ... */ get_archive_subdir(archive_subdir, archive_root_dir, filename, type); join_path_components(from_fullpath, archive_subdir, filename); success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode); } if (!success) { /* ... fallback to archive dir for backward compatibility purposes */ join_path_components(from_fullpath, archive_root_dir, filename); success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode); } return success; } /* * Copy WAL segment from archive catalog to pgdata with possible decompression. * When running in prefetch mode, we should not error out. Expand Down Expand Up @@ -1730,3 +1818,68 @@ uint32 maintain_prefetch(const char *prefetch_dir, XLogSegNo first_segno, uint32 return n_files; } /* Calculate subdir path in WAL archive directory. Example: * 000000010000000200000013 -> 00000002 */ void get_archive_subdir(char *archive_subdir, const char *archive_dir, const char *wal_file_name, xlogFileType type) { if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE) { int rc = 0; char tli[MAXFNAMELEN]; char log[MAXFNAMELEN]; char suffix[MAXFNAMELEN]; rc = sscanf(wal_file_name, "%08s%08s%s", (char *) &tli, (char *) &log, (char *) &suffix); if (rc == 3) { join_path_components(archive_subdir, archive_dir, log); return; } } /* for all other files just use root directory of WAL archive */ strcpy(archive_subdir, archive_dir); } /* Extract array of WAL archive subdirs using push filelist */ parray* setup_archive_subdirs(parray *batch_files, const char *archive_dir) { int i; parray *subdirs = NULL; char *cur_subdir = NULL; /* * - Do we need to sort batch_files? * - No, we rely on sorting of status files */ for (i = 0; i < parray_num(batch_files); i++) { WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i); if (xlogfile->type == SEGMENT || xlogfile->type == PARTIAL_SEGMENT || xlogfile->type == BACKUP_HISTORY_FILE) { char subdir[MAXPGPATH]; if (!subdirs) subdirs = parray_new(); get_archive_subdir(subdir, archive_dir, xlogfile->name, xlogfile->type); /* do not append the same subdir twice */ if (cur_subdir && strcmp(cur_subdir, subdir) == 0) continue; cur_subdir = pgut_strdup(subdir); parray_append(subdirs, cur_subdir); } } return subdirs; }