From a494ec33c2b72176afd3f7decfe571c969133012 Mon Sep 17 00:00:00 2001 From: Jeevan Ladhe Date: Tue, 15 Feb 2022 18:45:52 +0530 Subject: [PATCH] Add a ZSTD compression method for server side compression. This patch introduces --compress=server-zstd[:LEVEL] Add tap test. Add config option --with-zstd. Add documentation for ZSTD option. Add pg_basebackup help for ZSTD option. Example: pg_basebackup -t server:/tmp/data_test -Xnone --compress=server-zstd:4 --- configure | 295 +++++++++++++++++++++- configure.ac | 33 +++ doc/src/sgml/protocol.sgml | 5 +- doc/src/sgml/ref/pg_basebackup.sgml | 38 +-- src/Makefile.global.in | 1 + src/backend/replication/Makefile | 1 + src/backend/replication/basebackup.c | 7 +- src/backend/replication/basebackup_zstd.c | 294 +++++++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 18 +- src/bin/pg_basebackup/pg_receivewal.c | 4 + src/bin/pg_basebackup/walmethods.h | 1 + src/bin/pg_verifybackup/Makefile | 1 + src/bin/pg_verifybackup/t/008_untar.pl | 9 + src/include/pg_config.h.in | 6 + src/include/replication/basebackup_sink.h | 1 + 15 files changed, 686 insertions(+), 28 deletions(-) create mode 100644 src/backend/replication/basebackup_zstd.c mode change 100644 => 100755 src/bin/pg_verifybackup/t/008_untar.pl diff --git a/configure b/configure index 9305555658..fc83c17c68 100755 --- a/configure +++ b/configure @@ -650,6 +650,7 @@ CFLAGS_ARMV8_CRC32C CFLAGS_SSE42 have_win32_dbghelp LIBOBJS +ZSTD LZ4 UUID_LIBS LDAP_LIBS_BE @@ -700,6 +701,9 @@ with_gnu_ld LD LDFLAGS_SL LDFLAGS_EX +ZSTD_LIBS +ZSTD_CFLAGS +with_zstd LZ4_LIBS LZ4_CFLAGS with_lz4 @@ -801,6 +805,7 @@ infodir docdir oldincludedir includedir +runstatedir localstatedir sharedstatedir sysconfdir @@ -869,6 +874,7 @@ with_libxslt with_system_tzdata with_zlib with_lz4 +with_zstd with_gnu_ld with_ssl with_openssl @@ -898,6 +904,8 @@ XML2_CFLAGS XML2_LIBS LZ4_CFLAGS LZ4_LIBS +ZSTD_CFLAGS +ZSTD_LIBS LDFLAGS_EX LDFLAGS_SL PERL @@ -942,6 +950,7 @@ datadir='${datarootdir}' sysconfdir='${prefix}/etc' sharedstatedir='${prefix}/com' localstatedir='${prefix}/var' +runstatedir='${localstatedir}/run' includedir='${prefix}/include' oldincludedir='/usr/include' docdir='${datarootdir}/doc/${PACKAGE_TARNAME}' @@ -1194,6 +1203,15 @@ do | -silent | --silent | --silen | --sile | --sil) silent=yes ;; + -runstatedir | --runstatedir | --runstatedi | --runstated \ + | --runstate | --runstat | --runsta | --runst | --runs \ + | --run | --ru | --r) + ac_prev=runstatedir ;; + -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \ + | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \ + | --run=* | --ru=* | --r=*) + runstatedir=$ac_optarg ;; + -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb) ac_prev=sbindir ;; -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \ @@ -1331,7 +1349,7 @@ fi for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \ datadir sysconfdir sharedstatedir localstatedir includedir \ oldincludedir docdir infodir htmldir dvidir pdfdir psdir \ - libdir localedir mandir + libdir localedir mandir runstatedir do eval ac_val=\$$ac_var # Remove trailing slashes. @@ -1484,6 +1502,7 @@ Fine tuning of the installation directories: --sysconfdir=DIR read-only single-machine data [PREFIX/etc] --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com] --localstatedir=DIR modifiable single-machine data [PREFIX/var] + --runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run] --libdir=DIR object code libraries [EPREFIX/lib] --includedir=DIR C header files [PREFIX/include] --oldincludedir=DIR C header files for non-gcc [/usr/include] @@ -1577,6 +1596,7 @@ Optional Packages: use system time zone data in DIR --without-zlib do not use Zlib --with-lz4 build with LZ4 support + --with-zstd build with ZSTD support --with-gnu-ld assume the C compiler uses GNU ld [default=no] --with-ssl=LIB use LIB for SSL/TLS support (openssl) --with-openssl obsolete spelling of --with-ssl=openssl @@ -1606,6 +1626,8 @@ Some influential environment variables: XML2_LIBS linker flags for XML2, overriding pkg-config LZ4_CFLAGS C compiler flags for LZ4, overriding pkg-config LZ4_LIBS linker flags for LZ4, overriding pkg-config + ZSTD_CFLAGS C compiler flags for ZSTD, overriding pkg-config + ZSTD_LIBS linker flags for ZSTD, overriding pkg-config LDFLAGS_EX extra linker flags for linking executables only LDFLAGS_SL extra linker flags for linking shared libraries only PERL Perl program @@ -9034,6 +9056,146 @@ fi done fi +# +# ZSTD +# +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with ZSTD support" >&5 +$as_echo_n "checking whether to build with ZSTD support... " >&6; } + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + +$as_echo "#define USE_ZSTD 1" >>confdefs.h + + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_zstd" >&5 +$as_echo "$with_zstd" >&6; } + + +if test "$with_zstd" = yes; then + +pkg_failed=no +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libzstd" >&5 +$as_echo_n "checking for libzstd... " >&6; } + +if test -n "$ZSTD_CFLAGS"; then + pkg_cv_ZSTD_CFLAGS="$ZSTD_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libzstd\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libzstd") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_ZSTD_CFLAGS=`$PKG_CONFIG --cflags "libzstd" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$ZSTD_LIBS"; then + pkg_cv_ZSTD_LIBS="$ZSTD_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libzstd\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libzstd") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_ZSTD_LIBS=`$PKG_CONFIG --libs "libzstd" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + ZSTD_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libzstd" 2>&1` + else + ZSTD_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libzstd" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$ZSTD_PKG_ERRORS" >&5 + + as_fn_error $? "Package requirements (libzstd) were not met: + +$ZSTD_PKG_ERRORS + +Consider adjusting the PKG_CONFIG_PATH environment variable if you +installed software in a non-standard prefix. + +Alternatively, you may set the environment variables ZSTD_CFLAGS +and ZSTD_LIBS to avoid the need to call pkg-config. +See the pkg-config man page for more details." "$LINENO" 5 +elif test $pkg_failed = untried; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + { { $as_echo "$as_me:${as_lineno-$LINENO}: error: in \`$ac_pwd':" >&5 +$as_echo "$as_me: error: in \`$ac_pwd':" >&2;} +as_fn_error $? "The pkg-config script could not be found or is too old. Make sure it +is in your PATH or set the PKG_CONFIG environment variable to the full +path to pkg-config. + +Alternatively, you may set the environment variables ZSTD_CFLAGS +and ZSTD_LIBS to avoid the need to call pkg-config. +See the pkg-config man page for more details. + +To get pkg-config, see . +See \`config.log' for more details" "$LINENO" 5; } +else + ZSTD_CFLAGS=$pkg_cv_ZSTD_CFLAGS + ZSTD_LIBS=$pkg_cv_ZSTD_LIBS + { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +$as_echo "yes" >&6; } + +fi + # We only care about -I, -D, and -L switches; + # note that -lzstd will be added by AC_CHECK_LIB below. + for pgac_option in $ZSTD_CFLAGS; do + case $pgac_option in + -I*|-D*) CPPFLAGS="$CPPFLAGS $pgac_option";; + esac + done + for pgac_option in $ZSTD_LIBS; do + case $pgac_option in + -L*) LDFLAGS="$LDFLAGS $pgac_option";; + esac + done +fi # # Assignments # @@ -13130,6 +13292,56 @@ fi fi +if test "$with_zstd" = yes ; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compress (); +int +main () +{ +return ZSTD_compress (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compress=yes +else + ac_cv_lib_zstd_ZSTD_compress=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5 +fi + +fi + # Note: We can test for libldap_r only after we know PTHREAD_LIBS; # also, on AIX, we may need to have openssl in LIBS for this step. if test "$with_ldap" = yes ; then @@ -13904,6 +14116,77 @@ done fi +if test -z "$ZSTD"; then + for ac_prog in zstd +do + # Extract the first word of "$ac_prog", so it can be a program name with args. +set dummy $ac_prog; ac_word=$2 +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for $ac_word" >&5 +$as_echo_n "checking for $ac_word... " >&6; } +if ${ac_cv_path_ZSTD+:} false; then : + $as_echo_n "(cached) " >&6 +else + case $ZSTD in + [\\/]* | ?:[\\/]*) + ac_cv_path_ZSTD="$ZSTD" # Let the user override the test with a path. + ;; + *) + as_save_IFS=$IFS; IFS=$PATH_SEPARATOR +for as_dir in $PATH +do + IFS=$as_save_IFS + test -z "$as_dir" && as_dir=. + for ac_exec_ext in '' $ac_executable_extensions; do + if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then + ac_cv_path_ZSTD="$as_dir/$ac_word$ac_exec_ext" + $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5 + break 2 + fi +done + done +IFS=$as_save_IFS + + ;; +esac +fi +ZSTD=$ac_cv_path_ZSTD +if test -n "$ZSTD"; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: $ZSTD" >&5 +$as_echo "$ZSTD" >&6; } +else + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } +fi + + + test -n "$ZSTD" && break +done + +else + # Report the value of ZSTD in configure's output in all cases. + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD" >&5 +$as_echo_n "checking for ZSTD... " >&6; } + { $as_echo "$as_me:${as_lineno-$LINENO}: result: $ZSTD" >&5 +$as_echo "$ZSTD" >&6; } +fi + +if test "$with_zstd" = yes; then + for ac_header in zstd.h +do : + ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default" +if test "x$ac_cv_header_zstd_h" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_ZSTD_H 1 +_ACEOF + +else + as_fn_error $? "zstd.h header file is required for ZSTD" "$LINENO" 5 +fi + +done + +fi + if test "$with_gssapi" = yes ; then for ac_header in gssapi/gssapi.h do : @@ -15307,7 +15590,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15353,7 +15636,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15377,7 +15660,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15422,7 +15705,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -15446,7 +15729,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; diff --git a/configure.ac b/configure.ac index 16167329fc..729b23fbea 100644 --- a/configure.ac +++ b/configure.ac @@ -1056,6 +1056,30 @@ if test "$with_lz4" = yes; then done fi +# +# ZSTD +# +AC_MSG_CHECKING([whether to build with ZSTD support]) +PGAC_ARG_BOOL(with, zstd, no, [build with ZSTD support], + [AC_DEFINE([USE_ZSTD], 1, [Define to 1 to build with ZSTD support. (--with-zstd)])]) +AC_MSG_RESULT([$with_zstd]) +AC_SUBST(with_zstd) + +if test "$with_zstd" = yes; then + PKG_CHECK_MODULES(ZSTD, libzstd) + # We only care about -I, -D, and -L switches; + # note that -lzstd will be added by AC_CHECK_LIB below. + for pgac_option in $ZSTD_CFLAGS; do + case $pgac_option in + -I*|-D*) CPPFLAGS="$CPPFLAGS $pgac_option";; + esac + done + for pgac_option in $ZSTD_LIBS; do + case $pgac_option in + -L*) LDFLAGS="$LDFLAGS $pgac_option";; + esac + done +fi # # Assignments # @@ -1325,6 +1349,10 @@ if test "$with_lz4" = yes ; then AC_CHECK_LIB(lz4, LZ4_compress_default, [], [AC_MSG_ERROR([library 'lz4' is required for LZ4 support])]) fi +if test "$with_zstd" = yes ; then + AC_CHECK_LIB(zstd, ZSTD_compress, [], [AC_MSG_ERROR([library 'zstd' is required for ZSTD support])]) +fi + # Note: We can test for libldap_r only after we know PTHREAD_LIBS; # also, on AIX, we may need to have openssl in LIBS for this step. if test "$with_ldap" = yes ; then @@ -1490,6 +1518,11 @@ if test "$with_lz4" = yes; then AC_CHECK_HEADERS(lz4.h, [], [AC_MSG_ERROR([lz4.h header file is required for LZ4])]) fi +PGAC_PATH_PROGS(ZSTD, zstd) +if test "$with_zstd" = yes; then + AC_CHECK_HEADERS(zstd.h, [], [AC_MSG_ERROR([zstd.h header file is required for ZSTD])]) +fi + if test "$with_gssapi" = yes ; then AC_CHECK_HEADERS(gssapi/gssapi.h, [], [AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])]) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 1c5ab00879..c13d25051c 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2725,7 +2725,7 @@ The commands accepted in replication mode are: Instructs the server to compress the backup using the specified method. Currently, the supported methods are gzip - and lz4. + lz4, and zstd. @@ -2737,7 +2737,8 @@ The commands accepted in replication mode are: Specifies the compression level to be used. This should only be used in conjunction with the COMPRESSION option. For gzip the value should be an integer between 1 - and 9, and for lz4 it should be between 1 and 12. + and 9, for lz4 between 1 and 12, and for + zstd it should be between 1 and 22. diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 53aa40dcd1..4cf28a2a61 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -417,30 +417,32 @@ PostgreSQL documentation specify -Xfetch. - The compression method can be set to gzip or - lz4, or none for no - compression. A compression level can be optionally specified, by - appending the level number after a colon (:). If no - level is specified, the default compression level will be used. If - only a level is specified without mentioning an algorithm, - gzip compression will be used if the level is - greater than 0, and no compression will be used if the level is 0. - - - When the tar format is used with gzip or - lz4, the suffix .gz or - .lz4 will automatically be added to all tar - filenames. When the plain format is used, client-side compression may - not be specified, but it is still possible to request server-side - compression. If this is done, the server will compress the backup for - transmission, and the client will decompress and extract it. + The compression method can be set to gzip, + lz4, zstd, or + none for no compression. A compression level can + optionally be specified, by appending the level number after a colon + (:). If no level is specified, the default + compression level will be used. If only a level is specified without + mentioning an algorithm, gzip compression will be + used if the level is greater than 0, and no compression will be used if + the level is 0. + + + When the tar format is used with gzip, + lz4, or zstd, the suffix + .gz, .lz4, or + .zst respectively will be automatically added to + all tar filenames. When the plain format is used, client-side + compression may not be specified, but it is still possible to request + server-side compression. If this is done, the server will compress the + backup for transmission, and the client will decompress and extract it. When this option is used in combination with -Xstream, pg_wal.tar will be compressed using gzip if client-side gzip compression is selected, but will not be compressed if server-side - compresion or LZ4 compresion is selected. + compression, LZ4, or ZSTD compression is selected. diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 9dcd54fcbd..c980444233 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -351,6 +351,7 @@ XGETTEXT = @XGETTEXT@ GZIP = gzip BZIP2 = bzip2 LZ4 = @LZ4@ +ZSTD = @ZSTD@ DOWNLOAD = wget -O $@ --no-use-server-timestamps #DOWNLOAD = curl -o $@ diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 74043ff331..2e6de7007f 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -20,6 +20,7 @@ OBJS = \ basebackup_copy.o \ basebackup_gzip.o \ basebackup_lz4.o \ + basebackup_zstd.o \ basebackup_progress.o \ basebackup_server.o \ basebackup_sink.o \ diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 0bf28b55d7..2378ce5c5e 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -64,7 +64,8 @@ typedef enum { BACKUP_COMPRESSION_NONE, BACKUP_COMPRESSION_GZIP, - BACKUP_COMPRESSION_LZ4 + BACKUP_COMPRESSION_LZ4, + BACKUP_COMPRESSION_ZSTD } basebackup_compression_type; typedef struct @@ -906,6 +907,8 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->compression = BACKUP_COMPRESSION_GZIP; else if (strcmp(optval, "lz4") == 0) opt->compression = BACKUP_COMPRESSION_LZ4; + else if (strcmp(optval, "zstd") == 0) + opt->compression = BACKUP_COMPRESSION_ZSTD; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1026,6 +1029,8 @@ SendBaseBackup(BaseBackupCmd *cmd) sink = bbsink_gzip_new(sink, opt.compression_level); else if (opt.compression == BACKUP_COMPRESSION_LZ4) sink = bbsink_lz4_new(sink, opt.compression_level); + else if (opt.compression == BACKUP_COMPRESSION_ZSTD) + sink = bbsink_zstd_new(sink, opt.compression_level); /* Set up progress reporting. */ sink = bbsink_progress_new(sink, opt.progress); diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c new file mode 100644 index 0000000000..d99b3698f6 --- /dev/null +++ b/src/backend/replication/basebackup_zstd.c @@ -0,0 +1,294 @@ +/*------------------------------------------------------------------------- + * + * basebackup_zstd.c + * Basebackup sink implementing zstd compression. + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_zstd.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#ifdef HAVE_LIBZSTD +#include +#endif + +#include "replication/basebackup_sink.h" + +#ifdef HAVE_LIBZSTD + +typedef struct bbsink_zstd +{ + /* Common information for all types of sink. */ + bbsink base; + + /* Compression level */ + int compresslevel; + + ZSTD_CCtx *cctx; + ZSTD_outBuffer zstd_outBuf; +} bbsink_zstd; + +static void bbsink_zstd_begin_backup(bbsink *sink); +static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name); +static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in); +static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len); +static void bbsink_zstd_end_archive(bbsink *sink); +static void bbsink_zstd_cleanup(bbsink *sink); +static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); + +const bbsink_ops bbsink_zstd_ops = { + .begin_backup = bbsink_zstd_begin_backup, + .begin_archive = bbsink_zstd_begin_archive, + .archive_contents = bbsink_zstd_archive_contents, + .end_archive = bbsink_zstd_end_archive, + .begin_manifest = bbsink_forward_begin_manifest, + .manifest_contents = bbsink_zstd_manifest_contents, + .end_manifest = bbsink_forward_end_manifest, + .end_backup = bbsink_zstd_end_backup, + .cleanup = bbsink_zstd_cleanup +}; +#endif + +/* + * Create a new basebackup sink that performs zstd compression using the + * designated compression level. + */ +bbsink * +bbsink_zstd_new(bbsink *next, int compresslevel) +{ +#ifndef HAVE_LIBZSTD + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("zstd compression is not supported by this build"))); +#else + bbsink_zstd *sink; + + Assert(next != NULL); + Assert(compresslevel >= 0 && compresslevel <= 22); + + if (compresslevel < 0 || compresslevel > 22) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("zstd compression level %d is out of range", + compresslevel))); + + sink = palloc0(sizeof(bbsink_zstd)); + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; + sink->base.bbs_next = next; + sink->compresslevel = compresslevel; + + return &sink->base; +#endif +} + +#ifdef HAVE_LIBZSTD + +/* + * Begin backup. + */ +static void +bbsink_zstd_begin_backup(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + size_t output_buffer_bound; + + mysink->cctx = ZSTD_createCCtx(); + if (!mysink->cctx) + elog(ERROR, "could not create zstd compression context"); + + ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, + mysink->compresslevel); + + /* + * We need our own buffer, because we're going to pass different data to + * the next sink than what gets passed to us. + */ + mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length); + + /* + * Make sure that the next sink's bbs_buffer is big enough to accommodate + * the compressed input buffer. + */ + output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length); + + /* + * The buffer length is expected to be a multiple of BLCKSZ, so round up. + */ + output_buffer_bound = output_buffer_bound + BLCKSZ - + (output_buffer_bound % BLCKSZ); + + bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound); +} + +/* + * Prepare to compress the next archive. + */ +static void +bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + char *zstd_archive_name; + + /* + * At the start of each archive we reset the state to start a new + * compression operation. The parameters are sticky and they would stick + * around as we are resetting with option ZSTD_reset_session_only. + */ + ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only); + + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + + /* Add ".zst" to the archive name. */ + zstd_archive_name = psprintf("%s.zst", archive_name); + Assert(sink->bbs_next != NULL); + bbsink_begin_archive(sink->bbs_next, zstd_archive_name); + pfree(zstd_archive_name); +} + +/* + * Compress the input data to the output buffer until we run out of input + * data. Each time the output buffer falls below the compression bound for + * the input buffer, invoke the archive_contents() method for then next sink. + * + * Note that since we're compressing the input, it may very commonly happen + * that we consume all the input data without filling the output buffer. In + * that case, the compressed representation of the current input data won't + * actually be sent to the next bbsink until a later call to this function, + * or perhaps even not until bbsink_zstd_end_archive() is invoked. + */ +static void +bbsink_zstd_archive_contents(bbsink *sink, size_t len) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t required_outBuf_bound = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the out buffer is not left with enough space, send the output + * buffer to the next sink, and reset it. + */ + if ((mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos) <= + required_outBuf_bound) + { + bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + elog(ERROR, "could not compress data: %s", ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * There might be some data inside zstd's internal buffers; we need to get that + * flushed out, also end the zstd frame and then get that forwarded to the + * successor sink as archive content. + * + * Then we can end processing for this archive. + */ +static void +bbsink_zstd_end_archive(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {NULL, 0, 0}; + size_t required_outBuf_bound = ZSTD_compressBound(0); + + /* + * If the out buffer is not left with enough space, send the output + * buffer to the next sink, and reset it. + */ + if ((mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos) <= + required_outBuf_bound) + { + bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer; + mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length; + mysink->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mysink->cctx, + &mysink->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + elog(ERROR, "could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + + } while (yet_to_flush > 0); + + /* Make sure to pass any remaining bytes to the next sink. */ + if (mysink->zstd_outBuf.pos > 0) + bbsink_archive_contents(mysink->base.bbs_next, mysink->zstd_outBuf.pos); + + /* Pass on the information that this archive has ended. */ + bbsink_forward_end_archive(sink); +} + +/* + * Free the resources and context. + */ +static void +bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + + /* Release the context. */ + if (mysink->cctx) + { + ZSTD_freeCCtx(mysink->cctx); + mysink->cctx = NULL; + } + + bbsink_forward_end_backup(sink, endptr, endtli); +} + +/* + * Manifest contents are not compressed, but we do need to copy them into + * the successor sink's buffer, because we have our own. + */ +static void +bbsink_zstd_manifest_contents(bbsink *sink, size_t len) +{ + memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len); + bbsink_manifest_contents(sink->bbs_next, len); +} + +/* + * In case the backup fails, make sure we free the compression context by + * calling ZSTD_freeCCtx if needed to avoid memory leak. + */ +static void +bbsink_zstd_cleanup(bbsink *sink) +{ + bbsink_zstd *mysink = (bbsink_zstd *) sink; + + /* Release the context if not already released. */ + if (mysink->cctx) + { + ZSTD_freeCCtx(mysink->cctx); + mysink->cctx = NULL; + } +} + +#endif diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 0003b59615..3adb3a3845 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -391,7 +391,7 @@ usage(void) printf(_(" -X, --wal-method=none|fetch|stream\n" " include required WAL files with specified method\n")); printf(_(" -z, --gzip compress tar output\n")); - printf(_(" -Z, --compress={[{client,server}-]gzip,lz4,none}[:LEVEL] or [LEVEL]\n" + printf(_(" -Z, --compress={[{client|server}-]{gzip|lz4|zstd}}[:LEVEL]|none}\n" " compress tar output with given compression method or level\n")); printf(_("\nGeneral options:\n")); printf(_(" -c, --checkpoint=fast|spread\n" @@ -1023,6 +1023,11 @@ parse_compress_options(char *src, WalCompressionMethod *methodres, *methodres = COMPRESSION_LZ4; *locationres = COMPRESS_LOCATION_SERVER; } + else if (pg_strcasecmp(firstpart, "server-zstd") == 0) + { + *methodres = COMPRESSION_ZSTD; + *locationres = COMPRESS_LOCATION_SERVER; + } else if (pg_strcasecmp(firstpart, "none") == 0) { *methodres = COMPRESSION_NONE; @@ -1970,6 +1975,9 @@ BaseBackup(void) case COMPRESSION_LZ4: compressmethodstr = "lz4"; break; + case COMPRESSION_ZSTD: + compressmethodstr = "zstd"; + break; default: Assert(false); break; @@ -2819,6 +2827,14 @@ main(int argc, char **argv) exit(1); } break; + case COMPRESSION_ZSTD: + if (compresslevel > 22) + { + pg_log_error("compression level %d of method %s higher than maximum of 22", + compresslevel, "zstd"); + exit(1); + } + break; } /* diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index ccb215c398..9b7656c692 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -904,6 +904,10 @@ main(int argc, char **argv) exit(1); #endif break; + case COMPRESSION_ZSTD: + pg_log_error("compression with %s is not yet supported", "ZSTD"); + exit(1); + } diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index 2dfb353baa..ec54019cfc 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -24,6 +24,7 @@ typedef enum { COMPRESSION_GZIP, COMPRESSION_LZ4, + COMPRESSION_ZSTD, COMPRESSION_NONE } WalCompressionMethod; diff --git a/src/bin/pg_verifybackup/Makefile b/src/bin/pg_verifybackup/Makefile index 851233a6e0..596df15118 100644 --- a/src/bin/pg_verifybackup/Makefile +++ b/src/bin/pg_verifybackup/Makefile @@ -10,6 +10,7 @@ export TAR # name. export GZIP_PROGRAM=$(GZIP) export LZ4=$(LZ4) +export ZSTD=$(ZSTD) subdir = src/bin/pg_verifybackup top_builddir = ../../.. diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl old mode 100644 new mode 100755 index 6927ca4c74..1ccc6cb9df --- a/src/bin/pg_verifybackup/t/008_untar.pl +++ b/src/bin/pg_verifybackup/t/008_untar.pl @@ -43,6 +43,14 @@ my @test_configuration = ( 'decompress_program' => $ENV{'LZ4'}, 'decompress_flags' => [ '-d', '-m'], 'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'server-zstd'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); @@ -108,6 +116,7 @@ for my $tc (@test_configuration) # Cleanup. unlink($backup_path . '/backup_manifest'); unlink($backup_path . '/base.tar'); + unlink($backup_path . '/' . $tc->{'backup_archive'}); rmtree($extract_path); } } diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 28a1f0e9f0..26e373e9f7 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -325,6 +325,9 @@ /* Define to 1 if you have the `lz4' library (-llz4). */ #undef HAVE_LIBLZ4 +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if you have the `m' library (-lm). */ #undef HAVE_LIBM @@ -367,6 +370,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_LZ4_H +/* Define to 1 if you have the header file. */ +#undef HAVE_ZSTD_H + /* Define to 1 if you have the header file. */ #undef HAVE_MBARRIER_H diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index a3f8d37258..a7f16758a4 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -285,6 +285,7 @@ extern void bbsink_forward_cleanup(bbsink *sink); extern bbsink *bbsink_copystream_new(bool send_to_client); extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); extern bbsink *bbsink_lz4_new(bbsink *next, int compresslevel); +extern bbsink *bbsink_zstd_new(bbsink *next, int compresslevel); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_server_new(bbsink *next, char *pathname); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); -- 2.25.1