From dbacb4b3d19c579eb0e3b8aa2dbbff04c7273584 Mon Sep 17 00:00:00 2001 From: Amul Sul Date: Mon, 25 Aug 2025 17:26:29 +0530 Subject: [PATCH v2 6/9] WIP-pg_waldump: Remove the restriction on the order of archived WAL files. With previous patch, pg_waldump would stop decoding if WAL files were not in the required sequence. With this patch, decoding will now continue. Any WAL file that is out of order will be written to a temporary location, from which it will be read later. Once a temporary file has been read, it will be removed. TODO: Timeline switching is not handled correctly, especially when a timeline change occurs on the next WAL file that was previously written to a temporary location. --- doc/src/sgml/ref/pg_waldump.sgml | 8 +- src/bin/pg_waldump/astreamer_waldump.c | 189 +++++++++++++++++++++---- src/bin/pg_waldump/pg_waldump.c | 77 +++++++++- src/bin/pg_waldump/pg_waldump.h | 26 +++- src/bin/pg_waldump/t/001_basic.pl | 3 +- 5 files changed, 269 insertions(+), 34 deletions(-) diff --git a/doc/src/sgml/ref/pg_waldump.sgml b/doc/src/sgml/ref/pg_waldump.sgml index d004bb0f67e..8a28b4f0f91 100644 --- a/doc/src/sgml/ref/pg_waldump.sgml +++ b/doc/src/sgml/ref/pg_waldump.sgml @@ -149,8 +149,12 @@ PostgreSQL documentation of PGDATA. - If a tar archive is provided, its WAL segment files must be in - sequential order; otherwise, an error will be reported. + If a tar archive is provided and its WAL segment files are not in + sequential order, those files will be written to a temporary directory + named pg_waldump_tmp_dir/. This directory will be + created inside the directory specified by the TMPDIR + environment variable if it is set; otherwise, it will be created within + the same directory as the tar archive. diff --git a/src/bin/pg_waldump/astreamer_waldump.c b/src/bin/pg_waldump/astreamer_waldump.c index 916d388ef0c..acfbace7502 100644 --- a/src/bin/pg_waldump/astreamer_waldump.c +++ b/src/bin/pg_waldump/astreamer_waldump.c @@ -18,8 +18,8 @@ #include "access/xlog_internal.h" #include "access/xlogdefs.h" +#include "common/file_perm.h" #include "common/logging.h" -#include "fe_utils/simple_list.h" #include "pg_waldump.h" /* @@ -37,6 +37,8 @@ typedef struct astreamer_waldump /* These fields change with archive member. */ bool skipThisSeg; + bool writeThisSeg; + FILE *segFp; XLogSegNo nextSegNo; /* Next expected segment to stream */ } astreamer_waldump; @@ -53,8 +55,15 @@ static bool member_is_relevant_wal(astreamer_member *member, XLogSegNo startSegNo, XLogSegNo endSegNo, XLogSegNo nextSegNo, + char **curFname, XLogSegNo *curSegNo, TimeLineID *curSegTimeline); +static FILE *member_prepare_tmp_write(XLogSegNo curSegNo, + const char *fname, + XLogDumpPrivate *privateInfo); +static XLogSegNo member_next_segno(XLogSegNo curSegNo, + TimeLineID timeline, + XLogDumpPrivate *privateInfo); static const astreamer_ops astreamer_waldump_ops = { .content = astreamer_waldump_content, @@ -189,17 +198,8 @@ astreamer_waldump_content_new(astreamer *next, XLogRecPtr startptr, if (XLogRecPtrIsInvalid(startptr)) streamer->startSegNo = 0; else - { XLByteToSeg(startptr, streamer->startSegNo, WalSegSz); - /* - * Initialize the record pointer to the beginning of the first - * segment; this pointer will track the WAL record reading status. - */ - XLogSegNoOffsetToRecPtr(streamer->startSegNo, 0, WalSegSz, - privateInfo->archive_streamer_read_ptr); - } - if (XLogRecPtrIsInvalid(endPtr)) streamer->endSegNo = UINT64_MAX; else @@ -228,19 +228,21 @@ astreamer_waldump_content(astreamer *streamer, astreamer_member *member, { case ASTREAMER_MEMBER_HEADER: { + char *fname; XLogSegNo segNo; TimeLineID timeline; pg_log_debug("pg_waldump: reading \"%s\"", member->pathname); mystreamer->skipThisSeg = false; + mystreamer->writeThisSeg = false; if (!member_is_relevant_wal(member, privateInfo->timeline, mystreamer->startSegNo, mystreamer->endSegNo, mystreamer->nextSegNo, - &segNo, &timeline)) + &fname, &segNo, &timeline)) { mystreamer->skipThisSeg = true; break; @@ -254,24 +256,38 @@ astreamer_waldump_content(astreamer *streamer, astreamer_member *member, if (mystreamer->nextSegNo == 0) break; - /* WAL segments must be archived in order */ + /* + * When WAL segments are not archived sequentially, it becomes + * necessary to write out (or preserve) segments that might be + * required at a later point. + */ if (mystreamer->nextSegNo != segNo) { - pg_log_error("WAL files are not archived in sequential order"); - pg_log_error_detail("Expecting segment number " UINT64_FORMAT " but found " UINT64_FORMAT ".", - mystreamer->nextSegNo, segNo); - exit(1); + mystreamer->writeThisSeg = true; + mystreamer->segFp = + member_prepare_tmp_write(segNo, fname, privateInfo); + break; } /* - * We track the reading of WAL segment records using a pointer - * that's continuously incremented by the length of the - * received data. This pointer is crucial for serving WAL page - * requests from the WAL decoding routine, so it must be - * accurate. + * We are now streaming segment containt. + * + * We need to track the reading of WAL segment records using a + * pointer that's typically incremented by the length of the + * data read. However, we sometimes export the WAL file to + * temporary storage, allowing the decoding routine to read + * directly from there. This makes continuous pointer + * incrementing challenging, as file reads can occur from any + * offset, leading to potential errors. Therefore, we now + * reset the pointer when reading from a file for streaming. + * Also, if there's any existing data in the buffer, the next + * WAL record should logically follow it. */ #ifdef USE_ASSERT_CHECKING - if (mystreamer->nextSegNo != 0) + Assert(!mystreamer->skipThisSeg); + Assert(!mystreamer->writeThisSeg); + + if (privateInfo->archive_streamer_buf->len != 0) { XLogRecPtr recPtr; @@ -280,11 +296,19 @@ astreamer_waldump_content(astreamer *streamer, astreamer_member *member, } #endif + /* + * Initialized to the beginning of the current segment being + * streamed through the buffer. + */ + XLogSegNoOffsetToRecPtr(segNo, 0, WalSegSz, + privateInfo->archive_streamer_read_ptr); + /* Save the timeline */ privateInfo->timeline = timeline; /* Update the next expected segment number */ - mystreamer->nextSegNo += 1; + mystreamer->nextSegNo = + member_next_segno(segNo, timeline, privateInfo); } break; @@ -293,12 +317,44 @@ astreamer_waldump_content(astreamer *streamer, astreamer_member *member, if (mystreamer->skipThisSeg) break; + /* Or, write contents to file */ + if (mystreamer->writeThisSeg) + { + Assert(mystreamer->segFp != NULL); + + errno = 0; + if (len > 0 && fwrite(data, len, 1, mystreamer->segFp) != 1) + { + char *fname; + int pathlen = strlen(member->pathname); + + Assert(pathlen >= XLOG_FNAME_LEN); + + fname = member->pathname + (pathlen - XLOG_FNAME_LEN); + + /* + * If write didn't set errno, assume problem is no disk + * space + */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to file \"%s/%s\": %m", + privateInfo->tmpdir, fname); + } + break; + } + /* Or, copy contents to buffer */ privateInfo->archive_streamer_read_ptr += len; astreamer_buffer_bytes(streamer, &data, &len, len); break; case ASTREAMER_MEMBER_TRAILER: + if (mystreamer->segFp != NULL) + { + fclose(mystreamer->segFp); + mystreamer->segFp = NULL; + } break; case ASTREAMER_ARCHIVE_TRAILER: @@ -325,8 +381,14 @@ astreamer_waldump_finalize(astreamer *streamer) static void astreamer_waldump_free(astreamer *streamer) { + astreamer_waldump *mystreamer; + Assert(streamer->bbs_next == NULL); + mystreamer = (astreamer_waldump *) streamer; + if (mystreamer->segFp != NULL) + fclose(mystreamer->segFp); + pfree(streamer->bbs_buffer.data); pfree(streamer); } @@ -339,8 +401,8 @@ astreamer_waldump_free(astreamer *streamer) static bool member_is_relevant_wal(astreamer_member *member, TimeLineID startTimeLineID, XLogSegNo startSegNo, XLogSegNo endSegNo, - XLogSegNo nextSegNo, XLogSegNo *curSegNo, - TimeLineID *curSegTimeline) + XLogSegNo nextSegNo, char **curFname, + XLogSegNo *curSegNo, TimeLineID *curSegTimeline) { int pathlen; XLogSegNo segNo; @@ -371,8 +433,85 @@ member_is_relevant_wal(astreamer_member *member, TimeLineID startTimeLineID, if (startSegNo > segNo || endSegNo < segNo) return false; + *curFname = fname; *curSegNo = segNo; *curSegTimeline = timeline; return true; } + +/* + * Create an empty placeholder file and return its handle. The file is also + * added to an exported list for future management, e.g. access, deletion, and + * existence checks. + */ +static FILE * +member_prepare_tmp_write(XLogSegNo curSegNo, const char *fname, + XLogDumpPrivate *privateInfo) +{ + FILE *file; + char *fpath = get_tmp_wal_file_path(privateInfo, fname); + + /* Create an empty placeholder */ + file = fopen(fpath, PG_BINARY_W); + if (file == NULL) + pg_fatal("could not create file \"%s\": %m", fpath); + +#ifndef WIN32 + if (chmod(fpath, pg_file_create_mode)) + pg_fatal("could not set permissions on file \"%s\": %m", + fpath); +#endif + + /* Record this segment's export */ + simple_string_list_append(&privateInfo->exportedSegList, fname); + pfree(fpath); + + return file; +} + +/* + * Get next WAL segment that needs to be retrieved from the archive. + * + * The function checks for the presence of a previously read and extracted WAL + * segment in the temporary storage. If a temporary file is found for that + * segment, it indicates the segment has already been successfully retrieved + * from the archive. In this case, the function increments the segment number + * and repeats the check. This process continues until a segment that has not + * yet been retrieved is found, at which point the function returns its number. + */ +static XLogSegNo +member_next_segno(XLogSegNo curSegNo, TimeLineID timeline, + XLogDumpPrivate *privateInfo) +{ + XLogSegNo nextSegNo = curSegNo + 1; + bool exists; + + /* + * If we find a file that was previously written to the temporary space, + * it indicates that the corresponding WAL segment request has already + * been fulfilled. In that case, we increment the nextSegNo counter and + * check again whether that segment number again. if found above steps + * will be return if not then we return that segment number which would be + * needed from the archive. + */ + do + { + char fname[MAXFNAMELEN]; + + XLogFileName(fname, timeline, nextSegNo, WalSegSz); + + /* + * If the WAL segment has already been exported, increment the counter + * and check for the next segment. + */ + exists = false; + if (simple_string_list_member(&privateInfo->exportedSegList, fname)) + { + nextSegNo += 1; + exists = true; + } + } while (exists); + + return nextSegNo; +} diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 64f3a65b735..d456adce59c 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -393,13 +393,14 @@ setup_astreamer(XLogDumpPrivate *private, pg_compress_algorithm compression, } /* - * Initializes the archive reader for a tar file. + * Initializes the tar archive reader and a temporary directory for WAL files. */ static void init_tar_archive_reader(XLogDumpPrivate *private, char *waldir, pg_compress_algorithm compression) { int fd; + char *tmpdir; /* Now, the tar archive and store its file descriptor */ fd = open_file_in_directory(waldir, private->archive_name); @@ -411,6 +412,15 @@ init_tar_archive_reader(XLogDumpPrivate *private, char *waldir, /* Setup tar archive reading facility */ setup_astreamer(private, compression, private->startptr, private->endptr); + + /* Temporary space for writing WAL segments */ + if (getenv("TMPDIR")) + tmpdir = pstrdup(getenv("TMPDIR")); + else + tmpdir = waldir != NULL ? pstrdup(waldir) : pstrdup("."); + canonicalize_path(tmpdir); + + private->tmpdir = tmpdir; } /* @@ -419,6 +429,8 @@ init_tar_archive_reader(XLogDumpPrivate *private, char *waldir, static void free_tar_archive_reader(XLogDumpPrivate *private) { + SimpleStringListCell *cell; + /* * NB: Normally, astreamer_finalize() is called before astreamer_free() to * flush any remaining buffered data or to ensure the end of the tar @@ -432,6 +444,15 @@ free_tar_archive_reader(XLogDumpPrivate *private) if (close(private->archive_fd) != 0) pg_log_error("could not close file \"%s\": %m", private->archive_name); + + /* Clear out any existing temporary files */ + for (cell = private->exportedSegList.head; cell; cell = cell->next) + { + char *fpath = get_tmp_wal_file_path(private, cell->val); + + unlink(fpath); + pfree(fpath); + } } /* @@ -559,7 +580,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff) { XLogDumpPrivate *private = state->private_data; - int count = required_read_len(private, targetPtr, reqLen); + int count = required_read_len(private, targetPagePtr, reqLen); WALReadError errinfo; if (private->endptr_reached) @@ -618,12 +639,60 @@ TarWALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff) { XLogDumpPrivate *private = state->private_data; - int count = required_read_len(private, targetPtr, reqLen); + int count = required_read_len(private, targetPagePtr, reqLen); + XLogSegNo nextSegNo; if (private->endptr_reached) return -1; - /* Read the WAL page from the archive streamer */ + /* + * If the target page is in a different segment, first check for the WAL + * segment's physical existence in the temporary directory. + * + * XXX: Timeline change is not handled. + */ + nextSegNo = state->seg.ws_segno; + if (!XLByteInSeg(targetPagePtr, nextSegNo, WalSegSz)) + { + char fname[MAXPGPATH]; + char *fpath; + + if (state->seg.ws_file >= 0) + { + close(state->seg.ws_file); + state->seg.ws_file = -1; + + /* Remove this file, as it is no longer needed. */ + XLogFileName(fname, state->seg.ws_tli, nextSegNo, WalSegSz); + fpath = get_tmp_wal_file_path(private, fname); + unlink(fpath); + pfree(fpath); + } + + XLByteToSeg(targetPagePtr, nextSegNo, WalSegSz); + state->seg.ws_tli = private->timeline; + state->seg.ws_segno = nextSegNo; + + /* + * If the next segment exists, open it and continue reading from there + */ + XLogFileName(fname, private->timeline, nextSegNo, WalSegSz); + if (simple_string_list_member(&private->exportedSegList, fname)) + { + fpath = get_tmp_wal_file_path(private, fname); + state->seg.ws_file = open(fpath, O_RDONLY | PG_BINARY, 0); + + if (state->seg.ws_file < 0) + pg_fatal("could not open file \"%s\": %m", fpath); + } + } + + /* Continue reading from the open WAL segment, if any */ + if (state->seg.ws_file >= 0) + return WALDumpReadPage(state, targetPagePtr, reqLen, targetPtr, + readBuff); + + /* Otherwise, read the WAL page from the archive streamer */ return astreamer_wal_read(readBuff, targetPagePtr, count, private); } diff --git a/src/bin/pg_waldump/pg_waldump.h b/src/bin/pg_waldump/pg_waldump.h index b5d440500de..614e679cb96 100644 --- a/src/bin/pg_waldump/pg_waldump.h +++ b/src/bin/pg_waldump/pg_waldump.h @@ -13,8 +13,11 @@ #include "access/xlogdefs.h" #include "fe_utils/astreamer.h" +#include "fe_utils/simple_list.h" #include "lib/stringinfo.h" +#define TEMP_FILE_EXT "waldump.tmp" + extern int WalSegSz; /* Contains the necessary information to drive WAL decoding */ @@ -31,11 +34,30 @@ typedef struct XLogDumpPrivate astreamer *archive_streamer; StringInfo archive_streamer_buf; /* Buffer for receiving WAL data */ - XLogRecPtr archive_streamer_read_ptr; /* Populate the buffer with records - until this record pointer */ + XLogRecPtr archive_streamer_read_ptr; /* Populate the buffer with + * records until this record + * pointer */ + char *tmpdir; /* Temporary direcotry to export file */ + SimpleStringList exportedSegList; /* Temporary exported WAL file list */ } XLogDumpPrivate; +/* + * Generate the temporary WAL file path. + * + * Note that the caller is responsible to pfree it. + */ +static inline char * +get_tmp_wal_file_path(XLogDumpPrivate *privateInfo, const char *fname) +{ + char *fpath = (char *) palloc(MAXPGPATH); + + snprintf(fpath, MAXPGPATH, "%s/%s.%s", privateInfo->tmpdir, fname, + TEMP_FILE_EXT); + + return fpath; +} + extern astreamer *astreamer_waldump_content_new(astreamer *next, XLogRecPtr startptr, XLogRecPtr endptr, diff --git a/src/bin/pg_waldump/t/001_basic.pl b/src/bin/pg_waldump/t/001_basic.pl index 443126a9ce6..d5fa1f6d28d 100644 --- a/src/bin/pg_waldump/t/001_basic.pl +++ b/src/bin/pg_waldump/t/001_basic.pl @@ -7,6 +7,7 @@ use Cwd; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use List::Util qw(shuffle); my $tar = $ENV{TAR}; @@ -272,7 +273,7 @@ sub generate_archive } closedir $dh; - @files = sort @files; + @files = shuffle @files; # move into the WAL directory before archiving files my $cwd = getcwd; -- 2.47.1