@@ -115,7 +115,7 @@ typedef struct XLogReaderData
115
115
gzFile gz_xlogfile ;
116
116
char gz_xlogpath [MAXPGPATH ];
117
117
#endif
118
- bool is_stream ;
118
+ bool honor_subdirs ;
119
119
}XLogReaderData ;
120
120
121
121
/* Function to process a WAL record */
@@ -174,7 +174,7 @@ static bool RunXLogThreads(const char *archivedir,
174
174
xlog_record_function process_record ,
175
175
XLogRecTarget * last_rec ,
176
176
bool inclusive_endpoint ,
177
- bool is_stream );
177
+ bool honor_subdirs );
178
178
//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg);
179
179
static bool SwitchThreadToNextWal (XLogReaderState * xlogreader ,
180
180
xlog_thread_arg * arg );
@@ -256,7 +256,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
256
256
extract_isok = RunXLogThreads (archivedir ,0 ,InvalidTransactionId ,
257
257
InvalidXLogRecPtr ,end_tli ,wal_seg_size ,
258
258
startpoint ,endpoint , false,extractPageInfo ,
259
- NULL , true,false );
259
+ NULL , true,true );
260
260
else
261
261
{
262
262
/* We have to process WAL located on several different xlog intervals,
@@ -350,7 +350,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
350
350
extract_isok = RunXLogThreads (archivedir ,0 ,InvalidTransactionId ,
351
351
InvalidXLogRecPtr ,tmp_interval -> tli ,wal_seg_size ,
352
352
tmp_interval -> begin_lsn ,tmp_interval -> end_lsn ,
353
- false,extractPageInfo ,NULL ,inclusive_endpoint ,false );
353
+ false,extractPageInfo ,NULL ,inclusive_endpoint ,true );
354
354
if (!extract_isok )
355
355
break ;
356
356
@@ -379,7 +379,7 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup,
379
379
got_endpoint = RunXLogThreads (archivedir ,0 ,InvalidTransactionId ,
380
380
InvalidXLogRecPtr ,tli ,xlog_seg_size ,
381
381
backup -> start_lsn ,backup -> stop_lsn ,
382
- false,NULL ,NULL , true,backup -> stream );
382
+ false,NULL ,NULL , true,! backup -> stream );
383
383
384
384
if (!got_endpoint )
385
385
{
@@ -452,6 +452,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
452
452
elog (WARNING ,"Backup %s WAL segments are corrupted" ,backup_id );
453
453
return ;
454
454
}
455
+
455
456
/*
456
457
* If recovery target is provided check that we can restore backup to a
457
458
* recovery target time or xid.
@@ -493,7 +494,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
493
494
RunXLogThreads (archivedir ,target_time ,target_xid ,target_lsn ,
494
495
tli ,wal_seg_size ,backup -> stop_lsn ,
495
496
InvalidXLogRecPtr , true,validateXLogRecord ,& last_rec , true,
496
- backup -> stream );
497
+ true );
497
498
if (last_rec .rec_time > 0 )
498
499
time2iso (last_timestamp ,lengthof (last_timestamp ),
499
500
timestamptz_to_time_t (last_rec .rec_time ), false);
@@ -535,7 +536,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
535
536
bool
536
537
read_recovery_info (const char * archivedir ,TimeLineID tli ,uint32 wal_seg_size ,
537
538
XLogRecPtr start_lsn ,XLogRecPtr stop_lsn ,
538
- time_t * recovery_time )
539
+ time_t * recovery_time , bool honor_subdirs )
539
540
{
540
541
XLogRecPtr startpoint = stop_lsn ;
541
542
XLogReaderState * xlogreader ;
@@ -552,6 +553,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
552
553
553
554
xlogreader = InitXLogPageRead (& reader_data ,archivedir ,tli ,wal_seg_size ,
554
555
false, true, true);
556
+ reader_data .honor_subdirs = honor_subdirs ;
555
557
556
558
/* Read records from stop_lsn down to start_lsn */
557
559
do
@@ -611,7 +613,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
611
613
*/
612
614
bool
613
615
wal_contains_lsn (const char * archivedir ,XLogRecPtr target_lsn ,
614
- TimeLineID target_tli ,uint32 wal_seg_size )
616
+ TimeLineID target_tli ,uint32 wal_seg_size , bool honor_subdirs )
615
617
{
616
618
XLogReaderState * xlogreader ;
617
619
XLogReaderData reader_data ;
@@ -629,6 +631,7 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
629
631
elog (ERROR ,"Out of memory" );
630
632
631
633
xlogreader -> system_identifier = instance_config .system_identifier ;
634
+ reader_data .honor_subdirs = honor_subdirs ;
632
635
633
636
#if PG_VERSION_NUM >=130000
634
637
if (XLogRecPtrIsInvalid (target_lsn ))
@@ -1015,38 +1018,114 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
1015
1018
/* Try to switch to the next WAL segment */
1016
1019
if (!reader_data -> xlogexists )
1017
1020
{
1018
- char xlogfname [MAXFNAMELEN ];
1019
- char partial_file [MAXPGPATH ];
1021
+ bool compressed = false;
1022
+ char xlogfname [MAXFNAMELEN ];
1023
+ //charpartial_file[MAXPGPATH];
1024
+ char fullpath [MAXPGPATH ];
1025
+ char fullpath_gz [MAXPGPATH ];
1026
+ char fullpath_partial_gz [MAXPGPATH ];
1020
1027
1021
1028
GetXLogFileName (xlogfname ,reader_data -> tli ,reader_data -> xlogsegno ,wal_seg_size );
1022
1029
1023
- if (reader_data -> is_stream )
1024
- join_path_components (reader_data -> xlogpath ,wal_archivedir ,xlogfname );
1025
1030
/* obtain WAL archive subdir for ARCHIVE backup */
1026
- else
1031
+ if ( reader_data -> honor_subdirs )
1027
1032
{
1028
1033
char archive_subdir [MAXPGPATH ];
1029
1034
get_archive_subdir (archive_subdir ,wal_archivedir ,xlogfname ,SEGMENT );
1030
- join_path_components (reader_data -> xlogpath ,archive_subdir ,xlogfname );
1035
+
1036
+ /* check existence of wal_dir/xlogid/segment.gz file ... */
1037
+ snprintf (fullpath_gz ,MAXPGPATH ,"%s/%s.gz" ,archive_subdir ,xlogfname );
1038
+
1039
+ //TODO: rewrite it to something less ugly
1040
+ #ifdef HAVE_LIBZ
1041
+ if (fileExists (fullpath_gz ,FIO_LOCAL_HOST ))
1042
+ {
1043
+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s" ,archive_subdir ,xlogfname );
1044
+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_gz );
1045
+ compressed = true;
1046
+ gotofile_found ;
1047
+ }
1048
+
1049
+ /* ... failing that check existence of wal_dir/xlogid/segment.partial.gz ... */
1050
+ snprintf (fullpath_partial_gz ,MAXPGPATH ,"%s/%s.partial.gz" ,archive_subdir ,xlogfname );
1051
+ if (fileExists (fullpath_partial_gz ,FIO_LOCAL_HOST ))
1052
+ {
1053
+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s.partial" ,archive_subdir ,xlogfname );
1054
+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_partial_gz );
1055
+ compressed = true;
1056
+ gotofile_found ;
1057
+ }
1058
+ #endif
1059
+ /* ... failing that check existence of wal_dir/xlogid/segment ... */
1060
+ snprintf (fullpath ,MAXPGPATH ,"%s/%s" ,archive_subdir ,xlogfname );
1061
+ if (fileExists (fullpath ,FIO_LOCAL_HOST ))
1062
+ {
1063
+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s" ,fullpath );
1064
+ gotofile_found ;
1065
+ }
1066
+
1067
+ gotoarchive_dir ;
1031
1068
}
1069
+ /* use directory as-is */
1070
+ else
1071
+ {
1072
+ archive_dir :
1073
+ #ifdef HAVE_LIBZ
1074
+ /* ... failing that check existence of wal_dir/segment.gz ... */
1075
+ snprintf (fullpath_gz ,MAXPGPATH ,"%s/%s.gz" ,wal_archivedir ,xlogfname );
1076
+ if (fileExists (fullpath_gz ,FIO_LOCAL_HOST ))
1077
+ {
1078
+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_gz );
1079
+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s" ,wal_archivedir ,xlogfname );
1080
+ compressed = true;
1032
1081
1033
- snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s.gz" ,reader_data -> xlogpath );
1082
+ gotofile_found ;
1083
+ }
1084
+
1085
+ /* ... failing that check existence of wal_dir/segment.partial.gz ... */
1086
+ snprintf (fullpath_partial_gz ,MAXPGPATH ,"%s/%s.partial.gz" ,wal_archivedir ,xlogfname );
1087
+ if (fileExists (wal_archivedir ,FIO_LOCAL_HOST ))
1088
+ {
1089
+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s/%s.partial" ,wal_archivedir ,xlogfname );
1090
+ snprintf (reader_data -> gz_xlogpath ,MAXPGPATH ,"%s" ,fullpath_partial_gz );
1091
+ compressed = true;
1092
+ gotofile_found ;
1093
+ }
1094
+ #endif
1095
+ /* ... failing that check existence of wal_dir/segment ... */
1096
+ snprintf (fullpath ,MAXPGPATH ,"%s/%s" ,wal_archivedir ,xlogfname );
1097
+ if (fileExists (fullpath ,FIO_LOCAL_HOST ))
1098
+ {
1099
+ snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s" ,fullpath );
1100
+ gotofile_found ;
1101
+ }
1102
+ }
1103
+
1104
+ file_found :
1105
+ canonicalize_path (reader_data -> xlogpath );
1106
+
1107
+ #ifdef HAVE_LIBZ
1108
+ if (compressed )
1109
+ canonicalize_path (reader_data -> gz_xlogpath );
1110
+ #endif
1111
+
1112
+ //snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
1034
1113
1035
1114
/* We fall back to using .partial segment in case if we are running
1036
1115
* multi-timeline incremental backup right after standby promotion.
1037
1116
* TODO: it should be explicitly enabled.
1038
1117
*/
1039
- snprintf (partial_file ,MAXPGPATH ,"%s.partial" ,reader_data -> xlogpath );
1118
+ // snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);
1040
1119
1041
1120
/* If segment do not exists, but the same
1042
1121
* segment with '.partial' suffix does, use it instead */
1043
- if (!fileExists (reader_data -> xlogpath ,FIO_LOCAL_HOST )&&
1044
- fileExists (partial_file ,FIO_LOCAL_HOST ))
1045
- {
1046
- snprintf (reader_data -> xlogpath ,MAXPGPATH ,"%s" ,partial_file );
1047
- }
1122
+ // if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
1123
+ // fileExists(partial_file, FIO_LOCAL_HOST))
1124
+ // {
1125
+ // snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
1126
+ // }
1048
1127
1049
- if (fileExists ( reader_data -> xlogpath , FIO_LOCAL_HOST ) )
1128
+ if (! compressed )
1050
1129
{
1051
1130
elog (LOG ,"Thread [%d]: Opening WAL segment \"%s\"" ,
1052
1131
reader_data -> thread_num ,reader_data -> xlogpath );
@@ -1065,7 +1144,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
1065
1144
}
1066
1145
#ifdef HAVE_LIBZ
1067
1146
/* Try to open compressed WAL segment */
1068
- else if ( fileExists ( reader_data -> gz_xlogpath , FIO_LOCAL_HOST ))
1147
+ else
1069
1148
{
1070
1149
elog (LOG ,"Thread [%d]: Opening compressed WAL segment \"%s\"" ,
1071
1150
reader_data -> thread_num ,reader_data -> gz_xlogpath );
@@ -1203,7 +1282,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
1203
1282
TransactionId target_xid ,XLogRecPtr target_lsn ,TimeLineID tli ,
1204
1283
uint32 segment_size ,XLogRecPtr startpoint ,XLogRecPtr endpoint ,
1205
1284
bool consistent_read ,xlog_record_function process_record ,
1206
- XLogRecTarget * last_rec ,bool inclusive_endpoint ,bool is_stream )
1285
+ XLogRecTarget * last_rec ,bool inclusive_endpoint ,bool honor_subdirs )
1207
1286
{
1208
1287
pthread_t * threads ;
1209
1288
xlog_thread_arg * thread_args ;
@@ -1267,7 +1346,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
1267
1346
consistent_read , false);
1268
1347
arg -> reader_data .xlogsegno = segno_next ;
1269
1348
arg -> reader_data .thread_num = i + 1 ;
1270
- arg -> reader_data .is_stream = is_stream ;
1349
+ arg -> reader_data .honor_subdirs = honor_subdirs ;
1271
1350
arg -> process_record = process_record ;
1272
1351
arg -> startpoint = startpoint ;
1273
1352
arg -> endpoint = endpoint ;
@@ -1495,7 +1574,7 @@ XLogThreadWorker(void *arg)
1495
1574
reader_data -> thread_num ,
1496
1575
(uint32 ) (errptr >>32 ), (uint32 ) (errptr ));
1497
1576
1498
- /*In we failed to read record located at endpoint position,
1577
+ /*If we failed to read record located at endpoint position,
1499
1578
* and endpoint is not inclusive, do not consider this as an error.
1500
1579
*/
1501
1580
if (!thread_arg -> inclusive_endpoint &&
@@ -1522,6 +1601,7 @@ XLogThreadWorker(void *arg)
1522
1601
1523
1602
if (thread_arg -> process_record )
1524
1603
thread_arg -> process_record (xlogreader ,reader_data ,& stop_reading );
1604
+
1525
1605
if (stop_reading )
1526
1606
{
1527
1607
thread_arg -> got_target = true;
@@ -1928,7 +2008,7 @@ bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_
1928
2008
1929
2009
rc = RunXLogThreads (prefetch_dir ,0 ,InvalidTransactionId ,
1930
2010
InvalidXLogRecPtr ,tli ,wal_seg_size ,
1931
- startpoint ,endpoint , false,NULL ,NULL , true,true );
2011
+ startpoint ,endpoint , false,NULL ,NULL , true,false );
1932
2012
1933
2013
num_threads = tmp_num_threads ;
1934
2014