From 5b06f5b1039b51f0847e7c310c04a61308b3c7b9 Mon Sep 17 00:00:00 2001 From: Jeevan Ladhe Date: Tue, 18 Jan 2022 19:48:33 +0530 Subject: [PATCH 2/2] Add a ZSTD compression method for server side compression. This patch introduces --server-compression=zstd option. Add config option --with-zstd. Add documentation for ZSTD option Add pg_basebackup help for ZSTD option Example: pg_basebackup -t server:/tmp/data_zstd -Xnone --server-compression=zstd --- configure | 240 ++++++++++++++++++- configure.ac | 32 +++ doc/src/sgml/ref/pg_basebackup.sgml | 11 + src/backend/replication/Makefile | 1 + src/backend/replication/basebackup.c | 7 +- src/backend/replication/basebackup_zstd.c | 267 ++++++++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 2 +- src/include/pg_config.h.in | 6 + src/include/replication/basebackup_sink.h | 1 + 9 files changed, 559 insertions(+), 8 deletions(-) create mode 100644 src/backend/replication/basebackup_zstd.c diff --git a/configure b/configure index 9c856cb1d5..a532e85e66 100755 --- a/configure +++ b/configure @@ -699,6 +699,9 @@ with_gnu_ld LD LDFLAGS_SL LDFLAGS_EX +ZSTD_LIBS +ZSTD_CFLAGS +with_zstd LZ4_LIBS LZ4_CFLAGS with_lz4 @@ -800,6 +803,7 @@ infodir docdir oldincludedir includedir +runstatedir localstatedir sharedstatedir sysconfdir @@ -868,6 +872,7 @@ with_libxslt with_system_tzdata with_zlib with_lz4 +with_zstd with_gnu_ld with_ssl with_openssl @@ -897,6 +902,8 @@ XML2_CFLAGS XML2_LIBS LZ4_CFLAGS LZ4_LIBS +ZSTD_CFLAGS +ZSTD_LIBS LDFLAGS_EX LDFLAGS_SL PERL @@ -941,6 +948,7 @@ datadir='${datarootdir}' sysconfdir='${prefix}/etc' sharedstatedir='${prefix}/com' localstatedir='${prefix}/var' +runstatedir='${localstatedir}/run' includedir='${prefix}/include' oldincludedir='/usr/include' docdir='${datarootdir}/doc/${PACKAGE_TARNAME}' @@ -1193,6 +1201,15 @@ do | -silent | --silent | --silen | --sile | --sil) silent=yes ;; + -runstatedir | --runstatedir | --runstatedi | --runstated \ + | --runstate | --runstat | --runsta | --runst | --runs \ + | --run | --ru | --r) + ac_prev=runstatedir ;; + -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \ + | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \ + | --run=* | --ru=* | --r=*) + runstatedir=$ac_optarg ;; + -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb) ac_prev=sbindir ;; -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \ @@ -1330,7 +1347,7 @@ fi for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \ datadir sysconfdir sharedstatedir localstatedir includedir \ oldincludedir docdir infodir htmldir dvidir pdfdir psdir \ - libdir localedir mandir + libdir localedir mandir runstatedir do eval ac_val=\$$ac_var # Remove trailing slashes. @@ -1483,6 +1500,7 @@ Fine tuning of the installation directories: --sysconfdir=DIR read-only single-machine data [PREFIX/etc] --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com] --localstatedir=DIR modifiable single-machine data [PREFIX/var] + --runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run] --libdir=DIR object code libraries [EPREFIX/lib] --includedir=DIR C header files [PREFIX/include] --oldincludedir=DIR C header files for non-gcc [/usr/include] @@ -1576,6 +1594,7 @@ Optional Packages: use system time zone data in DIR --without-zlib do not use Zlib --with-lz4 build with LZ4 support + --with-zstd build with ZSTD support --with-gnu-ld assume the C compiler uses GNU ld [default=no] --with-ssl=LIB use LIB for SSL/TLS support (openssl) --with-openssl obsolete spelling of --with-ssl=openssl @@ -1605,6 +1624,8 @@ Some influential environment variables: XML2_LIBS linker flags for XML2, overriding pkg-config LZ4_CFLAGS C compiler flags for LZ4, overriding pkg-config LZ4_LIBS linker flags for LZ4, overriding pkg-config + ZSTD_CFLAGS C compiler flags for ZSTD, overriding pkg-config + ZSTD_LIBS linker flags for ZSTD, overriding pkg-config LDFLAGS_EX extra linker flags for linking executables only LDFLAGS_SL extra linker flags for linking shared libraries only PERL Perl program @@ -9033,6 +9054,146 @@ fi done fi +# +# ZSTD +# +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with ZSTD support" >&5 +$as_echo_n "checking whether to build with ZSTD support... " >&6; } + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + +$as_echo "#define USE_ZSTD 1" >>confdefs.h + + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_zstd" >&5 +$as_echo "$with_zstd" >&6; } + + +if test "$with_zstd" = yes; then + +pkg_failed=no +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libzstd" >&5 +$as_echo_n "checking for libzstd... " >&6; } + +if test -n "$ZSTD_CFLAGS"; then + pkg_cv_ZSTD_CFLAGS="$ZSTD_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libzstd\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libzstd") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_ZSTD_CFLAGS=`$PKG_CONFIG --cflags "libzstd" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$ZSTD_LIBS"; then + pkg_cv_ZSTD_LIBS="$ZSTD_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libzstd\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libzstd") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_ZSTD_LIBS=`$PKG_CONFIG --libs "libzstd" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + ZSTD_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libzstd" 2>&1` + else + ZSTD_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libzstd" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$ZSTD_PKG_ERRORS" >&5 + + as_fn_error $? "Package requirements (libzstd) were not met: + +$ZSTD_PKG_ERRORS + +Consider adjusting the PKG_CONFIG_PATH environment variable if you +installed software in a non-standard prefix. + +Alternatively, you may set the environment variables ZSTD_CFLAGS +and ZSTD_LIBS to avoid the need to call pkg-config. +See the pkg-config man page for more details." "$LINENO" 5 +elif test $pkg_failed = untried; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + { { $as_echo "$as_me:${as_lineno-$LINENO}: error: in \`$ac_pwd':" >&5 +$as_echo "$as_me: error: in \`$ac_pwd':" >&2;} +as_fn_error $? "The pkg-config script could not be found or is too old. Make sure it +is in your PATH or set the PKG_CONFIG environment variable to the full +path to pkg-config. + +Alternatively, you may set the environment variables ZSTD_CFLAGS +and ZSTD_LIBS to avoid the need to call pkg-config. +See the pkg-config man page for more details. + +To get pkg-config, see . +See \`config.log' for more details" "$LINENO" 5; } +else + ZSTD_CFLAGS=$pkg_cv_ZSTD_CFLAGS + ZSTD_LIBS=$pkg_cv_ZSTD_LIBS + { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +$as_echo "yes" >&6; } + +fi + # We only care about -I, -D, and -L switches; + # note that -lzstd will be added by AC_CHECK_LIB below. + for pgac_option in $ZSTD_CFLAGS; do + case $pgac_option in + -I*|-D*) CPPFLAGS="$CPPFLAGS $pgac_option";; + esac + done + for pgac_option in $ZSTD_LIBS; do + case $pgac_option in + -L*) LDFLAGS="$LDFLAGS $pgac_option";; + esac + done +fi # # Assignments # @@ -13136,6 +13297,56 @@ fi fi +if test "$with_zstd" = yes ; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compress (); +int +main () +{ +return ZSTD_compress (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compress=yes +else + ac_cv_lib_zstd_ZSTD_compress=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5 +fi + +fi + # Note: We can test for libldap_r only after we know PTHREAD_LIBS; # also, on AIX, we may need to have openssl in LIBS for this step. if test "$with_ldap" = yes ; then @@ -13856,6 +14067,23 @@ done fi +if test "$with_zstd" = yes; then + for ac_header in zstd.h +do : + ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default" +if test "x$ac_cv_header_zstd_h" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_ZSTD_H 1 +_ACEOF + +else + as_fn_error $? "zstd.h header file is required for ZSTD" "$LINENO" 5 +fi + +done + +fi + if test "$with_gssapi" = yes ; then for ac_header in gssapi/gssapi.h do : @@ -15259,7 +15487,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15305,7 +15533,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15329,7 +15557,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15374,7 +15602,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15398,7 +15626,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; diff --git a/configure.ac b/configure.ac index 95287705f6..85e15ff9f8 100644 --- a/configure.ac +++ b/configure.ac @@ -1056,6 +1056,30 @@ if test "$with_lz4" = yes; then done fi +# +# ZSTD +# +AC_MSG_CHECKING([whether to build with ZSTD support]) +PGAC_ARG_BOOL(with, zstd, no, [build with ZSTD support], + [AC_DEFINE([USE_ZSTD], 1, [Define to 1 to build with ZSTD support. (--with-zstd)])]) +AC_MSG_RESULT([$with_zstd]) +AC_SUBST(with_zstd) + +if test "$with_zstd" = yes; then + PKG_CHECK_MODULES(ZSTD, libzstd) + # We only care about -I, -D, and -L switches; + # note that -lzstd will be added by AC_CHECK_LIB below. + for pgac_option in $ZSTD_CFLAGS; do + case $pgac_option in + -I*|-D*) CPPFLAGS="$CPPFLAGS $pgac_option";; + esac + done + for pgac_option in $ZSTD_LIBS; do + case $pgac_option in + -L*) LDFLAGS="$LDFLAGS $pgac_option";; + esac + done +fi # # Assignments # @@ -1325,6 +1349,10 @@ if test "$with_lz4" = yes ; then AC_CHECK_LIB(lz4, LZ4_compress_default, [], [AC_MSG_ERROR([library 'lz4' is required for LZ4 support])]) fi +if test "$with_zstd" = yes ; then + AC_CHECK_LIB(zstd, ZSTD_compress, [], [AC_MSG_ERROR([library 'zstd' is required for ZSTD support])]) +fi + # Note: We can test for libldap_r only after we know PTHREAD_LIBS; # also, on AIX, we may need to have openssl in LIBS for this step. if test "$with_ldap" = yes ; then @@ -1488,6 +1516,10 @@ if test "$with_lz4" = yes; then AC_CHECK_HEADERS(lz4.h, [], [AC_MSG_ERROR([lz4.h header file is required for LZ4])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_HEADERS(zstd.h, [], [AC_MSG_ERROR([zstd.h header file is required for ZSTD])]) +fi + if test "$with_gssapi" = yes ; then AC_CHECK_HEADERS(gssapi/gssapi.h, [], [AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])]) diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 44395a749b..5cadadf16c 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -276,6 +276,17 @@ PostgreSQL documentation + + + zstd + + + Compression is performed using zstd and the + suffix .zst will automatically be added to + compressed files. + + + diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 74043ff331..2e6de7007f 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -20,6 +20,7 @@ OBJS = \ basebackup_copy.o \ basebackup_gzip.o \ basebackup_lz4.o \ + basebackup_zstd.o \ basebackup_progress.o \ basebackup_server.o \ basebackup_sink.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 9dea1c9bcc..12992b0a4d 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -65,7 +65,8 @@ typedef enum { BACKUP_COMPRESSION_NONE, BACKUP_COMPRESSION_GZIP, - BACKUP_COMPRESSION_LZ4 + BACKUP_COMPRESSION_LZ4, + BACKUP_COMPRESSION_ZSTD } basebackup_compression_type; typedef struct @@ -912,6 +913,8 @@ parse_basebackup_options(List *options, basebackup_options *opt) } else if (strcmp(optval, "lz4") == 0) opt->compression = BACKUP_COMPRESSION_LZ4; + else if (strcmp(optval, "zstd") == 0) + opt->compression = BACKUP_COMPRESSION_ZSTD; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1018,6 +1021,8 @@ SendBaseBackup(BaseBackupCmd *cmd) sink = bbsink_gzip_new(sink, opt.compression_level); else if (opt.compression == BACKUP_COMPRESSION_LZ4) sink = bbsink_lz4_new(sink); + else if (opt.compression == BACKUP_COMPRESSION_ZSTD) + sink = bbsink_zstd_new(sink); /* Set up progress reporting. */ sink = bbsink_progress_new(sink, opt.progress); diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c new file mode 100644 index 0000000000..a4bba94e7e --- /dev/null +++ b/src/backend/replication/basebackup_zstd.c @@ -0,0 +1,267 @@ +/*------------------------------------------------------------------------- + * + * basebackup_zstd.c + * Basebackup sink implementing zstd compression. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_zstd.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#ifdef HAVE_LIBZSTD +#include +#endif + +#include "replication/basebackup_sink.h" + +#ifdef HAVE_LIBZSTD + +typedef struct bbsink_zstd +{ + /* Common information for all types of sink. */ + bbsink base; + + ZSTD_CCtx *cctx; + ZSTD_outBuffer zstd_outBuf; +} bbsink_zstd; + +static void bbsink_zstd_begin_backup(bbsink *sink); +static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name); +static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in); +static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len); +static void bbsink_zstd_end_archive(bbsink *sink); +static void bbsink_zstd_cleanup(bbsink *sink); +static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); + +const bbsink_ops bbsink_zstd_ops = { + .begin_backup = bbsink_zstd_begin_backup, + .begin_archive = bbsink_zstd_begin_archive, + .archive_contents = bbsink_zstd_archive_contents, + .end_archive = bbsink_zstd_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_zstd_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_zstd_end_backup, + .cleanup = bbsink_zstd_cleanup +}; +#endif + +/* Create a new basebackup sink that performs zstd compression. */ +bbsink * +bbsink_zstd_new(bbsink *next) +{ +#ifndef HAVE_LIBZSTD + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("zstd compression is not supported by this build"))); +#else + bbsink_zstd *sink; + + Assert(next != NULL); + + sink = palloc0(sizeof(bbsink_zstd)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; + sink->base.bbs_next = next; + + return &sink->base; +#endif +} + +#ifdef HAVE_LIBZSTD + +/* + * Begin backup. + */ +static void +bbsink_zstd_begin_backup(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + size_t output_buffer_bound; + + mysink->cctx = ZSTD_createCCtx(); + if (!mysink->cctx) + elog(ERROR, "could not create zstd compression context"); + + /* + * We need our own buffer, because we're going to pass different data to + * the next sink than what gets passed to us. + */ + mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length); + + /* + * Make sure that the next sink's bbs_buffer is big enough to accommodate + * the compressed input buffer. + */ + output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length); + + /* + * The buffer length is expected to be a multiple of BLCKSZ, so round up. + */ + output_buffer_bound = output_buffer_bound + BLCKSZ - + (output_buffer_bound % BLCKSZ); + + bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound); +} + +/* + * Prepare to compress the next archive. + */ +static void +bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + char *zstd_archive_name; + + /* + * At the start of each archive we reset the state to start a new + * compression operation. The parameters are sticky and they would stick + * around as we are resetting with option ZSTD_reset_session_only. + */ + ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only); + + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + + /* Add ".zst" to the archive name. */ + zstd_archive_name = psprintf("%s.zst", archive_name); + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, zstd_archive_name); + pfree(zstd_archive_name); +} + +/* + * Compress the input data to the output buffer until we run out of input + * data. Each time the output buffer falls below the compression bound for + * the input buffer, invoke the archive_contents() method for then next sink. + * + * Note that since we're compressing the input, it may very commonly happen + * that we consume all the input data without filling the output buffer. In + * that case, the compressed representation of the current input data won't + * actually be sent to the next bbsink until a later call to this function, + * or perhaps even not until bbsink_zstd_end_archive() is invoked. + */ +static void +bbsink_zstd_archive_contents(bbsink *sink, size_t len) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + + ZSTD_inBuffer inBuf = { mysink->base.bbs_buffer, len, 0 }; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t required_outBuf_bound = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the out buffer is not left with enough space, send the output + * buffer to the next sink, and reset it. + */ + if ((mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos) <= + required_outBuf_bound) + { + bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + elog(ERROR, "could not compress data: %s", ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * There might be some data inside zstd's internal buffers; we need to get that + * flushed out, also end the zstd frame and then get that forwarded to the + * successor sink as archive content. + * + * Then we can end processing for this archive. + */ +static void +bbsink_zstd_end_archive(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {}; + + bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + + yet_to_flush = ZSTD_compressStream2(mysink->cctx, + &mysink->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + elog(ERROR, "could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + } while(yet_to_flush > 0); + + /* Make sure to pass the any remaining bytes to the next sink. */ + bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + + /* Pass on the information that this archive has ended. */ + bbsink_forward_end_archive(sink); +} + +/* + * Free the resources and context. + */ +static void +bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + + /* Release the context. */ + if (mysink->cctx) + { + ZSTD_freeCCtx(mysink->cctx); + mysink->cctx = NULL; + } + + bbsink_forward_end_backup(sink, endptr, endtli); +} + +/* + * Manifest contents are not compressed, but we do need to copy them into + * the successor sink's buffer, because we have our own. + */ +static void +bbsink_zstd_manifest_contents(bbsink *sink, size_t len) +{ + memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len); + bbsink_manifest_contents(sink->bbs_next, len); +} + +/* + * In case the backup fails, make sure we free the compression context by + * calling ZSTD_freeCCtx if needed to avoid memory leak. + */ +static void +bbsink_zstd_cleanup(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + + /* Release the context if not already released. */ + if (mysink->cctx) + { + ZSTD_freeCCtx(mysink->cctx); + mysink->cctx = NULL; + } +} + +#endif diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index d8da1cb2e9..b0c4a0f5b2 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -374,7 +374,7 @@ usage(void) " (in kB/s, or use suffix \"k\" or \"M\")\n")); printf(_(" -R, --write-recovery-conf\n" " write configuration for replication\n")); - printf(_(" --server-compression=none|gzip|gzip[1-9]|lz4\n" + printf(_(" --server-compression=none|gzip|gzip[1-9]|lz4|zstd\n" " compress backup on server\n")); printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n" " relocate tablespace in OLDDIR to NEWDIR\n")); diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 9d9bd6b9ef..61b2220eeb 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -325,6 +325,9 @@ /* Define to 1 if you have the `lz4' library (-llz4). */ #undef HAVE_LIBLZ4 +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if you have the `m' library (-lm). */ #undef HAVE_LIBM @@ -367,6 +370,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_LZ4_H +/* Define to 1 if you have the header file. */ +#undef HAVE_ZSTD_H + /* Define to 1 if you have the header file. */ #undef HAVE_MBARRIER_H diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index 964752ef5d..8c18917a76 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -286,6 +286,7 @@ extern bbsink *bbsink_copystream_new(bool send_to_client); extern bbsink *bbsink_copytblspc_new(void); extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); extern bbsink *bbsink_lz4_new(bbsink *next); +extern bbsink *bbsink_zstd_new(bbsink *next); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_server_new(bbsink *next, char *pathname); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); -- 2.25.1