Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward |
Date | |
Msg-id | 3686.1760232320@sss.pgh.pa.us Whole thread Raw |
In response to | Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward (Tom Lane <tgl@sss.pgh.pa.us>) |
Responses |
Re: [PING] [PATCH v2] parallel pg_restore: avoid disk seeks when jumping short distance forward
|
List | pgsql-hackers |
While playing around with the test cases for pg_dump compression, I was startled to discover that the performance of compress_lz4's "stream API" code is absolutely abysmal. Here is a simple test case to demonstrate, using the regression database as test data: $ pg_dump -Fd --compress=lz4 -f rlz4.dir regression $ time pg_restore -f /dev/null rlz4.dir real 0m0.023s user 0m0.017s sys 0m0.006s So far so good, but now let's compress the toc.dat file: $ lz4 -f -m --rm rlz4.dir/toc.dat $ time pg_restore -f /dev/null rlz4.dir real 0m1.335s user 0m1.326s sys 0m0.008s Considering that lz4 prides itself on fast decompression speed, that is not a sane result. Decompressing the file only requires a couple ms on my machine: $ time lz4cat rlz4.dir/toc.dat.lz4 >/dev/null real 0m0.002s user 0m0.000s sys 0m0.002s So on this example, pg_restore is something more than 600x slower to read the TOC data than it ought to be. On investigation, the blame mostly affixes to LZ4Stream_read_overflow's habit of memmove'ing all the remaining buffered data after each read operation. Since reading a TOC file tends to involve a lot of small (even one-byte) decompression calls, that amounts to an O(N^2) cost. This could have been fixed with a minimal patch, but to my eyes LZ4Stream_read_internal and LZ4Stream_read_overflow are badly-written spaghetti code; in particular the eol_flag logic is inefficient and duplicative. I chose to throw the code away and rewrite from scratch. This version is about sixty lines shorter as well as not having the performance issue. Fortunately, AFAICT the only way to get to this problem is to manually LZ4-compress the toc.dat and/or blobs.toc files within a directory-style archive. Few people do that, which likely explains the lack of field complaints. On top of that, a similar case with gzip doesn't work at all, though it's supposed to: $ pg_dump -Fd --compress=gzip -f rgzip.dir regression $ gzip rgzip.dir/toc.dat $ pg_restore -f /dev/null rgzip.dir pg_restore: error: could not read from input file: Tracking this down, it seems that Gzip_read doesn't cope with a request to read zero bytes. I wonder how long that's been broken. As far as I can see, 002_pg_dump.pl doesn't exercise the case of manually-compressed toc.dat files. I wonder why not. 0001 and 0002 attached are the same as before, then 0003 adds a fix for the LZ4 performance problem, and 0004 fixes the Gzip_read problem. While at it, I got rid of a few other minor inefficiencies such as unnecessary buffer-zeroing. regards, tom lane From 1d5ff3431923b9f75415b80721966ad42c4036f3 Mon Sep 17 00:00:00 2001 From: Tom Lane <tgl@sss.pgh.pa.us> Date: Fri, 10 Oct 2025 20:57:15 -0400 Subject: [PATCH v3 1/4] Fix poor buffering logic in pg_dump's lz4 and zstd compression code. Both of these modules dumped each bit of output that they got from the underlying compression library as a separate "data block" in the emitted archive file. In the case of zstd this'd frequently result in block sizes well under 100 bytes; lz4 is a little better but still produces blocks around 300 bytes, at least in the test case I tried. This bloats the archive file a little bit compared to larger block sizes, but the real problem is that when pg_restore has to skip each data block rather than seeking directly to some target data, tiny block sizes are enormously inefficient. Fix both modules so that they fill their allocated buffer reasonably well before dumping a data block. In the case of lz4, also delete some redundant logic that caused the lz4 frame header to be emitted as a separate data block. (That saves little, but I see no reason to expend extra code to get worse results.) I fixed the "stream API" code too. In those cases, feeding small amounts of data to fwrite() probably doesn't have any meaningful performance consequences. But it seems like a bad idea to leave the two sets of code doing the same thing in two different ways. In passing, remove unnecessary "extra paranoia" check in _ZstdWriteCommon. _CustomWriteFunc (the only possible referent of cs->writeF) already protects itself against zero-length writes, and it's really a modularity violation for _ZstdWriteCommon to know that the custom format disallows empty data blocks. Reported-by: Dimitrios Apostolou <jimis@gmx.net> Author: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us --- src/bin/pg_dump/compress_lz4.c | 167 +++++++++++++++++++------------- src/bin/pg_dump/compress_zstd.c | 37 +++---- 2 files changed, 117 insertions(+), 87 deletions(-) diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index e2f7c468293..47ee2e4bbac 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -60,13 +60,11 @@ typedef struct LZ4State bool compressing; /* - * Used by the Compressor API to mark if the compression headers have been - * written after initialization. + * I/O buffer area. */ - bool needs_header_flush; - - size_t buflen; - char *buffer; + char *buffer; /* buffer for compressed data */ + size_t buflen; /* allocated size of buffer */ + size_t bufdata; /* amount of valid data currently in buffer */ /* * Used by the Stream API to store already uncompressed data that the @@ -76,12 +74,6 @@ typedef struct LZ4State size_t overflowlen; char *overflowbuf; - /* - * Used by both APIs to keep track of the compressed data length stored in - * the buffer. - */ - size_t compressedlen; - /* * Used by both APIs to keep track of error codes. */ @@ -103,8 +95,17 @@ LZ4State_compression_init(LZ4State *state) { size_t status; + /* + * Compute size needed for buffer, assuming we will present at most + * DEFAULT_IO_BUFFER_SIZE input bytes at a time. + */ state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs); + /* + * Then double it, to ensure we're not forced to flush every time. + */ + state->buflen *= 2; + /* * LZ4F_compressBegin requires a buffer that is greater or equal to * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met. @@ -120,6 +121,10 @@ LZ4State_compression_init(LZ4State *state) } state->buffer = pg_malloc(state->buflen); + + /* + * Insert LZ4 header into buffer. + */ status = LZ4F_compressBegin(state->ctx, state->buffer, state->buflen, &state->prefs); @@ -129,7 +134,7 @@ LZ4State_compression_init(LZ4State *state) return false; } - state->compressedlen = status; + state->bufdata = status; return true; } @@ -201,36 +206,37 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs, { LZ4State *state = (LZ4State *) cs->private_data; size_t remaining = dLen; - size_t status; - size_t chunk; - - /* Write the header if not yet written. */ - if (state->needs_header_flush) - { - cs->writeF(AH, state->buffer, state->compressedlen); - state->needs_header_flush = false; - } while (remaining > 0) { + size_t chunk; + size_t required; + size_t status; - if (remaining > DEFAULT_IO_BUFFER_SIZE) - chunk = DEFAULT_IO_BUFFER_SIZE; - else - chunk = remaining; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } - remaining -= chunk; status = LZ4F_compressUpdate(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, data, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("could not compress data: %s", LZ4F_getErrorName(status)); - cs->writeF(AH, state->buffer, status); + state->bufdata += status; - data = ((char *) data) + chunk; + data = ((const char *) data) + chunk; + remaining -= chunk; } } @@ -238,29 +244,32 @@ static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs) { LZ4State *state = (LZ4State *) cs->private_data; + size_t required; size_t status; /* Nothing needs to be done */ if (!state) return; - /* - * Write the header if not yet written. The caller is not required to call - * writeData if the relation does not contain any data. Thus it is - * possible to reach here without having flushed the header. Do it before - * ending the compression. - */ - if (state->needs_header_flush) - cs->writeF(AH, state->buffer, state->compressedlen); + /* We might need to flush the buffer to make room for LZ4F_compressEnd */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } status = LZ4F_compressEnd(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, NULL); if (LZ4F_isError(status)) pg_fatal("could not end compression: %s", LZ4F_getErrorName(status)); + state->bufdata += status; - cs->writeF(AH, state->buffer, status); + /* Write the final bufferload */ + cs->writeF(AH, state->buffer, state->bufdata); status = LZ4F_freeCompressionContext(state->ctx); if (LZ4F_isError(status)) @@ -302,8 +311,6 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi pg_fatal("could not initialize LZ4 compression: %s", LZ4F_getErrorName(state->errcode)); - /* Remember that the header has not been written. */ - state->needs_header_flush = true; cs->private_data = state; } @@ -360,19 +367,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) state->compressing = compressing; - /* When compressing, write LZ4 header to the output stream. */ if (state->compressing) { - if (!LZ4State_compression_init(state)) return false; - - errno = 0; - if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen) - { - errno = (errno) ? errno : ENOSPC; - return false; - } } else { @@ -573,8 +571,7 @@ static void LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; - size_t status; - int remaining = size; + size_t remaining = size; /* Lazy init */ if (!LZ4Stream_init(state, size, true)) @@ -583,23 +580,36 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) while (remaining > 0) { - int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE); + size_t chunk; + size_t required; + size_t status; - remaining -= chunk; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_fatal("error during writing: %m"); + } + state->bufdata = 0; + } - status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen, + status = LZ4F_compressUpdate(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, ptr, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("error during writing: %s", LZ4F_getErrorName(status)); - - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_fatal("error during writing: %m"); - } + state->bufdata += status; ptr = ((const char *) ptr) + chunk; + remaining -= chunk; } } @@ -675,6 +685,7 @@ LZ4Stream_close(CompressFileHandle *CFH) { FILE *fp; LZ4State *state = (LZ4State *) CFH->private_data; + size_t required; size_t status; int ret; @@ -683,20 +694,36 @@ LZ4Stream_close(CompressFileHandle *CFH) { if (state->compressing) { - status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL); + /* We might need to flush the buffer to make room */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); + } + state->bufdata = 0; + } + + status = LZ4F_compressEnd(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, + NULL); if (LZ4F_isError(status)) { pg_log_error("could not end compression: %s", LZ4F_getErrorName(status)); } else + state->bufdata += status; + + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) { - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_log_error("could not write to output file: %m"); - } + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); } status = LZ4F_freeCompressionContext(state->ctx); diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index e24d45e1bbe..5fe2279faae 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -98,24 +98,22 @@ _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush) ZSTD_outBuffer *output = &zstdcs->output; /* Loop while there's any input or until flushed */ - while (input->pos != input->size || flush) + while (input->pos < input->size || flush) { size_t res; - output->pos = 0; res = ZSTD_compressStream2(zstdcs->cstream, output, input, flush ? ZSTD_e_end : ZSTD_e_continue); if (ZSTD_isError(res)) pg_fatal("could not compress data: %s", ZSTD_getErrorName(res)); - /* - * Extra paranoia: avoid zero-length chunks, since a zero length chunk - * is the EOF marker in the custom format. This should never happen - * but... - */ - if (output->pos > 0) + /* Dump output buffer if full, or if we're told to flush */ + if (output->pos >= output->size || flush) + { cs->writeF(AH, output->dst, output->pos); + output->pos = 0; + } if (res == 0) break; /* End of frame or all input consumed */ @@ -367,26 +365,31 @@ Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH) if (zstdcs->cstream == NULL) { zstdcs->output.size = ZSTD_CStreamOutSize(); - zstdcs->output.dst = pg_malloc0(zstdcs->output.size); + zstdcs->output.dst = pg_malloc(zstdcs->output.size); + zstdcs->output.pos = 0; zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec); if (zstdcs->cstream == NULL) pg_fatal("could not initialize compression library"); } /* Consume all input, to be flushed later */ - while (input->pos != input->size) + while (input->pos < input->size) { - output->pos = 0; res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue); if (ZSTD_isError(res)) pg_fatal("could not write to file: %s", ZSTD_getErrorName(res)); - errno = 0; - cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); - if (cnt != output->pos) + /* Dump output buffer if full */ + if (output->pos >= output->size) { - errno = (errno) ? errno : ENOSPC; - pg_fatal("could not write to file: %m"); + errno = 0; + cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); + if (cnt != output->pos) + { + errno = (errno) ? errno : ENOSPC; + pg_fatal("could not write to file: %m"); + } + output->pos = 0; } } } @@ -448,7 +451,6 @@ Zstd_close(CompressFileHandle *CFH) /* Loop until the compression buffers are fully consumed */ for (;;) { - output->pos = 0; res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end); if (ZSTD_isError(res)) { @@ -466,6 +468,7 @@ Zstd_close(CompressFileHandle *CFH) success = false; break; } + output->pos = 0; if (res == 0) break; /* End of frame */ -- 2.43.7 From 253f3c9f4bf2916930f4d48730e0dc98f757d66e Mon Sep 17 00:00:00 2001 From: Tom Lane <tgl@sss.pgh.pa.us> Date: Fri, 10 Oct 2025 22:08:13 -0400 Subject: [PATCH v3 2/4] Try to align the block sizes of pg_dump's various compression modes. (This is more of a straw man for discussion than a finished patch.) After the previous patch, compress_zstd.c tends to produce data block sizes around 128K, and we don't really have any control over that unless we want to overrule ZSTD_CStreamOutSize(). Which seems like a bad idea. But let's try to align the other compression modes to produce block sizes roughly comparable to that, so that pg_restore's skip-data performance isn't enormously different for different modes. gzip compression can be brought in line simply by setting DEFAULT_IO_BUFFER_SIZE = 128K, which this patch does. That increases some unrelated buffer sizes, but none of them seem problematic for modern platforms. lz4's idea of appropriate block size is highly nonlinear: if we just increase DEFAULT_IO_BUFFER_SIZE then the output blocks end up around 200K. I found that adjusting the slop factor in LZ4State_compression_init was a not-too-ugly way of bringing that number into line. With compress = none you get data blocks the same sizes as the table rows. We could avoid that by introducing an additional layer of buffering, but it's not clear to me that that's a net win, so this patch doesn't do so. Comments in compress_io.h and 002_pg_dump.pl suggest that if we increase DEFAULT_IO_BUFFER_SIZE then we need to increase the amount of data fed through the tests in order to improve coverage. I've not done that here either. In my view, the decompression side of compress_lz4.c needs to be rewritten to be simpler, rather than tested more. Author: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us --- src/bin/pg_dump/compress_io.h | 2 +- src/bin/pg_dump/compress_lz4.c | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 25a7bf0904d..53cf8c9b03b 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -24,7 +24,7 @@ * still exercise all the branches. This applies especially if the value is * increased, in which case the overflow buffer may not be needed. */ -#define DEFAULT_IO_BUFFER_SIZE 4096 +#define DEFAULT_IO_BUFFER_SIZE (128 * 1024) extern char *supports_compression(const pg_compress_specification compression_spec); diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index 47ee2e4bbac..c9ea895c137 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -102,9 +102,14 @@ LZ4State_compression_init(LZ4State *state) state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs); /* - * Then double it, to ensure we're not forced to flush every time. + * Add some slop to ensure we're not forced to flush every time. + * + * The present slop factor of 50% is chosen so that the typical output + * block size is about 128K when DEFAULT_IO_BUFFER_SIZE = 128K. We might + * need a different slop factor to maintain that equivalence if + * DEFAULT_IO_BUFFER_SIZE is changed dramatically. */ - state->buflen *= 2; + state->buflen += state->buflen / 2; /* * LZ4F_compressBegin requires a buffer that is greater or equal to -- 2.43.7 From 0eaeb4009f1b6956d36a89b1139d49ae1f6db2dc Mon Sep 17 00:00:00 2001 From: Tom Lane <tgl@sss.pgh.pa.us> Date: Sat, 11 Oct 2025 20:26:24 -0400 Subject: [PATCH v3 3/4] Fix serious performance problems in LZ4Stream_read_internal. I was distressed to find that reading an LZ4-compressed toc.dat file was hundreds of times slower than it ought to be. On investigation, the blame mostly affixes to LZ4Stream_read_overflow's habit of memmove'ing all the remaining buffered data after each read operation. Since reading a TOC file tends to involve a lot of small (even one-byte) decompression calls, that amounts to an O(N^2) cost. This could have been fixed with a minimal patch, but to my eyes LZ4Stream_read_internal and LZ4Stream_read_overflow are badly-written spaghetti code; in particular the eol_flag logic is inefficient and duplicative. I chose to throw the code away and rewrite from scratch. This version is about sixty lines shorter as well as not having the performance issue. Fortunately, AFAICT the only way to get to this problem is to manually LZ4-compress the toc.dat and/or blobs.toc files within a directory-style archive. Few people do that, which likely explains the lack of field complaints. Author: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us --- src/bin/pg_dump/compress_lz4.c | 242 ++++++++++++--------------------- 1 file changed, 89 insertions(+), 153 deletions(-) diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index c9ea895c137..450afd4e2be 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -65,14 +65,12 @@ typedef struct LZ4State char *buffer; /* buffer for compressed data */ size_t buflen; /* allocated size of buffer */ size_t bufdata; /* amount of valid data currently in buffer */ - - /* - * Used by the Stream API to store already uncompressed data that the - * caller has not consumed. - */ - size_t overflowalloclen; - size_t overflowlen; - char *overflowbuf; + /* These fields are used only while decompressing: */ + size_t bufnext; /* next buffer position to decompress */ + char *outbuf; /* buffer for decompressed data */ + size_t outbuflen; /* allocated size of outbuf */ + size_t outbufdata; /* amount of valid data currently in outbuf */ + size_t outbufnext; /* next outbuf position to return */ /* * Used by both APIs to keep track of error codes. @@ -168,8 +166,8 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) pg_fatal("could not create LZ4 decompression context: %s", LZ4F_getErrorName(status)); - outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE); - readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE); + outbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); + readbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); readbuflen = DEFAULT_IO_BUFFER_SIZE; while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0) { @@ -184,7 +182,6 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) size_t out_size = DEFAULT_IO_BUFFER_SIZE; size_t read_size = readend - readp; - memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE); status = LZ4F_decompress(ctx, outbuf, &out_size, readp, &read_size, &dec_opt); if (LZ4F_isError(status)) @@ -327,15 +324,16 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi /* * LZ4 equivalent to feof() or gzeof(). Return true iff there is no - * decompressed output in the overflow buffer and the end of the backing file - * is reached. + * more buffered data and the end of the input file has been reached. */ static bool LZ4Stream_eof(CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; - return state->overflowlen == 0 && feof(state->fp); + return state->outbufnext >= state->outbufdata && + state->bufnext >= state->bufdata && + feof(state->fp); } static const char * @@ -357,13 +355,15 @@ LZ4Stream_get_error(CompressFileHandle *CFH) * * Creates the necessary contexts for either compression or decompression. When * compressing data (indicated by compressing=true), it additionally writes the - * LZ4 header in the output stream. + * LZ4 header in the output buffer. + * + * It's expected that a not-yet-initialized LZ4State will be zero-filled. * * Returns true on success. In case of a failure returns false, and stores the * error code in state->errcode. */ static bool -LZ4Stream_init(LZ4State *state, int size, bool compressing) +LZ4Stream_init(LZ4State *state, bool compressing) { size_t status; @@ -386,66 +386,22 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) return false; } - state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE); + state->buflen = DEFAULT_IO_BUFFER_SIZE; state->buffer = pg_malloc(state->buflen); - - state->overflowalloclen = state->buflen; - state->overflowbuf = pg_malloc(state->overflowalloclen); - state->overflowlen = 0; + state->outbuflen = DEFAULT_IO_BUFFER_SIZE; + state->outbuf = pg_malloc(state->outbuflen); } state->inited = true; return true; } -/* - * Read already decompressed content from the overflow buffer into 'ptr' up to - * 'size' bytes, if available. If the eol_flag is set, then stop at the first - * occurrence of the newline char prior to 'size' bytes. - * - * Any unread content in the overflow buffer is moved to the beginning. - * - * Returns the number of bytes read from the overflow buffer (and copied into - * the 'ptr' buffer), or 0 if the overflow buffer is empty. - */ -static int -LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag) -{ - char *p; - int readlen = 0; - - if (state->overflowlen == 0) - return 0; - - if (state->overflowlen >= size) - readlen = size; - else - readlen = state->overflowlen; - - if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen))) - /* Include the line terminating char */ - readlen = p - state->overflowbuf + 1; - - memcpy(ptr, state->overflowbuf, readlen); - state->overflowlen -= readlen; - - if (state->overflowlen > 0) - memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen); - - return readlen; -} - /* * The workhorse for reading decompressed content out of an LZ4 compressed * stream. * * It will read up to 'ptrsize' decompressed content, or up to the new line - * char if found first when the eol_flag is set. It is possible that the - * decompressed output generated by reading any compressed input via the - * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored - * at an overflow buffer within LZ4State. Of course, when the function is - * called, it will first try to consume any decompressed content already - * present in the overflow buffer, before decompressing new content. + * char if one is found first when the eol_flag is set. * * Returns the number of bytes of decompressed data copied into the ptr * buffer, or -1 in case of error. @@ -454,62 +410,85 @@ static int LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag) { int dsize = 0; - int rsize; - int size = ptrsize; - bool eol_found = false; - - void *readbuf; + int remaining = ptrsize; /* Lazy init */ - if (!LZ4Stream_init(state, size, false /* decompressing */ )) + if (!LZ4Stream_init(state, false /* decompressing */ )) { pg_log_error("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); return -1; } - /* No work needs to be done for a zero-sized output buffer */ - if (size <= 0) - return 0; - - /* Verify that there is enough space in the outbuf */ - if (size > state->buflen) + /* Loop until postcondition is satisfied */ + while (remaining > 0) { - state->buflen = size; - state->buffer = pg_realloc(state->buffer, size); - } - - /* use already decompressed content if available */ - dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag); - if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize))) - return dsize; - - readbuf = pg_malloc(size); + /* + * If we already have some decompressed data, return that. + */ + if (state->outbufnext < state->outbufdata) + { + char *outptr = state->outbuf + state->outbufnext; + size_t readlen = state->outbufdata - state->outbufnext; + bool eol_found = false; + + if (readlen > remaining) + readlen = remaining; + /* If eol_flag is set, don't read beyond a newline */ + if (eol_flag) + { + char *eolptr = memchr(outptr, '\n', readlen); - do - { - char *rp; - char *rend; + if (eolptr) + { + readlen = eolptr - outptr + 1; + eol_found = true; + } + } + memcpy(ptr, outptr, readlen); + ptr = ((char *) ptr) + readlen; + state->outbufnext += readlen; + dsize += readlen; + remaining -= readlen; + if (eol_found || remaining == 0) + break; + /* We must have emptied outbuf */ + Assert(state->outbufnext >= state->outbufdata); + } - rsize = fread(readbuf, 1, size, state->fp); - if (rsize < size && !feof(state->fp)) + /* + * If we don't have any pending compressed data, load more into + * state->buffer. + */ + if (state->bufnext >= state->bufdata) { - pg_log_error("could not read from input file: %m"); - return -1; - } + size_t rsize; - rp = (char *) readbuf; - rend = (char *) readbuf + rsize; + rsize = fread(state->buffer, 1, state->buflen, state->fp); + if (rsize < state->buflen && !feof(state->fp)) + { + pg_log_error("could not read from input file: %m"); + return -1; + } + if (rsize == 0) + break; /* must be EOF */ + state->bufdata = rsize; + state->bufnext = 0; + } - while (rp < rend) + /* + * Decompress some data into state->outbuf. + */ { size_t status; - size_t outlen = state->buflen; - size_t read_remain = rend - rp; - - memset(state->buffer, 0, outlen); - status = LZ4F_decompress(state->dtx, state->buffer, &outlen, - rp, &read_remain, NULL); + size_t outlen = state->outbuflen; + size_t inlen = state->bufdata - state->bufnext; + + status = LZ4F_decompress(state->dtx, + state->outbuf, &outlen, + state->buffer + state->bufnext, + &inlen, + NULL); if (LZ4F_isError(status)) { state->errcode = status; @@ -517,54 +496,11 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag) LZ4F_getErrorName(state->errcode)); return -1; } - - rp += read_remain; - - /* - * fill in what space is available in ptr if the eol flag is set, - * either skip if one already found or fill up to EOL if present - * in the outbuf - */ - if (outlen > 0 && dsize < size && eol_found == false) - { - char *p; - size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize; - size_t len = outlen < lib ? outlen : lib; - - if (eol_flag && - (p = memchr(state->buffer, '\n', outlen)) && - (size_t) (p - state->buffer + 1) <= len) - { - len = p - state->buffer + 1; - eol_found = true; - } - - memcpy((char *) ptr + dsize, state->buffer, len); - dsize += len; - - /* move what did not fit, if any, at the beginning of the buf */ - if (len < outlen) - memmove(state->buffer, state->buffer + len, outlen - len); - outlen -= len; - } - - /* if there is available output, save it */ - if (outlen > 0) - { - while (state->overflowlen + outlen > state->overflowalloclen) - { - state->overflowalloclen *= 2; - state->overflowbuf = pg_realloc(state->overflowbuf, - state->overflowalloclen); - } - - memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen); - state->overflowlen += outlen; - } + state->bufnext += inlen; + state->outbufdata = outlen; + state->outbufnext = 0; } - } while (rsize == size && dsize < size && eol_found == false); - - pg_free(readbuf); + } return dsize; } @@ -579,7 +515,7 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) size_t remaining = size; /* Lazy init */ - if (!LZ4Stream_init(state, size, true)) + if (!LZ4Stream_init(state, true)) pg_fatal("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); @@ -742,7 +678,7 @@ LZ4Stream_close(CompressFileHandle *CFH) if (LZ4F_isError(status)) pg_log_error("could not end decompression: %s", LZ4F_getErrorName(status)); - pg_free(state->overflowbuf); + pg_free(state->outbuf); } pg_free(state->buffer); -- 2.43.7 From 6e65ac1ff26071024cfcf24d15819aa6b72af137 Mon Sep 17 00:00:00 2001 From: Tom Lane <tgl@sss.pgh.pa.us> Date: Sat, 11 Oct 2025 21:16:12 -0400 Subject: [PATCH v3 4/4] Fix issues with reading zero bytes in Gzip_read and Zstd_read. pg_dump expects a read request of zero bytes to be a no-op (see for example ReadStr()). Gzip_read got this wrong and falsely supposed that the resulting gzret == 0 indicated an error. Zstd_read got the right result, but only after doing a lot more work than necessary, because it checked at the bottom of the loop not the top. The Gzip_read fix perhaps should be back-patched, because it breaks the nominally-supported case of manually gzip'ing the toc.dat file within a directory-style dump. Author: Tom Lane <tgl@sss.pgh.pa.us> Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us --- src/bin/pg_dump/compress_gzip.c | 4 ++++ src/bin/pg_dump/compress_zstd.c | 5 +---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c index 4a067e1402c..ad3b6486027 100644 --- a/src/bin/pg_dump/compress_gzip.c +++ b/src/bin/pg_dump/compress_gzip.c @@ -257,6 +257,10 @@ Gzip_read(void *ptr, size_t size, CompressFileHandle *CFH) gzFile gzfp = (gzFile) CFH->private_data; int gzret; + /* Reading zero bytes must be a no-op */ + if (size == 0) + return 0; + gzret = gzread(gzfp, ptr, size); /* diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index 5fe2279faae..36c1fd264ee 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -287,7 +287,7 @@ Zstd_read_internal(void *ptr, size_t size, CompressFileHandle *CFH, bool exit_on output->dst = ptr; output->pos = 0; - for (;;) + while (output->pos < output->size) { Assert(input->pos <= input->size); Assert(input->size <= input_allocated_size); @@ -341,9 +341,6 @@ Zstd_read_internal(void *ptr, size_t size, CompressFileHandle *CFH, bool exit_on if (res == 0) break; /* End of frame */ } - - if (output->pos == output->size) - break; /* We read all the data that fits */ } return output->pos; -- 2.43.7
pgsql-hackers by date: