Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit51c0d18

Browse files
committed
Allow parallel zstd compression when taking a base backup.
libzstd allows transparent parallel compression just by settingan option when creating the compression context, so permit thatfor both client and server-side backup compression. To use this,use something like pg_basebackup --compress WHERE-zstd:workers=Nwhere WHERE is "client" or "server" and N is an integer.When compression is performed on the server side, this will spawnthreads inside the PostgreSQL backend. While there is almost noPostgreSQL server code which is thread-safe, the threads here are usedinternally by libzstd and touch only data structures controlled bylibzstd.Patch by me, based in part on earlier work by Dipesh Panditand Jeevan Ladhe. Reviewed by Justin Pryzby.Discussion:http://postgr.es/m/CA+Tgmobj6u-nWF-j=FemygUhobhryLxf9h-wJN7W-2rSsseHNA@mail.gmail.com
1 parentc6863b8 commit51c0d18

File tree

9 files changed

+147
-39
lines changed

9 files changed

+147
-39
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2739,17 +2739,23 @@ The commands accepted in replication mode are:
27392739
option. If the value is an integer, it specifies the compression
27402740
level. Otherwise, it should be a comma-separated list of items,
27412741
each of the form <literal>keyword</literal> or
2742-
<literal>keyword=value</literal>. Currently, the only supported
2743-
keyword is <literal>level</literal>, which sets the compression
2744-
level.
2742+
<literal>keyword=value</literal>. Currently, the supported keywords
2743+
are <literal>level</literal> and <literal>workers</literal>.
27452744
</para>
27462745

27472746
<para>
2747+
The <literal>level</literal> keyword sets the compression level.
27482748
For <literal>gzip</literal> the compression level should be an
27492749
integer between 1 and 9, for <literal>lz4</literal> an integer
27502750
between 1 and 12, and for <literal>zstd</literal> an integer
27512751
between 1 and 22.
27522752
</para>
2753+
2754+
<para>
2755+
The <literal>workers</literal> keyword sets the number of threads
2756+
that should be used for parallel compression. Parallel compression
2757+
is supported only for <literal>zstd</literal>.
2758+
</para>
27532759
</listitem>
27542760
</varlistentry>
27552761

‎doc/src/sgml/ref/pg_basebackup.sgml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,8 @@ PostgreSQL documentation
424424
integer, it specifies the compression level. Otherwise, it should be
425425
a comma-separated list of items, each of the form
426426
<literal>keyword</literal> or <literal>keyword=value</literal>.
427-
Currently, theonlysupportedkeyword is <literal>level</literal>,
428-
which sets the compression level.
427+
Currently, the supportedkeywords are <literal>level</literal>
428+
and <literal>workers</literal>.
429429
</para>
430430
<para>
431431
If no compression level is specified, the default compression level

‎src/backend/replication/basebackup_zstd.c

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ typedef struct bbsink_zstd
2525
/* Common information for all types of sink. */
2626
bbsinkbase;
2727

28-
/* Compressionlevel */
29-
intcompresslevel;
28+
/* Compressionoptions */
29+
bc_specification*compress;
3030

3131
ZSTD_CCtx*cctx;
3232
ZSTD_outBufferzstd_outBuf;
@@ -67,22 +67,13 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
6767
returnNULL;/* keep compiler quiet */
6868
#else
6969
bbsink_zstd*sink;
70-
intcompresslevel;
7170

7271
Assert(next!=NULL);
7372

74-
if ((compress->options&BACKUP_COMPRESSION_OPTION_LEVEL)==0)
75-
compresslevel=0;
76-
else
77-
{
78-
compresslevel=compress->level;
79-
Assert(compresslevel >=1&&compresslevel <=22);
80-
}
81-
8273
sink=palloc0(sizeof(bbsink_zstd));
8374
*((constbbsink_ops**)&sink->base.bbs_ops)=&bbsink_zstd_ops;
8475
sink->base.bbs_next=next;
85-
sink->compresslevel=compresslevel;
76+
sink->compress=compress;
8677

8778
return&sink->base;
8879
#endif
@@ -99,16 +90,36 @@ bbsink_zstd_begin_backup(bbsink *sink)
9990
bbsink_zstd*mysink= (bbsink_zstd*)sink;
10091
size_toutput_buffer_bound;
10192
size_tret;
93+
bc_specification*compress=mysink->compress;
10294

10395
mysink->cctx=ZSTD_createCCtx();
10496
if (!mysink->cctx)
10597
elog(ERROR,"could not create zstd compression context");
10698

