1515
1616static int push_file_internal_uncompressed (const char * wal_file_name ,const char * pg_xlog_dir ,
1717const char * archive_dir ,bool overwrite ,bool no_sync ,
18- uint32 archive_timeout );
18+ uint32 archive_timeout , xlogFileType type );
1919#ifdef HAVE_LIBZ
2020static int push_file_internal_gz (const char * wal_file_name ,const char * pg_xlog_dir ,
21- const char * archive_dir ,bool overwrite ,bool no_sync ,
22- int compress_level ,uint32 archive_timeout );
21+ const char * archive_dir ,bool overwrite ,bool no_sync ,
22+ int compress_level ,uint32 archive_timeout , xlogFileType type );
2323#endif
2424static void * push_files (void * arg );
2525static void * get_files (void * arg );
26+ static bool
27+ get_wal_file_wrapper (const char * filename ,const char * archive_root_dir ,
28+ const char * to_fullpath ,bool prefetch_mode );
2629static bool get_wal_file (const char * filename ,const char * from_path ,const char * to_path ,
2730bool prefetch_mode );
2831static int get_wal_file_internal (const char * from_path ,const char * to_path ,FILE * out ,
@@ -89,8 +92,9 @@ typedef struct
8992
9093typedef struct WALSegno
9194{
92- char name [MAXFNAMELEN ];
93- volatile pg_atomic_flag lock ;
95+ char name [MAXFNAMELEN ];
96+ volatile pg_atomic_flag lock ;
97+ xlogFileType type ;
9498}WALSegno ;
9599
96100static int push_file (WALSegno * xlogfile ,const char * archive_status_dir ,
@@ -102,6 +106,28 @@ static int push_file(WALSegno *xlogfile, const char *archive_status_dir,
102106static parray * setup_push_filelist (const char * archive_status_dir ,
103107const char * first_file ,int batch_size );
104108
109+ static xlogFileType
110+ get_xlogFileType (const char * filename )
111+ {
112+
113+ if IsXLogFileName (filename )
114+ return SEGMENT ;
115+
116+ else if IsPartialXLogFileName (filename )
117+ return PARTIAL_SEGMENT ;
118+
119+ else if IsBackupHistoryFileName (filename )
120+ return BACKUP_HISTORY_FILE ;
121+
122+ else if IsTLHistoryFileName (filename )
123+ return HISTORY_FILE ;
124+
125+ else if IsBackupHistoryFileName (filename )
126+ return BACKUP_HISTORY_FILE ;
127+
128+ return UNKNOWN ;
129+ }
130+
105131/*
106132 * At this point, we already done one roundtrip to archive server
107133 * to get instance config.
@@ -185,6 +211,8 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa
185211parray_num (batch_files ),batch_size ,
186212is_compress ?"zlib" :"none" );
187213
214+ /* TODO: create subdirectories here, not in internal functions */
215+
188216num_threads = n_threads ;
189217
190218/* Single-thread push
@@ -366,12 +394,12 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
366394if (!is_compress )
367395rc = push_file_internal_uncompressed (xlogfile -> name ,pg_xlog_dir ,
368396archive_dir ,overwrite ,no_sync ,
369- archive_timeout );
397+ archive_timeout , xlogfile -> type );
370398#ifdef HAVE_LIBZ
371399else
372400rc = push_file_internal_gz (xlogfile -> name ,pg_xlog_dir ,archive_dir ,
373401overwrite ,no_sync ,compress_level ,
374- archive_timeout );
402+ archive_timeout , xlogfile -> type );
375403#endif
376404
377405/* take '--no-ready-rename' flag into account */
@@ -408,13 +436,14 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
408436int
409437push_file_internal_uncompressed (const char * wal_file_name ,const char * pg_xlog_dir ,
410438const char * archive_dir ,bool overwrite ,bool no_sync ,
411- uint32 archive_timeout )
439+ uint32 archive_timeout , xlogFileType type )
412440{
413441FILE * in = NULL ;
414442int out = -1 ;
415443char * buf = pgut_malloc (OUT_BUF_SIZE );/* 1MB buffer */
416444char from_fullpath [MAXPGPATH ];
417445char to_fullpath [MAXPGPATH ];
446+ char archive_subdir [MAXPGPATH ];
418447/* partial handling */
419448struct stat st ;
420449char to_fullpath_part [MAXPGPATH ];
@@ -427,8 +456,16 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
427456/* from path */
428457join_path_components (from_fullpath ,pg_xlog_dir ,wal_file_name );
429458canonicalize_path (from_fullpath );
459+
460+ /* calculate subdir in WAL archive */
461+ get_archive_subdir (archive_subdir ,archive_dir ,wal_file_name ,type );
462+
463+ /* create subdirectory */
464+ if (fio_mkdir (archive_subdir ,DIR_PERMISSION ,FIO_BACKUP_HOST )!= 0 )
465+ elog (ERROR ,"Cannot create subdirectory in WAL archive: '%s'" ,archive_subdir );
466+
430467/* to path */
431- join_path_components (to_fullpath ,archive_dir ,wal_file_name );
468+ join_path_components (to_fullpath ,archive_subdir ,wal_file_name );
432469canonicalize_path (to_fullpath );
433470
434471/* Open source file for read */
@@ -647,14 +684,15 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
647684int
648685push_file_internal_gz (const char * wal_file_name ,const char * pg_xlog_dir ,
649686const char * archive_dir ,bool overwrite ,bool no_sync ,
650- int compress_level ,uint32 archive_timeout )
687+ int compress_level ,uint32 archive_timeout , xlogFileType type )
651688{
652689FILE * in = NULL ;
653690gzFile out = NULL ;
654691char * buf = pgut_malloc (OUT_BUF_SIZE );
655692char from_fullpath [MAXPGPATH ];
656693char to_fullpath [MAXPGPATH ];
657694char to_fullpath_gz [MAXPGPATH ];
695+ char archive_subdir [MAXPGPATH ];
658696
659697/* partial handling */
660698struct stat st ;
@@ -669,8 +707,16 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
669707/* from path */
670708join_path_components (from_fullpath ,pg_xlog_dir ,wal_file_name );
671709canonicalize_path (from_fullpath );
710+
711+ /* calculate subdir in WAL archive */
712+ get_archive_subdir (archive_subdir ,archive_dir ,wal_file_name ,type );
713+
714+ /* create subdirectory */
715+ if (fio_mkdir (archive_subdir ,DIR_PERMISSION ,FIO_BACKUP_HOST )!= 0 )
716+ elog (ERROR ,"Cannot create subdirectory in WAL archive: '%s'" ,archive_subdir );
717+
672718/* to path */
673- join_path_components (to_fullpath ,archive_dir ,wal_file_name );
719+ join_path_components (to_fullpath ,archive_subdir ,wal_file_name );
674720canonicalize_path (to_fullpath );
675721
676722/* destination file with .gz suffix */
@@ -940,15 +986,17 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
940986{
941987int i ;
942988WALSegno * xlogfile = NULL ;
943- parray * status_files = NULL ;
944- parray * batch_files = parray_new ();
989+ parray * status_files = NULL ;
990+ parray * batch_files = parray_new ();
945991
946992/* guarantee that first filename is in batch list */
947993xlogfile = palloc (sizeof (WALSegno ));
948994pg_atomic_init_flag (& xlogfile -> lock );
949995snprintf (xlogfile -> name ,MAXFNAMELEN ,"%s" ,first_file );
950996parray_append (batch_files ,xlogfile );
951997
998+ xlogfile -> type = get_xlogFileType (xlogfile -> name );
999+
9521000if (batch_size < 2 )
9531001return batch_files ;
9541002
@@ -980,6 +1028,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
9801028pg_atomic_init_flag (& xlogfile -> lock );
9811029
9821030snprintf (xlogfile -> name ,MAXFNAMELEN ,"%s" ,filename );
1031+
1032+ xlogfile -> type = get_xlogFileType (xlogfile -> name );
9831033parray_append (batch_files ,xlogfile );
9841034
9851035if (parray_num (batch_files ) >=batch_size )
@@ -1048,7 +1098,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha
10481098
10491099/* full filepath to WAL file in archive directory.
10501100 * $BACKUP_PATH/wal/instance_name/000000010000000000000001 */
1051- join_path_components (backup_wal_file_path ,instanceState -> instance_wal_subdir_path ,wal_file_name );
1101+ // join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name);
10521102
10531103INSTR_TIME_SET_CURRENT (start_time );
10541104if (num_threads > batch_size )
@@ -1177,7 +1227,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha
11771227
11781228while (fail_count < 3 )
11791229{
1180- if (get_wal_file (wal_file_name ,backup_wal_file_path ,absolute_wal_file_path , false))
1230+ if (get_wal_file_wrapper (wal_file_name ,instanceState -> instance_wal_subdir_path ,absolute_wal_file_path , false))
11811231{
11821232fail_count = 0 ;
11831233elog (INFO ,"pg_probackup archive-get copied WAL file %s" ,wal_file_name );
@@ -1260,7 +1310,7 @@ uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir,
12601310/* It is ok, maybe requested batch is greater than the number of available
12611311 * files in the archive
12621312 */
1263- if (!get_wal_file (xlogfile -> name ,from_fullpath ,to_fullpath , true))
1313+ if (!get_wal_file_wrapper (xlogfile -> name ,archive_dir ,to_fullpath , true))
12641314{
12651315elog (LOG ,"Thread [%d]: Failed to prefetch WAL segment %s" ,0 ,xlogfile -> name );
12661316break ;
@@ -1334,7 +1384,7 @@ get_files(void *arg)
13341384join_path_components (from_fullpath ,args -> archive_dir ,xlogfile -> name );
13351385join_path_components (to_fullpath ,args -> prefetch_dir ,xlogfile -> name );
13361386
1337- if (!get_wal_file (xlogfile -> name ,from_fullpath ,to_fullpath , true))
1387+ if (!get_wal_file_wrapper (xlogfile -> name ,args -> archive_dir ,to_fullpath , true))
13381388{
13391389/* It is ok, maybe requested batch is greater than the number of available
13401390 * files in the archive
@@ -1353,6 +1403,38 @@ get_files(void *arg)
13531403return NULL ;
13541404}
13551405
1406+ /*
1407+ * First we try to copy from WAL archive subdirectory:
1408+ * Failing that, try WAL archive root directory
1409+ */
1410+ bool
1411+ get_wal_file_wrapper (const char * filename ,const char * archive_root_dir ,
1412+ const char * to_fullpath ,bool prefetch_mode )
1413+ {
1414+ bool success = false;
1415+ char archive_subdir [MAXPGPATH ];
1416+ char from_fullpath [MAXPGPATH ];
1417+ xlogFileType type = get_xlogFileType (filename );
1418+
1419+ if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE )
1420+ {
1421+ /* first try subdir ... */
1422+ get_archive_subdir (archive_subdir ,archive_root_dir ,filename ,type );
1423+ join_path_components (from_fullpath ,archive_subdir ,filename );
1424+
1425+ success = get_wal_file (filename ,from_fullpath ,to_fullpath ,prefetch_mode );
1426+ }
1427+
1428+ if (!success )
1429+ {
1430+ /* ... fallback to archive dir for backward compatibility purposes */
1431+ join_path_components (from_fullpath ,archive_root_dir ,filename );
1432+ success = get_wal_file (filename ,from_fullpath ,to_fullpath ,prefetch_mode );
1433+ }
1434+
1435+ return success ;
1436+ }
1437+
13561438/*
13571439 * Copy WAL segment from archive catalog to pgdata with possible decompression.
13581440 * When running in prefetch mode, we should not error out.
@@ -1755,3 +1837,30 @@ uint32 maintain_prefetch(const char *prefetch_dir, XLogSegNo first_segno, uint32
17551837
17561838return n_files ;
17571839}
1840+
1841+ /* Calculate subdir path in WAL archive directory. Example:
1842+ * 000000010000000200000013 -> 00000002
1843+ */
1844+ void
1845+ get_archive_subdir (char * archive_subdir ,const char * archive_dir ,const char * wal_file_name ,xlogFileType type )
1846+ {
1847+ if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE )
1848+ {
1849+ int rc = 0 ;
1850+ char tli [MAXFNAMELEN ];
1851+ char log [MAXFNAMELEN ];
1852+ char suffix [MAXFNAMELEN ];
1853+
1854+ rc = sscanf (wal_file_name ,"%08s%08s%s" ,
1855+ (char * )& tli , (char * )& log , (char * )& suffix );
1856+
1857+ if (rc == 3 )
1858+ {
1859+ join_path_components (archive_subdir ,archive_dir ,log );
1860+ return ;
1861+ }
1862+ }
1863+
1864+ /* for all other files just use root directory of WAL archive */
1865+ strcpy (archive_subdir ,archive_dir );
1866+ }