From 4e5590699a417a080b784429f26056274bf1e142 Mon Sep 17 00:00:00 2001 From: Georgios Kokolatos Date: Fri, 2 Dec 2022 16:03:05 +0000 Subject: [PATCH v16 3/4] Introduce Compressor API in pg_dump The purpose of this API is to allow for easier addition of new compression methods. CompressFileHandle is substituting the cfp* family of functions under a struct of function pointers for opening, writing, etc. The implementor of a new compression method is now able to "simply" just add those definitions. Custom compressed archives now need to store the compression algorithm in their header. This requires a bump in the version number. The level of compression is no longer stored in the dump as it is irrelevant. --- src/bin/pg_dump/Makefile | 1 + src/bin/pg_dump/compress_gzip.c | 390 ++++++++++++ src/bin/pg_dump/compress_gzip.h | 9 + src/bin/pg_dump/compress_io.c | 839 ++++++-------------------- src/bin/pg_dump/compress_io.h | 68 ++- src/bin/pg_dump/meson.build | 1 + src/bin/pg_dump/pg_backup_archiver.c | 102 ++-- src/bin/pg_dump/pg_backup_archiver.h | 4 +- src/bin/pg_dump/pg_backup_custom.c | 23 +- src/bin/pg_dump/pg_backup_directory.c | 94 +-- src/bin/pg_dump/t/002_pg_dump.pl | 6 +- 11 files changed, 777 insertions(+), 760 deletions(-) create mode 100644 src/bin/pg_dump/compress_gzip.c create mode 100644 src/bin/pg_dump/compress_gzip.h diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index 9dc5a784dd..29eab02d37 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -24,6 +24,7 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport) OBJS = \ $(WIN32RES) \ + compress_gzip.o \ compress_io.o \ dumputils.o \ parallel.o \ diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c new file mode 100644 index 0000000000..bc6d1abc77 --- /dev/null +++ b/src/bin/pg_dump/compress_gzip.c @@ -0,0 +1,390 @@ +#include "postgres_fe.h" +#include "pg_backup_utils.h" + +#include "compress_gzip.h" + +#ifdef HAVE_LIBZ +#include "zlib.h" +/*---------------------- + * Compressor API + *---------------------- + */ +typedef struct GzipCompressorState +{ + int compressionLevel; + z_streamp zp; + + void *outbuf; + size_t outsize; +} GzipCompressorState; + +/* Private routines that support gzip compressed data I/O */ +static void +DeflateCompressorGzip(ArchiveHandle *AH, CompressorState *cs, bool flush) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private; + z_streamp zp = gzipcs->zp; + void *out = gzipcs->outbuf; + int res = Z_OK; + + while (gzipcs->zp->avail_in != 0 || flush) + { + res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); + if (res == Z_STREAM_ERROR) + pg_fatal("could not compress data: %s", zp->msg); + if ((flush && (zp->avail_out < gzipcs->outsize)) + || (zp->avail_out == 0) + || (zp->avail_in != 0) + ) + { + /* + * Extra paranoia: avoid zero-length chunks, since a zero length + * chunk is the EOF marker in the custom format. This should never + * happen but... + */ + if (zp->avail_out < gzipcs->outsize) + { + /* + * Any write function should do its own error checking but to + * make sure we do a check here as well... + */ + size_t len = gzipcs->outsize - zp->avail_out; + + cs->writeF(AH, (char *) out, len); + } + zp->next_out = out; + zp->avail_out = gzipcs->outsize; + } + + if (res == Z_STREAM_END) + break; + } +} + +static void +EndCompressorGzip(ArchiveHandle *AH, CompressorState *cs) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private; + z_streamp zp; + + if (gzipcs->zp) + { + zp = gzipcs->zp; + zp->next_in = NULL; + zp->avail_in = 0; + + /* Flush any remaining data from zlib buffer */ + DeflateCompressorGzip(AH, cs, true); + + if (deflateEnd(zp) != Z_OK) + pg_fatal("could not close compression stream: %s", zp->msg); + + pg_free(gzipcs->outbuf); + pg_free(gzipcs->zp); + } + + pg_free(gzipcs); + cs->private = NULL; +} + +static void +WriteDataToArchiveGzip(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private; + z_streamp zp; + + if (!gzipcs->zp) + { + zp = gzipcs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + /* + * outsize is the buffer size we tell zlib it can output to. We + * actually allocate one extra byte because some routines want to + * append a trailing zero byte to the zlib output. + */ + gzipcs->outbuf = pg_malloc(ZLIB_OUT_SIZE + 1); + gzipcs->outsize = ZLIB_OUT_SIZE; + + if (deflateInit(zp, gzipcs->compressionLevel) != Z_OK) + pg_fatal("could not initialize compression library: %s", zp->msg); + + /* Just be paranoid - maybe End is called after Start, with no Write */ + zp->next_out = gzipcs->outbuf; + zp->avail_out = gzipcs->outsize; + } + + gzipcs->zp->next_in = (void *) unconstify(void *, data); + gzipcs->zp->avail_in = dLen; + DeflateCompressorGzip(AH, cs, false); +} + +static void +ReadDataFromArchiveGzip(ArchiveHandle *AH, CompressorState *cs) +{ + z_streamp zp; + char *out; + int res = Z_OK; + size_t cnt; + char *buf; + size_t buflen; + + zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + buf = pg_malloc(ZLIB_IN_SIZE); + buflen = ZLIB_IN_SIZE; + + out = pg_malloc(ZLIB_OUT_SIZE + 1); + + if (inflateInit(zp) != Z_OK) + pg_fatal("could not initialize compression library: %s", + zp->msg); + + /* no minimal chunk size for zlib */ + while ((cnt = cs->readF(AH, &buf, &buflen))) + { + zp->next_in = (void *) buf; + zp->avail_in = cnt; + + while (zp->avail_in > 0) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + pg_fatal("could not uncompress data: %s", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + } + + zp->next_in = NULL; + zp->avail_in = 0; + while (res != Z_STREAM_END) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + pg_fatal("could not uncompress data: %s", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + + if (inflateEnd(zp) != Z_OK) + pg_fatal("could not close compression library: %s", zp->msg); + + free(buf); + free(out); + free(zp); +} + +/* Public routines that support gzip compressed data I/O */ +void +InitCompressorGzip(CompressorState *cs, int compressionLevel) +{ + GzipCompressorState *gzipcs; + + cs->readData = ReadDataFromArchiveGzip; + cs->writeData = WriteDataToArchiveGzip; + cs->end = EndCompressorGzip; + + gzipcs = (GzipCompressorState *) pg_malloc0(sizeof(GzipCompressorState)); + gzipcs->compressionLevel = compressionLevel; + + cs->private = gzipcs; +} + + +/*---------------------- + * Compress File API + *---------------------- + */ + +typedef struct GzipData +{ + gzFile fp; + int compressionLevel; +} GzipData; + +static size_t +Gzip_read(void *ptr, size_t size, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + size_t ret; + + ret = gzread(gd->fp, ptr, size); + if (ret != size && !gzeof(gd->fp)) + { + int errnum; + const char *errmsg = gzerror(gd->fp, &errnum); + + pg_fatal("could not read from input file: %s", + errnum == Z_ERRNO ? strerror(errno) : errmsg); + } + + return ret; +} + +static size_t +Gzip_write(const void *ptr, size_t size, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + + return gzwrite(gd->fp, ptr, size); +} + +static int +Gzip_getc(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + int ret; + + errno = 0; + ret = gzgetc(gd->fp); + if (ret == EOF) + { + if (!gzeof(gd->fp)) + pg_fatal("could not read from input file: %s", strerror(errno)); + else + pg_fatal("could not read from input file: end of file"); + } + + return ret; +} + +static char * +Gzip_gets(char *ptr, int size, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + + return gzgets(gd->fp, ptr, size); +} + +static int +Gzip_close(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + int save_errno; + int ret; + + CFH->private = NULL; + + ret = gzclose(gd->fp); + + save_errno = errno; + free(gd); + errno = save_errno; + + return ret; +} + +static int +Gzip_eof(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + + return gzeof(gd->fp); +} + +static const char * +Gzip_get_error(CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + const char *errmsg; + int errnum; + + errmsg = gzerror(gd->fp, &errnum); + if (errnum == Z_ERRNO) + errmsg = strerror(errno); + + return errmsg; +} + +static int +Gzip_open(const char *path, int fd, const char *mode, CompressFileHandle * CFH) +{ + GzipData *gd = (GzipData *) CFH->private; + char mode_compression[32]; + + if (gd->compressionLevel != Z_DEFAULT_COMPRESSION) + { + /* + * user has specified a compression level, so tell zlib to use it + */ + snprintf(mode_compression, sizeof(mode_compression), "%s%d", + mode, gd->compressionLevel); + } + else + strcpy(mode_compression, mode); + + if (fd >= 0) + gd->fp = gzdopen(dup(fd), mode_compression); + else + gd->fp = gzopen(path, mode_compression); + + if (gd->fp == NULL) + return 1; + + return 0; +} + +static int +Gzip_open_write(const char *path, const char *mode, CompressFileHandle * CFH) +{ + char *fname; + int ret; + int save_errno; + + fname = psprintf("%s.gz", path); + ret = CFH->open(fname, -1, mode, CFH); + + save_errno = errno; + pg_free(fname); + errno = save_errno; + + return ret; +} + +void +InitCompressGzip(CompressFileHandle * CFH, int compressionLevel) +{ + GzipData *gd; + + CFH->open = Gzip_open; + CFH->open_write = Gzip_open_write; + CFH->read = Gzip_read; + CFH->write = Gzip_write; + CFH->gets = Gzip_gets; + CFH->getc = Gzip_getc; + CFH->close = Gzip_close; + CFH->eof = Gzip_eof; + CFH->get_error = Gzip_get_error; + + gd = pg_malloc0(sizeof(GzipData)); + gd->compressionLevel = compressionLevel; + + CFH->private = gd; +} +#else /* HAVE_LIBZ */ +void +InitCompressorGzip(CompressorState *cs, int compressionLevel) +{ + pg_fatal("not built with zlib support"); +} + +void +InitCompressGzip(CompressFileHandle * CFH, int compressionLevel) +{ + pg_fatal("not built with zlib support"); +} +#endif /* HAVE_LIBZ */ diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h new file mode 100644 index 0000000000..ab0362c1f3 --- /dev/null +++ b/src/bin/pg_dump/compress_gzip.h @@ -0,0 +1,9 @@ +#ifndef _COMPRESS_GZIP_H_ +#define _COMPRESS_GZIP_H_ + +#include "compress_io.h" + +extern void InitCompressorGzip(CompressorState *cs, int compressionLevel); +extern void InitCompressGzip(CompressFileHandle * CFH, int compressionLevel); + +#endif /* _COMPRESS_GZIP_H_ */ diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index cb59300cb5..e7a0e57d8b 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -51,9 +51,12 @@ * *------------------------------------------------------------------------- */ +#include +#include #include "postgres_fe.h" #include "compress_io.h" +#include "compress_gzip.h" #include "pg_backup_utils.h" #ifdef HAVE_LIBZ @@ -65,84 +68,67 @@ *---------------------- */ -/* typedef appears in compress_io.h */ -struct CompressorState +/* Private routines that support uncompressed data I/O */ +static void +ReadDataFromArchiveNone(ArchiveHandle *AH, CompressorState *cs) { - pg_compress_specification compression_spec; - WriteFunc writeF; + size_t cnt; + char *buf; + size_t buflen; -#ifdef HAVE_LIBZ - z_streamp zp; - char *zlibOut; - size_t zlibOutSize; -#endif -}; + buf = pg_malloc(ZLIB_OUT_SIZE); + buflen = ZLIB_OUT_SIZE; -/* Routines that support zlib compressed data I/O */ -#ifdef HAVE_LIBZ -static void InitCompressorZlib(CompressorState *cs, int level); -static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, - bool flush); -static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF); -static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen); -static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs); -#endif + while ((cnt = cs->readF(AH, &buf, &buflen))) + { + ahwrite(buf, 1, cnt, AH); + } -/* Routines that support uncompressed data I/O */ -static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); -static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen); + free(buf); +} + + +static void +WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + cs->writeF(AH, data, dLen); +} + +static void +EndCompressorNone(ArchiveHandle *AH, CompressorState *cs) +{ + /* no op */ +} + +static void +InitCompressorNone(CompressorState *cs) +{ + cs->readData = ReadDataFromArchiveNone; + cs->writeData = WriteDataToArchiveNone; + cs->end = EndCompressorNone; +} /* Public interface routines */ /* Allocate a new compressor */ CompressorState * AllocateCompressor(const pg_compress_specification compression_spec, - WriteFunc writeF) + ReadFunc readF, WriteFunc writeF) { CompressorState *cs; -#ifndef HAVE_LIBZ - if (compression_spec.algorithm == PG_COMPRESSION_GZIP) - pg_fatal("not built with zlib support"); -#endif - cs = (CompressorState *) pg_malloc0(sizeof(CompressorState)); + cs->readF = readF; cs->writeF = writeF; - cs->compression_spec = compression_spec; - - /* - * Perform compression algorithm specific initialization. - */ -#ifdef HAVE_LIBZ - if (cs->compression_spec.algorithm == PG_COMPRESSION_GZIP) - InitCompressorZlib(cs, cs->compression_spec.level); -#endif - return cs; -} - -/* - * Read all compressed data from the input stream (via readF) and print it - * out with ahwrite(). - */ -void -ReadDataFromArchive(ArchiveHandle *AH, - const pg_compress_specification compression_spec, - ReadFunc readF) -{ switch (compression_spec.algorithm) { case PG_COMPRESSION_NONE: - ReadDataFromArchiveNone(AH, readF); + InitCompressorNone(cs); break; case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ReadDataFromArchiveZlib(AH, readF); -#else - pg_fatal("not built with zlib support"); -#endif + InitCompressorGzip(cs, compression_spec.level); break; case PG_COMPRESSION_LZ4: /* fallthrough */ @@ -150,33 +136,8 @@ ReadDataFromArchive(ArchiveHandle *AH, pg_fatal("invalid compression method"); break; } -} -/* - * Compress and write data to the output stream (via writeF). - */ -void -WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, - const void *data, size_t dLen) -{ - switch (cs->compression_spec.algorithm) - { - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - WriteDataToArchiveZlib(AH, cs, data, dLen); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_NONE: - WriteDataToArchiveNone(AH, cs, data, dLen); - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + return cs; } /* @@ -185,545 +146,177 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, void EndCompressor(ArchiveHandle *AH, CompressorState *cs) { - switch (cs->compression_spec.algorithm) - { - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - EndCompressorZlib(AH, cs); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_NONE: - free(cs); - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + cs->end(AH, cs); + pg_free(cs); } -/* Private routines, specific to each compression method. */ - -#ifdef HAVE_LIBZ -/* - * Functions for zlib compressed output. +/*---------------------- + * Compressed stream API + *---------------------- */ -static void -InitCompressorZlib(CompressorState *cs, int level) -{ - z_streamp zp; - - zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - /* - * zlibOutSize is the buffer size we tell zlib it can output to. We - * actually allocate one extra byte because some routines want to append a - * trailing zero byte to the zlib output. - */ - cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); - cs->zlibOutSize = ZLIB_OUT_SIZE; - - if (deflateInit(zp, level) != Z_OK) - pg_fatal("could not initialize compression library: %s", - zp->msg); - - /* Just be paranoid - maybe End is called after Start, with no Write */ - zp->next_out = (void *) cs->zlibOut; - zp->avail_out = cs->zlibOutSize; -} - -static void -EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) -{ - z_streamp zp = cs->zp; - - zp->next_in = NULL; - zp->avail_in = 0; - - /* Flush any remaining data from zlib buffer */ - DeflateCompressorZlib(AH, cs, true); - - if (deflateEnd(zp) != Z_OK) - pg_fatal("could not close compression stream: %s", zp->msg); - - free(cs->zlibOut); - free(cs->zp); -} - -static void -DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) +static int +hasSuffix(const char *filename, const char *suffix) { - z_streamp zp = cs->zp; - char *out = cs->zlibOut; - int res = Z_OK; + int filenamelen = strlen(filename); + int suffixlen = strlen(suffix); - while (cs->zp->avail_in != 0 || flush) - { - res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); - if (res == Z_STREAM_ERROR) - pg_fatal("could not compress data: %s", zp->msg); - if ((flush && (zp->avail_out < cs->zlibOutSize)) - || (zp->avail_out == 0) - || (zp->avail_in != 0) - ) - { - /* - * Extra paranoia: avoid zero-length chunks, since a zero length - * chunk is the EOF marker in the custom format. This should never - * happen but... - */ - if (zp->avail_out < cs->zlibOutSize) - { - /* - * Any write function should do its own error checking but to - * make sure we do a check here as well... - */ - size_t len = cs->zlibOutSize - zp->avail_out; - - cs->writeF(AH, out, len); - } - zp->next_out = (void *) out; - zp->avail_out = cs->zlibOutSize; - } + if (filenamelen < suffixlen) + return 0; - if (res == Z_STREAM_END) - break; - } + return memcmp(&filename[filenamelen - suffixlen], + suffix, + suffixlen) == 0; } -static void -WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen) +/* free() without changing errno; useful in several places below */ + static void +free_keep_errno(void *p) { - cs->zp->next_in = (void *) unconstify(char *, data); - cs->zp->avail_in = dLen; - DeflateCompressorZlib(AH, cs, false); -} + int save_errno = errno; -static void -ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF) -{ - z_streamp zp; - char *out; - int res = Z_OK; - size_t cnt; - char *buf; - size_t buflen; - - zp = (z_streamp) pg_malloc(sizeof(z_stream)); - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - buf = pg_malloc(ZLIB_IN_SIZE); - buflen = ZLIB_IN_SIZE; - - out = pg_malloc(ZLIB_OUT_SIZE + 1); - - if (inflateInit(zp) != Z_OK) - pg_fatal("could not initialize compression library: %s", - zp->msg); - - /* no minimal chunk size for zlib */ - while ((cnt = readF(AH, &buf, &buflen))) - { - zp->next_in = (void *) buf; - zp->avail_in = cnt; - - while (zp->avail_in > 0) - { - zp->next_out = (void *) out; - zp->avail_out = ZLIB_OUT_SIZE; - - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - pg_fatal("could not uncompress data: %s", zp->msg); - - out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; - ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); - } - } - - zp->next_in = NULL; - zp->avail_in = 0; - while (res != Z_STREAM_END) - { - zp->next_out = (void *) out; - zp->avail_out = ZLIB_OUT_SIZE; - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - pg_fatal("could not uncompress data: %s", zp->msg); - - out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; - ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); - } - - if (inflateEnd(zp) != Z_OK) - pg_fatal("could not close compression library: %s", zp->msg); - - free(buf); - free(out); - free(zp); + free(p); + errno = save_errno; } -#endif /* HAVE_LIBZ */ - /* - * Functions for uncompressed output. + * Compression None implementation */ - -static void -ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF) + static size_t +_read(void *ptr, size_t size, CompressFileHandle * CFH) { - size_t cnt; - char *buf; - size_t buflen; + FILE *fp = (FILE *) CFH->private; + size_t ret; - buf = pg_malloc(ZLIB_OUT_SIZE); - buflen = ZLIB_OUT_SIZE; + if (size == 0) + return 0; - while ((cnt = readF(AH, &buf, &buflen))) - { - ahwrite(buf, 1, cnt, AH); - } + ret = fread(ptr, 1, size, fp); + if (ret != size && !feof(fp)) + pg_fatal("could not read from input file: %s", + strerror(errno)); - free(buf); + return ret; } -static void -WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen) +static size_t +_write(const void *ptr, size_t size, CompressFileHandle * CFH) { - cs->writeF(AH, data, dLen); + return fwrite(ptr, 1, size, (FILE *) CFH->private); } - -/*---------------------- - * Compressed stream API - *---------------------- - */ - -/* - * cfp represents an open stream, wrapping the underlying FILE or gzFile - * pointer. This is opaque to the callers. - */ -struct cfp -{ - pg_compress_specification compression_spec; - void *fp; -}; - -#ifdef HAVE_LIBZ -static int hasSuffix(const char *filename, const char *suffix); -#endif - -/* free() without changing errno; useful in several places below */ -static void -free_keep_errno(void *p) +static const char * +_get_error(CompressFileHandle * CFH) { - int save_errno = errno; - - free(p); - errno = save_errno; + return strerror(errno); } -/* - * Open a file for reading. 'path' is the file to open, and 'mode' should - * be either "r" or "rb". - * - * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path' - * doesn't already have it) and try again. So if you pass "foo" as 'path', - * this will open either "foo" or "foo.gz". - * - * On failure, return NULL with an error code in errno. - */ -cfp * -cfopen_read(const char *path, const char *mode) +static char * +_gets(char *ptr, int size, CompressFileHandle * CFH) { - cfp *fp; - - pg_compress_specification compression_spec = {0}; - -#ifdef HAVE_LIBZ - if (hasSuffix(path, ".gz")) - { - compression_spec.algorithm = PG_COMPRESSION_GZIP; - fp = cfopen(path, mode, compression_spec); - } - else -#endif - { - compression_spec.algorithm = PG_COMPRESSION_NONE; - fp = cfopen(path, mode, compression_spec); -#ifdef HAVE_LIBZ - if (fp == NULL) - { - char *fname; - - fname = psprintf("%s.gz", path); - compression_spec.algorithm = PG_COMPRESSION_GZIP; - fp = cfopen(fname, mode, compression_spec); - free_keep_errno(fname); - } -#endif - } - return fp; + return fgets(ptr, size, (FILE *) CFH->private); } -/* - * Open a file for writing. 'path' indicates the path name, and 'mode' must - * be a filemode as accepted by fopen() and gzopen() that indicates writing - * ("w", "wb", "a", or "ab"). - * - * If 'compression_spec.algorithm' is GZIP, a gzip compressed stream is opened, - * and 'compression_spec.level' used. The ".gz" suffix is automatically added to - * 'path' in that case. - * - * On failure, return NULL with an error code in errno. - */ -cfp * -cfopen_write(const char *path, const char *mode, - const pg_compress_specification compression_spec) +static int +_getc(CompressFileHandle * CFH) { - cfp *fp; + FILE *fp = (FILE *) CFH->private; + int ret; - if (compression_spec.algorithm == PG_COMPRESSION_NONE) - fp = cfopen(path, mode, compression_spec); - else + ret = fgetc(fp); + if (ret == EOF) { -#ifdef HAVE_LIBZ - char *fname; - - fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, compression_spec); - free_keep_errno(fname); -#else - pg_fatal("not built with zlib support"); - fp = NULL; /* keep compiler quiet */ -#endif + if (!feof(fp)) + pg_fatal("could not read from input file: %s", strerror(errno)); + else + pg_fatal("could not read from input file: end of file"); } - return fp; + + return ret; } -/* - * This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or - * associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The - * descriptor is not dup'ed and it is the caller's responsibility to do so. - * The caller must verify that the 'compress_algorithm' is supported by the - * current build. - * - * On failure, return NULL with an error code in errno. - */ -static cfp * -cfopen_internal(const char *path, int fd, const char *mode, - pg_compress_specification compression_spec) +static int +_close(CompressFileHandle * CFH) { - cfp *fp = pg_malloc(sizeof(cfp)); + FILE *fp = (FILE *) CFH->private; + int ret = 0; - fp->compression_spec = compression_spec; + CFH->private = NULL; - switch (compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - if (fd >= 0) - fp->fp = fdopen(fd, mode); - else - fp->fp = fopen(path, mode); - if (fp->fp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } + if (fp) + ret = fclose(fp); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - if (compression_spec.level != Z_DEFAULT_COMPRESSION) - { - /* - * user has specified a compression level, so tell zlib to use - * it - */ - char mode_compression[32]; - - snprintf(mode_compression, sizeof(mode_compression), "%s%d", - mode, compression_spec.level); - if (fd >= 0) - fp->fp = gzdopen(fd, mode_compression); - else - fp->fp = gzopen(path, mode_compression); - } - else - { - /* don't specify a level, just use the zlib default */ - if (fd >= 0) - fp->fp = gzdopen(fd, mode); - else - fp->fp = gzopen(path, mode); - } - - if (fp->fp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } - - return fp; + return ret; } -cfp * -cfopen(const char *path, const char *mode, - const pg_compress_specification compression_spec) -{ - return cfopen_internal(path, -1, mode, compression_spec); -} -cfp * -cfdopen(int fd, const char *mode, - const pg_compress_specification compression_spec) +static int +_eof(CompressFileHandle * CFH) { - return cfopen_internal(NULL, fd, mode, compression_spec); + return feof((FILE *) CFH->private); } -int -cfread(void *ptr, int size, cfp *fp) +static int +_open(const char *path, int fd, const char *mode, CompressFileHandle * CFH) { - int ret = 0; + Assert(CFH->private == NULL); - if (size == 0) - return 0; + if (fd >= 0) + CFH->private = fdopen(dup(fd), mode); + else + CFH->private = fopen(path, mode); - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fread(ptr, 1, size, fp->fp); - if (ret != size && !feof(fp->fp)) - READ_ERROR_EXIT(fp->fp); + if (CFH->private == NULL) + return 1; - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzread(fp->fp, ptr, size); - if (ret != size && !gzeof(fp->fp)) - { - int errnum; - const char *errmsg = gzerror(fp->fp, &errnum); - - pg_fatal("could not read from input file: %s", - errnum == Z_ERRNO ? strerror(errno) : errmsg); - } -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } - - return ret; + return 0; } -int -cfwrite(const void *ptr, int size, cfp *fp) +static int +_open_write(const char *path, const char *mode, CompressFileHandle * CFH) { - int ret = 0; + Assert(CFH->private == NULL); - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fwrite(ptr, 1, size, fp->fp); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzwrite(fp->fp, ptr, size); -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + CFH->private = fopen(path, mode); + if (CFH->private == NULL) + return 1; - return ret; + return 0; } -int -cfgetc(cfp *fp) +static void +InitCompressNone(CompressFileHandle * CFH) { - int ret = 0; - - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fgetc(fp->fp); - if (ret == EOF) - READ_ERROR_EXIT(fp->fp); - - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzgetc((gzFile) fp->fp); - if (ret == EOF) - { - if (!gzeof(fp->fp)) - pg_fatal("could not read from input file: %s", strerror(errno)); - else - pg_fatal("could not read from input file: end of file"); - } -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } - - return ret; + CFH->open = _open; + CFH->open_write = _open_write; + CFH->read = _read; + CFH->write = _write; + CFH->gets = _gets; + CFH->getc = _getc; + CFH->close = _close; + CFH->eof = _eof; + CFH->get_error = _get_error; + + CFH->private = NULL; } -char * -cfgets(cfp *fp, char *buf, int len) +/* + * Public interface + */ +CompressFileHandle * +InitCompressFileHandle(const pg_compress_specification compression_spec) { - char *ret = NULL; + CompressFileHandle *CFH; - switch (fp->compression_spec.algorithm) + CFH = pg_malloc0(sizeof(CompressFileHandle)); + + switch (compression_spec.algorithm) { case PG_COMPRESSION_NONE: - ret = fgets(buf, len, fp->fp); - + InitCompressNone(CFH); break; case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzgets(fp->fp, buf, len); -#else - pg_fatal("not built with zlib support"); -#endif + InitCompressGzip(CFH, compression_spec.level); break; case PG_COMPRESSION_LZ4: /* fallthrough */ @@ -732,107 +325,77 @@ cfgets(cfp *fp, char *buf, int len) break; } - return ret; + return CFH; } -int -cfclose(cfp *fp) +/* + * Open a file for reading. 'path' is the file to open, and 'mode' should + * be either "r" or "rb". + * + * If the file at 'path' does not exist, we append the ".gz" suffix (if + * 'path' doesn't already have it) and try again. So if you pass "foo" as + * 'path', this will open either "foo" or "foo.gz", trying in that order. + * + * On failure, return NULL with an error code in errno. + * + */ +CompressFileHandle * +InitDiscoverCompressFileHandle(const char *path, const char *mode) { - int ret = 0; - - if (fp == NULL) - { - errno = EBADF; - return EOF; - } - - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fclose(fp->fp); - fp->fp = NULL; - - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzclose(fp->fp); - fp->fp = NULL; -#else - pg_fatal("not built with zlib support"); -#endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; - } + CompressFileHandle *CFH = NULL; + struct stat st; + char *fname; + pg_compress_specification compression_spec = {0}; - free_keep_errno(fp); + compression_spec.algorithm = PG_COMPRESSION_NONE; - return ret; -} + Assert(strcmp(mode, "r") == 0 || strcmp(mode, "rb") == 0); -int -cfeof(cfp *fp) -{ - int ret = 0; + fname = strdup(path); - switch (fp->compression_spec.algorithm) + if (hasSuffix(fname, ".gz")) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + else { - case PG_COMPRESSION_NONE: - ret = feof(fp->fp); + bool exists; - break; - case PG_COMPRESSION_GZIP: + exists = (stat(path, &st) == 0); + /* avoid unused warning if it is not build with compression */ + if (exists) + compression_spec.algorithm = PG_COMPRESSION_NONE; #ifdef HAVE_LIBZ - ret = gzeof(fp->fp); -#else - pg_fatal("not built with zlib support"); + if (!exists) + { + free_keep_errno(fname); + fname = psprintf("%s.gz", path); + exists = (stat(fname, &st) == 0); + + if (exists) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + } #endif - break; - case PG_COMPRESSION_LZ4: - /* fallthrough */ - case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); - break; } - return ret; -} - -const char * -get_cfp_error(cfp *fp) -{ - if (fp->compression_spec.algorithm == PG_COMPRESSION_GZIP) + CFH = InitCompressFileHandle(compression_spec); + if (CFH->open(fname, -1, mode, CFH)) { -#ifdef HAVE_LIBZ - int errnum; - const char *errmsg = gzerror(fp->fp, &errnum); - - if (errnum != Z_ERRNO) - return errmsg; -#else - pg_fatal("not built with zlib support"); -#endif + free_keep_errno(CFH); + CFH = NULL; } + free_keep_errno(fname); - return strerror(errno); + return CFH; } -#ifdef HAVE_LIBZ -static int -hasSuffix(const char *filename, const char *suffix) +int +DestroyCompressFileHandle(CompressFileHandle * CFH) { - int filenamelen = strlen(filename); - int suffixlen = strlen(suffix); + int ret = 0; - if (filenamelen < suffixlen) - return 0; + if (CFH->private) + ret = CFH->close(CFH); - return memcmp(&filename[filenamelen - suffixlen], - suffix, - suffixlen) == 0; -} + free_keep_errno(CFH); -#endif + return ret; +} diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 6fad6c2cd5..1118b7a638 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -37,34 +37,60 @@ typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len); */ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen); -/* struct definition appears in compress_io.c */ typedef struct CompressorState CompressorState; +struct CompressorState +{ + /* + * Read all compressed data from the input stream (via readF) and print it + * out with ahwrite(). + */ + void (*readData) (ArchiveHandle *AH, CompressorState *cs); + + /* + * Compress and write data to the output stream (via writeF). + */ + void (*writeData) (ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen); + void (*end) (ArchiveHandle *AH, CompressorState *cs); + + ReadFunc readF; + WriteFunc writeF; + + void *private; +}; extern CompressorState *AllocateCompressor(const pg_compress_specification compression_spec, + ReadFunc readF, WriteFunc writeF); -extern void ReadDataFromArchive(ArchiveHandle *AH, - const pg_compress_specification compression_spec, - ReadFunc readF); -extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, - const void *data, size_t dLen); extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); +/* + * Compress File Handle + */ +typedef struct CompressFileHandle CompressFileHandle; + +struct CompressFileHandle +{ + int (*open) (const char *path, int fd, const char *mode, + CompressFileHandle * CFH); + int (*open_write) (const char *path, const char *mode, + CompressFileHandle * cxt); + size_t (*read) (void *ptr, size_t size, CompressFileHandle * CFH); + size_t (*write) (const void *ptr, size_t size, + struct CompressFileHandle *CFH); + char *(*gets) (char *s, int size, CompressFileHandle * CFH); + int (*getc) (CompressFileHandle * CFH); + int (*eof) (CompressFileHandle * CFH); + int (*close) (CompressFileHandle * CFH); + const char *(*get_error) (CompressFileHandle * CFH); + + void *private; +}; -typedef struct cfp cfp; +extern CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compress_spec); -extern cfp *cfopen(const char *path, const char *mode, - const pg_compress_specification compression_spec); -extern cfp *cfdopen(int fd, const char *mode, - pg_compress_specification compression_spec); -extern cfp *cfopen_read(const char *path, const char *mode); -extern cfp *cfopen_write(const char *path, const char *mode, - const pg_compress_specification compression_spec); -extern int cfread(void *ptr, int size, cfp *fp); -extern int cfwrite(const void *ptr, int size, cfp *fp); -extern int cfgetc(cfp *fp); -extern char *cfgets(cfp *fp, char *buf, int len); -extern int cfclose(cfp *fp); -extern int cfeof(cfp *fp); -extern const char *get_cfp_error(cfp *fp); +extern CompressFileHandle * InitDiscoverCompressFileHandle(const char *path, + const char *mode); +extern int DestroyCompressFileHandle(CompressFileHandle * CFH); #endif diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build index d96e566846..0c73a4707e 100644 --- a/src/bin/pg_dump/meson.build +++ b/src/bin/pg_dump/meson.build @@ -1,5 +1,6 @@ pg_dump_common_sources = files( 'compress_io.c', + 'compress_gzip.c', 'dumputils.c', 'parallel.c', 'pg_backup_archiver.c', diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index fb94317ad9..dbd698027c 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -95,8 +95,8 @@ static void dump_lo_buf(ArchiveHandle *AH); static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec); -static cfp *SaveOutput(ArchiveHandle *AH); -static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput); +static CompressFileHandle * SaveOutput(ArchiveHandle *AH); +static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle * savedOutput); static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel); static void restore_toc_entries_prefork(ArchiveHandle *AH, @@ -272,7 +272,7 @@ CloseArchive(Archive *AHX) /* Close the output */ errno = 0; - res = cfclose(AH->OF); + res = DestroyCompressFileHandle(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); @@ -355,7 +355,7 @@ RestoreArchive(Archive *AHX) bool parallel_mode; bool supports_compression; TocEntry *te; - cfp *sav; + CompressFileHandle *sav; AH->stage = STAGE_INITIALIZING; @@ -1127,7 +1127,7 @@ PrintTOCSummary(Archive *AHX) TocEntry *te; pg_compress_specification out_compression_spec = {0}; teSection curSection; - cfp *sav; + CompressFileHandle *sav; const char *fmtName; char stamp_str[64]; @@ -1143,9 +1143,10 @@ PrintTOCSummary(Archive *AHX) strcpy(stamp_str, "[unknown]"); ahprintf(AH, ";\n; Archive created at %s\n", stamp_str); - ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n", + ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n", sanitize_line(AH->archdbname, false), - AH->tocCount, AH->compression_spec.level); + AH->tocCount, + get_compress_algorithm_name(AH->compression_spec.algorithm)); switch (AH->format) { @@ -1502,6 +1503,7 @@ static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec) { + CompressFileHandle *CFH; const char *mode; int fn = -1; @@ -1524,33 +1526,32 @@ SetOutput(ArchiveHandle *AH, const char *filename, else mode = PG_BINARY_W; - if (fn >= 0) - AH->OF = cfdopen(dup(fn), mode, compression_spec); - else - AH->OF = cfopen(filename, mode, compression_spec); + CFH = InitCompressFileHandle(compression_spec); - if (!AH->OF) + if (CFH->open(filename, fn, mode, CFH)) { if (filename) pg_fatal("could not open output file \"%s\": %m", filename); else pg_fatal("could not open output file: %m"); } + + AH->OF = CFH; } -static cfp * +static CompressFileHandle * SaveOutput(ArchiveHandle *AH) { - return (cfp *) AH->OF; + return (CompressFileHandle *) AH->OF; } static void -RestoreOutput(ArchiveHandle *AH, cfp *savedOutput) +RestoreOutput(ArchiveHandle *AH, CompressFileHandle * savedOutput) { int res; errno = 0; - res = cfclose(AH->OF); + res = DestroyCompressFileHandle(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); @@ -1689,7 +1690,11 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) else if (RestoringToDB(AH)) bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb); else - bytes_written = cfwrite(ptr, size * nmemb, AH->OF); + { + CompressFileHandle *CFH = (CompressFileHandle *) AH->OF; + + bytes_written = CFH->write(ptr, size * nmemb, CFH); + } if (bytes_written != size * nmemb) WRITE_ERROR_EXIT; @@ -2031,6 +2036,18 @@ ReadStr(ArchiveHandle *AH) return buf; } +static bool +_fileExistsInDirectory(const char *dir, const char *filename) +{ + struct stat st; + char buf[MAXPGPATH]; + + if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH) + pg_fatal("directory name too long: \"%s\"", dir); + + return (stat(buf, &st) == 0 && S_ISREG(st.st_mode)); +} + static int _discoverArchiveFormat(ArchiveHandle *AH) { @@ -2061,26 +2078,12 @@ _discoverArchiveFormat(ArchiveHandle *AH) */ if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode)) { - char buf[MAXPGPATH]; - - if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH) - pg_fatal("directory name too long: \"%s\"", - AH->fSpec); - if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) - { - AH->format = archDirectory; + AH->format = archDirectory; + if (_fileExistsInDirectory(AH->fSpec, "toc.dat")) return AH->format; - } - #ifdef HAVE_LIBZ - if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH) - pg_fatal("directory name too long: \"%s\"", - AH->fSpec); - if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) - { - AH->format = archDirectory; + if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz")) return AH->format; - } #endif pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)", AH->fSpec); @@ -2178,6 +2181,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, SetupWorkerPtrType setupWorkerPtr) { ArchiveHandle *AH; + CompressFileHandle *CFH; pg_compress_specification out_compress_spec = {0}; pg_log_debug("allocating AH for %s, format %d", @@ -2233,7 +2237,10 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, /* Open stdout with no compression for AH output handle */ out_compress_spec.algorithm = PG_COMPRESSION_NONE; - AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec); + CFH = InitCompressFileHandle(out_compress_spec); + if (CFH->open(NULL, fileno(stdout), PG_BINARY_A, CFH)) + pg_fatal("could not open stdout for appending: %m"); + AH->OF = CFH; /* * On Windows, we need to use binary mode to read/write non-text files, @@ -3646,12 +3653,7 @@ WriteHead(ArchiveHandle *AH) AH->WriteBytePtr(AH, AH->intSize); AH->WriteBytePtr(AH, AH->offSize); AH->WriteBytePtr(AH, AH->format); - /* - * For now the compression type is implied by the level. This will need - * to change once support for more compression algorithms is added, - * requiring a format bump. - */ - WriteInt(AH, AH->compression_spec.level); + AH->WriteBytePtr(AH, AH->compression_spec.algorithm); crtm = *localtime(&AH->createDate); WriteInt(AH, crtm.tm_sec); WriteInt(AH, crtm.tm_min); @@ -3722,10 +3724,11 @@ ReadHead(ArchiveHandle *AH) pg_fatal("expected format (%d) differs from format found in file (%d)", AH->format, fmt); - /* Guess the compression method based on the level */ - AH->compression_spec.algorithm = PG_COMPRESSION_NONE; - if (AH->version >= K_VERS_1_2) + if (AH->version >= K_VERS_1_15) + AH->compression_spec.algorithm = AH->ReadBytePtr(AH); + else if (AH->version >= K_VERS_1_2) { + /* Guess the compression method based on the level */ if (AH->version < K_VERS_1_4) AH->compression_spec.level = AH->ReadBytePtr(AH); else @@ -3737,10 +3740,17 @@ ReadHead(ArchiveHandle *AH) else AH->compression_spec.algorithm = PG_COMPRESSION_GZIP; + if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE) + { + bool unsupported = false; + #ifndef HAVE_LIBZ - if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) - pg_fatal("archive is compressed, but this installation does not support compression"); + if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) + unsupported = true; #endif + if (unsupported) + pg_fatal("archive is compressed, but this installation does not support compression"); + } if (AH->version >= K_VERS_1_4) { diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 4725e49747..9e97e871f0 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -65,10 +65,12 @@ #define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0) /* change search_path * behavior */ #define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0) /* add tableam */ +#define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0) /* add compression_algorithm + * in header */ /* Current archive version number (the format we can output) */ #define K_VERS_MAJOR 1 -#define K_VERS_MINOR 14 +#define K_VERS_MINOR 15 #define K_VERS_REV 0 #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV) diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index d1e54644a9..512ab043af 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -298,7 +298,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - ctx->cs = AllocateCompressor(AH->compression_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(AH->compression_spec, + NULL, + _CustomWriteFunc); } /* @@ -317,15 +319,15 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) CompressorState *cs = ctx->cs; if (dLen > 0) - /* WriteDataToArchive() internally throws write errors */ - WriteDataToArchive(AH, cs, data, dLen); + /* writeData() internally throws write errors */ + cs->writeData(AH, cs, data, dLen); } /* * Called by the archiver when a dumper's 'DataDumper' routine has * finished. * - * Optional. + * Mandatory. */ static void _EndData(ArchiveHandle *AH, TocEntry *te) @@ -333,6 +335,8 @@ _EndData(ArchiveHandle *AH, TocEntry *te) lclContext *ctx = (lclContext *) AH->formatData; EndCompressor(AH, ctx->cs); + ctx->cs = NULL; + /* Send the end marker */ WriteInt(AH, 0); } @@ -377,7 +381,9 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid) WriteInt(AH, oid); - ctx->cs = AllocateCompressor(AH->compression_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(AH->compression_spec, + NULL, + _CustomWriteFunc); } /* @@ -566,7 +572,12 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) static void _PrintData(ArchiveHandle *AH) { - ReadDataFromArchive(AH, AH->compression_spec, _CustomReadFunc); + CompressorState *cs; + + cs = AllocateCompressor(AH->compression_spec, + _CustomReadFunc, NULL); + cs->readData(AH, cs); + EndCompressor(AH, cs); } static void diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index f6aee775eb..4182718b0a 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -50,9 +50,8 @@ typedef struct */ char *directory; - cfp *dataFH; /* currently open data file */ - - cfp *LOsTocFH; /* file handle for blobs.toc */ + CompressFileHandle *dataFH; /* currently open data file */ + CompressFileHandle *LOsTocFH; /* file handle for blobs.toc */ ParallelState *pstate; /* for parallel backup / restore */ } lclContext; @@ -198,11 +197,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) else { /* Read Mode */ char fname[MAXPGPATH]; - cfp *tocFH; + CompressFileHandle *tocFH; setFilePath(AH, fname, "toc.dat"); - tocFH = cfopen_read(fname, PG_BINARY_R); + tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R); if (tocFH == NULL) pg_fatal("could not open input file \"%s\": %m", fname); @@ -218,7 +217,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) ReadToc(AH); /* Nothing else in the file, so close it again... */ - if (cfclose(tocFH) != 0) + if (DestroyCompressFileHandle(tocFH) != 0) pg_fatal("could not close TOC file: %m"); ctx->dataFH = NULL; } @@ -327,9 +326,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, - AH->compression_spec); - if (ctx->dataFH == NULL) + ctx->dataFH = InitCompressFileHandle(AH->compression_spec); + + if (ctx->dataFH->open_write(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -346,15 +345,16 @@ static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen) + if (dLen > 0 && CFH->write(data, dLen, CFH) != dLen) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error(CFH)); } } @@ -370,7 +370,7 @@ _EndData(ArchiveHandle *AH, TocEntry *te) lclContext *ctx = (lclContext *) AH->formatData; /* Close the file */ - if (cfclose(ctx->dataFH) != 0) + if (DestroyCompressFileHandle(ctx->dataFH) != 0) pg_fatal("could not close data file: %m"); ctx->dataFH = NULL; @@ -385,26 +385,25 @@ _PrintFileData(ArchiveHandle *AH, char *filename) size_t cnt; char *buf; size_t buflen; - cfp *cfp; + CompressFileHandle *CFH; if (!filename) return; - cfp = cfopen_read(filename, PG_BINARY_R); - - if (!cfp) + CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R); + if (!CFH) pg_fatal("could not open input file \"%s\": %m", filename); buf = pg_malloc(ZLIB_OUT_SIZE); buflen = ZLIB_OUT_SIZE; - while ((cnt = cfread(buf, buflen, cfp))) + while ((cnt = CFH->read(buf, buflen, CFH))) { ahwrite(buf, 1, cnt, AH); } free(buf); - if (cfclose(cfp) != 0) + if (DestroyCompressFileHandle(CFH) != 0) pg_fatal("could not close data file \"%s\": %m", filename); } @@ -435,6 +434,7 @@ _LoadLOs(ArchiveHandle *AH) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH; char tocfname[MAXPGPATH]; char line[MAXPGPATH]; @@ -442,14 +442,14 @@ _LoadLOs(ArchiveHandle *AH) setFilePath(AH, tocfname, "blobs.toc"); - ctx->LOsTocFH = cfopen_read(tocfname, PG_BINARY_R); + CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R); if (ctx->LOsTocFH == NULL) pg_fatal("could not open large object TOC file \"%s\" for input: %m", tocfname); /* Read the LOs TOC file line-by-line, and process each LO */ - while ((cfgets(ctx->LOsTocFH, line, MAXPGPATH)) != NULL) + while ((CFH->gets(line, MAXPGPATH, CFH)) != NULL) { char lofname[MAXPGPATH + 1]; char path[MAXPGPATH]; @@ -464,11 +464,11 @@ _LoadLOs(ArchiveHandle *AH) _PrintFileData(AH, path); EndRestoreLO(AH, oid); } - if (!cfeof(ctx->LOsTocFH)) + if (!CFH->eof(CFH)) pg_fatal("error reading large object TOC file \"%s\"", tocfname); - if (cfclose(ctx->LOsTocFH) != 0) + if (DestroyCompressFileHandle(ctx->LOsTocFH) != 0) pg_fatal("could not close large object TOC file \"%s\": %m", tocfname); @@ -488,15 +488,16 @@ _WriteByte(ArchiveHandle *AH, const int i) { unsigned char c = (unsigned char) i; lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (cfwrite(&c, 1, ctx->dataFH) != 1) + if (CFH->write(&c, 1, CFH) != 1) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error(CFH)); } return 1; @@ -512,8 +513,9 @@ static int _ReadByte(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; - return cfgetc(ctx->dataFH); + return CFH->getc(CFH); } /* @@ -524,15 +526,16 @@ static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (cfwrite(buf, len, ctx->dataFH) != len) + if (CFH->write(buf, len, CFH) != len) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error(CFH)); } } @@ -545,12 +548,13 @@ static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; /* - * If there was an I/O error, we already exited in cfread(), so here we + * If there was an I/O error, we already exited in readF(), so here we * exit on short reads. */ - if (cfread(buf, len, ctx->dataFH) != len) + if (CFH->read(buf, len, CFH) != len) pg_fatal("could not read from input file: end of file"); } @@ -573,7 +577,7 @@ _CloseArchive(ArchiveHandle *AH) if (AH->mode == archModeWrite) { - cfp *tocFH; + CompressFileHandle *tocFH; pg_compress_specification compression_spec = {0}; char fname[MAXPGPATH]; @@ -584,8 +588,8 @@ _CloseArchive(ArchiveHandle *AH) /* The TOC is always created uncompressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - tocFH = cfopen_write(fname, PG_BINARY_W, compression_spec); - if (tocFH == NULL) + tocFH = InitCompressFileHandle(compression_spec); + if (tocFH->open_write(fname, PG_BINARY_W, tocFH)) pg_fatal("could not open output file \"%s\": %m", fname); ctx->dataFH = tocFH; @@ -598,7 +602,7 @@ _CloseArchive(ArchiveHandle *AH) WriteHead(AH); AH->format = archDirectory; WriteToc(AH); - if (cfclose(tocFH) != 0) + if (DestroyCompressFileHandle(tocFH) != 0) pg_fatal("could not close TOC file: %m"); WriteDataChunks(AH, ctx->pstate); @@ -649,8 +653,8 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te) /* The LO TOC file is never compressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - ctx->LOsTocFH = cfopen_write(fname, "ab", compression_spec); - if (ctx->LOsTocFH == NULL) + ctx->LOsTocFH = InitCompressFileHandle(compression_spec); + if (ctx->LOsTocFH->open_write(fname, "ab", ctx->LOsTocFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -667,9 +671,8 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid) snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression_spec); - - if (ctx->dataFH == NULL) + ctx->dataFH = InitCompressFileHandle(AH->compression_spec); + if (ctx->dataFH->open_write(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -682,18 +685,19 @@ static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->LOsTocFH; char buf[50]; int len; - /* Close the LO data file itself */ - if (cfclose(ctx->dataFH) != 0) - pg_fatal("could not close LO data file: %m"); + /* Close the BLOB data file itself */ + if (DestroyCompressFileHandle(ctx->dataFH) != 0) + pg_fatal("could not close blob data file: %m"); ctx->dataFH = NULL; /* register the LO in blobs.toc */ len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid); - if (cfwrite(buf, len, ctx->LOsTocFH) != len) - pg_fatal("could not write to LOs TOC file"); + if (CFH->write(buf, len, CFH) != len) + pg_fatal("could not write to blobs TOC file"); } /* @@ -706,8 +710,8 @@ _EndLOs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; - if (cfclose(ctx->LOsTocFH) != 0) - pg_fatal("could not close LOs TOC file: %m"); + if (DestroyCompressFileHandle(ctx->LOsTocFH) != 0) + pg_fatal("could not close blobs TOC file: %m"); ctx->LOsTocFH = NULL; } diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 248540db8c..e8246a3d4c 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -91,7 +91,7 @@ my %pgdump_runs = ( 'pg_restore', '-l', "$tempdir/compression_gzip_custom.dump", ], - expected => qr/Compression: 1/, + expected => qr/Compression: gzip/, name => 'data content is gzip compressed' }, }, @@ -235,8 +235,8 @@ my %pgdump_runs = ( '-l', "$tempdir/defaults_custom_format.dump", ], expected => $supports_gzip ? - qr/Compression: -1/ : - qr/Compression: 0/, + qr/Compression: gzip/ : + qr/Compression: none/, name => 'data content is gzip compressed by default if available' }, }, -- 2.34.1