107-
ret=ZSTD_CCtx_setParameter(mysink->cctx,ZSTD_c_compressionLevel,
108-
mysink->compresslevel);
109-
if (ZSTD_isError(ret))
110-
elog(ERROR,"could not set zstd compression level to %d: %s",
111-
mysink->compresslevel,ZSTD_getErrorName(ret));
99+
if ((compress->options&BACKUP_COMPRESSION_OPTION_LEVEL)!=0)
100+
{
101+
ret=ZSTD_CCtx_setParameter(mysink->cctx,ZSTD_c_compressionLevel,
102+
compress->level);
103+
if (ZSTD_isError(ret))
104+
elog(ERROR,"could not set zstd compression level to %d: %s",
105+
compress->level,ZSTD_getErrorName(ret));
106+
}
107+
108+
if ((compress->options&BACKUP_COMPRESSION_OPTION_WORKERS)!=0)
109+
{
110+
/*
111+
* On older versions of libzstd, this option does not exist, and trying
112+
* to set it will fail. Similarly for newer versions if they are
113+
* compiled without threading support.
114+
*/
115+
ret=ZSTD_CCtx_setParameter(mysink->cctx,ZSTD_c_nbWorkers,
116+
compress->workers);
117+
if (ZSTD_isError(ret))
118+
ereport(ERROR,
119+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120+
errmsg("could not set compression worker count to %d: %s",
121+
compress->workers,ZSTD_getErrorName(ret)));
122+
}
112123

113124
/*
114125
* We need our own buffer, because we're going to pass different data to

‎src/bin/pg_basebackup/bbstreamer_zstd.c

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
6767
{
6868
#ifdefUSE_ZSTD
6969
bbstreamer_zstd_frame*streamer;
70-
intcompresslevel;
7170
size_tret;
7271

7372
Assert(next!=NULL);
@@ -88,18 +87,35 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
8887
exit(1);
8988
}
9089

91-
/* Initialize stream compression preferences */
92-
if ((compress->options&BACKUP_COMPRESSION_OPTION_LEVEL)==0)
93-
compresslevel=0;
94-
else
95-
compresslevel=compress->level;
96-
ret=ZSTD_CCtx_setParameter(streamer->cctx,ZSTD_c_compressionLevel,
97-
compresslevel);
98-
if (ZSTD_isError(ret))
90+
/* Set compression level, if specified */
91+
if ((compress->options&BACKUP_COMPRESSION_OPTION_LEVEL)!=0)
9992
{
100-
pg_log_error("could not set zstd compression level to %d: %s",
101-
compresslevel,ZSTD_getErrorName(ret));
102-
exit(1);
93+
ret=ZSTD_CCtx_setParameter(streamer->cctx,ZSTD_c_compressionLevel,
94+
compress->level);
95+
if (ZSTD_isError(ret))
96+
{
97+
pg_log_error("could not set zstd compression level to %d: %s",
98+
compress->level,ZSTD_getErrorName(ret));
99+
exit(1);
100+
}
101+
}
102+
103+
/* Set # of workers, if specified */
104+
if ((compress->options&BACKUP_COMPRESSION_OPTION_WORKERS)!=0)
105+
{
106+
/*
107+
* On older versions of libzstd, this option does not exist, and
108+
* trying to set it will fail. Similarly for newer versions if they
109+
* are compiled without threading support.
110+
*/
111+
ret=ZSTD_CCtx_setParameter(streamer->cctx,ZSTD_c_nbWorkers,
112+
compress->workers);
113+
if (ZSTD_isError(ret))
114+
{
115+
pg_log_error("could not set compression worker count to %d: %s",
116+
compress->workers,ZSTD_getErrorName(ret));
117+
exit(1);
118+
}
103119
}
104120

105121
/* Initialize the ZSTD output buffer. */

‎src/bin/pg_basebackup/t/010_pg_basebackup.pl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@
130130
'invalid compression specification: found empty string where a compression option was expected',
131131
'failure on extra, empty compression option'
132132
],
133+
[
134+
'gzip:workers=3',
135+
'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
136+
'failure on worker count for gzip'
137+
],
133138
);
134139
formy$cft (@compression_failure_tests)
135140
{

‎src/bin/pg_verifybackup/t/009_extract.pl

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434
'compression_method'=>'zstd',
3535
'backup_flags'=> ['--compress','server-zstd:5'],
3636
'enabled'=> check_pg_config("#define USE_ZSTD 1")
37+
},
38+
{
39+
'compression_method'=>'parallel zstd',
40+
'backup_flags'=> ['--compress','server-zstd:workers=3'],
41+
'enabled'=> check_pg_config("#define USE_ZSTD 1"),
42+
'possibly_unsupported'=>qr/could not set compression worker count to 3: Unsupported parameter/
3743
}
3844
);
3945

@@ -55,8 +61,27 @@
5561
my@verify = ('pg_verifybackup','-e',$backup_path);
5662

