From bf27b972eaf29c0a40b949eac40150a1d9ee00b0 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 23 Mar 2022 11:00:33 -0400 Subject: [PATCH] Allow parallel zstd compression when taking a base backup. libzstd allows transparent parallel compression just by setting an option when creating the compression context, so permit that for both client and server-side backup compression. To use this, use something like pg_basebackup --compress WHERE-zstd:workers=N where WHERE is "client" or "server" and N is an integer. When compression is performed on the server side, this will spawn threads inside the PostgreSQL backend. While there is almost no PostgreSQL server code which is thread-safe, the threads here are used internally by libzstd and touch only data structures controlled by libzstd. Patch by me, based in part on earlier work by Dipesh Pandit and Jeevan Ladhe. --- doc/src/sgml/protocol.sgml | 12 +++++-- doc/src/sgml/ref/pg_basebackup.sgml | 4 +-- src/backend/replication/basebackup_zstd.c | 19 +++++++++++ src/bin/pg_basebackup/bbstreamer_zstd.c | 16 +++++++++ src/bin/pg_basebackup/t/010_pg_basebackup.pl | 5 +++ src/bin/pg_verifybackup/t/009_extract.pl | 29 ++++++++++++++-- src/bin/pg_verifybackup/t/010_client_untar.pl | 33 +++++++++++++++++-- src/common/backup_compression.c | 16 +++++++++ src/include/common/backup_compression.h | 2 ++ src/test/perl/PostgreSQL/Test/Cluster.pm | 3 +- 10 files changed, 127 insertions(+), 12 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 719b947ef4..cc03a4587b 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2739,17 +2739,23 @@ The commands accepted in replication mode are: option. If the value is an integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the form keyword or - keyword=value. Currently, the only supported - keyword is level, which sets the compression - level. + keyword=value. Currently, the supported keywords + are level and workers. + The level keyword sets the compression level. For gzip the compression level should be an integer between 1 and 9, for lz4 an integer between 1 and 12, and for zstd an integer between 1 and 22. + + + The workers keyword sets the number of threads + that should be used for parallel compression. Parallel compression + is supported only for zstd. + diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index d9233beb8e..82f5f60625 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -424,8 +424,8 @@ PostgreSQL documentation integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the form keyword or keyword=value. - Currently, the only supported keyword is level, - which sets the compression level. + Currently, the supported keywords are level + and workers. If no compression level is specified, the default compression level diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index bb5b668c2a..4835aa70fc 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -28,6 +28,9 @@ typedef struct bbsink_zstd /* Compression level */ int compresslevel; + /* Number of parallel workers. */ + int workers; + ZSTD_CCtx *cctx; ZSTD_outBuffer zstd_outBuf; } bbsink_zstd; @@ -83,6 +86,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; sink->base.bbs_next = next; sink->compresslevel = compresslevel; + sink->workers = compress->workers; return &sink->base; #endif @@ -98,6 +102,7 @@ bbsink_zstd_begin_backup(bbsink *sink) { bbsink_zstd *mysink = (bbsink_zstd *) sink; size_t output_buffer_bound; + size_t ret; mysink->cctx = ZSTD_createCCtx(); if (!mysink->cctx) @@ -106,6 +111,20 @@ bbsink_zstd_begin_backup(bbsink *sink) ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, mysink->compresslevel); + /* + * We check for failure here because (1) older versions of the library + * do not support ZSTD_c_nbWorkers and (2) the library might want to + * reject an unreasonable values (though in practice it does not seem to do + * so). + */ + ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers, + mysink->workers); + if (ZSTD_isError(ret)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not set compression worker count to %d: %s", + mysink->workers, ZSTD_getErrorName(ret))); + /* * We need our own buffer, because we're going to pass different data to * the next sink than what gets passed to us. diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index caa5edcaf1..e17dfb6bd5 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -67,6 +67,7 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) { #ifdef USE_ZSTD bbstreamer_zstd_frame *streamer; + size_t ret; Assert(next != NULL); @@ -87,6 +88,21 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, compress->level); + /* + * We check for failure here because (1) older versions of the library + * do not support ZSTD_c_nbWorkers and (2) the library might want to + * reject unreasonable values (though in practice it does not seem to do + * so). + */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret)); + exit(1); + } + /* Initialize the ZSTD output buffer. */ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl index 2869a239e7..f074fe19b7 100644 --- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl +++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl @@ -133,6 +133,11 @@ my @compression_failure_tests = ( 'invalid compression specification: found empty string where a compression option was expected', 'failure on extra, empty compression option' ], + [ + 'gzip:workers=3', + 'invalid compression specification: compression algorithm "gzip" does not accept a worker count', + 'failure on worker count for gzip' + ], ); for my $cft (@compression_failure_tests) { diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl index 9f9cc7540b..e17e7cad51 100644 --- a/src/bin/pg_verifybackup/t/009_extract.pl +++ b/src/bin/pg_verifybackup/t/009_extract.pl @@ -36,6 +36,12 @@ my @test_configuration = ( 'compression_method' => 'zstd', 'backup_flags' => ['--compress', 'server-zstd:5'], 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, + { + 'compression_method' => 'parallel zstd', + 'backup_flags' => ['--compress', 'server-zstd:workers=3'], + 'enabled' => check_pg_config("#define USE_ZSTD 1"), + 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/ } ); @@ -57,8 +63,27 @@ for my $tc (@test_configuration) my @verify = ('pg_verifybackup', '-e', $backup_path); # A backup with a valid compression method should work. - $primary->command_ok(\@backup, - "backup done, compression method \"$method\""); + my $backup_stdout = ''; + my $backup_stderr = ''; + my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout, + '2>', \$backup_stderr); + if ($backup_stdout ne '') + { + print "# standard output was:\n$backup_stdout"; + } + if ($backup_stderr ne '') + { + print "# standard error was:\n$backup_stderr"; + } + if (! $backup_result && $tc->{'possibly_unsupported'} && + $backup_stderr =~ /$tc->{'possibly_unsupported'}/) + { + skip "compression with $method not supported by this build", 2; + } + else + { + ok($backup_result, "backup done, compression $method"); + } # Make sure that it verifies OK. $primary->command_ok(\@verify, diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl index 487e30e826..5f6a4b9963 100644 --- a/src/bin/pg_verifybackup/t/010_client_untar.pl +++ b/src/bin/pg_verifybackup/t/010_client_untar.pl @@ -50,6 +50,15 @@ my @test_configuration = ( 'decompress_program' => $ENV{'ZSTD'}, 'decompress_flags' => [ '-d' ], 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, + { + 'compression_method' => 'parallel zstd', + 'backup_flags' => ['--compress', 'client-zstd:workers=3'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define USE_ZSTD 1"), + 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/ } ); @@ -70,9 +79,27 @@ for my $tc (@test_configuration) 'pg_basebackup', '-D', $backup_path, '-Xfetch', '--no-sync', '-cfast', '-Ft'); push @backup, @{$tc->{'backup_flags'}}; - $primary->command_ok(\@backup, - "client side backup, compression $method"); - + my $backup_stdout = ''; + my $backup_stderr = ''; + my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout, + '2>', \$backup_stderr); + if ($backup_stdout ne '') + { + print "# standard output was:\n$backup_stdout"; + } + if ($backup_stderr ne '') + { + print "# standard error was:\n$backup_stderr"; + } + if (! $backup_result && $tc->{'possibly_unsupported'} && + $backup_stderr =~ /$tc->{'possibly_unsupported'}/) + { + skip "compression with $method not supported by this build", 3; + } + else + { + ok($backup_result, "client side backup, compression $method"); + } # Verify that the we got the files we expected. my $backup_files = join(',', diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c index 0650f975c4..969e08cca2 100644 --- a/src/common/backup_compression.c +++ b/src/common/backup_compression.c @@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification, result->level = expect_integer_value(keyword, value, result); result->options |= BACKUP_COMPRESSION_OPTION_LEVEL; } + else if (strcmp(keyword, "workers") == 0) + { + result->workers = expect_integer_value(keyword, value, result); + result->options |= BACKUP_COMPRESSION_OPTION_WORKERS; + } else result->parse_error = psprintf(_("unknown compression option \"%s\""), keyword); @@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec) min_level, max_level); } + /* + * Of the compression algorithms that we currently support, only zstd + * allows parallel workers. + */ + if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 && + (spec->algorithm != BACKUP_COMPRESSION_ZSTD)) + { + return psprintf(_("compression algorithm \"%s\" does not accept a worker count"), + get_bc_algorithm_name(spec->algorithm)); + } + return NULL; } diff --git a/src/include/common/backup_compression.h b/src/include/common/backup_compression.h index 0565cbc657..6a0ecaa99c 100644 --- a/src/include/common/backup_compression.h +++ b/src/include/common/backup_compression.h @@ -23,12 +23,14 @@ typedef enum bc_algorithm } bc_algorithm; #define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0) +#define BACKUP_COMPRESSION_OPTION_WORKERS (1 << 1) typedef struct bc_specification { bc_algorithm algorithm; unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */ int level; + int workers; char *parse_error; /* NULL if parsing was OK, else message */ } bc_specification; diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index e7b9161137..8d838f7d6d 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -2503,8 +2503,7 @@ sub run_log local %ENV = $self->_get_env(); - PostgreSQL::Test::Utils::run_log(@_); - return; + return PostgreSQL::Test::Utils::run_log(@_); } =pod -- 2.24.3 (Apple Git-128)