From ebbc80179d262f80eff31f5f7d6db218c5321354 Mon Sep 17 00:00:00 2001 From: Andrey Borodin Date: Wed, 8 Jan 2025 16:39:20 +0500 Subject: [PATCH v6] Add whole-record WAL compression alongside FPI compression Extends WAL compression to compress entire WAL records when their size exceeds wal_compression_threshold (default 512 bytes), complementing existing per-FPI compression which continues to work unchanged. When wal_compression is enabled and a record exceeds the threshold, the entire record body is compressed as a single unit. This achieves better ratios than per-FPI compression for workloads where multiple pages share similar content, such as B-tree index builds. Benchmark (CREATE INDEX on 10M rows of random float data, VACUUM + CHECKPOINT before each run so B-tree pages are logged as FPIs): Method No compression FPI-only (HEAD) Whole-record (Patched) ------ -------------- --------------- ---------------------- pglz 193 MB 193 MB 193 MB lz4 193 MB 160 MB 132 MB zstd 193 MB 125 MB 97 MB When whole-record compression is active, per-FPI compression is skipped during record assembly so that FPI data is not compressed twice. If whole-record compression fails (e.g. incompressible data) and the record contains FPIs, the code falls back to per-FPI compression automatically. Setting wal_compression_threshold >= wal_compression_buffer disables whole-record compression and restores pure FPI-only behaviour. Memory is controlled by the new wal_compression_buffer GUC. Its default equals MIN_WAL_COMPRESSION_BUFFER = XLR_MAX_BLOCK_ID * COMPRESS_BUFSIZE + HEADER_SCRATCH_SIZE, sized to hold the largest possible WAL record (32 full-page images plus headers) uncompressed. This is the same total memory previously occupied by per-block compressed_page arrays, so the default overhead is unchanged. Total memory per backend is approximately 2x wal_compression_buffer: one buffer for assembly, one for output. The XLR_COMPRESSED flag (0x04) in xl_info marks compressed records. Compressed records begin with XLogCompressionHeader carrying the compression method and original decompressed length. Author: Andrey Borodin Discussion: https://postgr.es/m/4DC38068-976E-4A84-8EE6-4EFACBBD927A@yandex-team.ru --- src/backend/access/transam/xlog.c | 18 +- src/backend/access/transam/xloginsert.c | 504 +++++++++++++++--- src/backend/access/transam/xlogreader.c | 233 +++++++- src/backend/utils/misc/guc_parameters.dat | 25 + src/backend/utils/misc/postgresql.conf.sample | 5 +- src/include/access/xlog.h | 2 + src/include/access/xlogreader.h | 4 + src/include/access/xlogrecord.h | 29 +- src/include/utils/guc_hooks.h | 2 + src/test/perl/PostgreSQL/Test/Cluster.pm | 15 +- src/test/recovery/Makefile | 10 + .../recovery/t/046_checkpoint_logical_slot.pl | 5 +- src/test/recovery/t/052_wal_compression.pl | 93 ++++ src/test/recovery/wal_compression.conf | 3 + 14 files changed, 846 insertions(+), 102 deletions(-) create mode 100644 src/test/recovery/t/052_wal_compression.pl create mode 100644 src/test/recovery/wal_compression.conf diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 81dc86847c0..0e5f96c5b57 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -138,6 +138,7 @@ int wal_retrieve_retry_interval = 5000; int max_slot_wal_keep_size_mb = -1; int wal_decode_buffer_size = 512 * 1024; bool track_wal_io_timing = false; +int wal_compression_threshold = 512; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -717,6 +718,21 @@ static void WALInsertLockAcquireExclusive(void); static void WALInsertLockRelease(void); static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); +#ifdef WAL_DEBUG +/* Read length of a record, accounting for possible compression */ +static uint32 +XLogGetRecordTotalLen(XLogRecord *record) +{ + if (record->xl_info & XLR_COMPRESSED) + { + XLogCompressionHeader *c = (XLogCompressionHeader *) record; + Assert(c->decompressed_length > 0); + return c->decompressed_length; + } + return record->xl_tot_len; +} +#endif + /* * Insert an XLOG record represented by an already-constructed chain of data * chunks. This is a low-level routine; to construct the WAL record header @@ -1034,7 +1050,7 @@ XLogInsertRecord(XLogRecData *rdata, /* We also need temporary space to decode the record. */ record = (XLogRecord *) recordBuf.data; decoded = (DecodedXLogRecord *) - palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len)); + palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record))); if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 92c48e768c3..16da53ba565 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -39,6 +39,7 @@ #include "replication/origin.h" #include "storage/bufmgr.h" #include "storage/proc.h" +#include "utils/guc_hooks.h" #include "utils/memutils.h" #include "utils/pgstat_internal.h" @@ -63,6 +64,24 @@ /* Buffer size required to store a compressed version of backup block image */ #define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ) +#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) + +#define HEADER_SCRATCH_SIZE \ + (SizeOfXLogRecord + \ + MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfXLogTransactionId) + +/* + * Minimum buffer size for wal_compression_buffer GUC. + * + * Must accommodate the largest WAL record we might want to compress as a + * whole: up to XLR_MAX_BLOCK_ID full-page images (each up to COMPRESS_BUFSIZE + * bytes when uncompressed) plus per-block and record headers. + */ +#define MIN_WAL_COMPRESSION_BUFFER (XLR_MAX_BLOCK_ID * COMPRESS_BUFSIZE + HEADER_SCRATCH_SIZE) + /* * For each block reference registered with XLogRegisterBuffer, we fill in * a registered_buffer struct. @@ -84,8 +103,8 @@ typedef struct XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to * backup block data in XLogRecordAssemble() */ - /* buffer to store a compressed version of backup block image */ - char compressed_page[COMPRESS_BUFSIZE]; + /* pointer into compression_buf for compressed page image */ + char *compressed_page; } registered_buffer; static registered_buffer *registered_buffers; @@ -115,14 +134,62 @@ static uint8 curinsert_flags = 0; static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; -#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) -#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) +/* + * GUC: Maximum memory per backend for WAL compression. + * + * Controls the size of the shared FPI compression buffer as well as the + * threshold for whole-record compression: records larger than this value + * will not be compressed as a whole (per-FPI compression still applies). + * + * The default equals MIN_WAL_COMPRESSION_BUFFER, which is the total memory + * previously used by embedded per-block compressed_page arrays. + * + * Actual memory consumption per backend is approximately 2x this value + * because we need both an input buffer and an output buffer for whole-record + * compression. + */ +int wal_compression_buffer; -#define HEADER_SCRATCH_SIZE \ - (SizeOfXLogRecord + \ - MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ - SizeOfXLogTransactionId) +static XLogRecData compressed_rdt_hdr; + +/* + * Compression buffers, allocated once and grown with repalloc if + * wal_compression_buffer increases. compression_buffers_size tracks the + * wal_compression_buffer value at which they were last allocated. + * + * compression_buf is a single wal_compression_buffer-sized staging area + * shared between two mutually exclusive uses: + * + * - FPI compression (skip_fpi_compression = false): compressed block images + * are packed into compression_buf one after another, with + * compression_buf_offset tracking the fill level. rdt nodes point + * directly into this buffer. + * + * - Whole-record compression (skip_fpi_compression = true): XLogCompressRdt + * flattens the rdt chain into compression_buf before compressing into + * compressed_data. FPI compression is skipped in this path, so the two + * uses never overlap. + * + * compressed_data holds the compressor output for whole-record compression. + * Its size is the maximum compressed output for wal_compression_buffer bytes. + */ +static char *compression_buf = NULL; +static int compression_buf_offset; /* fill level for FPI packing */ +static char *compressed_data = NULL; +static uint32 compressed_data_size; /* allocated size of compressed_data */ +static int compression_buffers_size; /* wal_compression_buffer at last alloc */ + +/* + * In assert builds (where MEMORY_CONTEXT_CHECKING is active), verify every + * palloc sentinel in the process after operations that write to compression + * buffers. This catches overflows that corrupt adjacent palloc blocks. + * Compiles to nothing in non-assert builds. + */ +#ifdef MEMORY_CONTEXT_CHECKING +#define CheckCompressionMemory() MemoryContextCheck(TopMemoryContext) +#else +#define CheckCompressionMemory() ((void) 0) +#endif /* * An array of XLogRecData structs, to hold registered data. @@ -136,11 +203,13 @@ static bool begininsert_called = false; /* Memory context to hold the registered buffer and data references. */ static MemoryContext xloginsert_cxt; +static void AllocCompressionBuffers(void); static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, int *num_fpi, - uint64 *fpi_bytes, - bool *topxid_included); + XLogRecPtr *fpw_lsn, int *num_fpi, + uint64 *fpi_bytes, + bool *topxid_included, uint64 *rec_size, + bool skip_fpi_compression); static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset, uint16 hole_length, void *dest, uint16 *dlen); @@ -155,6 +224,9 @@ XLogBeginInsert(void) Assert(mainrdata_last == (XLogRecData *) &mainrdata_head); Assert(mainrdata_len == 0); + /* Enforce that InitXLogInsert() was called before any WAL insertion */ + Assert(xloginsert_cxt != NULL); + /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) elog(ERROR, "cannot make new WAL entries during recovery"); @@ -234,6 +306,7 @@ XLogResetInsertion(void) mainrdata_len = 0; mainrdata_last = (XLogRecData *) &mainrdata_head; curinsert_flags = 0; + compression_buf_offset = 0; begininsert_called = false; } @@ -463,6 +536,124 @@ XLogSetRecordFlags(uint8 flags) curinsert_flags |= flags; } + +/* Compress assembled record on top of compression buffers */ +static XLogRecData * +XLogCompressRdt(XLogRecData *rdt) +{ + XLogCompressionHeader *compressed_header; + XLogRecord *src_header; + uint32 flat_len = 0; + uint32 orig_len; + int32 compr_len = -1; + + Assert(wal_compression != WAL_COMPRESSION_NONE); + Assert(compression_buf != NULL); + + /* + * Flatten the rdt chain into compression_buf. FPI compression is always + * skipped when whole-record compression is attempted (skip_fpi_compression + * = true), so compression_buf is unused at this point and safe to + * overwrite. + * + * Caller must have checked rec_size <= wal_compression_buffer before + * calling us, and AllocCompressionBuffers() guarantees the buffer is at + * least wal_compression_buffer bytes. Verify the invariant here so that + * any future drift between rec_size and the actual rdt chain length is + * caught immediately rather than silently corrupting memory. + */ + for (const XLogRecData *r = rdt; r != NULL; r = r->next) + { + memcpy(compression_buf + flat_len, r->data, r->len); + flat_len += r->len; + } + Assert(flat_len <= (uint32) wal_compression_buffer); + + src_header = (XLogRecord *) compression_buf; + compressed_header = (XLogCompressionHeader *) compressed_data; + + compressed_header->record_header = *src_header; + compressed_header->decompressed_length = flat_len; + + orig_len = src_header->xl_tot_len - SizeOfXLogRecord; + + switch ((WalCompression) wal_compression) + { + case WAL_COMPRESSION_PGLZ: + compressed_header->method = XLR_COMPRESS_PGLZ; + compr_len = pglz_compress((char *) &src_header[1], orig_len, (char *) &compressed_header[1], PGLZ_strategy_default); + if (compr_len == -1) + return NULL; + break; + + case WAL_COMPRESSION_LZ4: +#ifdef USE_LZ4 + compressed_header->method = XLR_COMPRESS_LZ4; + compr_len = LZ4_compress_default((char *) &src_header[1], (char *) &compressed_header[1], + orig_len, compressed_data_size); + if (compr_len <= 0) + return NULL; +#else + elog(ERROR, "LZ4 is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_ZSTD: +#ifdef USE_ZSTD + compressed_header->method = XLR_COMPRESS_ZSTD; + compr_len = ZSTD_compress((char *) &compressed_header[1], compressed_data_size, + (char *) &src_header[1], orig_len, ZSTD_CLEVEL_DEFAULT); + if (ZSTD_isError(compr_len)) + return NULL; +#else + elog(ERROR, "zstd is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_NONE: + Assert(false); /* cannot happen */ + return NULL; + break; + /* no default case, so that compiler will warn */ + } + + Assert(compr_len > 0); + + /* Verify compression did not overflow any palloc'd buffer. */ + CheckCompressionMemory(); + + compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len; + + compressed_header->record_header.xl_info |= XLR_COMPRESSED; + + compressed_rdt_hdr.data = compressed_data; + compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len; + compressed_rdt_hdr.next = NULL; + + return &compressed_rdt_hdr; +} + +/* Checksum assembled record (which may be compressed). */ +static void +XLogChecksumRecord(XLogRecData *rdt) +{ + pg_crc32c rdata_crc; + XLogRecord *rechdr = (XLogRecord *) rdt->data; + /* + * Calculate CRC of the data + * + * Note that the record header isn't added into the CRC initially since we + * don't know the prev-link yet. Thus, the CRC will represent the CRC of + * the whole record in the order: rdata, then backup blocks, then record + * header. + */ + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, ((char *)rdt->data) + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord); + for (rdt = rdt->next; rdt != NULL; rdt = rdt->next) + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); + rechdr->xl_crc = rdata_crc; +} + /* * Insert an XLOG record having the specified RMID and info bytes, with the * body of the record being the data and buffer references registered earlier @@ -478,6 +669,15 @@ XLogRecPtr XLogInsert(RmgrId rmid, uint8 info) { XLogRecPtr EndPos; + /* + * When whole-record compression can apply (threshold is within the buffer + * size limit), skip per-FPI compression during assembly to avoid + * compressing FPI data twice. When threshold >= buffer_size, whole-record + * compression can never trigger, so FPI compression runs normally. + */ + bool try_whole_record = (wal_compression != WAL_COMPRESSION_NONE && + wal_compression_threshold < + wal_compression_buffer); /* XLogBeginInsert() must have been called. */ if (!begininsert_called) @@ -514,6 +714,15 @@ XLogInsert(RmgrId rmid, uint8 info) XLogRecData *rdt; int num_fpi = 0; uint64 fpi_bytes = 0; + uint64 rec_size; + + /* + * Reset the FPI compression offset at the start of each iteration. + * The do-while loop retries when XLogInsertRecord returns + * InvalidXLogRecPtr (e.g. because doPageWrites changed), so we must + * not accumulate FPI data across iterations. + */ + compression_buf_offset = 0; /* * Get values needed to decide whether to do full-page writes. Since @@ -524,7 +733,53 @@ XLogInsert(RmgrId rmid, uint8 info) rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn, &num_fpi, &fpi_bytes, - &topxid_included); + &topxid_included, &rec_size, + try_whole_record); + + /* + * When try_whole_record is true, FPI compression was skipped during + * assembly. Attempt whole-record compression if the record is in the + * right size range. Neither XLogCompressRdt() nor the compressors it + * calls allocate memory, so this is safe inside a critical section. + * + * If whole-record compression was not used for any reason (record too + * small, too large for our buffer, or compressor rejected it), fall + * back to per-FPI compression by reassembling. This ensures FPIs are + * never silently left uncompressed because we optimistically skipped + * them during the first assembly pass. + */ + if (try_whole_record) + { + bool whole_record_compressed = false; + + if (rec_size > wal_compression_threshold && + rec_size <= (uint32) wal_compression_buffer) + { + XLogRecData *rdt_compressed = XLogCompressRdt(rdt); + + if (rdt_compressed != NULL) + { + rdt = rdt_compressed; + whole_record_compressed = true; + } + } + + if (!whole_record_compressed && num_fpi > 0) + { + /* + * Fall back to per-FPI compression. XLogRecordAssemble() + * overwrites the same static buffers each call, so no extra + * memory is required. + */ + compression_buf_offset = 0; + rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, + &fpw_lsn, &num_fpi, &fpi_bytes, + &topxid_included, &rec_size, + false); + } + } + + XLogChecksumRecord(rdt); EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi, fpi_bytes, topxid_included); @@ -547,6 +802,87 @@ XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value) return XLogInsert(rmid, info); } +/* + * Allocate (or grow) the two compression buffers. + * + * Must be called outside a critical section. InitXLogInsert() calls this at + * backend start when wal_compression is enabled. The GUC assign hooks for + * wal_compression and wal_compression_buffer call it when those GUCs change + * at runtime, which also always happens outside critical sections. + * + * If the buffers are already sized to wal_compression_buffer this is a no-op. + */ +static void +AllocCompressionBuffers(void) +{ + uint32 new_size = wal_compression_buffer; + uint32 compressed_buf_size; + + Assert(CritSectionCount == 0); + Assert(xloginsert_cxt != NULL); + + if (compression_buffers_size >= new_size) + return; /* already big enough */ + + compressed_buf_size = PGLZ_MAX_OUTPUT(new_size); +#ifdef USE_LZ4 + compressed_buf_size = Max(compressed_buf_size, + LZ4_COMPRESSBOUND(new_size)); +#endif +#ifdef USE_ZSTD + compressed_buf_size = Max(compressed_buf_size, + ZSTD_COMPRESSBOUND(new_size)); +#endif + compressed_buf_size += SizeOfXLogCompressedRecord; + + if (compression_buf == NULL) + { + compression_buf = MemoryContextAlloc(xloginsert_cxt, new_size); + compressed_data = MemoryContextAlloc(xloginsert_cxt, compressed_buf_size); + } + else + { + compression_buf = repalloc(compression_buf, new_size); + compressed_data = repalloc(compressed_data, compressed_buf_size); + } + + compressed_data_size = compressed_buf_size; + compression_buffers_size = new_size; + CheckCompressionMemory(); +} + +/* + * GUC assign hook for wal_compression. + * + * Allocates compression buffers immediately when compression is first enabled + * so that XLogInsert() never needs to allocate inside a critical section. + * If xloginsert_cxt is not yet set up (very early startup), the allocation is + * deferred to InitXLogInsert(). + */ +void +assign_wal_compression(int newval, void *extra) +{ + if (newval != WAL_COMPRESSION_NONE && + xloginsert_cxt != NULL && + compression_buf == NULL) + AllocCompressionBuffers(); +} + +/* + * GUC assign hook for wal_compression_buffer. + * + * Grows the compression buffers immediately when the GUC is increased so that + * XLogInsert() never needs to repalloc inside a critical section. + */ +void +assign_wal_compression_buffer(int newval, void *extra) +{ + if (wal_compression != WAL_COMPRESSION_NONE && + xloginsert_cxt != NULL && + compression_buf != NULL) + AllocCompressionBuffers(); +} + /* * Assemble a WAL record from the registered data and buffers into an * XLogRecData chain, ready for insertion with XLogInsertRecord(). @@ -566,12 +902,11 @@ static XLogRecData * XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr *fpw_lsn, int *num_fpi, uint64 *fpi_bytes, - bool *topxid_included) + bool *topxid_included, uint64 *rec_size, + bool skip_fpi_compression) { - XLogRecData *rdt; uint64 total_len = 0; int block_id; - pg_crc32c rdata_crc; registered_buffer *prev_regbuf = NULL; XLogRecData *rdt_datas_last; XLogRecord *rechdr; @@ -599,6 +934,18 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, if (wal_consistency_checking[rmid]) info |= XLR_CHECK_CONSISTENCY; + /* + * Compression buffers must already be allocated by this point. + * InitXLogInsert() allocates them at backend start when wal_compression + * is enabled; the assign hooks for wal_compression and + * wal_compression_buffer handle later changes outside critical sections. + * We must never allocate or repalloc here because XLogInsert() is + * routinely called inside critical sections. + */ + Assert(wal_compression == WAL_COMPRESSION_NONE || + (compression_buf != NULL && + compression_buffers_size >= wal_compression_buffer)); + /* * Make an rdata chain containing all the data portions of all block * references. This includes the data for full-page images. Also append @@ -613,6 +960,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecordBlockHeader bkpb; XLogRecordBlockImageHeader bimg; XLogRecordBlockCompressHeader cbimg = {0}; + uint16 hole_length; bool samerel; bool is_compressed = false; bool include_image; @@ -701,18 +1049,37 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, cbimg.hole_length = 0; } - /* - * Try to compress a block image if wal_compression is enabled - */ - if (wal_compression != WAL_COMPRESSION_NONE) + /* + * Try to compress a block image if wal_compression is enabled, + * we have space in the shared FPI compression buffer, and the caller + * has not requested that FPI compression be skipped (because + * whole-record compression will be applied instead). + */ + if (!skip_fpi_compression && + wal_compression != WAL_COMPRESSION_NONE && + compression_buf != NULL && + compression_buf_offset + COMPRESS_BUFSIZE <= wal_compression_buffer) { + /* Assign pointer into shared buffer for this FPI */ + regbuf->compressed_page = compression_buf + compression_buf_offset; + is_compressed = XLogCompressBackupBlock(page, bimg.hole_offset, cbimg.hole_length, regbuf->compressed_page, &compressed_len); + + if (is_compressed) + { + compression_buf_offset += compressed_len; + /* Verify FPI compression did not overrun compression_buf. */ + CheckCompressionMemory(); + } } + /* for uncompressed images, use hole_length from cbimg */ + hole_length = cbimg.hole_length; + /* * Fill in the remaining fields in the XLogRecordBlockHeader * struct @@ -778,9 +1145,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, } else { - bimg.length = BLCKSZ - cbimg.hole_length; + bimg.length = BLCKSZ - hole_length; - if (cbimg.hole_length == 0) + if (hole_length == 0) { rdt_datas_last->data = page; rdt_datas_last->len = BLCKSZ; @@ -795,9 +1162,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, rdt_datas_last = rdt_datas_last->next; rdt_datas_last->data = - page + (bimg.hole_offset + cbimg.hole_length); + page + (bimg.hole_offset + hole_length); rdt_datas_last->len = - BLCKSZ - (bimg.hole_offset + cbimg.hole_length); + BLCKSZ - (bimg.hole_offset + hole_length); } } @@ -914,19 +1281,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, hdr_rdt.len = (scratch - hdr_scratch); total_len += hdr_rdt.len; - /* - * Calculate CRC of the data - * - * Note that the record header isn't added into the CRC initially since we - * don't know the prev-link yet. Thus, the CRC will represent the CRC of - * the whole record in the order: rdata, then backup blocks, then record - * header. - */ - INIT_CRC32C(rdata_crc); - COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); - for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) - COMP_CRC32C(rdata_crc, rdt->data, rdt->len); - /* * Ensure that the XLogRecord is not too large. * @@ -950,7 +1304,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, rechdr->xl_info = info; rechdr->xl_rmid = rmid; rechdr->xl_prev = InvalidXLogRecPtr; - rechdr->xl_crc = rdata_crc; + rechdr->xl_crc = 0; + + *rec_size = rechdr->xl_tot_len; return &hdr_rdt; } @@ -1365,50 +1721,60 @@ log_newpage_range(Relation rel, ForkNumber forknum, /* * Allocate working buffers needed for WAL record construction. + * + * Must be called exactly once per process before any XLogBeginInsert() call. + * Both regular backends (via InitPostgres) and auxiliary processes (via + * AuxiliaryProcessMainCommon) must call this. */ void InitXLogInsert(void) { #ifdef USE_ASSERT_CHECKING + { + size_t max_required; - /* - * Check that any records assembled can be decoded. This is capped based - * on what XLogReader would require at its maximum bound. The XLOG_BLCKSZ - * addend covers the larger allocate_recordbuf() demand. This code path - * is called once per backend, more than enough for this check. - */ - size_t max_required = - DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ); + /* Detect extraneous calls */ + Assert(xloginsert_cxt == NULL); - Assert(AllocSizeIsValid(max_required)); + /* + * Check that any records assembled can be decoded. This is capped + * based on what XLogReader would require at its maximum bound. The + * XLOG_BLCKSZ addend covers the larger allocate_recordbuf() demand. + * This code path is called once per backend, more than enough for + * this check. + */ + max_required = DecodeXLogRecordRequiredSpace(XLogRecordMaxSize + XLOG_BLCKSZ); + Assert(AllocSizeIsValid(max_required)); + } #endif /* Initialize the working areas */ - if (xloginsert_cxt == NULL) - { - xloginsert_cxt = AllocSetContextCreate(TopMemoryContext, - "WAL record construction", - ALLOCSET_DEFAULT_SIZES); - } + xloginsert_cxt = AllocSetContextCreate(TopMemoryContext, + "WAL record construction", + ALLOCSET_DEFAULT_SIZES); - if (registered_buffers == NULL) - { - registered_buffers = (registered_buffer *) - MemoryContextAllocZero(xloginsert_cxt, - sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1)); - max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1; - } - if (rdatas == NULL) - { - rdatas = MemoryContextAlloc(xloginsert_cxt, - sizeof(XLogRecData) * XLR_NORMAL_RDATAS); - max_rdatas = XLR_NORMAL_RDATAS; - } + registered_buffers = (registered_buffer *) + MemoryContextAllocZero(xloginsert_cxt, + sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1)); + max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1; + + rdatas = MemoryContextAlloc(xloginsert_cxt, + sizeof(XLogRecData) * XLR_NORMAL_RDATAS); + max_rdatas = XLR_NORMAL_RDATAS; /* * Allocate a buffer to hold the header information for a WAL record. */ - if (hdr_scratch == NULL) - hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, - HEADER_SCRATCH_SIZE); + hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, HEADER_SCRATCH_SIZE); + + /* + * Pre-allocate compression buffers if compression is already enabled. + * This ensures the buffers exist before the first XLogInsert() call, + * which may happen inside a critical section where palloc is unsafe. + * If wal_compression is later turned on or wal_compression_buffer is + * increased, the GUC assign hooks handle allocation at that point + * (also always outside critical sections). + */ + if (wal_compression != WAL_COMPRESSION_NONE) + AllocCompressionBuffers(); } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c60aa9a51e9..080ab2aa0da 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -32,6 +32,7 @@ #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" #include "replication/origin.h" +#include "utils/memutils.h" #ifndef FRONTEND #include "pgstat.h" @@ -54,6 +55,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, static void ResetDecoder(XLogReaderState *state); static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); +static XLogRecord *XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record, + XLogRecPtr recptr); /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 @@ -170,6 +173,8 @@ XLogReaderFree(XLogReaderState *state) pfree(state->errormsg_buf); if (state->readRecordBuf) pfree(state->readRecordBuf); + if (state->decompression_buffer) + pfree(state->decompression_buffer); pfree(state->readBuf); pfree(state); } @@ -533,7 +538,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) XLogRecPtr targetPagePtr; bool randAccess; uint32 len, - total_len; + total_len_decomp, + total_len_physical; uint32 targetRecOff; uint32 pageHeaderSize; bool assembled; @@ -644,8 +650,67 @@ restart: * whole header. */ record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + total_len_physical = record->xl_tot_len; + /* + * Determine total_len_decomp for pre-validation allocation. + * + * Only xl_tot_len (the very first field) is guaranteed to be on this page. + * Any other XLogRecord field, including xl_info, may be on the next page + * when the record starts near the end of a WAL page. Only read xl_info + * after confirming the full header fits here. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + /* Full header is on this page; safe to read xl_info. */ + if (record->xl_info & XLR_COMPRESSED) + { + if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord) + { + total_len_decomp = -1; /* XLogCompressionHeader spans pages */ + } + else if (total_len_physical < SizeOfXLogCompressedRecord) + { + total_len_decomp = -1; /* Cannot be a valid compressed record */ + } + else + { + XLogCompressionHeader *c = (XLogCompressionHeader *) record; + uint32 dlen = c->decompressed_length; + char method = c->method; + bool valid_method; + + valid_method = (method == XLR_COMPRESS_PGLZ) +#ifdef USE_LZ4 + || (method == XLR_COMPRESS_LZ4) +#endif +#ifdef USE_ZSTD + || (method == XLR_COMPRESS_ZSTD) +#endif + ; + + /* Sanity-check before using for allocation */ + if (dlen <= SizeOfXLogRecord || + dlen > MaxAllocSize || + !valid_method) + total_len_decomp = -1; + else + total_len_decomp = dlen; + } + } + else + total_len_decomp = record->xl_tot_len; + } + else + { + /* + * Header spans pages; cannot read xl_info yet. Use xl_tot_len as + * a conservative pre-allocation size. If the record turns out to be + * compressed after assembly, total_len_decomp will be corrected from + * XLogCompressionHeader.decompressed_length below. + */ + total_len_decomp = record->xl_tot_len; + } /* * If the whole record header is on this page, validate it immediately. * Otherwise do just a basic sanity check on xl_tot_len, and validate the @@ -660,16 +725,18 @@ restart: randAccess)) goto err; gotheader = true; + if (record->xl_info & XLR_COMPRESSED) + gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord; } else { /* There may be no next page if it's too small. */ - if (total_len < SizeOfXLogRecord) + if (total_len_physical < SizeOfXLogRecord) { report_invalid_record(state, "invalid record length at %X/%08X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, total_len); + (uint32) SizeOfXLogRecord, total_len_physical); goto err; } /* We'll validate the header once we have the next page. */ @@ -681,9 +748,11 @@ restart: * calling palloc. If we can't, we'll try again below after we've * validated that total_len isn't garbage bytes from a recycled WAL page. */ - decoded = XLogReadRecordAlloc(state, - total_len, + if (total_len_decomp != -1) + decoded = XLogReadRecordAlloc(state, + total_len_decomp, false /* allow_oversized */ ); + if (decoded == NULL && nonblocking) { /* @@ -695,7 +764,7 @@ restart: } len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) + if (total_len_physical > len) { /* Need to reassemble record */ char *contdata; @@ -728,7 +797,9 @@ restart: * can handle the case where the previous record ended as being a * partial one. */ - readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD); + readOff = ReadPageInternal(state, targetPagePtr, + Min(total_len_physical - gotlen + SizeOfXLogShortPHD, + XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) @@ -767,19 +838,19 @@ restart: * we expect there to be left. */ if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) + total_len_physical != (pageHeader->xlp_rem_len + gotlen)) { report_invalid_record(state, "invalid contrecord length %u (expected %lld) at %X/%08X", pageHeader->xlp_rem_len, - ((long long) total_len) - gotlen, + ((long long) total_len_physical) - gotlen, LSN_FORMAT_ARGS(RecPtr)); goto err; } /* Wait for the next page to become available */ readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, + Min(total_len_physical - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; @@ -824,7 +895,7 @@ restart: * also cross-checked total_len against xlp_rem_len on the second * page, and verified xlp_pageaddr on both. */ - if (total_len > state->readRecordBufSize) + if (total_len_physical > state->readRecordBufSize) { char save_copy[XLOG_BLCKSZ * 2]; @@ -835,11 +906,11 @@ restart: Assert(gotlen <= lengthof(save_copy)); Assert(gotlen <= state->readRecordBufSize); memcpy(save_copy, state->readRecordBuf, gotlen); - allocate_recordbuf(state, total_len); + allocate_recordbuf(state, total_len_physical); memcpy(state->readRecordBuf, save_copy, gotlen); buffer = state->readRecordBuf + gotlen; } - } while (gotlen < total_len); + } while (gotlen < total_len_physical); Assert(gotheader); record = (XLogRecord *) state->readRecordBuf; @@ -854,8 +925,9 @@ restart: else { /* Wait for the record data to become available */ + Assert(targetRecOff + total_len_physical <= XLOG_BLCKSZ); readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); + targetRecOff + total_len_physical); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) @@ -865,7 +937,7 @@ restart: if (!ValidXLogRecord(state, record, RecPtr)) goto err; - state->NextRecPtr = RecPtr + MAXALIGN(total_len); + state->NextRecPtr = RecPtr + MAXALIGN(total_len_physical); state->DecodeRecPtr = RecPtr; } @@ -888,8 +960,19 @@ restart: if (decoded == NULL) { Assert(!nonblocking); + + /* total_len_decomp may not yet reflect the actual decompressed size */ + if (record->xl_info & XLR_COMPRESSED) + { + XLogCompressionHeader *c = (XLogCompressionHeader *) record; + Assert(c->decompressed_length > 0); + Assert(c->decompressed_length < MaxAllocSize); + total_len_decomp = c->decompressed_length; + } + else + total_len_decomp = record->xl_tot_len; decoded = XLogReadRecordAlloc(state, - total_len, + total_len_decomp, true /* allow_oversized */ ); /* allocation should always happen under allow_oversized */ Assert(decoded != NULL); @@ -1666,6 +1749,110 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len) return size; } +static XLogRecord * +XLogDecompressRecordIfNeeded(XLogReaderState *state, + XLogRecord *record, + XLogRecPtr recptr) +{ + if (record->xl_info & XLR_COMPRESSED) + { + XLogCompressionHeader *src = (XLogCompressionHeader *) record; + /* decompressed_length covers the XLogRecord header + body */ + uint32 body_len = src->decompressed_length - SizeOfXLogRecord; + uint32 srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord; + bool decomp_success = true; + char *dst; + XLogRecord *dst_h; + + /* + * Grow the decompression buffer if needed, rounding up to BLCKSZ to + * avoid frequent small reallocations. Since the buffer content is + * always fully overwritten, we simply pfree and reallocate. + */ + if (state->decompression_buffer_size < src->decompressed_length) + { + uint32 new_size = (uint32) TYPEALIGN(BLCKSZ, src->decompressed_length); + + if (state->decompression_buffer) + pfree(state->decompression_buffer); + state->decompression_buffer = + palloc_extended(new_size, MCXT_ALLOC_NO_OOM); + if (!state->decompression_buffer) + { + state->decompression_buffer_size = 0; + report_invalid_record(state, + "out of memory while decompressing record at %X/%X", + LSN_FORMAT_ARGS(recptr)); + return NULL; + } + state->decompression_buffer_size = new_size; + } + + dst_h = (XLogRecord *) state->decompression_buffer; + *dst_h = src->record_header; + dst_h->xl_tot_len = src->decompressed_length; + dst = (char *) &dst_h[1]; + + if (src->method == XLR_COMPRESS_PGLZ) + { + /* + * pglz_decompress with check_complete=true verifies that the + * output length exactly equals rawsize, so pass body_len, not the + * total buffer size. + */ + if (pglz_decompress((char *) &src[1], srclen, dst, + body_len, true) < 0) + decomp_success = false; + } + else if (src->method == XLR_COMPRESS_LZ4) + { +#ifdef USE_LZ4 + if (LZ4_decompress_safe((char *) &src[1], dst, + srclen, body_len) <= 0) + decomp_success = false; +#else + report_invalid_record(state, + "could not decompress record at %X/%X compressed with %s not supported by build", + LSN_FORMAT_ARGS((XLogRecPtr) recptr), "lz4"); + return NULL; +#endif + } + else if (src->method == XLR_COMPRESS_ZSTD) + { +#ifdef USE_ZSTD + size_t decomp_result = ZSTD_decompress(dst, body_len, + (char *) &src[1], srclen); + + if (ZSTD_isError(decomp_result)) + decomp_success = false; +#else + report_invalid_record(state, + "could not decompress record at %X/%X compressed with %s not supported by build", + LSN_FORMAT_ARGS((XLogRecPtr) recptr), "zstd"); + return NULL; +#endif + } + else + { + report_invalid_record(state, + "could not decompress record at %X/%X compressed with unknown method", + LSN_FORMAT_ARGS((XLogRecPtr) recptr)); + return NULL; + } + + if (!decomp_success) + { + report_invalid_record(state, + "could not decompress record at %X/%X", + LSN_FORMAT_ARGS(recptr)); + return NULL; + } + + return (XLogRecord *) state->decompression_buffer; + } + return record; +} + /* * Decode a record. "decoded" must point to a MAXALIGNed memory area that has * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On @@ -1704,6 +1891,14 @@ DecodeXLogRecord(XLogReaderState *state, RelFileLocator *rlocator = NULL; uint8 block_id; + record = XLogDecompressRecordIfNeeded(state, record, lsn); + + if (!record) + { + /* Decompression failed, error must be reported already */ + return false; + } + decoded->header = *record; decoded->lsn = lsn; decoded->next = NULL; @@ -1878,8 +2073,8 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len != BLCKSZ) { report_invalid_record(state, - "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %d at %X/%08X", - blk->data_len, + "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%08X", + (unsigned int) blk->bimg_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; } diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 7c60b125564..4958e9016a0 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -3323,6 +3323,31 @@ variable => 'wal_compression', boot_val => 'WAL_COMPRESSION_NONE', options => 'wal_compression_options', + assign_hook => 'assign_wal_compression', +}, + +{ name => 'wal_compression_buffer', type => 'int', context => 'PGC_SUSET', group => 'WAL_SETTINGS', + short_desc => 'Buffer size for WAL compression.', + long_desc => 'Sets the buffer size for WAL compression, used for both FPI compression ' + . 'and whole-record compression. Must be large enough to hold compressed ' + . 'full page images for up to 32 blocks (XLR_MAX_BLOCK_ID). ' + . 'Note: actual memory consumption per backend is approximately 2x this value ' + . 'plus ~5% overhead, as additional buffers are needed for whole-record compression.', + flags => 'GUC_UNIT_BYTE', + variable => 'wal_compression_buffer', + boot_val => '295972', + min => '295972', + max => '1073741824', + assign_hook => 'assign_wal_compression_buffer', +}, + +{ name => 'wal_compression_threshold', type => 'int', context => 'PGC_SUSET', group => 'WAL_SETTINGS', + short_desc => 'Minimum WAL record length to engage compression.', + flags => 'GUC_UNIT_BYTE', + variable => 'wal_compression_threshold', + boot_val => '512', + min => '32', + max => 'INT_MAX', }, { name => 'wal_consistency_checking', type => 'string', context => 'PGC_SUSET', group => 'DEVELOPER_OPTIONS', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index dc9e2255f8a..c1f02bc013a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -245,8 +245,11 @@ #full_page_writes = on # recover from partial page writes #wal_log_hints = off # also do full page writes of non-critical updates # (change requires restart) -#wal_compression = off # enables compression of full-page writes; +#wal_compression = lz4 # enables compression of full-page writes; # off, pglz, lz4, zstd, or on +#wal_compression_threshold = 512 # min 32, minimal record length to be compressed +#wal_compression_buffer = 295972 # buffer for FPI and whole-record compression; + # actual memory usage is ~2x this value #wal_init_zero = on # zero-fill new WAL files #wal_recycle = on # recycle WAL files #wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0591a885dd1..a6e759847f2 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -57,6 +57,8 @@ extern PGDLLIMPORT int CommitDelay; extern PGDLLIMPORT int CommitSiblings; extern PGDLLIMPORT bool track_wal_io_timing; extern PGDLLIMPORT int wal_decode_buffer_size; +extern PGDLLIMPORT int wal_compression_threshold; +extern PGDLLIMPORT int wal_compression_buffer; extern PGDLLIMPORT int CheckPointSegments; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 9b63b6aff75..207d58dcf54 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -251,6 +251,10 @@ struct XLogReaderState char *decode_buffer_head; /* data is read from the head */ char *decode_buffer_tail; /* new data is written at the tail */ + /* Buffer for decompressing whole-record compressed WAL records */ + char *decompression_buffer; + uint32 decompression_buffer_size; + /* * Queue of records that have been decoded. This is a linked list that * usually consists of consecutive records in decode_buffer, but may also diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index e8999d3fe91..c3e05bb86bf 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -48,7 +48,7 @@ typedef struct XLogRecord /* 2 bytes of padding here, initialize to zero */ pg_crc32c xl_crc; /* CRC for this record */ - /* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */ + /* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */ } XLogRecord; @@ -90,6 +90,9 @@ typedef struct XLogRecord */ #define XLR_CHECK_CONSISTENCY 0x02 +/* This bit in xl_info means the record is compressed */ +#define XLR_COMPRESSED 0x04 + /* * Header info for block data appended to an XLOG record. * @@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader uint16 length; /* number of page image bytes */ uint16 hole_offset; /* number of bytes before "hole" */ uint8 bimg_info; /* flag bits, see below */ - - /* - * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an - * XLogRecordBlockCompressHeader struct follows. - */ } XLogRecordBlockImageHeader; #define SizeOfXLogRecordBlockImageHeader \ @@ -157,13 +155,13 @@ typedef struct XLogRecordBlockImageHeader #define BKPIMAGE_HAS_HOLE 0x01 /* page image has "hole" */ #define BKPIMAGE_APPLY 0x02 /* page image should be restored * during replay */ -/* compression methods supported */ +/* Compression methods supported for FPI (stored in bimg_info) */ #define BKPIMAGE_COMPRESS_PGLZ 0x04 #define BKPIMAGE_COMPRESS_LZ4 0x08 #define BKPIMAGE_COMPRESS_ZSTD 0x10 #define BKPIMAGE_COMPRESSED(info) \ - ((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \ + (((info) & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \ BKPIMAGE_COMPRESS_ZSTD)) != 0) /* @@ -178,6 +176,21 @@ typedef struct XLogRecordBlockCompressHeader #define SizeOfXLogRecordBlockCompressHeader \ sizeof(XLogRecordBlockCompressHeader) +/* compression methods supported for whole-record compression */ +#define XLR_COMPRESS_PGLZ 0x04 +#define XLR_COMPRESS_LZ4 0x08 +#define XLR_COMPRESS_ZSTD 0x10 + +/* Header prepended to a whole-record compressed WAL record */ +typedef struct XLogCompressionHeader +{ + XLogRecord record_header; + uint8 method; /* XLR_COMPRESS_* */ + uint32 decompressed_length; +} XLogCompressionHeader; + +#define SizeOfXLogCompressedRecord (offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32)) + /* * Maximum size of the header for a block reference. This is used to size a * temporary buffer for constructing the header. diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index f723668da9e..6b5f60c9b4c 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -169,6 +169,8 @@ extern const char *show_unix_socket_permissions(void); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern bool check_wal_consistency_checking(char **newval, void **extra, GucSource source); +extern void assign_wal_compression(int newval, void *extra); +extern void assign_wal_compression_buffer(int newval, void *extra); extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 955dfc0e7f8..55e4aa74f49 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -720,6 +720,13 @@ sub init # TEMP_CONFIG. Otherwise, print it before TEMP_CONFIG, thereby permitting # overrides. Settings that merely improve performance or ease debugging # belong before TEMP_CONFIG. + + # Prevent whole-record WAL compression from triggering in tests that are + # not specifically testing it. Tests that want whole-record compression + # can either lower this with append_conf() after init(), or supply a lower + # value via TEMP_CONFIG. + print $conf "wal_compression_threshold = '1GB'\n"; + print $conf PostgreSQL::Test::Utils::slurp_file($ENV{TEMP_CONFIG}) if defined $ENV{TEMP_CONFIG}; @@ -3145,10 +3152,16 @@ sub emit_wal { my ($self, $size) = @_; + # Disable whole-record WAL compression for this emission. Tests call + # emit_wal() to place an exact number of bytes in WAL (e.g. to reach a + # specific page offset); whole-record compression would shrink the record + # and break those position-sensitive calculations. return int( $self->safe_psql( 'postgres', - "SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'" + "SET wal_compression_threshold = 2147483647; + SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'; + RESET wal_compression_threshold" )); } diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index d41aaaf8ae1..f4c9cd89811 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -24,6 +24,16 @@ export enable_injection_points REGRESS_SHLIB=$(abs_top_builddir)/src/test/regress/regress$(DLSUFFIX) export REGRESS_SHLIB +# Exercise WAL compression in recovery tests. Set USE_WAL_COMPRESSION_CONFIG= +# to disable, or TEMP_CONFIG=/path to use a different config. +USE_WAL_COMPRESSION_CONFIG ?= 1 +ifneq ($(USE_WAL_COMPRESSION_CONFIG),) +TEMP_CONFIG ?= $(srcdir)/wal_compression.conf +export TEMP_CONFIG +else +unexport TEMP_CONFIG +endif + check: $(prove_check) diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl index 0bf7f024177..633b7a6ac9a 100644 --- a/src/test/recovery/t/046_checkpoint_logical_slot.pl +++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl @@ -116,10 +116,9 @@ $node->safe_psql('postgres', q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())} ); -# Generate a long WAL record, spawning at least two pages for the follow-up +# Generate a long WAL record, spanning at least two pages for the follow-up # post-recovery check. -$node->safe_psql('postgres', - q{select pg_logical_emit_message(false, '', repeat('123456789', 1000))}); +$node->emit_wal(9000); # Continue the checkpoint and wait for its completion. my $log_offset = -s $node->logfile; diff --git a/src/test/recovery/t/052_wal_compression.pl b/src/test/recovery/t/052_wal_compression.pl new file mode 100644 index 00000000000..ea772c7eb5e --- /dev/null +++ b/src/test/recovery/t/052_wal_compression.pl @@ -0,0 +1,93 @@ + +# Copyright (c) 2025-2026, PostgreSQL Global Development Group +# +# Test whole-record WAL compression via wal_compression and +# wal_compression_threshold. Exercises compression during replication +# (walsender path) and decompression during crash recovery (startup path). +# + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Determine which compression methods are compiled in, preferring the +# faster hardware-accelerated codecs first for quicker failure feedback. +my @methods = (); +push @methods, 'zstd' if check_pg_config('#define HAVE_LIBZSTD 1'); +push @methods, 'lz4' if check_pg_config('#define HAVE_LIBLZ4 1'); +push @methods, 'pglz'; + +# Test whole-record WAL compression with a specific method. +# +# Creates a primary with the method enabled and a low threshold so that +# every non-trivial WAL record is compressed. A streaming standby is +# used to verify that compressed WAL is transmitted and decoded correctly. +# Then the primary is stopped immediately (simulating a crash) and +# restarted to verify that startup recovery can decompress WAL records. +sub test_wal_compression +{ + my ($method) = @_; + + note "testing wal_compression = $method"; + + my $primary = PostgreSQL::Test::Cluster->new("primary_$method"); + $primary->init(allows_streaming => 1); + + # Use the minimum threshold so virtually every record gets compressed, + # and leave wal_compression_buffer at its default (set by Cluster.pm). + $primary->append_conf( + 'postgresql.conf', + "wal_compression = '$method'\n" + . "wal_compression_threshold = 32\n"); + $primary->start; + + my $backup_name = "backup_$method"; + $primary->backup($backup_name); + + my $standby = PostgreSQL::Test::Cluster->new("standby_$method"); + $standby->init_from_backup($primary, $backup_name, has_streaming => 1); + $standby->start; + + # Generate WAL with records that exceed the compression threshold. + # Each row's WAL record includes 200-byte payload well above 32 bytes. + $primary->safe_psql( + 'postgres', + "CREATE TABLE t AS + SELECT g, repeat('x', 200) AS d + FROM generate_series(1, 100) AS g"); + + $primary->wait_for_replay_catchup($standby); + + is( $standby->safe_psql('postgres', 'SELECT count(*) FROM t'), + '100', + "compressed WAL replicated via streaming ($method)"); + + # Insert another batch that will be recovered after the simulated crash. + $primary->safe_psql( + 'postgres', + "INSERT INTO t + SELECT g, repeat('y', 200) + FROM generate_series(101, 200) AS g"); + + # Stop without a clean shutdown. On restart PostgreSQL will replay WAL + # from the last checkpoint, exercising XLogDecompressRecordIfNeeded for + # every compressed record generated since that checkpoint. + $primary->stop('immediate'); + $primary->start; + + is( $primary->safe_psql('postgres', 'SELECT count(*) FROM t'), + '200', + "crash recovery replays compressed WAL ($method)"); + + $primary->stop; + $standby->stop; +} + +foreach my $method (@methods) +{ + test_wal_compression($method); +} + +done_testing(); diff --git a/src/test/recovery/wal_compression.conf b/src/test/recovery/wal_compression.conf new file mode 100644 index 00000000000..6867f370ef2 --- /dev/null +++ b/src/test/recovery/wal_compression.conf @@ -0,0 +1,3 @@ +# Enable WAL compression for recovery tests. +# lz4 is used here; 052_wal_compression.pl separately tests all methods. +wal_compression = 'lz4' -- 2.51.2