From fb7c1e49afaea669f2baa7f05ed2eaf5ae003d81 Mon Sep 17 00:00:00 2001 From: Jeevan Ladhe Date: Wed, 16 Feb 2022 22:51:47 +0530 Subject: [PATCH 4/4] ZSTD: add client-side decompression support. ZSTD decompression of a backup compressed on the server can be performed on the client using pg_basebackup -Fp --compress server-lz4. Example: pg_basebackup -D /tmp/zstd_C_D -Fp -Xfetch --compress=server-zstd:7 --- src/bin/pg_basebackup/bbstreamer.h | 1 + src/bin/pg_basebackup/bbstreamer_zstd.c | 133 +++++++++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 2 + src/bin/pg_verifybackup/t/009_extract.pl | 5 + 4 files changed, 141 insertions(+) mode change 100644 => 100755 src/bin/pg_verifybackup/t/009_extract.pl diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h index bfc624a863..02d4c05df6 100644 --- a/src/bin/pg_basebackup/bbstreamer.h +++ b/src/bin/pg_basebackup/bbstreamer.h @@ -211,6 +211,7 @@ extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next, extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel); +extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index 0b20267cf4..83b59d63ba 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -27,6 +27,7 @@ typedef struct bbstreamer_zstd_frame bbstreamer base; ZSTD_CCtx *cctx; + ZSTD_DCtx *dctx; ZSTD_outBuffer zstd_outBuf; } bbstreamer_zstd_frame; @@ -42,6 +43,19 @@ const bbstreamer_ops bbstreamer_zstd_compressor_ops = { .finalize = bbstreamer_zstd_compressor_finalize, .free = bbstreamer_zstd_compressor_free }; + +static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer); +static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer); + +const bbstreamer_ops bbstreamer_zstd_decompressor_ops = { + .content = bbstreamer_zstd_decompressor_content, + .finalize = bbstreamer_zstd_decompressor_finalize, + .free = bbstreamer_zstd_decompressor_free +}; #endif /* @@ -200,3 +214,122 @@ bbstreamer_zstd_compressor_free(bbstreamer *streamer) pfree(streamer); } #endif + +/* + * Create a new base backup streamer that performs decompression of zstd + * compressed blocks. + */ +bbstreamer * +bbstreamer_zstd_decompressor_new(bbstreamer *next) +{ +#ifdef HAVE_LIBZSTD + bbstreamer_zstd_frame *streamer; + + Assert(next != NULL); + + streamer = palloc0(sizeof(bbstreamer_zstd_frame)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_zstd_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->dctx = ZSTD_createDCtx(); + if (!streamer->dctx) + { + pg_log_error("could not create zstd decompression context"); + exit(1); + } + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_log_error("this build does not support compression"); + exit(1); +#endif +} + +#ifdef HAVE_LIBZSTD +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +bbstreamer_zstd_decompressor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t ret; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) + { + bbstreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + ret = ZSTD_decompressStream(mystreamer->dctx, + &mystreamer->zstd_outBuf, &inBuf); + + if (ZSTD_isError(ret)) + pg_log_error("could not decompress data: %s", ZSTD_getErrorName(ret)); + } +} + +/* + * End-of-stream processing. + */ +static void +bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + if (mystreamer->zstd_outBuf.pos > 0) + bbstreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + BBSTREAMER_UNKNOWN); + + bbstreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +bbstreamer_zstd_decompressor_free(bbstreamer *streamer) +{ + bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + + bbstreamer_free(streamer->bbs_next); + ZSTD_freeDCtx(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 18bd0df9a5..cef66d3e9e 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1333,6 +1333,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, streamer = bbstreamer_gzip_decompressor_new(streamer); else if (compressmethod == COMPRESSION_LZ4) streamer = bbstreamer_lz4_decompressor_new(streamer); + else if (compressmethod == COMPRESSION_ZSTD) + streamer = bbstreamer_zstd_decompressor_new(streamer); } /* Return the results. */ diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl old mode 100644 new mode 100755 index c51cdf79f8..d30ba01742 --- a/src/bin/pg_verifybackup/t/009_extract.pl +++ b/src/bin/pg_verifybackup/t/009_extract.pl @@ -31,6 +31,11 @@ my @test_configuration = ( 'compression_method' => 'lz4', 'backup_flags' => ['--compress', 'server-lz4:5'], 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'server-zstd:5'], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); -- 2.25.1