From 0de5fd971602ae00fd3bd62cf5da0d8f0a3cce5a Mon Sep 17 00:00:00 2001 From: Andrew Dunstan Date: Sun, 22 Mar 2026 05:32:45 -0400 Subject: [PATCH v5 2/5] Fix failure to finalize the decompression pipeline at archive EOF. archive_waldump.c called astreamer_finalize() nowhere. This meant that any data retained in decompression buffers at the moment we detect archive EOF would never reach astreamer_waldump_content(), resulting in surprising failures if we actually need the last few bytes of the archive file. To fix, make read_archive_file() do the finalize once it detects EOF. Change its API to return a boolean "yes there's more data" rather than the entirely-misleading raw count of bytes read. Also document the contract that cur_file can change (or become NULL) during a single read_archive_file() call, since the decompression pipeline may produce enough output to trigger multiple astreamer callbacks. Author: Tom Lane Discussion: https://postgr.es/m/2178517.1774064942@sss.pgh.pa.us --- src/bin/pg_waldump/archive_waldump.c | 50 +++++++++++++++++++++++----- src/bin/pg_waldump/pg_waldump.h | 1 + 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/bin/pg_waldump/archive_waldump.c b/src/bin/pg_waldump/archive_waldump.c index b078c2d6960..cd092a057ef 100644 --- a/src/bin/pg_waldump/archive_waldump.c +++ b/src/bin/pg_waldump/archive_waldump.c @@ -89,7 +89,7 @@ typedef struct astreamer_waldump static ArchivedWALFile *get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo); -static int read_archive_file(XLogDumpPrivate *privateInfo, Size count); +static bool read_archive_file(XLogDumpPrivate *privateInfo, Size count); static void setup_tmpwal_dir(const char *waldir); static void cleanup_tmpwal_dir_atexit(void); @@ -139,6 +139,7 @@ init_archive_reader(XLogDumpPrivate *privateInfo, pg_fatal("could not open file \"%s\"", privateInfo->archive_name); privateInfo->archive_fd = fd; + privateInfo->archive_fd_eof = false; streamer = astreamer_waldump_new(privateInfo); @@ -178,7 +179,7 @@ init_archive_reader(XLogDumpPrivate *privateInfo, */ while (entry == NULL || entry->buf->len < XLOG_BLCKSZ) { - if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0) + if (!read_archive_file(privateInfo, XLOG_BLCKSZ)) pg_fatal("could not find WAL in archive \"%s\"", privateInfo->archive_name); @@ -236,9 +237,10 @@ free_archive_reader(XLogDumpPrivate *privateInfo) /* * NB: Normally, astreamer_finalize() is called before astreamer_free() to * flush any remaining buffered data or to ensure the end of the tar - * archive is reached. However, when decoding WAL, once we hit the end - * LSN, any remaining buffered data or unread portion of the archive can - * be safely ignored. + * archive is reached. read_archive_file() may have done so. However, + * when decoding WAL we can stop once we hit the end LSN, so we may never + * have read all of the input file. In that case any remaining buffered + * data or unread portion of the archive can be safely ignored. */ astreamer_free(privateInfo->archive_streamer); @@ -384,7 +386,7 @@ read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr, fname, privateInfo->archive_name, (long long int) (count - nbytes), (long long int) count); - if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0) + if (!read_archive_file(privateInfo, READ_CHUNK_SIZE)) pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes", privateInfo->archive_name, fname, (long long int) (count - nbytes), @@ -490,7 +492,7 @@ get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo) */ if (entry == NULL || entry->buf->len == 0) { - if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0) + if (!read_archive_file(privateInfo, READ_CHUNK_SIZE)) break; /* archive file ended */ } @@ -540,8 +542,22 @@ get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo) /* * Reads a chunk from the archive file and passes it through the streamer * pipeline for decompression (if needed) and tar member extraction. + * + * count is the maximum amount to try to read this time. Note that it's + * measured in raw file bytes, and may have little to do with how much + * comes out of decompression/extraction. + * + * Returns true if successful, false if there is no more data. + * + * Callers must be aware that a single call may trigger multiple callbacks + * in astreamer_waldump_content, so privateInfo->cur_file can change value + * (or become NULL) during a call. In particular, cur_file is set to NULL + * when the ASTREAMER_MEMBER_TRAILER callback fires at the end of a tar + * member; it is then set to a new entry when the next WAL member's + * ASTREAMER_MEMBER_HEADER callback fires, which may or may not happen + * within the same call. */ -static int +static bool read_archive_file(XLogDumpPrivate *privateInfo, Size count) { int rc; @@ -549,6 +565,11 @@ read_archive_file(XLogDumpPrivate *privateInfo, Size count) /* The read request must not exceed the allocated buffer size. */ Assert(privateInfo->archive_read_buf_size >= count); + /* Fail if we already reached EOF in a prior call. */ + if (privateInfo->archive_fd_eof) + return false; + + /* Try to read some more data. */ rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count); if (rc < 0) pg_fatal("could not read file \"%s\": %m", @@ -562,8 +583,19 @@ read_archive_file(XLogDumpPrivate *privateInfo, Size count) astreamer_content(privateInfo->archive_streamer, NULL, privateInfo->archive_read_buf, rc, ASTREAMER_UNKNOWN); + else + { + /* + * We reached EOF, but there is probably still data queued in the + * astreamer pipeline's buffers. Flush it out to ensure that we + * process everything. + */ + astreamer_finalize(privateInfo->archive_streamer); + /* Set flag to ensure we don't finalize more than once. */ + privateInfo->archive_fd_eof = true; + } - return rc; + return true; } /* diff --git a/src/bin/pg_waldump/pg_waldump.h b/src/bin/pg_waldump/pg_waldump.h index 36893624f53..cde7c6ca3f2 100644 --- a/src/bin/pg_waldump/pg_waldump.h +++ b/src/bin/pg_waldump/pg_waldump.h @@ -35,6 +35,7 @@ typedef struct XLogDumpPrivate char *archive_dir; char *archive_name; /* Tar archive filename */ int archive_fd; /* File descriptor for the open tar file */ + bool archive_fd_eof; /* Have we reached EOF on archive_fd? */ astreamer *archive_streamer; char *archive_read_buf; /* Reusable read buffer for archive I/O */ -- 2.43.0