@@ -115,7 +115,7 @@ typedef struct XLogReaderData
115115gzFile gz_xlogfile ;
116116char gz_xlogpath [MAXPGPATH ];
117117#endif
118- bool is_stream ;
118+ bool honor_subdirs ;
119119}XLogReaderData ;
120120
121121/* Function to process a WAL record */
@@ -174,7 +174,7 @@ static bool RunXLogThreads(const char *archivedir,
174174xlog_record_function process_record ,
175175XLogRecTarget * last_rec ,
176176bool inclusive_endpoint ,
177- bool is_stream );
177+ bool honor_subdirs );
178178//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg);
179179static bool SwitchThreadToNextWal (XLogReaderState * xlogreader ,
180180xlog_thread_arg * arg );
@@ -256,7 +256,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
256256extract_isok = RunXLogThreads (archivedir ,0 ,InvalidTransactionId ,
257257InvalidXLogRecPtr ,end_tli ,wal_seg_size ,
258258startpoint ,endpoint , false,extractPageInfo ,
259- NULL , true,false );
259+ NULL , true,true );
260260else
261261{
262262/* We have to process WAL located on several different xlog intervals,
@@ -350,7 +350,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
350350extract_isok = RunXLogThreads (archivedir ,0 ,InvalidTransactionId ,
351351InvalidXLogRecPtr ,tmp_interval -> tli ,wal_seg_size ,
352352tmp_interval -> begin_lsn ,tmp_interval -> end_lsn ,
353- false,extractPageInfo ,NULL ,inclusive_endpoint ,false );
353+ false,extractPageInfo ,NULL ,inclusive_endpoint ,true );
354354if (!extract_isok )
355355break ;
356356
@@ -379,7 +379,7 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup,
379379got_endpoint = RunXLogThreads (archivedir ,0 ,InvalidTransactionId ,
380380InvalidXLogRecPtr ,tli ,xlog_seg_size ,
381381backup -> start_lsn ,backup -> stop_lsn ,
382- false,NULL ,NULL , true,backup -> stream );
382+ false,NULL ,NULL , true,! backup -> stream );
383383
384384if (!got_endpoint )
385385{
@@ -452,6 +452,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
452452elog (WARNING ,"Backup %s WAL segments are corrupted" ,backup_id );
453453return ;
454454}
455+
455456/*
456457 * If recovery target is provided check that we can restore backup to a
457458 * recovery target time or xid.
@@ -493,7 +494,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
493494RunXLogThreads (archivedir ,target_time ,target_xid ,target_lsn ,
494495tli ,wal_seg_size ,backup -> stop_lsn ,
495496InvalidXLogRecPtr , true,validateXLogRecord ,& last_rec , true,
496- backup -> stream );
497+ true );
497498if (last_rec .rec_time > 0 )
498499time2iso (last_timestamp ,lengthof (last_timestamp ),
499500timestamptz_to_time_t (last_rec .rec_time ), false);
@@ -535,7 +536,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
535536bool
536537read_recovery_info (const char * archivedir ,TimeLineID tli ,uint32 wal_seg_size ,
537538XLogRecPtr start_lsn ,XLogRecPtr stop_lsn ,
538- time_t * recovery_time )
539+ time_t * recovery_time , bool honor_subdirs )
539540{
540541XLogRecPtr startpoint = stop_lsn ;
541542XLogReaderState * xlogreader ;
@@ -552,6 +553,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
552553
553554xlogreader = InitXLogPageRead (& reader_data ,archivedir ,tli ,wal_seg_size ,
554555 false, true, true);
556+ reader_data .honor_subdirs = honor_subdirs ;
555557
556558/* Read records from stop_lsn down to start_lsn */
557559do
@@ -611,7 +613,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
611613 */
612614bool
613615wal_contains_lsn (const char * archivedir ,XLogRecPtr target_lsn ,
614- TimeLineID target_tli ,uint32 wal_seg_size )
616+ TimeLineID target_tli ,uint32 wal_seg_size , bool honor_subdirs )
615617{
616618XLogReaderState * xlogreader ;
617619XLogReaderData reader_data ;
@@ -629,6 +631,7 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
629631elog (ERROR ,"Out of memory" );
630632
631633xlogreader -> system_identifier = instance_config .system_identifier ;
634+ reader_data .honor_subdirs = honor_subdirs ;
632635
633636#if PG_VERSION_NUM >=130000
634637if (XLogRecPtrIsInvalid (target_lsn ))
@@ -1015,38 +1018,114 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
10151018/* Try to switch to the next WAL segment */
10161019if (!reader_data -> xlogexists )
10171020{
1018- char xlogfname [MAXFNAMELEN ];
1019- char partial_file [MAXPGPATH ];
1021+ bool compressed = false;
1022+ char xlogfname [MAXFNAMELEN ];
1023+ //charpartial_file[MAXPGPATH];
1024+ char fullpath [MAXPGPATH ];
1025+ char fullpath_gz [MAXPGPATH ];
1026+ char fullpath_partial_gz [MAXPGPATH ];
10201027
10211028GetXLogFileName (xlogfname ,reader_data -> tli ,reader_data -> xlogsegno ,wal_seg_size );
10221029
1023- if (reader_data -> is_stream )
1024- join_path_components (reader_data -> xlogpath ,wal_archivedir ,xlogfname );
10251030/* obtain WAL archive subdir for ARCHIVE backup */
1026- else
1031+ if ( reader_data -> honor_subdirs )
10271032{
10281033char archive_subdir [MAXPGPATH ];
10291034get_archive_subdir (archive_subdir ,wal_archivedir ,xlogfname ,SEGMENT );
1030- join_path_components (reader_data -> xlogpath ,archive_subdir ,xlogfname );
1035+
1036+ /* check existence of wal_dir/xlogid/segment.gz file ... */
1037+ snprintf (fullpath_gz ,MAXPGPATH ,"%s/%s.gz" ,archive_subdir ,xlogfname );
1038+
1039+ //TODO: rewrite it to something less ugly
1040+ #ifdef HAVE_LIBZ
1041+ if (fileExists (fullpath_gz ,FIO_LOCAL_HOST ))
1042+ {
1043+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s" ,archive_subdir ,xlogfname );
1044+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_gz );
1045+ compressed = true;
1046+ gotofile_found ;
1047+ }
1048+
1049+ /* ... failing that check existence of wal_dir/xlogid/segment.partial.gz ... */
1050+ snprintf (fullpath_partial_gz ,MAXPGPATH ,"%s/%s.partial.gz" ,archive_subdir ,xlogfname );
1051+ if (fileExists (fullpath_partial_gz ,FIO_LOCAL_HOST ))
1052+ {
1053+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s.partial" ,archive_subdir ,xlogfname );
1054+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_partial_gz );
1055+ compressed = true;
1056+ gotofile_found ;
1057+ }
1058+ #endif
1059+ /* ... failing that check existence of wal_dir/xlogid/segment ... */
1060+ snprintf (fullpath ,MAXPGPATH ,"%s/%s" ,archive_subdir ,xlogfname );
1061+ if (fileExists (fullpath ,FIO_LOCAL_HOST ))
1062+ {
1063+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s" ,fullpath );
1064+ gotofile_found ;
1065+ }
1066+
1067+ gotoarchive_dir ;
10311068}
1069+ /* use directory as-is */
1070+ else
1071+ {
1072+ archive_dir :
1073+ #ifdef HAVE_LIBZ
1074+ /* ... failing that check existence of wal_dir/segment.gz ... */
1075+ snprintf (fullpath_gz ,MAXPGPATH ,"%s/%s.gz" ,wal_archivedir ,xlogfname );
1076+ if (fileExists (fullpath_gz ,FIO_LOCAL_HOST ))
1077+ {
1078+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_gz );
1079+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s" ,wal_archivedir ,xlogfname );
1080+ compressed = true;
10321081
1033- snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s.gz" ,reader_data -> xlogpath );
1082+ gotofile_found ;
1083+ }
1084+
1085+ /* ... failing that check existence of wal_dir/segment.partial.gz ... */
1086+ snprintf (fullpath_partial_gz ,MAXPGPATH ,"%s/%s.partial.gz" ,wal_archivedir ,xlogfname );
1087+ if (fileExists (wal_archivedir ,FIO_LOCAL_HOST ))
1088+ {
1089+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s.partial" ,wal_archivedir ,xlogfname );
1090+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_partial_gz );
1091+ compressed = true;
1092+ gotofile_found ;
1093+ }
1094+ #endif
1095+ /* ... failing that check existence of wal_dir/segment ... */
1096+ snprintf (fullpath ,MAXPGPATH ,"%s/%s" ,wal_archivedir ,xlogfname );
1097+ if (fileExists (fullpath ,FIO_LOCAL_HOST ))
1098+ {
1099+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s" ,fullpath );
1100+ gotofile_found ;
1101+ }
1102+ }
1103+
1104+ file_found :
1105+ canonicalize_path (reader_data -> xlogpath );
1106+
1107+ #ifdef HAVE_LIBZ
1108+ if (compressed )
1109+ canonicalize_path (reader_data -> gz_xlogpath );
1110+ #endif
1111+
1112+ //snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
10341113
10351114/* We fall back to using .partial segment in case if we are running
10361115 * multi-timeline incremental backup right after standby promotion.
10371116 * TODO: it should be explicitly enabled.
10381117 */
1039- snprintf (partial_file ,MAXPGPATH ,"%s.partial" ,reader_data -> xlogpath );
1118+ // snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);
10401119
10411120/* If segment do not exists, but the same
10421121 * segment with '.partial' suffix does, use it instead */
1043- if (!fileExists (reader_data -> xlogpath ,FIO_LOCAL_HOST )&&
1044- fileExists (partial_file ,FIO_LOCAL_HOST ))
1045- {
1046- snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s" ,partial_file );
1047- }
1122+ // if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
1123+ // fileExists(partial_file, FIO_LOCAL_HOST))
1124+ // {
1125+ // snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
1126+ // }
10481127
1049- if (fileExists ( reader_data -> xlogpath , FIO_LOCAL_HOST ) )
1128+ if (! compressed )
10501129{
10511130elog (LOG ,"Thread [%d]: Opening WAL segment \"%s\"" ,
10521131reader_data -> thread_num ,reader_data -> xlogpath );
@@ -1065,7 +1144,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
10651144}
10661145#ifdef HAVE_LIBZ
10671146/* Try to open compressed WAL segment */
1068- else if ( fileExists ( reader_data -> gz_xlogpath , FIO_LOCAL_HOST ))
1147+ else
10691148{
10701149elog (LOG ,"Thread [%d]: Opening compressed WAL segment \"%s\"" ,
10711150reader_data -> thread_num ,reader_data -> gz_xlogpath );
@@ -1203,7 +1282,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
12031282TransactionId target_xid ,XLogRecPtr target_lsn ,TimeLineID tli ,
12041283uint32 segment_size ,XLogRecPtr startpoint ,XLogRecPtr endpoint ,
12051284bool consistent_read ,xlog_record_function process_record ,
1206- XLogRecTarget * last_rec ,bool inclusive_endpoint ,bool is_stream )
1285+ XLogRecTarget * last_rec ,bool inclusive_endpoint ,bool honor_subdirs )
12071286{
12081287pthread_t * threads ;
12091288xlog_thread_arg * thread_args ;
@@ -1267,7 +1346,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
12671346consistent_read , false);
12681347arg -> reader_data .xlogsegno = segno_next ;
12691348arg -> reader_data .thread_num = i + 1 ;
1270- arg -> reader_data .is_stream = is_stream ;
1349+ arg -> reader_data .honor_subdirs = honor_subdirs ;
12711350arg -> process_record = process_record ;
12721351arg -> startpoint = startpoint ;
12731352arg -> endpoint = endpoint ;
@@ -1495,7 +1574,7 @@ XLogThreadWorker(void *arg)
14951574reader_data -> thread_num ,
14961575 (uint32 ) (errptr >>32 ), (uint32 ) (errptr ));
14971576
1498- /*In we failed to read record located at endpoint position,
1577+ /*If we failed to read record located at endpoint position,
14991578 * and endpoint is not inclusive, do not consider this as an error.
15001579 */
15011580if (!thread_arg -> inclusive_endpoint &&
@@ -1522,6 +1601,7 @@ XLogThreadWorker(void *arg)
15221601
15231602if (thread_arg -> process_record )
15241603thread_arg -> process_record (xlogreader ,reader_data ,& stop_reading );
1604+
15251605if (stop_reading )
15261606{
15271607thread_arg -> got_target = true;
@@ -1928,7 +2008,7 @@ bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_
19282008
19292009rc = RunXLogThreads (prefetch_dir ,0 ,InvalidTransactionId ,
19302010InvalidXLogRecPtr ,tli ,wal_seg_size ,
1931- startpoint ,endpoint , false,NULL ,NULL , true,true );
2011+ startpoint ,endpoint , false,NULL ,NULL , true,false );
19322012
19332013num_threads = tmp_num_threads ;
19342014