From dc33f7ea2930dfc5a7bace52eb0086c759a7d86e Mon Sep 17 00:00:00 2001 From: Georgios Kokolatos Date: Tue, 2 Nov 2021 10:39:42 +0000 Subject: [PATCH v8 1/2] Refactor pg_receivewal in preparation for introducing lz4 compression The program pg_receivewal can use gzip compression to store the received WAL. The option `--compress` with a value [1, 9] was used to denote that gzip compression was required. When `--compress` with a value of `0` was used, then no compression would take place. This commit introduces a new option, `--compression-method`. Valid values are [none|zlib]. The option `--compress` requires for `--compression-method` with value other than `none`. Also `--compress=0` now returns an error. Under the hood, there are no surprising changes. A new enum WalCompressionMethod has been introduced and is used throughout the relevant codepaths to explicitly note which compression method to use. Last, the macros IsXLogFileName and friends, have been replaced by the function is_xlogfilename(). This will allow for easier expansion of the available compression methods that can be recognised. --- doc/src/sgml/ref/pg_receivewal.sgml | 24 ++- src/bin/pg_basebackup/pg_basebackup.c | 7 +- src/bin/pg_basebackup/pg_receivewal.c | 164 +++++++++++++------ src/bin/pg_basebackup/receivelog.c | 2 +- src/bin/pg_basebackup/t/020_pg_receivewal.pl | 16 +- src/bin/pg_basebackup/walmethods.c | 51 ++++-- src/bin/pg_basebackup/walmethods.h | 15 +- src/tools/pgindent/typedefs.list | 1 + 8 files changed, 200 insertions(+), 80 deletions(-) diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml index 9fde2fd2ef..f95cffcd5e 100644 --- a/doc/src/sgml/ref/pg_receivewal.sgml +++ b/doc/src/sgml/ref/pg_receivewal.sgml @@ -263,15 +263,31 @@ PostgreSQL documentation + + + + + Enables compression of write-ahead logs using the specified method. + Supported values zlib, + and none. + + + + - Enables gzip compression of write-ahead logs, and specifies the - compression level (0 through 9, 0 being no compression and 9 being best - compression). The suffix .gz will - automatically be added to all filenames. + Specifies the compression level (1 through + 9, 1 being worst compression + and 9 being best compression) for + zlib compressed WAL segments. + + + + This option requires to be + specified with zlib. diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 27ee6394cf..cdea3711b7 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -555,10 +555,13 @@ LogStreamerMain(logstreamer_param *param) stream.replication_slot = replication_slot; if (format == 'p') - stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, + stream.walmethod = CreateWalDirectoryMethod(param->xlog, + COMPRESSION_NONE, 0, stream.do_sync); else - stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, + stream.walmethod = CreateWalTarMethod(param->xlog, + COMPRESSION_NONE, /* ignored */ + compresslevel, stream.do_sync); if (!ReceiveXlogStream(param->bgconn, &stream)) diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 04ba20b197..9641f4a2f4 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -32,6 +32,9 @@ /* Time to sleep between reconnection attempts */ #define RECONNECT_SLEEP_TIME 5 +/* This is just the redefinition of a libz constant */ +#define Z_DEFAULT_COMPRESSION (-1) + /* Global options */ static char *basedir = NULL; static int verbose = 0; @@ -45,6 +48,7 @@ static bool do_drop_slot = false; static bool do_sync = true; static bool synchronous = false; static char *replication_slot = NULL; +static WalCompressionMethod compression_method = COMPRESSION_NONE; static XLogRecPtr endpos = InvalidXLogRecPtr; @@ -63,16 +67,6 @@ disconnect_atexit(void) PQfinish(conn); } -/* Routines to evaluate segment file format */ -#define IsCompressXLogFileName(fname) \ - (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \ - strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ - strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0) -#define IsPartialCompressXLogFileName(fname) \ - (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \ - strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ - strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0) - static void usage(void) { @@ -92,7 +86,9 @@ usage(void) printf(_(" --synchronous flush write-ahead log immediately after writing\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); - printf(_(" -Z, --compress=0-9 compress logs with given compression level\n")); + printf(_(" --compression-method=METHOD\n" + " method to compress logs\n")); + printf(_(" -Z, --compress=1-9 compress logs with given compression level\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -108,6 +104,61 @@ usage(void) printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL); } + +/* + * Check if the filename looks like an xlog file. Also note if it is partial + * and/or compressed file. + */ +static bool +is_xlogfilename(const char *filename, bool *ispartial, + WalCompressionMethod *wal_compression_method) +{ + size_t fname_len = strlen(filename); + size_t xlog_pattern_len = strspn(filename, "0123456789ABCDEF"); + + /* File does not look like a XLOG file */ + if (xlog_pattern_len != XLOG_FNAME_LEN) + return false; + + /* File looks like a complete uncompressed XLOG file */ + if (fname_len == XLOG_FNAME_LEN) + { + *ispartial = false; + *wal_compression_method = COMPRESSION_NONE; + return true; + } + + /* File looks like a complete zlib compressed XLOG file */ + if (fname_len == XLOG_FNAME_LEN + strlen(".gz") && + strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0) + { + *ispartial = false; + *wal_compression_method = COMPRESSION_ZLIB; + return true; + } + + /* File looks like a partial uncompressed XLOG file */ + if (fname_len == XLOG_FNAME_LEN + strlen(".partial") && + strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0) + { + *ispartial = true; + *wal_compression_method = COMPRESSION_NONE; + return true; + } + + /* File looks like a partial zlib compressed XLOG file */ + if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") && + strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0) + { + *ispartial = true; + *wal_compression_method = COMPRESSION_ZLIB; + return true; + } + + /* File does not look like something we recognise */ + return false; +} + static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) { @@ -213,33 +264,11 @@ FindStreamingStart(uint32 *tli) { uint32 tli; XLogSegNo segno; + WalCompressionMethod wal_compression_method; bool ispartial; - bool iscompress; - /* - * Check if the filename looks like an xlog file, or a .partial file. - */ - if (IsXLogFileName(dirent->d_name)) - { - ispartial = false; - iscompress = false; - } - else if (IsPartialXLogFileName(dirent->d_name)) - { - ispartial = true; - iscompress = false; - } - else if (IsCompressXLogFileName(dirent->d_name)) - { - ispartial = false; - iscompress = true; - } - else if (IsPartialCompressXLogFileName(dirent->d_name)) - { - ispartial = true; - iscompress = true; - } - else + if (!is_xlogfilename(dirent->d_name, + &ispartial, &wal_compression_method)) continue; /* @@ -250,14 +279,14 @@ FindStreamingStart(uint32 *tli) /* * Check that the segment has the right size, if it's supposed to be * completed. For non-compressed segments just check the on-disk size - * and see if it matches a completed segment. For compressed segments, - * look at the last 4 bytes of the compressed file, which is where the - * uncompressed size is located for gz files with a size lower than - * 4GB, and then compare it to the size of a completed segment. The 4 - * last bytes correspond to the ISIZE member according to - * http://www.zlib.org/rfc-gzip.html. + * and see if it matches a completed segment. For zlib compressed + * segments, look at the last 4 bytes of the compressed file, which is + * where the uncompressed size is located for gz files with a size + * lower than 4GB, and then compare it to the size of a completed + * segment. The 4 last bytes correspond to the ISIZE member according + * to http://www.zlib.org/rfc-gzip.html. */ - if (!ispartial && !iscompress) + if (!ispartial && wal_compression_method == COMPRESSION_NONE) { struct stat statbuf; char fullpath[MAXPGPATH * 2]; @@ -276,7 +305,7 @@ FindStreamingStart(uint32 *tli) continue; } } - else if (!ispartial && iscompress) + else if (!ispartial && wal_compression_method == COMPRESSION_ZLIB) { int fd; char buf[4]; @@ -457,7 +486,9 @@ StreamLog(void) stream.synchronous = synchronous; stream.do_sync = do_sync; stream.mark_done = false; - stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel, + stream.walmethod = CreateWalDirectoryMethod(basedir, + compression_method, + compresslevel, stream.do_sync); stream.partial_suffix = ".partial"; stream.replication_slot = replication_slot; @@ -510,6 +541,7 @@ main(int argc, char **argv) {"status-interval", required_argument, NULL, 's'}, {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, + {"compression-method", required_argument, NULL, 'I'}, {"compress", required_argument, NULL, 'Z'}, /* action */ {"create-slot", no_argument, NULL, 1}, @@ -595,8 +627,20 @@ main(int argc, char **argv) case 'v': verbose++; break; + case 'I': + if (pg_strcasecmp(optarg, "zlib") == 0) + compression_method = COMPRESSION_ZLIB; + else if (pg_strcasecmp(optarg, "none") == 0) + compression_method = COMPRESSION_NONE; + else + { + pg_log_error("invalid value \"%s\" for option %s", + optarg, "--compress-method"); + exit(1); + } + break; case 'Z': - if (!option_parse_int(optarg, "-Z/--compress", 0, 9, + if (!option_parse_int(optarg, "-Z/--compress", 1, 9, &compresslevel)) exit(1); break; @@ -676,13 +720,35 @@ main(int argc, char **argv) exit(1); } + + /* + * Compression related arguments + */ + if (compression_method != COMPRESSION_NONE) + { #ifndef HAVE_LIBZ - if (compresslevel != 0) + if (compression_method == COMPRESSION_ZLIB) + { + pg_log_error("this build does not support compression with %s", + "gzip"); + exit(1); + } +#endif + } + + if (compression_method != COMPRESSION_ZLIB && compresslevel != 0) { - pg_log_error("this build does not support compression"); + pg_log_error("can only use --compress with --compression-method=zlib"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); exit(1); } -#endif + + if (compression_method == COMPRESSION_ZLIB && compresslevel == 0) + { + pg_log_info("no --compression specified, will be using the library default"); + compresslevel = Z_DEFAULT_COMPRESSION; + } /* * Check existence of destination folder. diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 72b8d9e315..2d4f660daa 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -109,7 +109,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) * When streaming to tar, no file with this name will exist before, so we * never have to verify a size. */ - if (stream->walmethod->compression() == 0 && + if (stream->walmethod->compression_method() == COMPRESSION_NONE && stream->walmethod->existsfile(fn)) { size = stream->walmethod->get_file_size(fn); diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl index ab05f9e91e..56c4a0d2af 100644 --- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl +++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl @@ -5,7 +5,7 @@ use strict; use warnings; use PostgreSQL::Test::Utils; use PostgreSQL::Test::Cluster; -use Test::More tests => 35; +use Test::More tests => 37; program_help_ok('pg_receivewal'); program_version_ok('pg_receivewal'); @@ -33,6 +33,13 @@ $primary->command_fails( $primary->command_fails( [ 'pg_receivewal', '-D', $stream_dir, '--synchronous', '--no-sync' ], 'failure if --synchronous specified with --no-sync'); +$primary->command_fails_like( + [ + 'pg_receivewal', '-D', $stream_dir, '--compression-method', 'none', + '--compress', '1' + ], + qr/\Qpg_receivewal: error: can only use --compress with --compression-method=zlib/, + 'failure if --compression-method=none specified with --compress'); # Slot creation and drop my $slot_name = 'test'; @@ -90,8 +97,11 @@ SKIP: # a valid value. $primary->command_ok( [ - 'pg_receivewal', '-D', $stream_dir, '--verbose', - '--endpos', $nextlsn, '--compress', '1 ', + 'pg_receivewal', '-D', + $stream_dir, '--verbose', + '--endpos', $nextlsn, + '--compression-method', 'zlib', + '--compress', '1 ', '--no-loop' ], "streaming some WAL using ZLIB compression"); diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index 8695647db4..068b276251 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -41,6 +41,7 @@ typedef struct DirectoryMethodData { char *basedir; + WalCompressionMethod compression_method; int compression; bool sync; } DirectoryMethodData; @@ -74,7 +75,8 @@ dir_get_file_name(const char *pathname, const char *temp_suffix) char *filename = pg_malloc0(MAXPGPATH * sizeof(char)); snprintf(filename, MAXPGPATH, "%s%s%s", - pathname, dir_data->compression > 0 ? ".gz" : "", + pathname, + dir_data->compression_method == COMPRESSION_ZLIB ? ".gz" : "", temp_suffix ? temp_suffix : ""); return filename; @@ -107,7 +109,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ return NULL; #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_method == COMPRESSION_ZLIB) { gzfp = gzdopen(fd, "wb"); if (gzfp == NULL) @@ -126,7 +128,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ #endif /* Do pre-padding on non-compressed files */ - if (pad_to_size && dir_data->compression == 0) + if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE) { PGAlignedXLogBlock zerobuf; int bytes; @@ -171,7 +173,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ fsync_parent_path(tmppath) != 0) { #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_method == COMPRESSION_ZLIB) gzclose(gzfp); else #endif @@ -182,7 +184,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ f = pg_malloc0(sizeof(DirectoryMethodFile)); #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_method == COMPRESSION_ZLIB) f->gzfp = gzfp; #endif f->fd = fd; @@ -204,7 +206,7 @@ dir_write(Walfile f, const void *buf, size_t count) Assert(f != NULL); #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_method == COMPRESSION_ZLIB) r = (ssize_t) gzwrite(df->gzfp, buf, count); else #endif @@ -234,7 +236,7 @@ dir_close(Walfile f, WalCloseMethod method) Assert(f != NULL); #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_method == COMPRESSION_ZLIB) r = gzclose(df->gzfp); else #endif @@ -309,7 +311,7 @@ dir_sync(Walfile f) return 0; #ifdef HAVE_LIBZ - if (dir_data->compression > 0) + if (dir_data->compression_method == COMPRESSION_ZLIB) { if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK) return -1; @@ -334,10 +336,10 @@ dir_get_file_size(const char *pathname) return statbuf.st_size; } -static int -dir_compression(void) +static WalCompressionMethod +dir_compression_method(void) { - return dir_data->compression; + return dir_data->compression_method; } static bool @@ -373,7 +375,9 @@ dir_finish(void) WalWriteMethod * -CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) +CreateWalDirectoryMethod(const char *basedir, + WalCompressionMethod compression_method, + int compression, bool sync) { WalWriteMethod *method; @@ -383,7 +387,7 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) method->get_current_pos = dir_get_current_pos; method->get_file_size = dir_get_file_size; method->get_file_name = dir_get_file_name; - method->compression = dir_compression; + method->compression_method = dir_compression_method; method->close = dir_close; method->sync = dir_sync; method->existsfile = dir_existsfile; @@ -391,6 +395,7 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) method->getlasterror = dir_getlasterror; dir_data = pg_malloc0(sizeof(DirectoryMethodData)); + dir_data->compression_method = compression_method; dir_data->compression = compression; dir_data->basedir = pg_strdup(basedir); dir_data->sync = sync; @@ -424,6 +429,7 @@ typedef struct TarMethodData { char *tarfilename; int fd; + WalCompressionMethod compression_method; int compression; bool sync; TarMethodFile *currentfile; @@ -731,10 +737,10 @@ tar_get_file_size(const char *pathname) return -1; } -static int -tar_compression(void) +static WalCompressionMethod +tar_compression_method(void) { - return tar_data->compression; + return tar_data->compression_method; } static off_t @@ -1031,8 +1037,16 @@ tar_finish(void) return true; } +/* + * The argument compression_method is currently ignored. It is in place for + * symmetry with CreateWalDirectoryMethod which uses it for distinguishing + * between the different compression methods. CreateWalTarMethod and its family + * of functions handle only zlib compression. + */ WalWriteMethod * -CreateWalTarMethod(const char *tarbase, int compression, bool sync) +CreateWalTarMethod(const char *tarbase, + WalCompressionMethod compression_method, + int compression, bool sync) { WalWriteMethod *method; const char *suffix = (compression != 0) ? ".tar.gz" : ".tar"; @@ -1043,7 +1057,7 @@ CreateWalTarMethod(const char *tarbase, int compression, bool sync) method->get_current_pos = tar_get_current_pos; method->get_file_size = tar_get_file_size; method->get_file_name = tar_get_file_name; - method->compression = tar_compression; + method->compression_method = tar_compression_method; method->close = tar_close; method->sync = tar_sync; method->existsfile = tar_existsfile; @@ -1054,6 +1068,7 @@ CreateWalTarMethod(const char *tarbase, int compression, bool sync) tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix); tar_data->fd = -1; + tar_data->compression_method = compression_method; tar_data->compression = compression; tar_data->sync = sync; #ifdef HAVE_LIBZ diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index 4abdfd8333..4fc7b3d2a3 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -19,6 +19,12 @@ typedef enum CLOSE_NO_RENAME } WalCloseMethod; +typedef enum +{ + COMPRESSION_ZLIB, + COMPRESSION_NONE +} WalCompressionMethod; + /* * A WalWriteMethod structure represents the different methods used * to write the streaming WAL as it's received. @@ -58,8 +64,8 @@ struct WalWriteMethod */ char *(*get_file_name) (const char *pathname, const char *temp_suffix); - /* Return the level of compression */ - int (*compression) (void); + /* Returns the compression method */ + WalCompressionMethod (*compression_method) (void); /* * Write count number of bytes to the file, and return the number of bytes @@ -95,8 +101,11 @@ struct WalWriteMethod * not all those required for pg_receivewal) */ WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, + WalCompressionMethod compression_method, int compression, bool sync); -WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync); +WalWriteMethod *CreateWalTarMethod(const char *tarbase, + WalCompressionMethod compression_method, + int compression, bool sync); /* Cleanup routines for previously-created methods */ void FreeWalDirectoryMethod(void); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 7bbbb34e2f..da6ac8ed83 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2858,6 +2858,7 @@ WaitEventTimeout WaitPMResult WalCloseMethod WalCompression +WalCompressionMethod WalLevel WalRcvData WalRcvExecResult -- 2.25.1