5763
# A backup with a valid compression method should work.
58-
$primary->command_ok(\@backup,
59-
"backup done, compression method\"$method\"");
64+
my$backup_stdout ='';
65+
my$backup_stderr ='';
66+
my$backup_result =$primary->run_log(\@backup,'>', \$backup_stdout,
67+
'2>', \$backup_stderr);
68+
if ($backup_stdoutne'')
69+
{
70+
print"# standard output was:\n$backup_stdout";
71+
}
72+
if ($backup_stderrne'')
73+
{
74+
print"# standard error was:\n$backup_stderr";
75+
}
76+
if (!$backup_result &&$tc->{'possibly_unsupported'} &&
77+
$backup_stderr =~/$tc->{'possibly_unsupported'}/)
78+
{
79+
skip"compression with$method not supported by this build", 2;
80+
}
81+
else
82+
{
83+
ok($backup_result,"backup done, compression$method");
84+
}
6085

6186
# Make sure that it verifies OK.
6287
$primary->command_ok(\@verify,

‎src/bin/pg_verifybackup/t/010_client_untar.pl

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@
4949
'decompress_program'=>$ENV{'ZSTD'},
5050
'decompress_flags'=> ['-d' ],
5151
'enabled'=> check_pg_config("#define USE_ZSTD 1")
52+
},
53+
{
54+
'compression_method'=>'parallel zstd',
55+
'backup_flags'=> ['--compress','client-zstd:workers=3'],
56+
'backup_archive'=>'base.tar.zst',
57+
'decompress_program'=>$ENV{'ZSTD'},
58+
'decompress_flags'=> ['-d' ],
59+
'enabled'=> check_pg_config("#define USE_ZSTD 1"),
60+
'possibly_unsupported'=>qr/could not set compression worker count to 3: Unsupported parameter/
5261
}
5362
);
5463

@@ -69,9 +78,27 @@
6978
'pg_basebackup','-D',$backup_path,
7079
'-Xfetch','--no-sync','-cfast','-Ft');
7180
push@backup, @{$tc->{'backup_flags'}};
72-
$primary->command_ok(\@backup,
73-
"client side backup, compression$method");
74-
81+
my$backup_stdout ='';
82+
my$backup_stderr ='';
83+
my$backup_result =$primary->run_log(\@backup,'>', \$backup_stdout,
84+
'2>', \$backup_stderr);
85+
if ($backup_stdoutne'')
86+
{
87+
print"# standard output was:\n$backup_stdout";
88+
}
89+
if ($backup_stderrne'')
90+
{
91+
print"# standard error was:\n$backup_stderr";
92+
}
93+
if (!$backup_result &&$tc->{'possibly_unsupported'} &&
94+
$backup_stderr =~/$tc->{'possibly_unsupported'}/)
95+
{
96+
skip"compression with$method not supported by this build", 3;
97+
}
98+
else
99+
{
100+
ok($backup_result,"client side backup, compression$method");
101+
}
75102

76103
# Verify that the we got the files we expected.
77104
my$backup_files =join(',',

‎src/common/backup_compression.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification,
177177
result->level=expect_integer_value(keyword,value,result);
178178
result->options |=BACKUP_COMPRESSION_OPTION_LEVEL;
179179
}
180+
elseif (strcmp(keyword,"workers")==0)
181+
{
182+
result->workers=expect_integer_value(keyword,value,result);
183+
result->options |=BACKUP_COMPRESSION_OPTION_WORKERS;
184+
}
180185
else
181186
result->parse_error=
182187
psprintf(_("unknown compression option \"%s\""),keyword);
@@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec)
266271
min_level,max_level);
267272
}
268273

274+
/*
275+
* Of the compression algorithms that we currently support, only zstd
276+
* allows parallel workers.
277+
*/
278+
if ((spec->options&BACKUP_COMPRESSION_OPTION_WORKERS)!=0&&
279+
(spec->algorithm!=BACKUP_COMPRESSION_ZSTD))
280+
{
281+
returnpsprintf(_("compression algorithm \"%s\" does not accept a worker count"),
282+
get_bc_algorithm_name(spec->algorithm));
283+
}
284+
269285
returnNULL;
270286
}

‎src/include/common/backup_compression.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ typedef enum bc_algorithm
2323
}bc_algorithm;
2424

2525
#defineBACKUP_COMPRESSION_OPTION_LEVEL(1 << 0)
26+
#defineBACKUP_COMPRESSION_OPTION_WORKERS(1 << 1)
2627

2728
typedefstructbc_specification
2829
{
2930
bc_algorithmalgorithm;
3031
unsignedoptions;/* OR of BACKUP_COMPRESSION_OPTION constants */
3132
intlevel;
33+
intworkers;
3234
char*parse_error;/* NULL if parsing was OK, else message */
3335
}bc_specification;
3436

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp