From 76a910744597ab95cabbbfc68872832f18289aa1 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 7 Mar 2022 16:20:32 -0500 Subject: [PATCH v14 2/2] My changes. --- doc/src/sgml/ref/pg_basebackup.sgml | 7 ++--- src/backend/replication/basebackup_zstd.c | 33 +++++++++++++---------- src/bin/pg_basebackup/bbstreamer_zstd.c | 23 +++++++++------- 3 files changed, 36 insertions(+), 27 deletions(-) diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 4cf28a2a61..4a630b59b7 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -431,7 +431,7 @@ PostgreSQL documentation When the tar format is used with gzip, lz4, or zstd, the suffix .gz, .lz4, or - .zst respectively will be automatically added to + .zst, respectively, will be automatically added to all tar filenames. When the plain format is used, client-side compression may not be specified, but it is still possible to request server-side compression. If this is done, the server will compress the @@ -441,8 +441,9 @@ PostgreSQL documentation When this option is used in combination with -Xstream, pg_wal.tar will be compressed using gzip if client-side gzip - compression is selected, but will not be compressed if server-side - compression, LZ4, or ZSTD compression is selected. + compression is selected, but will not be compressed if any other + compression algorithm is selected, or if server-side compression + is selected. diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index 24993a5bb6..e3f9b1d4dc 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -172,18 +172,19 @@ bbsink_zstd_archive_contents(bbsink *sink, size_t len) while (inBuf.pos < inBuf.size) { size_t yet_to_flush; - size_t required_outBuf_bound = ZSTD_compressBound(inBuf.size - inBuf.pos); + size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); /* * If the out buffer is not left with enough space, send the output * buffer to the next sink, and reset it. */ - if ((mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos) <= - required_outBuf_bound) + if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed) { - bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + bbsink_archive_contents(mysink->base.bbs_next, + mysink->zstd_outBuf.pos); mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; - mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.size = + mysink->base.bbs_next->bbs_buffer_length; mysink->zstd_outBuf.pos = 0; } @@ -191,7 +192,9 @@ bbsink_zstd_archive_contents(bbsink *sink, size_t len) &inBuf, ZSTD_e_continue); if (ZSTD_isError(yet_to_flush)) - elog(ERROR, "could not compress data: %s", ZSTD_getErrorName(yet_to_flush)); + elog(ERROR, + "could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); } } @@ -211,18 +214,19 @@ bbsink_zstd_end_archive(bbsink *sink) do { ZSTD_inBuffer in = {NULL, 0, 0}; - size_t required_outBuf_bound = ZSTD_compressBound(0); + size_t max_needed = ZSTD_compressBound(0); /* * If the out buffer is not left with enough space, send the output * buffer to the next sink, and reset it. */ - if ((mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos) <= - required_outBuf_bound) + if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed) { - bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + bbsink_archive_contents(mysink->base.bbs_next, + mysink->zstd_outBuf.pos); mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; - mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.size = + mysink->base.bbs_next->bbs_buffer_length; mysink->zstd_outBuf.pos = 0; } @@ -238,7 +242,8 @@ bbsink_zstd_end_archive(bbsink *sink) /* Make sure to pass any remaining bytes to the next sink. */ if (mysink->zstd_outBuf.pos > 0) - bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + bbsink_archive_contents(mysink->base.bbs_next, + mysink->zstd_outBuf.pos); /* Pass on the information that this archive has ended. */ bbsink_forward_end_archive(sink); @@ -275,8 +280,8 @@ bbsink_zstd_manifest_contents(bbsink *sink, size_t len) } /* - * In case the backup fails, make sure we free the compression context by - * calling ZSTD_freeCCtx if needed to avoid memory leak. + * In case the backup fails, make sure we free any compression context that + * got allocated, so that we don't leak memory. */ static void bbsink_zstd_cleanup(bbsink *sink) diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index 83b59d63ba..cc68367dd5 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -121,14 +121,14 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer, while (inBuf.pos < inBuf.size) { size_t yet_to_flush; - size_t required_outBuf_bound = ZSTD_compressBound(inBuf.size - inBuf.pos); + size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); /* * If the output buffer is not left with enough space, send the * compressed bytes to the next streamer, and empty the buffer. */ - if ((mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos) <= - required_outBuf_bound) + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) { bbstreamer_content(mystreamer->base.bbs_next, member, mystreamer->zstd_outBuf.dst, @@ -141,11 +141,13 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer, mystreamer->zstd_outBuf.pos = 0; } - yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, - &inBuf, ZSTD_e_continue); + yet_to_flush = + ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, + &inBuf, ZSTD_e_continue); if (ZSTD_isError(yet_to_flush)) - pg_log_error("could not compress data: %s", ZSTD_getErrorName(yet_to_flush)); + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); } } @@ -161,14 +163,14 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) do { ZSTD_inBuffer in = {NULL, 0, 0}; - size_t required_outBuf_bound = ZSTD_compressBound(0); + size_t max_needed = ZSTD_compressBound(0); /* * If the output buffer is not left with enough space, send the * compressed bytes to the next streamer, and empty the buffer. */ - if ((mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos) <= - required_outBuf_bound) + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) { bbstreamer_content(mystreamer->base.bbs_next, NULL, mystreamer->zstd_outBuf.dst, @@ -186,7 +188,8 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) &in, ZSTD_e_end); if (ZSTD_isError(yet_to_flush)) - pg_log_error("could not compress data: %s", ZSTD_getErrorName(yet_to_flush)); + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); } while (yet_to_flush > 0); -- 2.24.3 (Apple Git-128)