From 0c8e498070fa30f3f05657cd769df3053a39e1d8 Mon Sep 17 00:00:00 2001 From: Georgios Kokolatos Date: Tue, 29 Nov 2022 11:22:48 +0000 Subject: [PATCH v11 3/4] Introduce Compressor API in pg_dump The purpose of this API is to allow for easier addition of new compression methods. CompressFileHandle is substituting the cfp* family of functions under a struct of function pointers for opening, writing, etc. The implementor of a new compression method is now able to "simply" just add those definitions. Custom compressed archives now need to store the compression algorithm in their header. This requires a bump in the version number. The level of compression is no longer stored in the dump as it is irrelevant. --- src/bin/pg_dump/Makefile | 1 + src/bin/pg_dump/compress_gzip.c | 390 ++++++++++++ src/bin/pg_dump/compress_gzip.h | 9 + src/bin/pg_dump/compress_io.c | 829 ++++++-------------------- src/bin/pg_dump/compress_io.h | 69 ++- src/bin/pg_dump/meson.build | 1 + src/bin/pg_dump/pg_backup_archiver.c | 93 +-- src/bin/pg_dump/pg_backup_archiver.h | 4 +- src/bin/pg_dump/pg_backup_custom.c | 23 +- src/bin/pg_dump/pg_backup_directory.c | 85 +-- 10 files changed, 766 insertions(+), 738 deletions(-) create mode 100644 src/bin/pg_dump/compress_gzip.c create mode 100644 src/bin/pg_dump/compress_gzip.h diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index 9dc5a784dd..29eab02d37 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -24,6 +24,7 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport) OBJS = \ $(WIN32RES) \ + compress_gzip.o \ compress_io.o \ dumputils.o \ parallel.o \ diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c new file mode 100644 index 0000000000..bc6d1abc77 --- /dev/null +++ b/src/bin/pg_dump/compress_gzip.c @@ -0,0 +1,390 @@ +#include "postgres_fe.h" +#include "pg_backup_utils.h" + +#include "compress_gzip.h" + +#ifdef HAVE_LIBZ +#include "zlib.h" +/*---------------------- + * Compressor API + *---------------------- + */ +typedef struct GzipCompressorState +{ + int compressionLevel; + z_streamp zp; + + void *outbuf; + size_t outsize; +} GzipCompressorState; + +/* Private routines that support gzip compressed data I/O */ +static void +DeflateCompressorGzip(ArchiveHandle *AH, CompressorState *cs, bool flush) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private; + z_streamp zp = gzipcs->zp; + void *out = gzipcs->outbuf; + int res = Z_OK; + + while (gzipcs->zp->avail_in != 0 || flush) + { + res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); + if (res == Z_STREAM_ERROR) + pg_fatal("could not compress data: %s", zp->msg); + if ((flush && (zp->avail_out < gzipcs->outsize)) + || (zp->avail_out == 0) + || (zp->avail_in != 0) + ) + { + /* + * Extra paranoia: avoid zero-length chunks, since a zero length + * chunk is the EOF marker in the custom format. This should never + * happen but... + */ + if (zp->avail_out < gzipcs->outsize) + { + /* + * Any write function should do its own error checking but to + * make sure we do a check here as well... + */ + size_t len = gzipcs->outsize - zp->avail_out; + + cs->writeF(AH, (char *) out, len); + } + zp->next_out = out; + zp->avail_out = gzipcs->outsize; + } + + if (res == Z_STREAM_END) + break; + } +} + +static void +EndCompressorGzip(ArchiveHandle *AH, CompressorState *cs) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private; + z_streamp zp; + + if (gzipcs->zp) + { + zp = gzipcs->zp; + zp->next_in = NULL; + zp->avail_in = 0; + + /* Flush any remaining data from zlib buffer */ + DeflateCompressorGzip(AH, cs, true); + + if (deflateEnd(zp) != Z_OK) + pg_fatal("could not close compression stream: %s", zp->msg); + + pg_free(gzipcs->outbuf); + pg_free(gzipcs->zp); + } + + pg_free(gzipcs); + cs->private = NULL; +} + +static void +WriteDataToArchiveGzip(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private; + z_streamp zp; + + if (!gzipcs->zp) + { + zp = gzipcs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + /* + * outsize is the buffer size we tell zlib it can output to. We + * actually allocate one extra byte because some routines want to + * append a trailing zero byte to the zlib output. + */ + gzipcs->outbuf = pg_malloc(ZLIB_OUT_SIZE + 1); + gzipcs->outsize = ZLIB_OUT_SIZE; + + if (deflateInit(zp, gzipcs->compressionLevel) != Z_OK) + pg_fatal("could not initialize compression library: %s", zp->msg); + + /* Just be paranoid - maybe End is called after Start, with no Write */ + zp->next_out = gzipcs->outbuf; + zp->avail_out = gzipcs->outsize; + } + + gzipcs->zp->next_in = (void *) unconstify(void *, data); + gzipcs->zp->avail_in = dLen; + DeflateCompressorGzip(AH, cs, false); +} + +static void +ReadDataFromArchiveGzip(ArchiveHandle *AH, CompressorState *cs) +{ + z_streamp zp; + char *out; + int res = Z_OK; + size_t cnt; + char *buf; + size_t buflen; + + zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + buf = pg_malloc(ZLIB_IN_SIZE); + buflen = ZLIB_IN_SIZE; + + out = pg_malloc(ZLIB_OUT_SIZE + 1); + + if (inflateInit(zp) != Z_OK) + pg_fatal("could not initialize compression library: %s", + zp->msg); + + /* no minimal chunk size for zlib */ + while ((cnt = cs->readF(AH, &buf, &buflen))) + { + zp->next_in = (void *) buf; + zp->avail_in = cnt; + + while (zp->avail_in > 0) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + pg_fatal("could not uncompress data: %s", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + } + + zp->next_in = NULL; + zp->avail_in = 0; + while (res != Z_STREAM_END) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + pg_fatal("could not uncompress data: %s", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + + if (inflateEnd(zp) != Z_OK) + pg_fatal("could not close compression library: %s", zp->msg); + + free(buf); + free(out); + free(zp); +} + +/* Public routines that support gzip compressed data I/O */ +void +InitCompressorGzip(CompressorState *cs, int compressionLevel) +{ + GzipCompressorState *gzipcs; + + cs->readData = ReadDataFromArchiveGzip; + cs->writeData = WriteDataToArchiveGzip; + cs->end = EndCompressorGzip; + + gzipcs = (GzipCompressorState *) pg_malloc0(sizeof(GzipCompressorState)); + gzipcs->compressionLevel = compressionLevel; + + cs->private = gzipcs; +} + + +/*---------------------- + * Compress File API + *---------------------- + */ + +typedef struct GzipData +{ + gzFile fp; + int compressionLevel; +} GzipData; + +static size_t +Gzip_read(void *ptr, size_t size, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + size_t ret; + + ret = gzread(gd->fp, ptr, size); + if (ret != size && !gzeof(gd->fp)) + { + int errnum; + const char *errmsg = gzerror(gd->fp, &errnum); + + pg_fatal("could not read from input file: %s", + errnum == Z_ERRNO ? strerror(errno) : errmsg); + } + + return ret; +} + +static size_t +Gzip_write(const void *ptr, size_t size, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + + return gzwrite(gd->fp, ptr, size); +} + +static int +Gzip_getc(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + int ret; + + errno = 0; + ret = gzgetc(gd->fp); + if (ret == EOF) + { + if (!gzeof(gd->fp)) + pg_fatal("could not read from input file: %s", strerror(errno)); + else + pg_fatal("could not read from input file: end of file"); + } + + return ret; +} + +static char * +Gzip_gets(char *ptr, int size, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + + return gzgets(gd->fp, ptr, size); +} + +static int +Gzip_close(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + int save_errno; + int ret; + + CFH->private = NULL; + + ret = gzclose(gd->fp); + + save_errno = errno; + free(gd); + errno = save_errno; + + return ret; +} + +static int +Gzip_eof(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + + return gzeof(gd->fp); +} + +static const char * +Gzip_get_error(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + const char *errmsg; + int errnum; + + errmsg = gzerror(gd->fp, &errnum); + if (errnum == Z_ERRNO) + errmsg = strerror(errno); + + return errmsg; +} + +static int +Gzip_open(const char *path, int fd, const char *mode, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + char mode_compression[32]; + + if (gd->compressionLevel != Z_DEFAULT_COMPRESSION) + { + /* + * user has specified a compression level, so tell zlib to use it + */ + snprintf(mode_compression, sizeof(mode_compression), "%s%d", + mode, gd->compressionLevel); + } + else + strcpy(mode_compression, mode); + + if (fd >= 0) + gd->fp = gzdopen(dup(fd), mode_compression); + else + gd->fp = gzopen(path, mode_compression); + + if (gd->fp == NULL) + return 1; + + return 0; +} + +static int +Gzip_open_write(const char *path, const char *mode, CompressFileHandle * CFH) +{ + char *fname; + int ret; + int save_errno; + + fname = psprintf("%s.gz", path); + ret = CFH->open(fname, -1, mode, CFH); + + save_errno = errno; + pg_free(fname); + errno = save_errno; + + return ret; +} + +void +InitCompressGzip(CompressFileHandle * CFH, int compressionLevel) +{ + GzipData *gd; + + CFH->open = Gzip_open; + CFH->open_write = Gzip_open_write; + CFH->read = Gzip_read; + CFH->write = Gzip_write; + CFH->gets = Gzip_gets; + CFH->getc = Gzip_getc; + CFH->close = Gzip_close; + CFH->eof = Gzip_eof; + CFH->get_error = Gzip_get_error; + + gd = pg_malloc0(sizeof(GzipData)); + gd->compressionLevel = compressionLevel; + + CFH->private = gd; +} +#else /* HAVE_LIBZ */ +void +InitCompressorGzip(CompressorState *cs, int compressionLevel) +{ + pg_fatal("not built with zlib support"); +} + +void +InitCompressGzip(CompressFileHandle * CFH, int compressionLevel) +{ + pg_fatal("not built with zlib support"); +} +#endif /* HAVE_LIBZ */ diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h new file mode 100644 index 0000000000..ab0362c1f3 --- /dev/null +++ b/src/bin/pg_dump/compress_gzip.h @@ -0,0 +1,9 @@ +#ifndef _COMPRESS_GZIP_H_ +#define _COMPRESS_GZIP_H_ + +#include "compress_io.h" + +extern void InitCompressorGzip(CompressorState *cs, int compressionLevel); +extern void InitCompressGzip(CompressFileHandle * CFH, int compressionLevel); + +#endif /* _COMPRESS_GZIP_H_ */ diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 83b478bc63..56a07e309b 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -51,9 +51,12 @@ * *------------------------------------------------------------------------- */ +#include +#include #include "postgres_fe.h" #include "compress_io.h" +#include "compress_gzip.h" #include "pg_backup_utils.h" #ifdef HAVE_LIBZ @@ -65,83 +68,66 @@ *---------------------- */ -/* typedef appears in compress_io.h */ -struct CompressorState +/* Private routines that support uncompressed data I/O */ +static void +ReadDataFromArchiveNone(ArchiveHandle *AH, CompressorState *cs) { - pg_compress_specification compress_spec; - WriteFunc writeF; + size_t cnt; + char *buf; + size_t buflen; -#ifdef HAVE_LIBZ - z_streamp zp; - char *zlibOut; - size_t zlibOutSize; -#endif -}; + buf = pg_malloc(ZLIB_OUT_SIZE); + buflen = ZLIB_OUT_SIZE; -/* Routines that support zlib compressed data I/O */ -#ifdef HAVE_LIBZ -static void InitCompressorZlib(CompressorState *cs, int level); -static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, - bool flush); -static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF); -static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen); -static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs); -#endif + while ((cnt = cs->readF(AH, &buf, &buflen))) + { + ahwrite(buf, 1, cnt, AH); + } + + free(buf); +} -/* Routines that support uncompressed data I/O */ -static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); -static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen); +static void +WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + cs->writeF(AH, data, dLen); +} + +static void +EndCompressorNone(ArchiveHandle *AH, CompressorState *cs) +{ + /* no op */ +} + +static void +InitCompressorNone(CompressorState *cs) +{ + cs->readData = ReadDataFromArchiveNone; + cs->writeData = WriteDataToArchiveNone; + cs->end = EndCompressorNone; +} /* Public interface routines */ /* Allocate a new compressor */ CompressorState * AllocateCompressor(const pg_compress_specification compress_spec, - WriteFunc writeF) + ReadFunc readF, WriteFunc writeF) { CompressorState *cs; -#ifndef HAVE_LIBZ - if (compress_spec.algorithm == PG_COMPRESSION_GZIP) - pg_fatal("not built with zlib support"); -#endif - cs = (CompressorState *) pg_malloc0(sizeof(CompressorState)); + cs->readF = readF; cs->writeF = writeF; - cs->compress_spec = compress_spec; - - /* - * Perform compression algorithm specific initialization. - */ -#ifdef HAVE_LIBZ - if (cs->compress_spec.algorithm == PG_COMPRESSION_GZIP) - InitCompressorZlib(cs, cs->compress_spec.level); -#endif - return cs; -} - -/* - * Read all compressed data from the input stream (via readF) and print it - * out with ahwrite(). - */ -void -ReadDataFromArchive(ArchiveHandle *AH, pg_compress_specification compress_spec, - ReadFunc readF) -{ switch (compress_spec.algorithm) { case PG_COMPRESSION_NONE: - ReadDataFromArchiveNone(AH, readF); + InitCompressorNone(cs); break; case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ReadDataFromArchiveZlib(AH, readF); -#else - pg_fatal("not built with zlib support"); -#endif + InitCompressorGzip(cs, compress_spec.level); break; case PG_COMPRESSION_LZ4: /* fallthrough */ @@ -149,33 +135,8 @@ ReadDataFromArchive(ArchiveHandle *AH, pg_compress_specification compress_spec, pg_fatal("invalid compression method"); break; } -} -/* - * Compress and write data to the output stream (via writeF). - */ -void -WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, - const void *data, size_t dLen) -{ - switch (cs->compress_spec.algorithm) - { - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - WriteDataToArchiveZlib(AH, cs, data, dLen); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_NONE: - WriteDataToArchiveNone(AH, cs, data, dLen); - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + return cs; } /* @@ -184,244 +145,28 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, void EndCompressor(ArchiveHandle *AH, CompressorState *cs) { - switch (cs->compress_spec.algorithm) - { - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - EndCompressorZlib(AH, cs); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_NONE: - free(cs); - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } -} - -/* Private routines, specific to each compression method. */ - -#ifdef HAVE_LIBZ -/* - * Functions for zlib compressed output. - */ - -static void -InitCompressorZlib(CompressorState *cs, int level) -{ - z_streamp zp; - - zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - /* - * zlibOutSize is the buffer size we tell zlib it can output to. We - * actually allocate one extra byte because some routines want to append a - * trailing zero byte to the zlib output. - */ - cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); - cs->zlibOutSize = ZLIB_OUT_SIZE; - - if (deflateInit(zp, level) != Z_OK) - pg_fatal("could not initialize compression library: %s", - zp->msg); - - /* Just be paranoid - maybe End is called after Start, with no Write */ - zp->next_out = (void *) cs->zlibOut; - zp->avail_out = cs->zlibOutSize; + cs->end(AH, cs); + pg_free(cs); } -static void -EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) -{ - z_streamp zp = cs->zp; - - zp->next_in = NULL; - zp->avail_in = 0; - - /* Flush any remaining data from zlib buffer */ - DeflateCompressorZlib(AH, cs, true); - - if (deflateEnd(zp) != Z_OK) - pg_fatal("could not close compression stream: %s", zp->msg); - - free(cs->zlibOut); - free(cs->zp); -} - -static void -DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) -{ - z_streamp zp = cs->zp; - char *out = cs->zlibOut; - int res = Z_OK; - - while (cs->zp->avail_in != 0 || flush) - { - res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); - if (res == Z_STREAM_ERROR) - pg_fatal("could not compress data: %s", zp->msg); - if ((flush && (zp->avail_out < cs->zlibOutSize)) - || (zp->avail_out == 0) - || (zp->avail_in != 0) - ) - { - /* - * Extra paranoia: avoid zero-length chunks, since a zero length - * chunk is the EOF marker in the custom format. This should never - * happen but... - */ - if (zp->avail_out < cs->zlibOutSize) - { - /* - * Any write function should do its own error checking but to - * make sure we do a check here as well... - */ - size_t len = cs->zlibOutSize - zp->avail_out; - - cs->writeF(AH, out, len); - } - zp->next_out = (void *) out; - zp->avail_out = cs->zlibOutSize; - } - - if (res == Z_STREAM_END) - break; - } -} - -static void -WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen) -{ - cs->zp->next_in = (void *) unconstify(char *, data); - cs->zp->avail_in = dLen; - DeflateCompressorZlib(AH, cs, false); -} - -static void -ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF) -{ - z_streamp zp; - char *out; - int res = Z_OK; - size_t cnt; - char *buf; - size_t buflen; - - zp = (z_streamp) pg_malloc(sizeof(z_stream)); - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - buf = pg_malloc(ZLIB_IN_SIZE); - buflen = ZLIB_IN_SIZE; - - out = pg_malloc(ZLIB_OUT_SIZE + 1); - - if (inflateInit(zp) != Z_OK) - pg_fatal("could not initialize compression library: %s", - zp->msg); - - /* no minimal chunk size for zlib */ - while ((cnt = readF(AH, &buf, &buflen))) - { - zp->next_in = (void *) buf; - zp->avail_in = cnt; - - while (zp->avail_in > 0) - { - zp->next_out = (void *) out; - zp->avail_out = ZLIB_OUT_SIZE; - - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - pg_fatal("could not uncompress data: %s", zp->msg); - - out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; - ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); - } - } - - zp->next_in = NULL; - zp->avail_in = 0; - while (res != Z_STREAM_END) - { - zp->next_out = (void *) out; - zp->avail_out = ZLIB_OUT_SIZE; - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - pg_fatal("could not uncompress data: %s", zp->msg); - - out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; - ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); - } - - if (inflateEnd(zp) != Z_OK) - pg_fatal("could not close compression library: %s", zp->msg); - - free(buf); - free(out); - free(zp); -} -#endif /* HAVE_LIBZ */ - - -/* - * Functions for uncompressed output. - */ - -static void -ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF) -{ - size_t cnt; - char *buf; - size_t buflen; - - buf = pg_malloc(ZLIB_OUT_SIZE); - buflen = ZLIB_OUT_SIZE; - - while ((cnt = readF(AH, &buf, &buflen))) - { - ahwrite(buf, 1, cnt, AH); - } - - free(buf); -} - -static void -WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen) -{ - cs->writeF(AH, data, dLen); -} - - /*---------------------- * Compressed stream API *---------------------- */ -/* - * cfp represents an open stream, wrapping the underlying FILE or gzFile - * pointer. This is opaque to the callers. - */ -struct cfp +static int +hasSuffix(const char *filename, const char *suffix) { - pg_compress_specification compress_spec; - void *fp; -}; + int filenamelen = strlen(filename); + int suffixlen = strlen(suffix); -#ifdef HAVE_LIBZ -static int hasSuffix(const char *filename, const char *suffix); -#endif + if (filenamelen < suffixlen) + return 0; + + return memcmp(&filename[filenamelen - suffixlen], + suffix, + suffixlen) == 0; +} /* free() without changing errno; useful in several places below */ static void @@ -434,328 +179,142 @@ free_keep_errno(void *p) } /* - * Open a file for reading. 'path' is the file to open, and 'mode' should - * be either "r" or "rb". - * - * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path' - * doesn't already have it) and try again. So if you pass "foo" as 'path', - * this will open either "foo" or "foo.gz". - * - * On failure, return NULL with an error code in errno. + * Compression None implementation */ -cfp * -cfopen_read(const char *path, const char *mode) +static size_t +_read(void *ptr, size_t size, CompressFileHandle * CFH) { - cfp *fp; - pg_compress_specification compress_spec = {0}; + FILE *fp = (FILE *) CFH->private; + size_t ret; - compress_spec.algorithm = PG_COMPRESSION_GZIP; -#ifdef HAVE_LIBZ - if (hasSuffix(path, ".gz")) - fp = cfopen(path, mode, compress_spec); - else -#endif - { - compress_spec.algorithm = PG_COMPRESSION_NONE; - fp = cfopen(path, mode, compress_spec); -#ifdef HAVE_LIBZ - if (fp == NULL) - { - char *fname; + if (size == 0) + return 0; - compress_spec.algorithm = PG_COMPRESSION_GZIP; - fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, compress_spec); - free_keep_errno(fname); - } -#endif - } - return fp; -} + ret = fread(ptr, 1, size, fp); + if (ret != size && !feof(fp)) + pg_fatal("could not read from input file: %s", + strerror(errno)); -/* - * Open a file for writing. 'path' indicates the path name, and 'mode' must - * be a filemode as accepted by fopen() and gzopen() that indicates writing - * ("w", "wb", "a", or "ab"). - * - * If 'compress_spec.algorithm' is GZIP, a gzip compressed stream is opened, - * and 'compress_spec.level' used. The ".gz" suffix is automatically added to - * 'path' in that case. - * - * On failure, return NULL with an error code in errno. - */ -cfp * -cfopen_write(const char *path, const char *mode, - const pg_compress_specification compress_spec) -{ - cfp *fp; - - if (compress_spec.algorithm == PG_COMPRESSION_NONE) - fp = cfopen(path, mode, compress_spec); - else - { -#ifdef HAVE_LIBZ - char *fname; - - fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, compress_spec); - free_keep_errno(fname); -#else - pg_fatal("not built with zlib support"); - fp = NULL; /* keep compiler quiet */ -#endif - } - return fp; + return ret; } -/* - * This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or - * associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The - * descriptor is not dup'ed and it is the caller's responsibility to do so. - * The caller must verify that the 'compress_algorithm' is supported by the - * current build. - * - * On failure, return NULL with an error code in errno. - */ -static cfp * -cfopen_internal(const char *path, int fd, const char *mode, - pg_compress_specification compress_spec) +static size_t +_write(const void *ptr, size_t size, CompressFileHandle * CFH) { - cfp *fp = pg_malloc(sizeof(cfp)); - - fp->compress_spec = compress_spec; - - switch (compress_spec.algorithm) - { - case PG_COMPRESSION_NONE: - if (fd >= 0) - fp->fp = fdopen(fd, mode); - else - fp->fp = fopen(path, mode); - if (fp->fp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } - - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - if (compress_spec.level != Z_DEFAULT_COMPRESSION) - { - /* - * user has specified a compression level, so tell zlib to use - * it - */ - char mode_compression[32]; - - snprintf(mode_compression, sizeof(mode_compression), "%s%d", - mode, compress_spec.level); - if (fd >= 0) - fp->fp = gzdopen(fd, mode_compression); - else - fp->fp = gzopen(path, mode_compression); - } - else - { - /* don't specify a level, just use the zlib default */ - if (fd >= 0) - fp->fp = gzdopen(fd, mode); - else - fp->fp = gzopen(path, mode); - } - - if (fp->fp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } - - return fp; + return fwrite(ptr, 1, size, (FILE *) CFH->private); } -cfp * -cfopen(const char *path, const char *mode, - const pg_compress_specification compress_spec) +static const char * +_get_error(CompressFileHandle * CFH) { - return cfopen_internal(path, -1, mode, compress_spec); + return strerror(errno); } -cfp * -cfdopen(int fd, const char *mode, - const pg_compress_specification compress_spec) +static char * +_gets(char *ptr, int size, CompressFileHandle * CFH) { - return cfopen_internal(NULL, fd, mode, compress_spec); + return fgets(ptr, size, (FILE *) CFH->private); } -int -cfread(void *ptr, int size, cfp *fp) +static int +_getc(CompressFileHandle * CFH) { - int ret = 0; - - if (size == 0) - return 0; + FILE *fp = (FILE *) CFH->private; + int ret; - switch (fp->compress_spec.algorithm) + ret = fgetc(fp); + if (ret == EOF) { - case PG_COMPRESSION_NONE: - ret = fread(ptr, 1, size, fp->fp); - if (ret != size && !feof(fp->fp)) - READ_ERROR_EXIT(fp->fp); - - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzread(fp->fp, ptr, size); - if (ret != size && !gzeof(fp->fp)) - { - int errnum; - const char *errmsg = gzerror(fp->fp, &errnum); - - pg_fatal("could not read from input file: %s", - errnum == Z_ERRNO ? strerror(errno) : errmsg); - } -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; + if (!feof(fp)) + pg_fatal("could not read from input file: %s", strerror(errno)); + else + pg_fatal("could not read from input file: end of file"); } return ret; } -int -cfwrite(const void *ptr, int size, cfp *fp) +static int +_close(CompressFileHandle * CFH) { + FILE *fp = (FILE *) CFH->private; int ret = 0; - switch (fp->compress_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fwrite(ptr, 1, size, fp->fp); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzwrite(fp->fp, ptr, size); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + CFH->private = NULL; + + if (fp) + ret = fclose(fp); return ret; } -int -cfgetc(cfp *fp) +static int +_eof(CompressFileHandle * CFH) { - int ret = 0; + return feof((FILE *) CFH->private); +} - switch (fp->compress_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fgetc(fp->fp); - if (ret == EOF) - READ_ERROR_EXIT(fp->fp); +static int +_open(const char *path, int fd, const char *mode, CompressFileHandle * CFH) +{ + Assert(CFH->private == NULL); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzgetc((gzFile) fp->fp); - if (ret == EOF) - { - if (!gzeof(fp->fp)) - pg_fatal("could not read from input file: %s", strerror(errno)); - else - pg_fatal("could not read from input file: end of file"); - } -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + if (fd >= 0) + CFH->private = fdopen(dup(fd), mode); + else + CFH->private = fopen(path, mode); - return ret; + if (CFH->private == NULL) + return 1; + + return 0; } -char * -cfgets(cfp *fp, char *buf, int len) +static int +_open_write(const char *path, const char *mode, CompressFileHandle * CFH) { - char *ret = NULL; + Assert(CFH->private == NULL); - switch (fp->compress_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fgets(buf, len, fp->fp); + CFH->private = fopen(path, mode); + if (CFH->private == NULL) + return 1; - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzgets(fp->fp, buf, len); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + return 0; +} - return ret; +static void +InitCompressNone(CompressFileHandle * CFH) +{ + CFH->open = _open; + CFH->open_write = _open_write; + CFH->read = _read; + CFH->write = _write; + CFH->gets = _gets; + CFH->getc = _getc; + CFH->close = _close; + CFH->eof = _eof; + CFH->get_error = _get_error; + + CFH->private = NULL; } -int -cfclose(cfp *fp) +/* + * Public interface + */ +CompressFileHandle * +InitCompressFileHandle(const pg_compress_specification compress_spec) { - int ret = 0; + CompressFileHandle *CFH; - if (fp == NULL) - { - errno = EBADF; - return EOF; - } + CFH = pg_malloc0(sizeof(CompressFileHandle)); - switch (fp->compress_spec.algorithm) + switch (compress_spec.algorithm) { case PG_COMPRESSION_NONE: - ret = fclose(fp->fp); - fp->fp = NULL; - + InitCompressNone(CFH); break; case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzclose(fp->fp); - fp->fp = NULL; -#else - pg_fatal("not built with zlib support"); -#endif + InitCompressGzip(CFH, compress_spec.level); break; case PG_COMPRESSION_LZ4: /* fallthrough */ @@ -764,71 +323,77 @@ cfclose(cfp *fp) break; } - free_keep_errno(fp); - - return ret; + return CFH; } -int -cfeof(cfp *fp) +/* + * Open a file for reading. 'path' is the file to open, and 'mode' should + * be either "r" or "rb". + * + * If the file at 'path' does not exist, we append the ".gz" suffix (if + * 'path' doesn't already have it) and try again. So if you pass "foo" as + * 'path', this will open either "foo" or "foo.gz", trying in that order. + * + * On failure, return NULL with an error code in errno. + * + */ +CompressFileHandle * +InitDiscoverCompressFileHandle(const char *path, const char *mode) { - int ret = 0; + CompressFileHandle *CFH = NULL; + struct stat st; + char *fname; + pg_compress_specification compress_spec = {0}; - switch (fp->compress_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = feof(fp->fp); + compress_spec.algorithm = PG_COMPRESSION_NONE; - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzeof(fp->fp); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + Assert(strcmp(mode, "r") == 0 || strcmp(mode, "rb") == 0); - return ret; -} + fname = strdup(path); -const char * -get_cfp_error(cfp *fp) -{ - if (fp->compress_spec.algorithm == PG_COMPRESSION_GZIP) + if (hasSuffix(fname, ".gz")) + compress_spec.algorithm = PG_COMPRESSION_GZIP; + else { + bool exists; + + exists = (stat(path, &st) == 0); + /* avoid unused warning if it is not build with compression */ + if (exists) + compress_spec.algorithm = PG_COMPRESSION_NONE; #ifdef HAVE_LIBZ - int errnum; - const char *errmsg = gzerror(fp->fp, &errnum); + if (!exists) + { + free_keep_errno(fname); + fname = psprintf("%s.gz", path); + exists = (stat(fname, &st) == 0); - if (errnum != Z_ERRNO) - return errmsg; -#else - pg_fatal("not built with zlib support"); + if (exists) + compress_spec.algorithm = PG_COMPRESSION_GZIP; + } #endif } - return strerror(errno); + CFH = InitCompressFileHandle(compress_spec); + if (CFH->open(fname, -1, mode, CFH)) + { + free_keep_errno(CFH); + CFH = NULL; + } + free_keep_errno(fname); + + return CFH; } -#ifdef HAVE_LIBZ -static int -hasSuffix(const char *filename, const char *suffix) +int +DestroyCompressFileHandle(CompressFileHandle * CFH) { - int filenamelen = strlen(filename); - int suffixlen = strlen(suffix); + int ret = 0; - if (filenamelen < suffixlen) - return 0; + if (CFH->private) + ret = CFH->close(CFH); - return memcmp(&filename[filenamelen - suffixlen], - suffix, - suffixlen) == 0; -} + free_keep_errno(CFH); -#endif + return ret; +} diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index d6335fff02..a986f5e6ee 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -37,34 +37,61 @@ typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len); */ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen); -/* struct definition appears in compress_io.c */ typedef struct CompressorState CompressorState; +struct CompressorState +{ + /* + * Read all compressed data from the input stream (via readF) and print it + * out with ahwrite(). + */ + void (*readData) (ArchiveHandle *AH, CompressorState *cs); + + /* + * Compress and write data to the output stream (via writeF). + */ + void (*writeData) (ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen); + void (*end) (ArchiveHandle *AH, CompressorState *cs); + + ReadFunc readF; + WriteFunc writeF; + + void *private; +}; extern CompressorState *AllocateCompressor(const pg_compress_specification compress_spec, + ReadFunc readF, WriteFunc writeF); -extern void ReadDataFromArchive(ArchiveHandle *AH, - const pg_compress_specification compress_spec, - ReadFunc readF); -extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, - const void *data, size_t dLen); extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); +/* + * Compress File Handle + */ +typedef struct CompressFileHandle CompressFileHandle; + +struct CompressFileHandle +{ + int (*open) (const char *path, int fd, const char *mode, + CompressFileHandle * CFH); + int (*open_write) (const char *path, const char *mode, + CompressFileHandle * cxt); + size_t (*read) (void *ptr, size_t size, CompressFileHandle * CFH); + size_t (*write) (const void *ptr, size_t size, + struct CompressFileHandle *CFH); + char *(*gets) (char *s, int size, CompressFileHandle * CFH); + int (*getc) (CompressFileHandle * CFH); + int (*eof) (CompressFileHandle * CFH); + int (*close) (CompressFileHandle * CFH); + const char *(*get_error) (CompressFileHandle * CFH); + + void *private; +}; + -typedef struct cfp cfp; +extern CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compress_spec); -extern cfp *cfopen(const char *path, const char *mode, - const pg_compress_specification compress_spec); -extern cfp *cfdopen(int fd, const char *mode, - pg_compress_specification compress_spec); -extern cfp *cfopen_read(const char *path, const char *mode); -extern cfp *cfopen_write(const char *path, const char *mode, - const pg_compress_specification compress_spec); -extern int cfread(void *ptr, int size, cfp *fp); -extern int cfwrite(const void *ptr, int size, cfp *fp); -extern int cfgetc(cfp *fp); -extern char *cfgets(cfp *fp, char *buf, int len); -extern int cfclose(cfp *fp); -extern int cfeof(cfp *fp); -extern const char *get_cfp_error(cfp *fp); +extern CompressFileHandle * InitDiscoverCompressFileHandle(const char *path, + const char *mode); +extern int DestroyCompressFileHandle(CompressFileHandle * CFH); #endif diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build index d96e566846..0c73a4707e 100644 --- a/src/bin/pg_dump/meson.build +++ b/src/bin/pg_dump/meson.build @@ -1,5 +1,6 @@ pg_dump_common_sources = files( 'compress_io.c', + 'compress_gzip.c', 'dumputils.c', 'parallel.c', 'pg_backup_archiver.c', diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 92a160b67e..248646143d 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -95,8 +95,8 @@ static void dump_lo_buf(ArchiveHandle *AH); static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compress_spec); -static cfp *SaveOutput(ArchiveHandle *AH); -static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput); +static CompressFileHandle * SaveOutput(ArchiveHandle *AH); +static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle * savedOutput); static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel); static void restore_toc_entries_prefork(ArchiveHandle *AH, @@ -272,7 +272,7 @@ CloseArchive(Archive *AHX) /* Close the output */ errno = 0; - res = cfclose(AH->OF); + res = DestroyCompressFileHandle(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); @@ -355,7 +355,7 @@ RestoreArchive(Archive *AHX) bool parallel_mode; bool supports_compression; TocEntry *te; - cfp *sav; + CompressFileHandle *sav; AH->stage = STAGE_INITIALIZING; @@ -1127,7 +1127,7 @@ PrintTOCSummary(Archive *AHX) TocEntry *te; pg_compress_specification out_compress_spec = {0}; teSection curSection; - cfp *sav; + CompressFileHandle *sav; const char *fmtName; char stamp_str[64]; @@ -1503,6 +1503,7 @@ static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compress_spec) { + CompressFileHandle *CFH; const char *mode; int fn = -1; @@ -1525,33 +1526,32 @@ SetOutput(ArchiveHandle *AH, const char *filename, else mode = PG_BINARY_W; - if (fn >= 0) - AH->OF = cfdopen(dup(fn), mode, compress_spec); - else - AH->OF = cfopen(filename, mode, compress_spec); + CFH = InitCompressFileHandle(compress_spec); - if (!AH->OF) + if (CFH->open(filename, fn, mode, CFH)) { if (filename) pg_fatal("could not open output file \"%s\": %m", filename); else pg_fatal("could not open output file: %m"); } + + AH->OF = CFH; } -static cfp * +static CompressFileHandle * SaveOutput(ArchiveHandle *AH) { - return (cfp *) AH->OF; + return (CompressFileHandle *) AH->OF; } static void -RestoreOutput(ArchiveHandle *AH, cfp *savedOutput) +RestoreOutput(ArchiveHandle *AH, CompressFileHandle * savedOutput) { int res; errno = 0; - res = cfclose(AH->OF); + res = DestroyCompressFileHandle(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); @@ -1690,7 +1690,11 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) else if (RestoringToDB(AH)) bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb); else - bytes_written = cfwrite(ptr, size * nmemb, AH->OF); + { + CompressFileHandle *CFH = (CompressFileHandle *) AH->OF; + + bytes_written = CFH->write(ptr, size * nmemb, CFH); + } if (bytes_written != size * nmemb) WRITE_ERROR_EXIT; @@ -2032,6 +2036,18 @@ ReadStr(ArchiveHandle *AH) return buf; } +static bool +_fileExistsInDirectory(const char *dir, const char *filename) +{ + struct stat st; + char buf[MAXPGPATH]; + + if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH) + pg_fatal("directory name too long: \"%s\"", dir); + + return (stat(buf, &st) == 0 && S_ISREG(st.st_mode)); +} + static int _discoverArchiveFormat(ArchiveHandle *AH) { @@ -2062,26 +2078,12 @@ _discoverArchiveFormat(ArchiveHandle *AH) */ if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode)) { - char buf[MAXPGPATH]; - - if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH) - pg_fatal("directory name too long: \"%s\"", - AH->fSpec); - if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) - { - AH->format = archDirectory; + AH->format = archDirectory; + if (_fileExistsInDirectory(AH->fSpec, "toc.dat")) return AH->format; - } - #ifdef HAVE_LIBZ - if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH) - pg_fatal("directory name too long: \"%s\"", - AH->fSpec); - if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) - { - AH->format = archDirectory; + if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz")) return AH->format; - } #endif pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)", AH->fSpec); @@ -2179,6 +2181,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, SetupWorkerPtrType setupWorkerPtr) { ArchiveHandle *AH; + CompressFileHandle *CFH; pg_compress_specification out_compress_spec = {0}; pg_log_debug("allocating AH for %s, format %d", @@ -2234,7 +2237,10 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, /* Open stdout with no compression for AH output handle */ out_compress_spec.algorithm = PG_COMPRESSION_NONE; - AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec); + CFH = InitCompressFileHandle(out_compress_spec); + if (CFH->open(NULL, fileno(stdout), PG_BINARY_A, CFH)) + pg_fatal("could not open stdout for appending: %m"); + AH->OF = CFH; /* * On Windows, we need to use binary mode to read/write non-text files, @@ -3647,7 +3653,7 @@ WriteHead(ArchiveHandle *AH) AH->WriteBytePtr(AH, AH->intSize); AH->WriteBytePtr(AH, AH->offSize); AH->WriteBytePtr(AH, AH->format); - WriteInt(AH, AH->compress_spec.level); + AH->WriteBytePtr(AH, AH->compress_spec.algorithm); crtm = *localtime(&AH->createDate); WriteInt(AH, crtm.tm_sec); WriteInt(AH, crtm.tm_min); @@ -3719,7 +3725,9 @@ ReadHead(ArchiveHandle *AH) AH->format, fmt); AH->compress_spec.algorithm = PG_COMPRESSION_NONE; - if (AH->version >= K_VERS_1_2) + if (AH->version >= K_VERS_1_15) + AH->compress_spec.algorithm = AH->ReadBytePtr(AH); + else if (AH->version >= K_VERS_1_2) { if (AH->version < K_VERS_1_4) AH->compress_spec.level = AH->ReadBytePtr(AH); @@ -3732,11 +3740,20 @@ ReadHead(ArchiveHandle *AH) else AH->compress_spec.algorithm = PG_COMPRESSION_GZIP; + if (AH->compress_spec.algorithm != PG_COMPRESSION_NONE) + { + bool unsupported = false; + #ifndef HAVE_LIBZ - if (AH->compress_spec.algorithm == PG_COMPRESSION_GZIP) - pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available"); + if (AH->compress_spec.algorithm == PG_COMPRESSION_GZIP) + unsupported = true; #endif - + if (unsupported) + { + pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available"); + AH->compress_spec.algorithm = PG_COMPRESSION_NONE; + } + } if (AH->version >= K_VERS_1_4) { diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index d2930949ab..bb7fad2af1 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -65,10 +65,12 @@ #define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0) /* change search_path * behavior */ #define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0) /* add tableam */ +#define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0) /* add compression_algorithm + * in header */ /* Current archive version number (the format we can output) */ #define K_VERS_MAJOR 1 -#define K_VERS_MINOR 14 +#define K_VERS_MINOR 15 #define K_VERS_REV 0 #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV) diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 6a2112c45f..49ec0e3816 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -298,7 +298,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - ctx->cs = AllocateCompressor(AH->compress_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(AH->compress_spec, + NULL, + _CustomWriteFunc); } /* @@ -317,15 +319,15 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) CompressorState *cs = ctx->cs; if (dLen > 0) - /* WriteDataToArchive() internally throws write errors */ - WriteDataToArchive(AH, cs, data, dLen); + /* writeData() internally throws write errors */ + cs->writeData(AH, cs, data, dLen); } /* * Called by the archiver when a dumper's 'DataDumper' routine has * finished. * - * Optional. + * Mandatory. */ static void _EndData(ArchiveHandle *AH, TocEntry *te) @@ -333,6 +335,8 @@ _EndData(ArchiveHandle *AH, TocEntry *te) lclContext *ctx = (lclContext *) AH->formatData; EndCompressor(AH, ctx->cs); + ctx->cs = NULL; + /* Send the end marker */ WriteInt(AH, 0); } @@ -377,7 +381,9 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) WriteInt(AH, oid); - ctx->cs = AllocateCompressor(AH->compress_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(AH->compress_spec, + NULL, + _CustomWriteFunc); } /* @@ -566,7 +572,12 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) static void _PrintData(ArchiveHandle *AH) { - ReadDataFromArchive(AH, AH->compress_spec, _CustomReadFunc); + CompressorState *cs; + + cs = AllocateCompressor(AH->compress_spec, + _CustomReadFunc, NULL); + cs->readData(AH, cs); + EndCompressor(AH, cs); } static void diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 7d2cddbb2c..e1ce2f393b 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -50,9 +50,9 @@ typedef struct */ char *directory; - cfp *dataFH; /* currently open data file */ + CompressFileHandle *dataFH; /* currently open data file */ - cfp *blobsTocFH; /* file handle for blobs.toc */ + CompressFileHandle *blobsTocFH; /* file handle for blobs.toc */ ParallelState *pstate; /* for parallel backup / restore */ } lclContext; @@ -198,11 +198,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) else { /* Read Mode */ char fname[MAXPGPATH]; - cfp *tocFH; + CompressFileHandle *tocFH; setFilePath(AH, fname, "toc.dat"); - tocFH = cfopen_read(fname, PG_BINARY_R); + tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R); if (tocFH == NULL) pg_fatal("could not open input file \"%s\": %m", fname); @@ -218,7 +218,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) ReadToc(AH); /* Nothing else in the file, so close it again... */ - if (cfclose(tocFH) != 0) + if (DestroyCompressFileHandle(tocFH) != 0) pg_fatal("could not close TOC file: %m"); ctx->dataFH = NULL; } @@ -327,9 +327,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, - AH->compress_spec); - if (ctx->dataFH == NULL) + ctx->dataFH = InitCompressFileHandle(AH->compress_spec); + + if (ctx->dataFH->open_write(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -346,15 +346,16 @@ static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen) + if (dLen > 0 && CFH->write(data, dLen, CFH) != dLen) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error(CFH)); } } @@ -370,7 +371,7 @@ _EndData(ArchiveHandle *AH, TocEntry *te) lclContext *ctx = (lclContext *) AH->formatData; /* Close the file */ - if (cfclose(ctx->dataFH) != 0) + if (DestroyCompressFileHandle(ctx->dataFH) != 0) pg_fatal("could not close data file: %m"); ctx->dataFH = NULL; @@ -385,26 +386,25 @@ _PrintFileData(ArchiveHandle *AH, char *filename) size_t cnt; char *buf; size_t buflen; - cfp *cfp; + CompressFileHandle *CFH; if (!filename) return; - cfp = cfopen_read(filename, PG_BINARY_R); - - if (!cfp) + CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R); + if (!CFH) pg_fatal("could not open input file \"%s\": %m", filename); buf = pg_malloc(ZLIB_OUT_SIZE); buflen = ZLIB_OUT_SIZE; - while ((cnt = cfread(buf, buflen, cfp))) + while ((cnt = CFH->read(buf, buflen, CFH))) { ahwrite(buf, 1, cnt, AH); } free(buf); - if (cfclose(cfp) != 0) + if (DestroyCompressFileHandle(CFH) != 0) pg_fatal("could not close data file \"%s\": %m", filename); } @@ -435,6 +435,7 @@ _LoadBlobs(ArchiveHandle *AH) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH; char tocfname[MAXPGPATH]; char line[MAXPGPATH]; @@ -442,14 +443,14 @@ _LoadBlobs(ArchiveHandle *AH) setFilePath(AH, tocfname, "blobs.toc"); - ctx->blobsTocFH = cfopen_read(tocfname, PG_BINARY_R); + CFH = ctx->blobsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R); if (ctx->blobsTocFH == NULL) pg_fatal("could not open large object TOC file \"%s\" for input: %m", tocfname); /* Read the blobs TOC file line-by-line, and process each blob */ - while ((cfgets(ctx->blobsTocFH, line, MAXPGPATH)) != NULL) + while ((CFH->gets(line, MAXPGPATH, CFH)) != NULL) { char blobfname[MAXPGPATH + 1]; char path[MAXPGPATH]; @@ -464,11 +465,11 @@ _LoadBlobs(ArchiveHandle *AH) _PrintFileData(AH, path); EndRestoreBlob(AH, oid); } - if (!cfeof(ctx->blobsTocFH)) + if (!CFH->eof(CFH)) pg_fatal("error reading large object TOC file \"%s\"", tocfname); - if (cfclose(ctx->blobsTocFH) != 0) + if (DestroyCompressFileHandle(ctx->blobsTocFH) != 0) pg_fatal("could not close large object TOC file \"%s\": %m", tocfname); @@ -488,15 +489,16 @@ _WriteByte(ArchiveHandle *AH, const int i) { unsigned char c = (unsigned char) i; lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (cfwrite(&c, 1, ctx->dataFH) != 1) + if (CFH->write(&c, 1, CFH) != 1) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error(CFH)); } return 1; @@ -512,8 +514,9 @@ static int _ReadByte(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; - return cfgetc(ctx->dataFH); + return CFH->getc(CFH); } /* @@ -524,15 +527,16 @@ static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (cfwrite(buf, len, ctx->dataFH) != len) + if (CFH->write(buf, len, CFH) != len) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error(CFH)); } } @@ -545,12 +549,13 @@ static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; /* - * If there was an I/O error, we already exited in cfread(), so here we + * If there was an I/O error, we already exited in readF(), so here we * exit on short reads. */ - if (cfread(buf, len, ctx->dataFH) != len) + if (CFH->read(buf, len, CFH) != len) pg_fatal("could not read from input file: end of file"); } @@ -573,7 +578,7 @@ _CloseArchive(ArchiveHandle *AH) if (AH->mode == archModeWrite) { - cfp *tocFH; + CompressFileHandle *tocFH; pg_compress_specification compress_spec = {0}; char fname[MAXPGPATH]; @@ -584,8 +589,8 @@ _CloseArchive(ArchiveHandle *AH) /* The TOC is always created uncompressed */ compress_spec.algorithm = PG_COMPRESSION_NONE; - tocFH = cfopen_write(fname, PG_BINARY_W, compress_spec); - if (tocFH == NULL) + tocFH = InitCompressFileHandle(compress_spec); + if (tocFH->open_write(fname, PG_BINARY_W, tocFH)) pg_fatal("could not open output file \"%s\": %m", fname); ctx->dataFH = tocFH; @@ -598,7 +603,7 @@ _CloseArchive(ArchiveHandle *AH) WriteHead(AH); AH->format = archDirectory; WriteToc(AH); - if (cfclose(tocFH) != 0) + if (DestroyCompressFileHandle(tocFH) != 0) pg_fatal("could not close TOC file: %m"); WriteDataChunks(AH, ctx->pstate); @@ -649,8 +654,8 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te) /* The blob TOC file is never compressed */ compress_spec.algorithm = PG_COMPRESSION_NONE; - ctx->blobsTocFH = cfopen_write(fname, "ab", compress_spec); - if (ctx->blobsTocFH == NULL) + ctx->blobsTocFH = InitCompressFileHandle(compress_spec); + if (ctx->blobsTocFH->open_write(fname, "ab", ctx->blobsTocFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -667,9 +672,8 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compress_spec); - - if (ctx->dataFH == NULL) + ctx->dataFH = InitCompressFileHandle(AH->compress_spec); + if (ctx->dataFH->open_write(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -682,17 +686,18 @@ static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->blobsTocFH; char buf[50]; int len; /* Close the BLOB data file itself */ - if (cfclose(ctx->dataFH) != 0) + if (DestroyCompressFileHandle(ctx->dataFH) != 0) pg_fatal("could not close blob data file: %m"); ctx->dataFH = NULL; /* register the blob in blobs.toc */ len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid); - if (cfwrite(buf, len, ctx->blobsTocFH) != len) + if (CFH->write(buf, len, CFH) != len) pg_fatal("could not write to blobs TOC file"); } @@ -706,7 +711,7 @@ _EndBlobs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; - if (cfclose(ctx->blobsTocFH) != 0) + if (DestroyCompressFileHandle(ctx->blobsTocFH) != 0) pg_fatal("could not close blobs TOC file: %m"); ctx->blobsTocFH = NULL; } -- 2.34.1