|
| 1 | +/*------------------------------------------------------------------------- |
| 2 | + * |
| 3 | + * basebackup_lz4.c |
| 4 | + * Basebackup sink implementing lz4 compression. |
| 5 | + * |
| 6 | + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group |
| 7 | + * |
| 8 | + * IDENTIFICATION |
| 9 | + * src/backend/replication/basebackup_lz4.c |
| 10 | + * |
| 11 | + *------------------------------------------------------------------------- |
| 12 | + */ |
| 13 | +#include"postgres.h" |
| 14 | + |
| 15 | +#ifdefHAVE_LIBLZ4 |
| 16 | +#include<lz4frame.h> |
| 17 | +#endif |
| 18 | + |
| 19 | +#include"replication/basebackup_sink.h" |
| 20 | + |
| 21 | +#ifdefHAVE_LIBLZ4 |
| 22 | + |
| 23 | +typedefstructbbsink_lz4 |
| 24 | +{ |
| 25 | +/* Common information for all types of sink. */ |
| 26 | +bbsinkbase; |
| 27 | + |
| 28 | +/* Compression level. */ |
| 29 | +intcompresslevel; |
| 30 | + |
| 31 | +LZ4F_compressionContext_tctx; |
| 32 | +LZ4F_preferences_tprefs; |
| 33 | + |
| 34 | +/* Number of bytes staged in output buffer. */ |
| 35 | +size_tbytes_written; |
| 36 | +}bbsink_lz4; |
| 37 | + |
| 38 | +staticvoidbbsink_lz4_begin_backup(bbsink*sink); |
| 39 | +staticvoidbbsink_lz4_begin_archive(bbsink*sink,constchar*archive_name); |
| 40 | +staticvoidbbsink_lz4_archive_contents(bbsink*sink,size_tavail_in); |
| 41 | +staticvoidbbsink_lz4_manifest_contents(bbsink*sink,size_tlen); |
| 42 | +staticvoidbbsink_lz4_end_archive(bbsink*sink); |
| 43 | +staticvoidbbsink_lz4_cleanup(bbsink*sink); |
| 44 | + |
| 45 | +constbbsink_opsbbsink_lz4_ops= { |
| 46 | +.begin_backup=bbsink_lz4_begin_backup, |
| 47 | +.begin_archive=bbsink_lz4_begin_archive, |
| 48 | +.archive_contents=bbsink_lz4_archive_contents, |
| 49 | +.end_archive=bbsink_lz4_end_archive, |
| 50 | +.begin_manifest=bbsink_forward_begin_manifest, |
| 51 | +.manifest_contents=bbsink_lz4_manifest_contents, |
| 52 | +.end_manifest=bbsink_forward_end_manifest, |
| 53 | +.end_backup=bbsink_forward_end_backup, |
| 54 | +.cleanup=bbsink_lz4_cleanup |
| 55 | +}; |
| 56 | +#endif |
| 57 | + |
| 58 | +/* |
| 59 | + * Create a new basebackup sink that performs lz4 compression using the |
| 60 | + * designated compression level. |
| 61 | + */ |
| 62 | +bbsink* |
| 63 | +bbsink_lz4_new(bbsink*next,intcompresslevel) |
| 64 | +{ |
| 65 | +#ifndefHAVE_LIBLZ4 |
| 66 | +ereport(ERROR, |
| 67 | +(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 68 | +errmsg("lz4 compression is not supported by this build"))); |
| 69 | +#else |
| 70 | +bbsink_lz4*sink; |
| 71 | + |
| 72 | +Assert(next!=NULL); |
| 73 | + |
| 74 | +if (compresslevel<0||compresslevel>12) |
| 75 | +ereport(ERROR, |
| 76 | +(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 77 | +errmsg("lz4 compression level %d is out of range", |
| 78 | +compresslevel))); |
| 79 | + |
| 80 | +sink=palloc0(sizeof(bbsink_lz4)); |
| 81 | +*((constbbsink_ops**)&sink->base.bbs_ops)=&bbsink_lz4_ops; |
| 82 | +sink->base.bbs_next=next; |
| 83 | +sink->compresslevel=compresslevel; |
| 84 | + |
| 85 | +return&sink->base; |
| 86 | +#endif |
| 87 | +} |
| 88 | + |
| 89 | +#ifdefHAVE_LIBLZ4 |
| 90 | + |
| 91 | +/* |
| 92 | + * Begin backup. |
| 93 | + */ |
| 94 | +staticvoid |
| 95 | +bbsink_lz4_begin_backup(bbsink*sink) |
| 96 | +{ |
| 97 | +bbsink_lz4*mysink= (bbsink_lz4*)sink; |
| 98 | +size_toutput_buffer_bound; |
| 99 | +LZ4F_preferences_t*prefs=&mysink->prefs; |
| 100 | + |
| 101 | +/* Initialize compressor object. */ |
| 102 | +memset(prefs,0,sizeof(LZ4F_preferences_t)); |
| 103 | +prefs->frameInfo.blockSizeID=LZ4F_max256KB; |
| 104 | +prefs->compressionLevel=mysink->compresslevel; |
| 105 | + |
| 106 | +/* |
| 107 | + * We need our own buffer, because we're going to pass different data to |
| 108 | + * the next sink than what gets passed to us. |
| 109 | + */ |
| 110 | +mysink->base.bbs_buffer=palloc(mysink->base.bbs_buffer_length); |
| 111 | + |
| 112 | +/* |
| 113 | + * Since LZ4F_compressUpdate() requires the output buffer of size equal or |
| 114 | + * greater than that of LZ4F_compressBound(), make sure we have the next |
| 115 | + * sink's bbs_buffer of length that can accommodate the compressed input |
| 116 | + * buffer. |
| 117 | + */ |
| 118 | +output_buffer_bound=LZ4F_compressBound(mysink->base.bbs_buffer_length, |
| 119 | +&mysink->prefs); |
| 120 | + |
| 121 | +/* |
| 122 | + * The buffer length is expected to be a multiple of BLCKSZ, so round up. |
| 123 | + */ |
| 124 | +output_buffer_bound=output_buffer_bound+BLCKSZ- |
| 125 | +(output_buffer_bound %BLCKSZ); |
| 126 | + |
| 127 | +bbsink_begin_backup(sink->bbs_next,sink->bbs_state,output_buffer_bound); |
| 128 | +} |
| 129 | + |
| 130 | +/* |
| 131 | + * Prepare to compress the next archive. |
| 132 | + */ |
| 133 | +staticvoid |
| 134 | +bbsink_lz4_begin_archive(bbsink*sink,constchar*archive_name) |
| 135 | +{ |
| 136 | +bbsink_lz4*mysink= (bbsink_lz4*)sink; |
| 137 | +char*lz4_archive_name; |
| 138 | +LZ4F_errorCode_tctxError; |
| 139 | +size_theaderSize; |
| 140 | + |
| 141 | +ctxError=LZ4F_createCompressionContext(&mysink->ctx,LZ4F_VERSION); |
| 142 | +if (LZ4F_isError(ctxError)) |
| 143 | +elog(ERROR,"could not create lz4 compression context: %s", |
| 144 | +LZ4F_getErrorName(ctxError)); |
| 145 | + |
| 146 | +/* First of all write the frame header to destination buffer. */ |
| 147 | +headerSize=LZ4F_compressBegin(mysink->ctx, |
| 148 | +mysink->base.bbs_next->bbs_buffer, |
| 149 | +mysink->base.bbs_next->bbs_buffer_length, |
| 150 | +&mysink->prefs); |
| 151 | + |
| 152 | +if (LZ4F_isError(headerSize)) |
| 153 | +elog(ERROR,"could not write lz4 header: %s", |
| 154 | +LZ4F_getErrorName(headerSize)); |
| 155 | + |
| 156 | +/* |
| 157 | + * We need to write the compressed data after the header in the output |
| 158 | + * buffer. So, make sure to update the notion of bytes written to output |
| 159 | + * buffer. |
| 160 | + */ |
| 161 | +mysink->bytes_written+=headerSize; |
| 162 | + |
| 163 | +/* Add ".lz4" to the archive name. */ |
| 164 | +lz4_archive_name=psprintf("%s.lz4",archive_name); |
| 165 | +Assert(sink->bbs_next!=NULL); |
| 166 | +bbsink_begin_archive(sink->bbs_next,lz4_archive_name); |
| 167 | +pfree(lz4_archive_name); |
| 168 | +} |
| 169 | + |
| 170 | +/* |
| 171 | + * Compress the input data to the output buffer until we run out of input |
| 172 | + * data. Each time the output buffer falls below the compression bound for |
| 173 | + * the input buffer, invoke the archive_contents() method for then next sink. |
| 174 | + * |
| 175 | + * Note that since we're compressing the input, it may very commonly happen |
| 176 | + * that we consume all the input data without filling the output buffer. In |
| 177 | + * that case, the compressed representation of the current input data won't |
| 178 | + * actually be sent to the next bbsink until a later call to this function, |
| 179 | + * or perhaps even not until bbsink_lz4_end_archive() is invoked. |
| 180 | + */ |
| 181 | +staticvoid |
| 182 | +bbsink_lz4_archive_contents(bbsink*sink,size_tavail_in) |
| 183 | +{ |
| 184 | +bbsink_lz4*mysink= (bbsink_lz4*)sink; |
| 185 | +size_tcompressedSize; |
| 186 | +size_tavail_in_bound; |
| 187 | + |
| 188 | +avail_in_bound=LZ4F_compressBound(avail_in,&mysink->prefs); |
| 189 | + |
| 190 | +/* |
| 191 | + * If the number of available bytes has fallen below the value computed by |
| 192 | + * LZ4F_compressBound(), ask the next sink to process the data so that we |
| 193 | + * can empty the buffer. |
| 194 | + */ |
| 195 | +if ((mysink->base.bbs_next->bbs_buffer_length-mysink->bytes_written) <= |
| 196 | +avail_in_bound) |
| 197 | +{ |
| 198 | +bbsink_archive_contents(sink->bbs_next,mysink->bytes_written); |
| 199 | +mysink->bytes_written=0; |
| 200 | +} |
| 201 | + |
| 202 | +/* |
| 203 | + * Compress the input buffer and write it into the output buffer. |
| 204 | + */ |
| 205 | +compressedSize=LZ4F_compressUpdate(mysink->ctx, |
| 206 | +mysink->base.bbs_next->bbs_buffer+mysink->bytes_written, |
| 207 | +mysink->base.bbs_next->bbs_buffer_length-mysink->bytes_written, |
| 208 | + (uint8*)mysink->base.bbs_buffer, |
| 209 | +avail_in, |
| 210 | +NULL); |
| 211 | + |
| 212 | +if (LZ4F_isError(compressedSize)) |
| 213 | +elog(ERROR,"could not compress data: %s", |
| 214 | +LZ4F_getErrorName(compressedSize)); |
| 215 | + |
| 216 | +/* |
| 217 | + * Update our notion of how many bytes we've written into output buffer. |
| 218 | + */ |
| 219 | +mysink->bytes_written+=compressedSize; |
| 220 | +} |
| 221 | + |
| 222 | +/* |
| 223 | + * There might be some data inside lz4's internal buffers; we need to get |
| 224 | + * that flushed out and also finalize the lz4 frame and then get that forwarded |
| 225 | + * to the successor sink as archive content. |
| 226 | + * |
| 227 | + * Then we can end processing for this archive. |
| 228 | + */ |
| 229 | +staticvoid |
| 230 | +bbsink_lz4_end_archive(bbsink*sink) |
| 231 | +{ |
| 232 | +bbsink_lz4*mysink= (bbsink_lz4*)sink; |
| 233 | +size_tcompressedSize; |
| 234 | +size_tlz4_footer_bound; |
| 235 | + |
| 236 | +lz4_footer_bound=LZ4F_compressBound(0,&mysink->prefs); |
| 237 | + |
| 238 | +Assert(mysink->base.bbs_next->bbs_buffer_length >=lz4_footer_bound); |
| 239 | + |
| 240 | +if ((mysink->base.bbs_next->bbs_buffer_length-mysink->bytes_written) <= |
| 241 | +lz4_footer_bound) |
| 242 | +{ |
| 243 | +bbsink_archive_contents(sink->bbs_next,mysink->bytes_written); |
| 244 | +mysink->bytes_written=0; |
| 245 | +} |
| 246 | + |
| 247 | +compressedSize=LZ4F_compressEnd(mysink->ctx, |
| 248 | +mysink->base.bbs_next->bbs_buffer+mysink->bytes_written, |
| 249 | +mysink->base.bbs_next->bbs_buffer_length-mysink->bytes_written, |
| 250 | +NULL); |
| 251 | + |
| 252 | +if (LZ4F_isError(compressedSize)) |
| 253 | +elog(ERROR,"could not end lz4 compression: %s", |
| 254 | +LZ4F_getErrorName(compressedSize)); |
| 255 | + |
| 256 | +/* Update our notion of how many bytes we've written. */ |
| 257 | +mysink->bytes_written+=compressedSize; |
| 258 | + |
| 259 | +/* Send whatever accumulated output bytes we have. */ |
| 260 | +bbsink_archive_contents(sink->bbs_next,mysink->bytes_written); |
| 261 | +mysink->bytes_written=0; |
| 262 | + |
| 263 | +/* Release the resources. */ |
| 264 | +LZ4F_freeCompressionContext(mysink->ctx); |
| 265 | +mysink->ctx=NULL; |
| 266 | + |
| 267 | +/* Pass on the information that this archive has ended. */ |
| 268 | +bbsink_forward_end_archive(sink); |
| 269 | +} |
| 270 | + |
| 271 | +/* |
| 272 | + * Manifest contents are not compressed, but we do need to copy them into |
| 273 | + * the successor sink's buffer, because we have our own. |
| 274 | + */ |
| 275 | +staticvoid |
| 276 | +bbsink_lz4_manifest_contents(bbsink*sink,size_tlen) |
| 277 | +{ |
| 278 | +memcpy(sink->bbs_next->bbs_buffer,sink->bbs_buffer,len); |
| 279 | +bbsink_manifest_contents(sink->bbs_next,len); |
| 280 | +} |
| 281 | + |
| 282 | +/* |
| 283 | + * In case the backup fails, make sure we free the compression context by |
| 284 | + * calling LZ4F_freeCompressionContext() if needed to avoid memory leak. |
| 285 | + */ |
| 286 | +staticvoid |
| 287 | +bbsink_lz4_cleanup(bbsink*sink) |
| 288 | +{ |
| 289 | +bbsink_lz4*mysink= (bbsink_lz4*)sink; |
| 290 | + |
| 291 | +if (mysink->ctx) |
| 292 | +{ |
| 293 | +LZ4F_freeCompressionContext(mysink->ctx); |
| 294 | +mysink->ctx=NULL; |
| 295 | +} |
| 296 | +} |
| 297 | + |
| 298 | +#endif |