Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3873318

Browse files
committed
Decompress restored file by remote agent
1 parentda8bf6d commit3873318

File tree

4 files changed

+79
-49
lines changed

4 files changed

+79
-49
lines changed

‎src/data.c

Lines changed: 29 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
8989
* Decompresses source into dest using algorithm. Returns the number of bytes
9090
* decompressed in the destination buffer, or -1 if decompression fails.
9191
*/
92-
staticint32
92+
int32
9393
do_decompress(void*dst,size_tdst_size,voidconst*src,size_tsrc_size,
9494
CompressAlgalg,constchar**errormsg)
9595
{
@@ -719,6 +719,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
719719
BlockNumberblknum=0,
720720
truncate_from=0;
721721
boolneed_truncate= false;
722+
size_trc;
722723

723724
/* BYTES_INVALID allowed only in case of restoring file from DELTA backup */
724725
if (file->write_size!=BYTES_INVALID)
@@ -750,9 +751,9 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
750751
{
751752
off_twrite_pos;
752753
size_tread_len;
753-
DataPagecompressed_page;/* used as read buffer */
754754
DataPagepage;
755-
int32uncompressed_size=0;
755+
int32compressed_size;
756+
constchar*errormsg=NULL;
756757

757758
/* File didn`t changed. Nothig to copy */
758759
if (file->write_size==BYTES_INVALID)
@@ -789,7 +790,9 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
789790
blknum,file->path,strerror(errno_tmp));
790791
}
791792

792-
if (header.block==0&&header.compressed_size==0)
793+
compressed_size=header.compressed_size;
794+
795+
if (header.block==0&&compressed_size)
793796
{
794797
elog(VERBOSE,"Skip empty block of \"%s\"",file->path);
795798
continue;
@@ -801,7 +804,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
801804

802805
blknum=header.block;
803806

804-
if (header.compressed_size==PageIsTruncated)
807+
if (compressed_size==PageIsTruncated)
805808
{
806809
/*
807810
* Backup contains information that this block was truncated.
@@ -812,39 +815,14 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
812815
break;
813816
}
814817

815-
Assert(header.compressed_size <=BLCKSZ);
818+
Assert(compressed_size <=BLCKSZ);
816819

817820
/* read a page from file */
818-
read_len=fread(compressed_page.data,1,
819-
MAXALIGN(header.compressed_size),in);
820-
if (read_len!=MAXALIGN(header.compressed_size))
821+
read_len=fread(page.data,1,
822+
MAXALIGN(compressed_size),in);
823+
if (read_len!=MAXALIGN(compressed_size))
821824
elog(ERROR,"Cannot read block %u of \"%s\" read %zu of %d",
822-
blknum,file->path,read_len,header.compressed_size);
823-
824-
/*
825-
* if page size is smaller than BLCKSZ, decompress the page.
826-
* BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ.
827-
* we have to check, whether it is compressed or not using
828-
* page_may_be_compressed() function.
829-
*/
830-
if (header.compressed_size!=BLCKSZ
831-
||page_may_be_compressed(compressed_page.data,file->compress_alg,
832-
backup_version))
833-
{
834-
constchar*errormsg=NULL;
835-
836-
uncompressed_size=do_decompress(page.data,BLCKSZ,
837-
compressed_page.data,
838-
header.compressed_size,
839-
file->compress_alg,&errormsg);
840-
if (uncompressed_size<0&&errormsg!=NULL)
841-
elog(WARNING,"An error occured during decompressing block %u of file \"%s\": %s",
842-
blknum,file->path,errormsg);
843-
844-
if (uncompressed_size!=BLCKSZ)
845-
elog(ERROR,"Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ",
846-
file->path,uncompressed_size);
847-
}
825+
blknum,file->path,read_len,compressed_size);
848826

849827
write_pos= (write_header) ?blknum* (BLCKSZ+sizeof(header)) :
850828
blknum*BLCKSZ;
@@ -865,21 +843,24 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
865843
blknum,file->path,strerror(errno));
866844
}
867845

868-
/* if we uncompressed the page - write page.data,
869-
* if page wasn't compressed -
870-
* write what we've read - compressed_page.data
846+
/*
847+
* if page size is smaller than BLCKSZ, decompress the page.
848+
* BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ.
849+
* we have to check, whether it is compressed or not using
850+
* page_may_be_compressed() function.
871851
*/
872-
if (uncompressed_size==BLCKSZ)
873-
{
874-
if (fio_fwrite(out,page.data,BLCKSZ)!=BLCKSZ)
875-
elog(ERROR,"Cannot write block %u of \"%s\": %s",
876-
blknum,file->path,strerror(errno));
877-
}
878-
else
852+
rc= (compressed_size!=BLCKSZ||page_may_be_compressed(page.data,file->compress_alg,backup_version))
853+
?fio_fwrite_compressed(out,page.data,compressed_size,file->compress_alg,&errormsg)
854+
:fio_fwrite(out,page.data,compressed_size);
855+
856+
if (rc!=compressed_size)
879857
{
880-
if (fio_fwrite(out,compressed_page.data,BLCKSZ)!=BLCKSZ)
881-
elog(ERROR,"Cannot write block %u of \"%s\": %s",
882-
blknum,file->path,strerror(errno));
858+
if (errormsg!=NULL)
859+
elog(WARNING,"An error occured during decompressing block %u of file \"%s\": %s",
860+
blknum,file->path,errormsg);
861+
862+
elog(ERROR,"Cannot write block %u of \"%s\": %s",
863+
blknum,file->path,strerror(errno));
883864
}
884865
}
885866

‎src/pg_probackup.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,8 @@ int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
700700
externPGconn*pgdata_basic_setup(ConnectionOptionsconn_opt,PGNodeInfo*nodeInfo);
701701
externvoidcheck_system_identifiers(PGconn*conn,char*pgdata);
702702
externvoidparse_filelist_filenames(parray*files,constchar*root);
703+
externint32do_decompress(void*dst,size_tdst_size,voidconst*src,size_tsrc_size,
704+
CompressAlgalg,constchar**errormsg);
703705

704706

705707
#endif/* PG_PROBACKUP_H */

‎src/utils/file.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,35 @@ size_t fio_fwrite(FILE* f, void const* buf, size_t size)
566566
:fwrite(buf,1,size,f);
567567
}
568568

569+
/* Write data to stdio file */
570+
size_tfio_fwrite_compressed(FILE*f,voidconst*buf,size_tsize,intcompress_alg,constchar**errmsg)
571+
{
572+
uint32decompressed_size;
573+
chardecompressed_page[BLCKSZ];
574+
575+
if (fio_is_remote_file(f))
576+
{
577+
fio_headerhdr;
578+
579+
hdr.cop=FIO_WRITE_COMPRESSED;
580+
hdr.handle=fio_fileno(f)& ~FIO_PIPE_MARKER;
581+
hdr.size=size;
582+
hdr.arg=compress_alg;
583+
584+
IO_CHECK(fio_write_all(fio_stdout,&hdr,sizeof(hdr)),sizeof(hdr));
585+
IO_CHECK(fio_write_all(fio_stdout,buf,size),size);
586+
587+
returnsize;
588+
}
589+
decompressed_size=do_decompress(decompressed_page,
590+
BLCKSZ,
591+
buf,
592+
size,
593+
(CompressAlg)compress_alg,errmsg);
594+
returndecompressed_size!=BLCKSZ
595+
?0 :fwrite(decompressed_page,1,decompressed_size,f);
596+
}
597+
569598
/* Write data to the file */
570599
ssize_tfio_write(intfd,voidconst*buf,size_tsize)
571600
{
@@ -1381,6 +1410,22 @@ void fio_communicate(int in, int out)
13811410
caseFIO_WRITE:/* Write to the current position in file */
13821411
IO_CHECK(fio_write_all(fd[hdr.handle],buf,hdr.size),hdr.size);
13831412
break;
1413+
caseFIO_WRITE_COMPRESSED:/* Write to the current position in file */
1414+
{
1415+
chardecompressed_page[BLCKSZ];
1416+
charconst*errmsg=NULL;
1417+
int32decompressed_size=do_decompress(decompressed_page,BLCKSZ,
1418+
buf,
1419+
hdr.size,
1420+
(CompressAlg)hdr.arg,&errmsg);
1421+
if (errmsg!=NULL||decompressed_size!=BLCKSZ)
1422+
{
1423+
fprintf(stderr,"Failed to decompress block: %s",errmsg ?errmsg:"unknown error");
1424+
exit(EXIT_FAILURE);
1425+
}
1426+
IO_CHECK(fio_write_all(fd[hdr.handle],decompressed_page,BLCKSZ),BLCKSZ);
1427+
}
1428+
break;
13841429
caseFIO_READ:/* Read from the current position in file */
13851430
if ((size_t)hdr.arg>buf_size) {
13861431
buf_size=hdr.arg;

‎src/utils/file.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ typedef enum
3333
FIO_READDIR,
3434
FIO_CLOSEDIR,
3535
FIO_SEND_PAGES,
36-
FIO_PAGE
36+
FIO_PAGE,
37+
FIO_WRITE_COMPRESSED,
3738
}fio_operations;
3839

3940
typedefenum
@@ -69,6 +70,7 @@ extern void fio_communicate(int in, int out);
6970

7071
externFILE*fio_fopen(charconst*name,charconst*mode,fio_locationlocation);
7172
externsize_tfio_fwrite(FILE*f,voidconst*buf,size_tsize);
73+
externsize_tfio_fwrite_compressed(FILE*f,voidconst*buf,size_tsize,intcompress_alg,constchar**errmsg);
7274
externssize_tfio_fread(FILE*f,void*buf,size_tsize);
7375
externintfio_pread(FILE*f,void*buf,off_toffs);
7476
externintfio_fprintf(FILE*f,charconst*arg, ...)pg_attribute_printf(2,3);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp