From e8619cb11f065a8a86495a9a7aa16c073cd6e37e Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Tue, 17 Mar 2026 21:47:56 +0500 Subject: [PATCH v2 1/2] Pipelined Recovery - Producer This includes the producer specific code for the producer-consumer architecture for WAL replay that separates WAL decoding from the recovery process, enabling parallel processing between differemt steps of replay. The producer includes a background worker that reads and decodes WAL records, then send them to the startup process for the redo. IPC happens via shared memory message queues (shm_mq), allowing the decoder to run ahead of the apply process. This provides some improvement in recovery performance for CPU-bound workloads. New GUC: wal_pipeline (default: off) Author: Imran Zaheer Idea by: Ants Aasma --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/meson.build | 1 + src/backend/access/transam/xlogpipeline.c | 677 ++++++++++++++++++ src/backend/access/transam/xlogrecovery.c | 43 ++ src/backend/postmaster/bgworker.c | 5 + src/backend/storage/ipc/ipci.c | 5 + src/backend/utils/misc/guc_parameters.dat | 15 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/access/xlog.h | 2 + src/include/access/xlogpipeline.h | 188 +++++ src/include/access/xlogrecovery.h | 19 + src/test/recovery/meson.build | 1 + src/test/recovery/t/053_walpipeline.pl | 208 ++++++ 13 files changed, 1167 insertions(+) create mode 100644 src/backend/access/transam/xlogpipeline.c create mode 100644 src/include/access/xlogpipeline.h create mode 100644 src/test/recovery/t/053_walpipeline.pl diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index a32f473e0a2..ba0bf343769 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -32,6 +32,7 @@ OBJS = \ xlogbackup.o \ xlogfuncs.o \ xloginsert.o \ + xlogpipeline.o \ xlogprefetcher.o \ xlogreader.o \ xlogrecovery.o \ diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build index 06aadc7f315..be37b40581d 100644 --- a/src/backend/access/transam/meson.build +++ b/src/backend/access/transam/meson.build @@ -20,6 +20,7 @@ backend_sources += files( 'xlogbackup.c', 'xlogfuncs.c', 'xloginsert.c', + 'xlogpipeline.c', 'xlogprefetcher.c', 'xlogrecovery.c', 'xlogstats.c', diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c new file mode 100644 index 00000000000..4b95a11d16b --- /dev/null +++ b/src/backend/access/transam/xlogpipeline.c @@ -0,0 +1,677 @@ +/*------------------------------------------------------------------------- + * + * xlogpipeline.c + * WAL replay pipeline implementation + * + * This module implements a producer-consumer pipeline for WAL replay. + * The producer (background worker) reads and decodes WAL records in parallel + * with the consumer (startup process) that applies them. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogpipeline.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "access/heapam_xlog.h" +#include "access/rmgr.h" +#include "access/xlog.h" +#include "access/xlogpipeline.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogrecord.h" +#include "access/xlogrecovery.h" +#include "access/xlogutils.h" +#include "access/xlog_internal.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/startup.h" +#include "storage/bufmgr.h" +#include "storage/dsm.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/md.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "storage/smgr.h" +#include "tcop/tcopprot.h" +#include "utils/elog.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/rel.h" +#include "utils/timeout.h" + + +/* Global shared memory control structure */ +WalPipelineShmCtl *WalPipelineShm = NULL; + +XLogPrefetcher *xlogprefetcher_pipelined = NULL; + +/* Local state for producer */ +static dsm_segment *producer_dsm_seg = NULL; +static shm_mq *producer_mq = NULL; +static shm_mq_handle *producer_mq_handle = NULL; + + +/* + * Flags set by interrupt handlers for later service in the redo loop. + */ +static volatile sig_atomic_t got_SIGHUP = false; + +/* Signal handlers */ +static void PipelineBgwSigHupHandler(SIGNAL_ARGS); + +/* Forward declarations */ +static void wal_pipeline_cleanup_callback(int code, Datum arg); +static Size serialize_wal_record(XLogReaderState *record, char **buffer); +static void cleanup_producer_resources(void); +static void WalPipeline_WaitForConsumerShutdownRequest(void); + +/* copied from xlogrecovery.c */ +/* Parameters passed down from ReadRecord to the XLogPageRead callback. */ +typedef struct XLogPageReadPrivate +{ + int emode; + bool fetching_ckpt; /* are we fetching a checkpoint record? */ + bool randAccess; + TimeLineID replayTLI; +} XLogPageReadPrivate; + +/* + * Compute space needed for WAL pipeline shared memory + */ +Size +WalPipelineShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(WalPipelineShmCtl)); + + return size; +} + +/* + * Initialize WAL pipeline shared memory structures + */ +void +WalPipelineShmemInit(void) +{ + bool found; + + WalPipelineShm = (WalPipelineShmCtl *) + ShmemInitStruct("WAL Pipeline Control", + sizeof(WalPipelineShmCtl), + &found); + + if (!found) + { + /* First time through, initialize */ + SpinLockInit(&WalPipelineShm->mutex); + WalPipelineShm->initialized = false; + WalPipelineShm->shutdown_requested = false; + WalPipelineShm->producer_exited = false; + WalPipelineShm->producer_pid = 0; + WalPipelineShm->consumer_pid = 0; + WalPipelineShm->producer_lsn = InvalidXLogRecPtr; + WalPipelineShm->consumer_lsn = InvalidXLogRecPtr; + WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID; + WalPipelineShm->records_sent = 0; + WalPipelineShm->records_received = 0; + WalPipelineShm->bytes_sent = 0; + WalPipelineShm->bytes_received = 0; + WalPipelineShm->error_code = 0; + WalPipelineShm->error_message[0] = '\0'; + } +} + + + +/* + * Producer Function. + * Main loop for the producer background worker. + */ +void +WalPipeline_ProducerMain(Datum main_arg) +{ + dsm_handle handle = DatumGetUInt32(main_arg); + dsm_segment *seg; + shm_toc *toc; + WalPipelineParams *params; + XLogReaderState *xlogreader; + XLogPageReadPrivate *private; + XLogRecord *record; + TimeLineID replayTLI = 0; + bool end_of_wal = false; + uint64 records_sent; + uint64 records_received; + + /* + * Properly accept or ignore signals the postmaster might send us. + */ + pqsignal(SIGHUP, PipelineBgwSigHupHandler); /* reload config file */ + + /* Register cleanup callback */ + before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0); + + seg = dsm_attach(handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("[walpipeline] producer: could not map dynamic shared memory segment"))); + + toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment"))); + + /* Lookup params and queue */ + params = shm_toc_lookup(toc, 1, false); + producer_mq = shm_toc_lookup(toc, 2, false); + + /* Set up producer side of queue */ + producer_dsm_seg = seg; + shm_mq_set_sender(producer_mq, MyProc); + producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL); + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = MyProcPid; + SpinLockRelease(&WalPipelineShm->mutex); + + /* DSM is now attached, so safe to unblock the signals */ + BackgroundWorkerUnblockSignals(); + + /* Set up WAL reading processor */ + private = palloc0(sizeof(XLogPageReadPrivate)); + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + private); + + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating a WAL reading processor."))); + xlogreader->system_identifier = GetSystemIdentifier(); + + /* + * Set the WAL decode buffer size. This limits how far ahead we can read + * in the WAL. + */ + XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size); + + /* Init some important globals before starting */ + replayTLI = params->ReplayTLI; + InitializePipelineRecoveryEnv(params); + + /* Reinit the WAL prefetcher. */ + xlogprefetcher_pipelined = XLogPrefetcherAllocate(xlogreader); + + + elog(LOG, "[walpipeline] producer: started at %X/%X, TLI %u", + LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI); + + XLogPrefetcherBeginRead(xlogprefetcher_pipelined, params->NextRecPtr); + + /* Main decoding loop */ + while (true) + { + bool shutdown_requested; + + /* Check if consumer requested shutdown */ + SpinLockAcquire(&WalPipelineShm->mutex); + shutdown_requested = WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + if (shutdown_requested) + { + elog(DEBUG1, "[walpipeline] producer: shutdown requested by consumer"); + break; + } + + /* Read next WAL record */ + record = ReadRecord(xlogprefetcher_pipelined, LOG, false, replayTLI); + + if (record == NULL) + { + end_of_wal = true; + elog(DEBUG1, "[walpipeline] producer: reached end of WAL"); + break; + } + + /* + * Successfully decoded a record. Send it to the consumer. + */ + if (!WalPipeline_SendRecord(xlogreader)) + { + elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached"); + break; + } + + /* Update our position for monitoring */ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_lsn = xlogreader->EndRecPtr; + SpinLockRelease(&WalPipelineShm->mutex); + + CHECK_FOR_INTERRUPTS(); + } + + + if (end_of_wal) + { + /* Notify consumer we need to exit early */ + WalPipeline_SendShutdown(); + + /* wait until consumer set the flag */ + WalPipeline_WaitForConsumerShutdownRequest(); + } + + SpinLockAcquire(&WalPipelineShm->mutex); + records_sent = WalPipelineShm->records_sent; + records_received = WalPipelineShm->records_received; + SpinLockRelease(&WalPipelineShm->mutex); + + elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT, + records_sent, records_received); + + /* Cleanup */ + XLogReaderFree(xlogreader); + DisownRecoveryWakeupLatch(); + pfree(private); + cleanup_producer_resources(); +} + +/* + * Producer Function. + * Send a decoded WAL record to the consumer + */ +bool +WalPipeline_SendRecord(XLogReaderState *record) +{ + char *buffer = NULL; + Size msglen; + shm_mq_result res; + + + if (!producer_mq_handle) + return false; + + /* Serialize the record */ + msglen = serialize_wal_record(record, &buffer); + + res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true); + + if (res == SHM_MQ_SUCCESS) + { + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->records_sent++; + WalPipelineShm->bytes_sent += msglen; + SpinLockRelease(&WalPipelineShm->mutex); + + pfree(buffer); + return true; + } + + if (res == SHM_MQ_DETACHED) + { + elog(PANIC, "[walpipeline] producer: consumer detached"); + pfree(buffer); + return false; + } + + /* Some other error */ + elog(PANIC, "[walpipeline] producer: shm_mq_send failed with result %d", res); + pfree(buffer); + return false; +} + +/* + * Producer Function. + * Send shutdown message to consumer + */ +bool +WalPipeline_SendShutdown(void) +{ + WalRecordMsgHeader hdr; + shm_mq_result res; + + if (!producer_mq_handle) + return false; + + hdr.msg_type = WAL_MSG_SHUTDOWN; + hdr.endRecPtr = InvalidXLogRecPtr; + + res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true); + return (res == SHM_MQ_SUCCESS); +} + +/* + * Producer Function. + * Send error message to consumer + */ +bool +WalPipeline_SendError(int errcode, const char *errmsg) +{ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->error_code = errcode; + strlcpy(WalPipelineShm->error_message, errmsg, + sizeof(WalPipelineShm->error_message)); + SpinLockRelease(&WalPipelineShm->mutex); + + return true; +} + +/* + * Producer Function. + * Producer may can exit without waiting for the consumer, but its better to + * wait until consumer request shutdown. This way log messages will show + * no of records_sent & records_received records equal to each other. + */ +static void +WalPipeline_WaitForConsumerShutdownRequest(void) +{ + int iters = 0; + + while (true) + { + bool shutdown_requested; + + SpinLockAcquire(&WalPipelineShm->mutex); + shutdown_requested = WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + if (shutdown_requested) + break; + + if (++iters >= MAX_SHUTDOWN_WAIT_ITERS) + { + elog(WARNING, + "[walpipeline] producer: timed out waiting for consumer " + "to acknowledge shutdown, exiting anyway"); + break; + } + + /* Allow SIGTERM / SIGHUP to interrupt the wait */ + ProcessPipelineBgwInterrupts(); + + pg_usleep(10000); /* sleep 10ms */ + } +} + + +/* + * serialize_wal_record (Producer) + * + * Serialize the currently decoded WAL record owned by the given + * XLogReaderState into a contiguous buffer. + * + * Allocation/layout of the output buffer: + * + * [WalRecordMsgHeader] + * [XLogRecord] + * [main_data] + * Repeated for each in-use block: + * [SerializedBlockMeta] + * [backup image bytes] (optional) + * [block data bytes] (optional) + * + * The resulting buffer contains no pointers and is safe to transfer + * across processes or store externally. + */ +static Size +serialize_wal_record(XLogReaderState *xlogreader, char **outbuf) +{ + DecodedXLogRecord *dec = xlogreader->record; + WalRecordMsgHeader hdr; + Size total; + char *ptr; + + if (dec == NULL) + return 0; + + /* ---- size calculation ---- */ + total = + sizeof(WalRecordMsgHeader) + + sizeof(XLogRecord) + + dec->main_data_len; + + for (int i = 0; i <= dec->max_block_id; i++) + { + const DecodedBkpBlock *blk = &dec->blocks[i]; + + if (!blk->in_use) + continue; + + total += sizeof(SerializedBlockMeta); + + if (blk->has_image && blk->bkp_image && blk->bimg_len > 0) + total += blk->bimg_len; + + if (blk->has_data && blk->data && blk->data_len > 0) + total += blk->data_len; + } + + ptr = *outbuf = palloc(total); + + /* ---- message header ---- */ + memset(&hdr, 0, sizeof(hdr)); + hdr.msg_type = WAL_MSG_RECORD; + hdr.readRecPtr = xlogreader->ReadRecPtr; + hdr.abortedRecPtr = xlogreader->abortedRecPtr; + hdr.missingContrecPtr = xlogreader->missingContrecPtr; + hdr.overwrittenRecPtr = xlogreader->overwrittenRecPtr; + hdr.endRecPtr = xlogreader->EndRecPtr; + hdr.decoded_size = total - sizeof(WalRecordMsgHeader); + hdr.max_block_id = dec->max_block_id; + hdr.main_data_len = dec->main_data_len; + hdr.toplevel_xid = dec->toplevel_xid; + hdr.record_origin = dec->record_origin; + + memcpy(ptr, &hdr, sizeof(hdr)); + ptr += sizeof(hdr); + + /* ---- XLogRecord ---- */ + memcpy(ptr, &dec->header, sizeof(XLogRecord)); + ptr += sizeof(XLogRecord); + + /* ---- main data ---- */ + if (dec->main_data_len > 0) + { + memcpy(ptr, dec->main_data, dec->main_data_len); + ptr += dec->main_data_len; + } + + /* ---- blocks ---- */ + for (int i = 0; i <= dec->max_block_id; i++) + { + const DecodedBkpBlock *blk = &dec->blocks[i]; + SerializedBlockMeta meta; + + if (!blk->in_use) + continue; + + memset(&meta, 0, sizeof(meta)); + meta.block_id = i; + meta.in_use = true; + meta.rlocator = blk->rlocator; + meta.forknum = blk->forknum; + meta.blkno = blk->blkno; + meta.flags = blk->flags; + meta.has_image = blk->has_image; + meta.apply_image = blk->apply_image; + meta.has_data = blk->has_data; + meta.bimg_len = blk->bimg_len; + meta.bimg_info = blk->bimg_info; + meta.hole_offset = blk->hole_offset; + meta.hole_length = blk->hole_length; + meta.data_len = blk->data_len; + + memcpy(ptr, &meta, sizeof(meta)); + ptr += sizeof(meta); + + if (blk->has_image && blk->bkp_image && blk->bimg_len > 0) + { + memcpy(ptr, blk->bkp_image, blk->bimg_len); + ptr += blk->bimg_len; + } + + if (blk->has_data && blk->data && blk->data_len > 0) + { + memcpy(ptr, blk->data, blk->data_len); + ptr += blk->data_len; + } + } + + Assert(ptr - *outbuf == total); + return total; +} + + +/* + * We need to put some assertion that only pipeline worker should be touching + * the specific code. + */ +bool AmWalPipeline() +{ + if (MyBackendType == B_BG_WORKER && MyBgworkerEntry) + { + if (strncmp(MyBgworkerEntry->bgw_name, "wal pipeline", 12) == 0) + return true; + } + + return false; +} + +/* + * Clean up producer-side resources + */ +static void +cleanup_producer_resources(void) +{ + if (producer_mq_handle) + { + shm_mq_detach(producer_mq_handle); + producer_mq_handle = NULL; + } + + if (producer_dsm_seg) + { + dsm_detach(producer_dsm_seg); + producer_dsm_seg = NULL; + } + + producer_mq = NULL; + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = 0; + WalPipelineShm->producer_exited = true; + SpinLockRelease(&WalPipelineShm->mutex); +} + + +/* + * Cleanup callback for process exit + */ +static void +wal_pipeline_cleanup_callback(int code, Datum arg) +{ + pid_t mypid = MyProcPid; + bool is_producer = false; + + if (WalPipelineShm) + { + SpinLockAcquire(&WalPipelineShm->mutex); + is_producer = (WalPipelineShm->producer_pid == mypid); + SpinLockRelease(&WalPipelineShm->mutex); + } + + if (is_producer) + cleanup_producer_resources(); + else + cleanup_consumer_resources(); +} + +/* -------------------------------- + * signal handler routines + * -------------------------------- + */ + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +PipelineBgwSigHupHandler(SIGNAL_ARGS) +{ + got_SIGHUP = true; + WakeupRecovery(); +} + +/* + * Re-read the config file. + * + * If one of the critical walreceiver options has changed, flag xlogrecovery.c + * to restart it. + */ +static void +PipelineRereadConfig(void) +{ + char *conninfo = pstrdup(PrimaryConnInfo); + char *slotname = pstrdup(PrimarySlotName); + bool tempSlot = wal_receiver_create_temp_slot; + bool conninfoChanged; + bool slotnameChanged; + bool tempSlotChanged = false; + + ProcessConfigFile(PGC_SIGHUP); + + conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0; + slotnameChanged = strcmp(slotname, PrimarySlotName) != 0; + + /* + * wal_receiver_create_temp_slot is used only when we have no slot + * configured. We do not need to track this change if it has no effect. + */ + if (!slotnameChanged && strcmp(PrimarySlotName, "") == 0) + tempSlotChanged = tempSlot != wal_receiver_create_temp_slot; + pfree(conninfo); + pfree(slotname); + + if (conninfoChanged || slotnameChanged || tempSlotChanged) + StartupRequestWalReceiverRestart(); +} + +/* + * Process any requests or signals received recently. + */ +void +ProcessPipelineBgwInterrupts(void) +{ + + bool shutdown_requested; + + if (got_SIGHUP) + { + got_SIGHUP = false; + PipelineRereadConfig(); + } + + SpinLockAcquire(&WalPipelineShm->mutex); + shutdown_requested = WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + if (shutdown_requested) + proc_exit(0); + + CHECK_FOR_INTERRUPTS(); +} \ No newline at end of file diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 6d2c4a86b96..b66ec80fa25 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -4738,6 +4738,49 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue } +/* + * Pipeline bgw should be aware of all the parameters thats been initialized by + * the startup process before performing the actual recoevery. Other then this + * there are also some variables that keep on changing. The pipeline & the startup + * process should be aware of the state change of such variables, we can use shared + * memory for such variables. + */ +void +InitializePipelineRecoveryEnv(WalPipelineParams *params) +{ + StandbyMode = params->StandbyMode; + StandbyModeRequested = params->StandbyModeRequested; + ArchiveRecoveryRequested = params->ArchiveRecoveryRequested; + InArchiveRecovery = params->InArchiveRecovery; + recoveryTargetTLI = params->recoveryTargetTLI; + minRecoveryPointTLI = params->minRecoveryPointTLI; + minRecoveryPoint = params->minRecoveryPoint; + currentSource = params->currentSource; + lastSourceFailed = params->lastSourceFailed; + pendingWalRcvRestart = params->pendingWalRcvRestart; + RedoStartTLI = params->RedoStartTLI; + RedoStartLSN = params->RedoStartLSN; + standbyState = params->standbyState; + CheckPointLoc = params->CheckPointLoc; + CheckPointTLI = params->CheckPointTLI; + flushedUpto = params->flushedUpto; + receiveTLI = params->receiveTLI; + abortedRecPtr = params->abortedRecPtr; + missingContrecPtr = params->missingContrecPtr; + InRedo = params->InRedo; + backupEndRequired = params->backupEndRequired; + backupStartPoint = params->backupStartPoint; + backupEndPoint = params->backupEndPoint; + curFileTLI = params->curFileTLI; + InRecovery = true; + + /* + * As pipeline will be reading the wal, so better to own the latch to wait at. + */ + if (ArchiveRecoveryRequested) + OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch); +} + /* * GUC check_hook for primary_slot_name */ diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 0104a86b9ec..192295ad695 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/xlogpipeline.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -143,6 +144,10 @@ static const struct { .fn_name = "SequenceSyncWorkerMain", .fn_addr = SequenceSyncWorkerMain + }, + { + .fn_name = "WalPipeline_ProducerMain", + .fn_addr = WalPipeline_ProducerMain } }; diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index a4785daf1e5..f5eaff675f0 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/syncscan.h" #include "access/transam.h" #include "access/twophase.h" +#include "access/xlogpipeline.h" #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "access/xlogwait.h" @@ -142,6 +143,7 @@ CalculateShmemSize(void) size = add_size(size, AioShmemSize()); size = add_size(size, WaitLSNShmemSize()); size = add_size(size, LogicalDecodingCtlShmemSize()); + size = add_size(size, WalPipelineShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -224,6 +226,9 @@ CreateSharedMemoryAndSemaphores(void) /* Initialize dynamic shared memory facilities. */ dsm_postmaster_startup(shim); + /* Initialize WAL pipeline module */ + WalPipelineShmemInit(); + /* * Now give loadable modules a chance to set up their shmem allocations */ diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index a5a0edf2534..3523e7d459f 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -3398,6 +3398,21 @@ boot_val => 'false', }, +{ name => 'wal_pipeline', type => 'bool', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY', + short_desc => 'Use parallel workers to speedup recovery.', + variable => 'wal_pipeline_enabled', + boot_val => 'false', +}, + +{ name => 'wal_pipeline_queue_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY', + short_desc => 'Size of the shared memory queue used by the WAL pipeline.', + flags => 'GUC_UNIT_MB', + variable => 'wal_pipeline_mq_size_mb', + boot_val => '128', + min => '1', + max => '1024', +}, + { name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.', variable => 'wal_receiver_create_temp_slot', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index e686d88afc4..1d2fdb9747a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -277,6 +277,8 @@ #recovery_prefetch = try # prefetch pages referenced in the WAL? #wal_decode_buffer_size = 512kB # lookahead window used for prefetching # (change requires restart) +#wal_pipeline = off # decode in parallel +#wal_pipeline_queue_size = 128MB # - Archiving - diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index dcc12eb8cbe..a0c26f8005f 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -40,6 +40,8 @@ extern PGDLLIMPORT int min_wal_size_mb; extern PGDLLIMPORT int max_wal_size_mb; extern PGDLLIMPORT int wal_keep_size_mb; extern PGDLLIMPORT int max_slot_wal_keep_size_mb; +extern PGDLLIMPORT int wal_pipeline_mq_size_mb; +extern PGDLLIMPORT bool wal_pipeline_enabled; extern PGDLLIMPORT int XLOGbuffers; extern PGDLLIMPORT int XLogArchiveTimeout; extern PGDLLIMPORT int wal_retrieve_retry_interval; diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h new file mode 100644 index 00000000000..5740b05f79c --- /dev/null +++ b/src/include/access/xlogpipeline.h @@ -0,0 +1,188 @@ +/*------------------------------------------------------------------------- + * + * xlogpipeline.h + * WAL replay pipeline for parallel recovery + * + * This module implements a producer-consumer pipeline for WAL replay: + * - Producer: background worker that reads and decodes WAL records + * - Consumer: startup process: core redo loop + * + * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL + * records from producer to consumer, enabling parallelism while + * maintaining sequential replay semantics. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * + * src/include/access/xlogpipeline.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAL_PIPELINE_H +#define WAL_PIPELINE_H + +#include "access/xlogreader.h" +#include "access/xlogrecovery.h" +#include "access/xlogutils.h" +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/spin.h" + +/* + * Magic number for shared memory TOC + */ +#define PG_WAL_PIPELINE_MAGIC 0x57414C50 /* "WALP" */ + +/* + * Message types sent through the pipeline + */ +typedef enum WalMsgType +{ + WAL_MSG_INVALID = 0, + WAL_MSG_RECORD, /* Decoded WAL record */ + WAL_MSG_SHUTDOWN, /* Graceful shutdown request */ + WAL_MSG_ERROR /* Error occurred in producer */ +} WalMsgType; + +/* Wire header for a serialized WAL message */ +typedef struct WalRecordMsgHeader +{ + WalMsgType msg_type; /* WAL_MSG_RECORD etc */ + XLogRecPtr readRecPtr; /* XLogReaderState->ReadRecPtr */ + XLogRecPtr abortedRecPtr; /* XLogReaderState->abortedRecPtr */ + XLogRecPtr missingContrecPtr; /* XLogReaderState->missingContrecPtr */ + XLogRecPtr overwrittenRecPtr; /* XLogReaderState->overwrittenRecPtr */ + ReplOriginId record_origin; /* DecodedXLogRecord->record_origin */ + TransactionId toplevel_xid; /* DecodedXLogRecord->toplevel_xid */ + XLogRecPtr endRecPtr; /* EndRecPtr */ + uint32 decoded_size; /* total size of decoded record payload (not including this header) */ + int32 max_block_id; /* highest block id (could be -1) */ + uint32 main_data_len; /* length of main_data */ + /* followed by decoded payload bytes */ +} WalRecordMsgHeader; + +/* Per-block metadata on the wire (no pointers) */ +typedef struct SerializedBlockMeta +{ + int32 block_id; + bool in_use; + RelFileLocator rlocator; /* same as decoded block */ + ForkNumber forknum; + BlockNumber blkno; + uint8 flags; + bool has_image; + bool apply_image; + bool has_data; + uint16 bimg_len; + uint8 bimg_info; + uint16 hole_offset; + uint16 hole_length; + uint16 data_len; + /* followed by bimg bytes then data bytes */ +} SerializedBlockMeta; + +/* + * Parameters passed from StartupXLOG (consumer side) + * to the WAL pipeline producer background worker. + */ +typedef struct WalPipelineParams +{ + bool StandbyMode; + bool StandbyModeRequested; + bool ArchiveRecoveryRequested; + bool InArchiveRecovery; + bool InRedo; + bool lastSourceFailed; + bool pendingWalRcvRestart; + bool backupEndRequired; + + TimeLineID RedoStartTLI; + TimeLineID CheckPointTLI; + TimeLineID recoveryTargetTLI; + TimeLineID minRecoveryPointTLI; + TimeLineID ReplayTLI; + TimeLineID receiveTLI; + + XLogRecPtr backupStartPoint; + XLogRecPtr backupEndPoint; + XLogRecPtr CheckPointLoc; + XLogRecPtr RedoStartLSN; + XLogRecPtr NextRecPtr; + XLogRecPtr minRecoveryPoint; + XLogRecPtr flushedUpto; + XLogRecPtr abortedRecPtr; + XLogRecPtr missingContrecPtr; + + int readFile; + XLogSegNo readSegNo; + uint32 readOff; + uint32 readLen; + XLogSource readSource; + TimeLineID curFileTLI; + + + HotStandbyState standbyState; + XLogSource currentSource; + +} WalPipelineParams; + +/* + * Shared memory control structure for the WAL pipeline + */ +typedef struct WalPipelineShmCtl +{ + /* Lifecycle management */ + slock_t mutex; + bool initialized; + bool shutdown_requested; + bool producer_exited; + + /* Producer state */ + pid_t producer_pid; + ProcNumber producer_procnum; + XLogRecPtr producer_lsn; /* Last LSN read by producer */ + + /* Consumer state */ + pid_t consumer_pid; + XLogRecPtr consumer_lsn; /* Last LSN recieved by consumer */ + XLogRecPtr applied_lsn; /* Last LSN applied by consumer */ + + /* Queue handles */ + dsm_handle dsm_seg_handle; + shm_mq_handle *producer_mq_handle; + shm_mq_handle *consumer_mq_handle; + + /* Statistics */ + uint64 records_sent; + uint64 records_received; + uint64 bytes_sent; + uint64 bytes_received; + + /* Error state */ + int error_code; + char error_message[256]; +} WalPipelineShmCtl; + +/* consumer may have to compute prefetecher stats */ +extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined; + +/* + * Public API functions + */ + +/* Initialize the WAL pipeline shared memory structures */ +extern Size WalPipelineShmemSize(void); +extern void WalPipelineShmemInit(void); + +/* Producer functions (called by background worker) */ +extern void WalPipeline_ProducerMain(Datum main_arg); +extern bool WalPipeline_SendRecord(XLogReaderState *record); +extern bool WalPipeline_SendShutdown(void); +extern bool WalPipeline_SendError(int errcode, const char *errmsg); + + +extern void ProcessPipelineBgwInterrupts(void); + +/* Global shared memory pointer */ +extern WalPipelineShmCtl *WalPipelineShm; + +#endif /* WAL_PIPELINE_H */ \ No newline at end of file diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 2842106b285..e675ab8353d 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -11,6 +11,7 @@ #ifndef XLOGRECOVERY_H #define XLOGRECOVERY_H +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" @@ -60,6 +61,17 @@ typedef enum RecoveryPauseState RECOVERY_PAUSED, /* recovery is paused */ } RecoveryPauseState; +/* Codes indicating where we got a WAL file from during recovery, or where + * to attempt to get one. + */ +typedef enum +{ + XLOG_FROM_ANY = 0, /* request to read WAL from any source */ + XLOG_FROM_ARCHIVE, /* restored using restore_command */ + XLOG_FROM_PG_WAL, /* existing file in pg_wal */ + XLOG_FROM_STREAM, /* streamed from primary */ +} XLogSource; + /* * Shared-memory state for WAL recovery. */ @@ -208,6 +220,8 @@ typedef struct bool recovery_signal_file_found; } EndOfWalRecoveryInfo; +struct WalPipelineParams; /* forward declaration */ + extern EndOfWalRecoveryInfo *FinishWalRecovery(void); extern void ShutdownWalRecovery(void); extern void RemovePromoteSignalFiles(void); @@ -220,6 +234,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern TimestampTz GetLatestXTime(void); extern TimestampTz GetCurrentChunkReplayStartTime(void); extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI); +extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, + bool fetching_ckpt, TimeLineID replayTLI); +extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf); extern bool PromoteIsTriggered(void); extern bool CheckPromoteSignal(void); @@ -229,6 +247,7 @@ extern void StartupRequestWalReceiverRestart(void); extern void XLogRequestWalReceiverReply(void); extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue); +extern void InitializePipelineRecoveryEnv(struct WalPipelineParams *params); extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 36d789720a3..fc23acbaec2 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -61,6 +61,7 @@ tests += { 't/050_redo_segment_missing.pl', 't/051_effective_wal_level.pl', 't/052_checkpoint_segment_missing.pl', + 't/053_walpipeline.pl', ], }, } diff --git a/src/test/recovery/t/053_walpipeline.pl b/src/test/recovery/t/053_walpipeline.pl new file mode 100644 index 00000000000..2fb790aadc5 --- /dev/null +++ b/src/test/recovery/t/053_walpipeline.pl @@ -0,0 +1,208 @@ +# Copyright (c) 2025-2026, PostgreSQL Global Development Group +# +# Tests for the WAL pipeline feature (wal_pipeline GUC). + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# ---------- +# Helpers +# ---------- + +sub slurp_log +{ + my ($node) = @_; + open(my $fh, '<', $node->logfile()) or die "Cannot open log: $!"; + my @lines = <$fh>; + close($fh); + return @lines; +} + +sub log_matches +{ + my ($node, $re) = @_; + return grep { /$re/ } slurp_log($node); +} + + +# ######################################## +# wal_pipeline = on, basic recovery +# ######################################## + +my $node1 = PostgreSQL::Test::Cluster->new('p1-recovery'); +$node1->init; +$node1->start; + +$node1->safe_psql('postgres', q{ + CREATE TABLE t (id serial PRIMARY KEY, v text); + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# generate more WAL +$node1->safe_psql('postgres', q{ + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# crash stop to force WAL recovery +$node1->stop('immediate'); + +# restart → recovery happens +$node1->append_conf('postgresql.conf', "wal_pipeline = on"); +$node1->start; + + +# Producer started +ok(scalar log_matches($node1, qr/\[walpipeline\] producer: started at/), + 'producer started message found in log'); + +# Pipeline stopped cleanly +ok(scalar log_matches($node1, qr/\[walpipeline\] stopped/), + 'pipeline stopped message found in log'); + +# Consumer received shutdown from producer +ok(scalar log_matches($node1, qr/\[walpipeline\] consumer: received shutdown message/), + 'consumer received shutdown message from producer'); + +# sent == received +my @exit_lines = log_matches($node1, + qr/\[walpipeline\] producer: exiting: sent=\d+ received=\d+/); +ok(scalar @exit_lines >= 1, 'producer exiting line found in log'); + +my ($sent, $recv) = $exit_lines[-1] =~ /sent=(\d+) received=(\d+)/; +ok(defined $sent && $sent > 0, "sent count ($sent) is positive"); +ok(defined $recv && $recv > 0, "received count ($recv) is positive"); +is($sent, $recv, "no records lost in pipeline queue: sent=$sent received=$recv"); + +# No PANIC +ok(!(scalar log_matches($node1, qr/\bPANIC\b/)), + 'no PANIC messages during pipeline recovery'); + +# Data integrity +my $count = $node1->safe_psql('postgres', 'SELECT count(*) FROM t'); +is($count + 0, 100_000, 'all 100000 rows visible after pipeline recovery'); + +$node1->stop; + +# ############################################################## +# wal_pipeline = off (baseline, no pipeline log messages) +# ############################################################## + +my $node2 = PostgreSQL::Test::Cluster->new('p0-recovery'); +$node2->init; +$node2->start; + +$node2->safe_psql('postgres', q{ + CREATE TABLE t (id serial PRIMARY KEY, v text); + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# generate more WAL +$node2->safe_psql('postgres', q{ + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1,50000) i; +}); + +# crash stop to force WAL recovery +$node2->stop('immediate'); + +# restart → recovery happens +$node2->append_conf('postgresql.conf', "wal_pipeline = off"); +$node2->start; + +ok(!(scalar log_matches($node2, qr/\[walpipeline\] producer: started/)), + 'no pipeline log messages when wal_pipeline = off'); + +my $count2 = $node2->safe_psql('postgres', 'SELECT count(*) FROM t'); +is($count2 + 0, 100_000, 'all rows present after non-pipeline recovery'); + +$node2->stop; + + + +# ################################################################### +# pipeline on vs off produce identical data (checksum comparison) +# ################################################################### + +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 1); +$primary->start; + +$primary->safe_psql('postgres', q{ + CREATE TABLE t (id serial PRIMARY KEY, v text); + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1, 30000) i; +}); + +$primary->backup('backup3'); + +$primary->safe_psql('postgres', q{ + INSERT INTO t (v) + SELECT md5(i::text) FROM generate_series(1, 30000) i; + UPDATE t SET v = 'x' WHERE id % 10 = 0; +}); + +# ensure WAL boundary +$primary->safe_psql('postgres', 'SELECT pg_switch_wal()'); +my $target_lsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_lsn()'); + +my $replica_on = PostgreSQL::Test::Cluster->new('replica_p1'); +$replica_on->init_from_backup($primary, 'backup3', + has_streaming => 1); +$replica_on->append_conf('postgresql.conf', "wal_pipeline = on\n"); +$replica_on->start; + +my $replica_off = PostgreSQL::Test::Cluster->new('replica_p0'); +$replica_off->init_from_backup($primary, 'backup3', + has_streaming => 1); +$replica_off->append_conf('postgresql.conf', "wal_pipeline = off\n"); +$replica_off->start; + +# wait for replicas to catch up +$primary->wait_for_catchup($replica_on); +$primary->wait_for_catchup($replica_off); + +my $md5_on = $replica_on->safe_psql('postgres', + "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t"); + +my $md5_off = $replica_off->safe_psql('postgres', + "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t"); + +is($md5_on, $md5_off, + 'table checksum identical between pipeline=on and pipeline=off'); + +$replica_on->stop; +$replica_off->stop; +$primary->stop('fast'); + + + +# ################################# +# pipeline when on tiny replay +# ################################# + +my $node3 = PostgreSQL::Test::Cluster->new('p1-small-replay'); +$node3->init; +$node3->start; + +# crash stop to force WAL recovery +$node3->stop('immediate'); + +# restart → recovery happens +$node3->append_conf('postgresql.conf', "wal_pipeline = on"); +$node3->start; + +ok(scalar log_matches($node3, qr/\[walpipeline\] producer: exiting: sent=0 received=0/), + 'pipeline producer sent zero records'); + +ok((scalar log_matches($node3, qr/redo done at/)), + 'pipeline redo done even with tiny replay'); + +$node3->stop; + +done_testing(); \ No newline at end of file -- 2.34.1