From 323482e965096afb5fc2a0ddefb4a075f2716f28 Mon Sep 17 00:00:00 2001 From: Imran Zaheer Date: Tue, 17 Mar 2026 21:48:05 +0500 Subject: [PATCH v2 2/2] Pipelined Recovery - Consumer This includes the consumer-specific code for the producer-consumer architecture for WAL replay that separates WAL decoding from the recovery process, enabling parallel processing between different steps of replay. The consumer receives the decoded record from the shared memory message queue. Both producer and consumer should be aware of any state changes, so variable states are synced through shared memory `XLogRecoveryCtlData`. Also, constant states are shared from the consumer to the producer using `WalPipelineParams`. Author: Imran Zaheer Idea by: Ants Aasma --- src/backend/access/transam/xlog.c | 5 +- src/backend/access/transam/xlogpipeline.c | 525 ++++++++++++++++++++ src/backend/access/transam/xlogprefetcher.c | 3 +- src/backend/access/transam/xlogrecovery.c | 469 +++++++++++++++-- src/backend/storage/ipc/standby.c | 1 + src/include/access/xlogpipeline.h | 15 + src/include/access/xlogrecovery.h | 50 ++ 7 files changed, 1028 insertions(+), 40 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f5c9a34374d..3e1b7d78055 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -59,6 +59,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" +#include "access/xlogpipeline.h" #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" @@ -5924,6 +5925,7 @@ StartupXLOG(void) ProcArrayApplyRecoveryInfo(&running); } + SetSharedHotStandbyState(); } /* @@ -8488,6 +8490,7 @@ xlog_redo(XLogReaderState *record) running.xids = xids; ProcArrayApplyRecoveryInfo(&running); + SetSharedHotStandbyState(); } /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ @@ -9667,7 +9670,7 @@ GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli) void XLogShutdownWalRcv(void) { - Assert(AmStartupProcess() || !IsUnderPostmaster); + Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline()); ShutdownWalRcv(); ResetInstallXLogFileSegmentActive(); diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c index 4b95a11d16b..5bb575b1f14 100644 --- a/src/backend/access/transam/xlogpipeline.c +++ b/src/backend/access/transam/xlogpipeline.c @@ -52,6 +52,16 @@ #include "utils/timeout.h" +/* + * Convert values of GUCs measured in megabytes to bytes + */ +#define MBToBytes(mbvar) (mbvar * 1024 * 1024) + +/* + * Waiting for consumer before exiting gracefully. + */ +#define MAX_SHUTDOWN_WAIT_ITERS 1000 /* 1000 * 10ms = 10 seconds */ + /* Global shared memory control structure */ WalPipelineShmCtl *WalPipelineShm = NULL; @@ -62,6 +72,10 @@ static dsm_segment *producer_dsm_seg = NULL; static shm_mq *producer_mq = NULL; static shm_mq_handle *producer_mq_handle = NULL; +/* Local state for consumer */ +static dsm_segment *consumer_dsm_seg = NULL; +static shm_mq *consumer_mq = NULL; +static shm_mq_handle *consumer_mq_handle = NULL; /* * Flags set by interrupt handlers for later service in the redo loop. @@ -74,7 +88,9 @@ static void PipelineBgwSigHupHandler(SIGNAL_ARGS); /* Forward declarations */ static void wal_pipeline_cleanup_callback(int code, Datum arg); static Size serialize_wal_record(XLogReaderState *record, char **buffer); +static DecodedXLogRecord *deserialize_wal_record(const char *buffer, Size len, XLogReaderState *startup_reader, bool first_iteration); static void cleanup_producer_resources(void); +static void cleanup_consumer_resources(void); static void WalPipeline_WaitForConsumerShutdownRequest(void); /* copied from xlogrecovery.c */ @@ -135,6 +151,168 @@ WalPipelineShmemInit(void) } +/* + * Called by Consumer. + * + * Initialize and start the WAL pipeline. This will be called by the startup + * process (consumer) as a request to start the pipeline. + */ +void +WalPipeline_Start(WalPipelineParams *params) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle; + dsm_segment *seg; + shm_toc_estimator e; + shm_toc *toc; + Size segsize; + shm_mq *mq; + WalPipelineParams *shared_params; + pid_t pid; + BgwHandleStatus status; + + SpinLockAcquire(&WalPipelineShm->mutex); + if (WalPipelineShm->initialized) + { + SpinLockRelease(&WalPipelineShm->mutex); + return; /* Already started */ + } + WalPipelineShm->initialized = true; + SpinLockRelease(&WalPipelineShm->mutex); + + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(WalPipelineParams)); + shm_toc_estimate_chunk(&e, MBToBytes(wal_pipeline_mq_size_mb)); + shm_toc_estimate_keys(&e, 2); /* key=1 → params, key=2 → mq */ + segsize = shm_toc_estimate(&e); + + seg = dsm_create(segsize, 0); + dsm_pin_segment(seg); + + toc = shm_toc_create(PG_WAL_PIPELINE_MAGIC, + dsm_segment_address(seg), segsize); + + /* + * Pass the startup process vaaraibles state through WalPipelineParams + */ + shared_params = shm_toc_allocate(toc, sizeof(WalPipelineParams)); + shm_toc_insert(toc, 1, shared_params); + *shared_params = *params; + + /* create the message queue */ + mq = shm_mq_create(shm_toc_allocate(toc, MBToBytes(wal_pipeline_mq_size_mb)), + MBToBytes(wal_pipeline_mq_size_mb)); + shm_toc_insert(toc, 2, mq); + + /* update shared state */ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->dsm_seg_handle = dsm_segment_handle(seg); + WalPipelineShm->consumer_pid = MyProcPid; + SpinLockRelease(&WalPipelineShm->mutex); + + /* Set up consumer side of the queue */ + consumer_dsm_seg = seg; + consumer_mq = mq; + shm_mq_set_receiver(consumer_mq, MyProc); + consumer_mq_handle = shm_mq_attach(consumer_mq, seg, NULL); + + /* Register cleanup callback */ + before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0); + + /* Register background worker */ + memset(&worker, 0, sizeof(worker)); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_PostmasterStart; + worker.bgw_restart_time = BGW_NEVER_RESTART; + sprintf(worker.bgw_library_name, "postgres"); + sprintf(worker.bgw_function_name, "WalPipeline_ProducerMain"); + snprintf(worker.bgw_name, BGW_MAXLEN, "wal pipeline producer"); + snprintf(worker.bgw_type, BGW_MAXLEN, "wal pipeline producer"); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->initialized = false; + SpinLockRelease(&WalPipelineShm->mutex); + + dsm_unpin_segment(dsm_segment_handle(seg)); + dsm_detach(seg); + consumer_dsm_seg = NULL; + consumer_mq = NULL; + consumer_mq_handle = NULL; + + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background worker for WAL pipeline"))); + } + + status = WaitForBackgroundWorkerStartup(handle, &pid); + + if (status != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background process"), + errhint("More details may be available in the server log."))); + else + ereport(LOG, (errmsg("[walpipeline] started."))); +} + +/* + * Request producer shutdown. + * This is called by the consumer when it no longer needs records. + */ +static void +WalPipeline_RequestShutdown(void) +{ + if (!WalPipelineShm) + return; + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->shutdown_requested = true; + SpinLockRelease(&WalPipelineShm->mutex); +} + +/* + * Consumer Function. + * Stop the WAL pipeline. This will also be called be the startup process + * (consumer). This will only be called when consumer don't need any more + * decoded records. This function will also wait until the pipeline workers + * are stopped. + */ +void +WalPipeline_Stop(void) +{ + if (!WalPipelineShm || !WalPipelineShm->initialized) + return; + + /* Ask producer to stop */ + WalPipeline_RequestShutdown(); + + /* Wait for producer to exit (max 10 seconds) */ + for (int i = 0; i < 100; i++) + { + bool producer_alive; + + SpinLockAcquire(&WalPipelineShm->mutex); + producer_alive = (WalPipelineShm->producer_pid != 0); + SpinLockRelease(&WalPipelineShm->mutex); + + if (!producer_alive) + break; + + pg_usleep(100000); /* 100 ms */ + } + + cleanup_consumer_resources(); + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->initialized = false; + SpinLockRelease(&WalPipelineShm->mutex); + + elog(LOG, "[walpipeline] stopped"); +} /* * Producer Function. @@ -374,6 +552,117 @@ WalPipeline_SendError(int errcode, const char *errmsg) return true; } +/* + * Consumer Function. + * Receive and deserialize a WAL record from the producer + */ +DecodedXLogRecord * +WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration) +{ + shm_mq_result res; + Size nbytes; + void *data; + char *err_msg; + int err_code; + WalRecordMsgHeader *hdr; + DecodedXLogRecord *record; + + if (!consumer_mq_handle) + return NULL; + + /* Receive message from queue */ + res = shm_mq_receive(consumer_mq_handle, &nbytes, &data, false); + + if (res != SHM_MQ_SUCCESS) + elog(ERROR, "[walpipeline] consumer: failed to receive record"); + + hdr = (WalRecordMsgHeader *) data; + + /* Handle different message types */ + switch (hdr->msg_type) + { + case WAL_MSG_RECORD: + record = deserialize_wal_record((char *) data, nbytes, startup_reader, first_iteration); + + /* Update statistics */ + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->records_received++; + WalPipelineShm->bytes_received += nbytes; + WalPipelineShm->consumer_lsn = hdr->endRecPtr; + SpinLockRelease(&WalPipelineShm->mutex); + + return record; + + case WAL_MSG_SHUTDOWN: + elog(LOG, "[walpipeline] consumer: received shutdown message from the producer"); + return NULL; + + case WAL_MSG_ERROR: + SpinLockAcquire(&WalPipelineShm->mutex); + err_code = WalPipelineShm->error_code; + err_msg = WalPipelineShm->error_message; + SpinLockRelease(&WalPipelineShm->mutex); + + ereport(ERROR, + (errcode(err_code), + errmsg("[walpipeline] consumer: received error from the producer: %s", err_msg))); + return NULL; + + default: + elog(PANIC, "[walpipeline] consumer: unknown message type: %d", + hdr->msg_type); + return NULL; + } +} + +/* + * Consumer Function. + * Check if producer is still running + */ +bool +WalPipeline_CheckProducerAlive(void) +{ + pid_t pid; + bool alive; + + SpinLockAcquire(&WalPipelineShm->mutex); + pid = WalPipelineShm->producer_pid; + SpinLockRelease(&WalPipelineShm->mutex); + + if (pid == 0) + return false; + + alive = (kill(pid, 0) == 0); + + if (!alive) + { + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->producer_pid = 0; + SpinLockRelease(&WalPipelineShm->mutex); + } + + return alive; +} + +/* + * Consumer Function. + * Check if pipeline is active + */ +bool +WalPipeline_IsActive(void) +{ + bool active; + + if (!WalPipelineShm) + return false; + + SpinLockAcquire(&WalPipelineShm->mutex); + active = WalPipelineShm->initialized && !WalPipelineShm->shutdown_requested; + SpinLockRelease(&WalPipelineShm->mutex); + + return active; +} + /* * Producer Function. * Producer may can exit without waiting for the consumer, but its better to @@ -411,6 +700,55 @@ WalPipeline_WaitForConsumerShutdownRequest(void) } } +/* + * Consumer Function. + * Wait unless last sent record by the pipeline is applied by the + * startup process. + */ +void +WalPipeline_WaitForConsumerCatchup(void) +{ + XLogRecPtr producer_lsn; + XLogRecPtr consumer_lsn; + + for (;;) + { + SpinLockAcquire(&WalPipelineShm->mutex); + producer_lsn = WalPipelineShm->producer_lsn; + consumer_lsn = WalPipelineShm->applied_lsn; + SpinLockRelease(&WalPipelineShm->mutex); + + if (producer_lsn == consumer_lsn) + return; + + CHECK_FOR_INTERRUPTS(); + + /* short sleep to avoid busy looping */ + pg_usleep(50); /* 50 microseconds */ + } +} + +/* + * Consumer Function. + * Get pipeline statistics + */ +void +WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received, + XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn) +{ + SpinLockAcquire(&WalPipelineShm->mutex); + + if (records_sent) + *records_sent = WalPipelineShm->records_sent; + if (records_received) + *records_received = WalPipelineShm->records_received; + if (producer_lsn) + *producer_lsn = WalPipelineShm->producer_lsn; + if (consumer_lsn) + *consumer_lsn = WalPipelineShm->consumer_lsn; + + SpinLockRelease(&WalPipelineShm->mutex); +} /* * serialize_wal_record (Producer) @@ -539,6 +877,167 @@ serialize_wal_record(XLogReaderState *xlogreader, char **outbuf) return total; } +/* + * deserialize_wal_record (Consumer) + * + * Deserialize a WAL record from a buffer into a DecodedXLogRecord. + * + * Memory layout: + * [DecodedXLogRecord + blocks][main_data][block_images][block_data] + */ +DecodedXLogRecord * +deserialize_wal_record(const char *buf, Size len, XLogReaderState *startup_reader, bool first_iteration) +{ + const char *ptr = buf; + const char *end = buf + len; + WalRecordMsgHeader hdr; + DecodedXLogRecord *dec = NULL; + char *alloc_ptr; + int nblocks; + Size total; + + if (len < sizeof(WalRecordMsgHeader)) + return NULL; + + memcpy(&hdr, ptr, sizeof(hdr)); + ptr += sizeof(hdr); + + if (hdr.decoded_size != len - sizeof(WalRecordMsgHeader)) + return NULL; + + nblocks = (hdr.max_block_id >= 0) ? hdr.max_block_id + 1 : 0; + + /* ---- space allocation ---- */ + total = MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock)) + + MAXALIGN(hdr.decoded_size); + + dec = palloc(total); + memset(dec, 0, total); + + alloc_ptr = (char *)dec + MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock)); + + /* ---- record metadata ---- */ + dec->lsn = hdr.readRecPtr; + dec->next_lsn = hdr.endRecPtr; + dec->max_block_id = hdr.max_block_id; + dec->main_data_len = hdr.main_data_len; + dec->toplevel_xid = hdr.toplevel_xid; + dec->record_origin = hdr.record_origin; + + /* ---- XLogRecord ---- */ + if (ptr + sizeof(XLogRecord) > end) + goto fail; + + memcpy(&dec->header, ptr, sizeof(XLogRecord)); + ptr += sizeof(XLogRecord); + + /* ---- main data ---- */ + if (hdr.main_data_len > 0) + { + if (ptr + hdr.main_data_len > end) + goto fail; + + dec->main_data = alloc_ptr; + memcpy(dec->main_data, ptr, hdr.main_data_len); + ptr += hdr.main_data_len; + alloc_ptr += MAXALIGN(hdr.main_data_len); + } + + /* ---- blocks ---- */ + for (int i = 0; i < nblocks && ptr < end; i++) + { + SerializedBlockMeta meta; + DecodedBkpBlock *blk; + + if (ptr + sizeof(meta) > end) + break; + + memcpy(&meta, ptr, sizeof(meta)); + ptr += sizeof(meta); + + if (!meta.in_use) + continue; + + if (meta.block_id < 0 || meta.block_id >= nblocks) + goto fail; + + blk = &dec->blocks[meta.block_id]; + + blk->in_use = true; + blk->rlocator = meta.rlocator; + blk->forknum = meta.forknum; + blk->blkno = meta.blkno; + blk->flags = meta.flags; + blk->has_image = meta.has_image; + blk->apply_image = meta.apply_image; + blk->has_data = meta.has_data; + blk->bimg_len = meta.bimg_len; + blk->bimg_info = meta.bimg_info; + blk->hole_offset = meta.hole_offset; + blk->hole_length = meta.hole_length; + blk->data_len = meta.data_len; + + if (blk->has_image && blk->bimg_len > 0) + { + if (ptr + blk->bimg_len > end) + goto fail; + + blk->bkp_image = alloc_ptr; + memcpy(blk->bkp_image, ptr, blk->bimg_len); + ptr += blk->bimg_len; + alloc_ptr += MAXALIGN(blk->bimg_len); + } + + if (blk->has_data && blk->data_len > 0) + { + if (ptr + blk->data_len > end) + goto fail; + + blk->data = alloc_ptr; + memcpy(blk->data, ptr, blk->data_len); + ptr += blk->data_len; + alloc_ptr += MAXALIGN(blk->data_len); + } + } + + dec->size = alloc_ptr - (char *)dec; + dec->oversized = false; + + /* + * The previous decoded record has been deserialized from + * from the pipeline and hence need to free the memory after + * use. + * + * But for the first iteration memory space for `reader->record` + * was allocated from the `decode_buffer`, and freeing this + * memory can be fatal. This memory will be freed automatically + * at the end of the recovery in `finishwalrecovery()`. So we + * will skip pfree for the first iteration (apply). + */ + if (startup_reader->record && !first_iteration) + pfree(startup_reader->record); + + /* Attach to reader, only updating the public parameters */ + startup_reader->record = dec; + startup_reader->ReadRecPtr = dec->lsn; + startup_reader->DecodeRecPtr = dec->lsn; + startup_reader->EndRecPtr = dec->next_lsn; + startup_reader->NextRecPtr = dec->next_lsn; + startup_reader->decode_queue_head = dec; + startup_reader->decode_queue_tail = dec; + startup_reader->missingContrecPtr = hdr.missingContrecPtr; + startup_reader->abortedRecPtr = hdr.abortedRecPtr; + startup_reader->overwrittenRecPtr = hdr.overwrittenRecPtr; + + return dec; + +fail: + if (dec) + pfree(dec); + + elog(LOG, "deserialize_wal_record: failed"); + return NULL; +} /* * We need to put some assertion that only pipeline worker should be touching @@ -581,6 +1080,32 @@ cleanup_producer_resources(void) SpinLockRelease(&WalPipelineShm->mutex); } +/* + * Clean up consumer-side resources + */ +static void +cleanup_consumer_resources(void) +{ + if (consumer_mq_handle) + { + shm_mq_detach(consumer_mq_handle); + consumer_mq_handle = NULL; + } + + if (consumer_dsm_seg) + { + dsm_unpin_segment(dsm_segment_handle(consumer_dsm_seg)); + dsm_detach(consumer_dsm_seg); + consumer_dsm_seg = NULL; + } + + consumer_mq = NULL; + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->consumer_pid = 0; + WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID; + SpinLockRelease(&WalPipelineShm->mutex); +} /* * Cleanup callback for process exit diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index c235eca7c51..1536ea34c41 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -27,6 +27,7 @@ #include "postgres.h" +#include "access/xlogpipeline.h" #include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" @@ -352,7 +353,7 @@ XLogPrefetchReconfigure(void) static inline void XLogPrefetchIncrement(pg_atomic_uint64 *counter) { - Assert(AmStartupProcess() || !IsUnderPostmaster); + Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline()); pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); } diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index b66ec80fa25..49cc0b9aafa 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -35,6 +35,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" +#include "access/xlogpipeline.h" #include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogrecovery.h" @@ -99,6 +100,8 @@ int recovery_min_apply_delay = 0; char *PrimaryConnInfo = NULL; char *PrimarySlotName = NULL; bool wal_receiver_create_temp_slot = false; +bool wal_pipeline_enabled = false; +int wal_pipeline_mq_size_mb = 128; /* * recoveryTargetTimeLineGoal: what the user requested, if any @@ -205,17 +208,6 @@ typedef struct XLogPageReadPrivate /* flag to tell XLogPageRead that we have started replaying */ static bool InRedo = false; -/* - * Codes indicating where we got a WAL file from during recovery, or where - * to attempt to get one. - */ -typedef enum -{ - XLOG_FROM_ANY = 0, /* request to read WAL from any source */ - XLOG_FROM_ARCHIVE, /* restored using restore_command */ - XLOG_FROM_PG_WAL, /* existing file in pg_wal */ - XLOG_FROM_STREAM, /* streamed from primary */ -} XLogSource; /* human-readable names for XLogSources, for debugging output */ static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"}; @@ -356,12 +348,6 @@ static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); static void ConfirmRecoveryPaused(void); -static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, - int emode, bool fetching_ckpt, - TimeLineID replayTLI); - -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, @@ -383,6 +369,7 @@ static bool HotStandbyActiveInReplay(void); static void SetCurrentChunkStartTime(TimestampTz xtime); static void SetLatestXTime(TimestampTz xtime); +static void InitializePipelineStartupEnv(WalPipelineParams *params); /* * Initialization of shared memory for WAL recovery @@ -411,9 +398,27 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + InitSharedLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); } +/* + * We may not be able to share expectedTLEs list across the sharedmemory. + * For now just trigger the startup process (consumer) to + * reread the timelinehistory file whenever pipeline updates the value for + * expectedTLEs. So the consumer proc will expectedTLEs updated locally. + */ +static void +PipelineConsumerexpectedTLEsUpdateTLI(TimeLineID recoveryTargetTLI) +{ + if (wal_pipeline_enabled) + { + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->expectedTLEsUpdateTLI = recoveryTargetTLI; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } +} + /* * A thin wrapper to enable StandbyMode and do other preparatory work as * needed. @@ -429,7 +434,8 @@ EnableStandbyMode(void) * startup progress timeout in standby mode to avoid calling * startup_progress_timeout_handler() unnecessarily. */ - disable_startup_progress_timeout(); + if (!AmWalPipeline()) + disable_startup_progress_timeout(); } /* @@ -489,7 +495,10 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, * recovery, if required. */ if (ArchiveRecoveryRequested) + { + OwnLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch); OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + } /* * Set the WAL reading processor now, as it will be needed when reading @@ -961,6 +970,14 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, minRecoveryPointTLI = 0; } + /* update shared state. */ + if (wal_pipeline_enabled) + { + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + /* * Start recovery assuming that the final record isn't lost. */ @@ -972,6 +989,12 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, *haveTblspcMap_ptr = haveTblspcMap; } +void DisownRecoveryWakeupLatch() +{ + if (ArchiveRecoveryRequested) + DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch); +} + /* * See if there are any recovery signal files and if so, set state for * recovery. @@ -1420,6 +1443,15 @@ FinishWalRecovery(void) TimeLineID lastRecTLI; XLogRecPtr endOfLog; + if (wal_pipeline_enabled) + { + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + InArchiveRecovery = XLogRecoveryCtl->InArchiveRecovery; + missingContrecPtr = XLogRecoveryCtl->missingContrecPtr; + abortedRecPtr = XLogRecoveryCtl->abortedRecPtr; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + /* * Kill WAL receiver, if it's still running, before we continue to write * the startup checkpoint and aborted-contrecord records. It will trump @@ -1477,6 +1509,17 @@ FinishWalRecovery(void) lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr; lastRecTLI = XLogRecoveryCtl->lastReplayedTLI; } + + /* + * Invalidate contents of internal buffer before read attempt. Just set + * the length to 0, rather than a full XLogReaderInvalReadState(). + * + * This is needed because we could be reading from the pipeline reader so + * far, so before moving back the the startup proc readerstate better to + * invalidate it. + */ + xlogreader->readLen = 0; + XLogPrefetcherBeginRead(xlogprefetcher, lastRec); (void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI); endOfLog = xlogreader->EndRecPtr; @@ -1599,7 +1642,69 @@ ShutdownWalRecovery(void) * it, but let's do it for the sake of tidiness. */ if (ArchiveRecoveryRequested) - DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + { + DisownLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch); + + /* + * Only disown the latch if we were the owner (pipeline disabled). + */ + if (!wal_pipeline_enabled) + DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + } + + +} + +/* + * Get next record for redo. + * Use the pipeline if enabled for parallel decoding and receive decoded + * records from a shared queue, else read it directly. + */ +static XLogRecord * +ReceiveRecord(XLogPrefetcher *xlogprefetcher, int emode, + bool fetching_ckpt, TimeLineID replayTLI, + XLogReaderState **localreader, bool first_iteration) +{ + + XLogRecord *record = NULL; + XLogReaderState *reader = *localreader; + + /* + * If pipeline not enabled read the record directly + */ + if (!wal_pipeline_enabled) + { + record = ReadRecord(xlogprefetcher, emode, fetching_ckpt, replayTLI); + return record; + } + + /* + * Get record from the pipeline + */ + if (WalPipeline_IsActive()) + { + DecodedXLogRecord *decoded_record = NULL; + + decoded_record = WalPipeline_ReceiveRecord(reader, first_iteration); + + if (decoded_record) + { + record = &decoded_record->header; + return record; + } + else + { + /* + * We will end up here only when pipeline couldn't read more + * records and have sent a shutdown msg. We will acknowldge this + * and will trigger request to stop the pipeline workers. + */ + WalPipeline_Stop(); + return NULL; + } + } + + elog(PANIC, "[walpipeline] consumer: pipeline not active, even though wal_pipeline is set to on."); } /* @@ -1614,6 +1719,12 @@ PerformWalRecovery(void) bool reachedRecoveryTarget = false; TimeLineID replayTLI; + /* + * standalone backend may exist in case of pg_rewind. + */ + if (!IsUnderPostmaster) + wal_pipeline_enabled = false; + /* * Initialize shared variables for tracking progress of WAL replay, as if * we had just replayed the record before the REDO location (or the @@ -1688,11 +1799,24 @@ PerformWalRecovery(void) { TimestampTz xtime; PGRUsage ru0; + uint64 loop_count = 0; pg_rusage_init(&ru0); InRedo = true; + if(wal_pipeline_enabled) + { + /* + * Startup proc parameters that pipeline shold also be aware of. + */ + WalPipelineParams *params = palloc0(sizeof(WalPipelineParams)); + + params->ReplayTLI = replayTLI; + InitializePipelineStartupEnv(params); + WalPipeline_Start(params); + } + RmgrStartup(); ereport(LOG, @@ -1798,7 +1922,7 @@ PerformWalRecovery(void) } /* Else, try to fetch the next WAL record */ - record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); + record = ReceiveRecord(xlogprefetcher, LOG, false, replayTLI, &xlogreader, loop_count++ == 0); } while (record != NULL); /* @@ -1807,6 +1931,9 @@ PerformWalRecovery(void) if (reachedRecoveryTarget) { + if (wal_pipeline_enabled) + WalPipeline_Stop(); + if (!reachedConsistency) ereport(FATAL, (errmsg("requested recovery stop point is before consistent recovery point"))); @@ -1841,6 +1968,8 @@ PerformWalRecovery(void) RmgrCleanup(); + // XXX: testing purpose only + ereport(DEBUG1, (errmsg("replay loop fiinished with loop count: " UINT64_FORMAT, loop_count))); ereport(LOG, errmsg("redo done at %X/%08X system usage: %s", LSN_FORMAT_ARGS(xlogreader->ReadRecPtr), @@ -1879,7 +2008,9 @@ static void ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI) { ErrorContextCallback errcallback; + XLogRecPtr walrcv_flushedupto; bool switchedTLI = false; + bool pipeline_enabled_stanby = false; /* Setup error traceback support for ereport() */ errcallback.callback = rm_redo_error_callback; @@ -1980,6 +2111,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr; XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr; XLogRecoveryCtl->lastReplayedTLI = *replayTLI; + walrcv_flushedupto = XLogRecoveryCtl->flushedUptoRecPtr; + pipeline_enabled_stanby = XLogRecoveryCtl->stanbyEnabled; SpinLockRelease(&XLogRecoveryCtl->info_lck); /* ------ @@ -2018,6 +2151,16 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl WalRcvForceReply(); } + /* + * As the pipeline (producer) was running way ahead of the startup proc + * (consumer), see if the producer asked to wakeup the wal_reciever by + * updating the value of `flushedUptoRecPtr`. + */ + if ((walrcv_flushedupto != InvalidXLogRecPtr) && + ((walrcv_flushedupto == xlogreader->EndRecPtr) || + (walrcv_flushedupto == xlogreader->ReadRecPtr))) + WalRcvForceReply(); + /* Allow read-only connections if we're consistent now */ CheckRecoveryConsistency(); @@ -2033,6 +2176,14 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl /* Reset the prefetcher. */ XLogPrefetchReconfigure(); } + + /* Conusmer should also enable the standby if pipline have */ + if (pipeline_enabled_stanby) + EnableStandbyMode(); + + SpinLockAcquire(&WalPipelineShm->mutex); + WalPipelineShm->applied_lsn = xlogreader->EndRecPtr; + SpinLockRelease(&WalPipelineShm->mutex); } /* @@ -2158,12 +2309,27 @@ CheckRecoveryConsistency(void) Assert(InArchiveRecovery); - /* - * assume that we are called in the startup process, and hence don't need - * a lock to read lastReplayedEndRecPtr - */ - lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr; - lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI; + + if (AmStartupProcess()) + { + /* + * assume that we are called in the startup process, and hence don't need + * a lock to read lastReplayedEndRecPtr + */ + lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr; + lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI; + } + else + { + /* + * We could be in the pipeline worker, so update the shared states. + */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr; + lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI; + standbyState = XLogRecoveryCtl->standbyState; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } /* * Have we reached the point where our base backup was completed? @@ -2350,12 +2516,28 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI, TimeLineID replayTLI) { + + /* Check that the record agrees on what the current (old) timeline is */ if (prevTLI != replayTLI) ereport(PANIC, (errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record", prevTLI, replayTLI))); + + /* Pipeline may have updated the expectedTLEs */ + if (wal_pipeline_enabled) + { + TimeLineID targetTLI; + + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + targetTLI = XLogRecoveryCtl->expectedTLEsUpdateTLI; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + + if (targetTLI) + expectedTLEs = readTimeLineHistory(targetTLI); + } + /* * The new timeline better be in the list of timelines we expect to see, * according to the timeline history. It should also not decrease. @@ -3003,7 +3185,7 @@ recoveryApplyDelay(XLogReaderState *record) while (true) { - ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + ResetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch); /* This might change recovery_min_apply_delay. */ ProcessStartupProcInterrupts(); @@ -3028,7 +3210,7 @@ recoveryApplyDelay(XLogReaderState *record) elog(DEBUG2, "recovery apply delay %ld milliseconds", msecs); - (void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch, + (void) WaitLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, msecs, WAIT_EVENT_RECOVERY_APPLY_DELAY); @@ -3100,7 +3282,7 @@ ConfirmRecoveryPaused(void) * (emode must be either PANIC, LOG). In standby mode, retries until a valid * record is available. */ -static XLogRecord * +XLogRecord * ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, TimeLineID replayTLI) { @@ -3108,7 +3290,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher); XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - Assert(AmStartupProcess() || !IsUnderPostmaster); + Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline()); /* Pass through parameters to XLogPageRead */ private->fetching_ckpt = fetching_ckpt; @@ -3143,6 +3325,17 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, { abortedRecPtr = xlogreader->abortedRecPtr; missingContrecPtr = xlogreader->missingContrecPtr; + + /* + * Also update the shared state if necessary + */ + if (wal_pipeline_enabled) + { + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->abortedRecPtr = abortedRecPtr; + XLogRecoveryCtl->missingContrecPtr = missingContrecPtr; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } } if (readFile >= 0) @@ -3210,9 +3403,31 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, if (!InArchiveRecovery && ArchiveRecoveryRequested && !fetching_ckpt) { + /* + * Wait for the startup process to apply the last sent record + * by the pipeline, otherwise we will fail the consistency + * check as all the records decoded by the pipeline have not + * arrived/consumed by the consumer (statup proc) yet. + */ + if (wal_pipeline_enabled && AmWalPipeline()) + WalPipeline_WaitForConsumerCatchup(); + ereport(DEBUG1, (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery"))); InArchiveRecovery = true; + + if (wal_pipeline_enabled) + { + /* also update the shared state */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery; + + /* update startup proc (consumer) about the standbymode */ + if (StandbyModeRequested) + XLogRecoveryCtl->stanbyEnabled = true; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + if (StandbyModeRequested) EnableStandbyMode(); @@ -3269,7 +3484,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int +int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { @@ -3281,7 +3496,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, int r; instr_time io_start; - Assert(AmStartupProcess() || !IsUnderPostmaster); + Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline()); XLByteToSeg(targetPagePtr, targetSegNo, wal_segment_size); targetPageOff = XLogSegmentOffset(targetPagePtr, wal_segment_size); @@ -3707,7 +3922,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, LSN_FORMAT_ARGS(RecPtr)); /* Do background tasks that might benefit us later. */ - KnownAssignedTransactionIdsIdleMaintenance(); + if (AmStartupProcess()) + KnownAssignedTransactionIdsIdleMaintenance(); (void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch, WL_LATCH_SET | WL_TIMEOUT | @@ -3719,6 +3935,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, /* Handle interrupt signals of startup process */ ProcessStartupProcInterrupts(); + if (wal_pipeline_enabled) + ProcessPipelineBgwInterrupts(); } last_fail_time = now; currentSource = XLOG_FROM_ARCHIVE; @@ -3744,6 +3962,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, xlogSourceNames[oldSource], xlogSourceNames[currentSource], lastSourceFailed ? "failure" : "success"); + if (wal_pipeline_enabled) + { + /* also update the shared state */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->currentSource = currentSource; + pendingWalRcvRestart = XLogRecoveryCtl->pendingWalRcvRestart; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + /* * We've now handled possible failure. Try to read from the chosen * source. @@ -3818,6 +4045,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, } pendingWalRcvRestart = false; + if (wal_pipeline_enabled) + { + /* also update the shared state */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->pendingWalRcvRestart = false; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + /* * Launch walreceiver if needed. * @@ -3895,6 +4130,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (latestChunkStart <= RecPtr) { XLogReceiptTime = GetCurrentTimestamp(); + + if (wal_pipeline_enabled) + { + /* also update the shared state */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + SetCurrentChunkStartTime(XLogReceiptTime); } } @@ -3923,7 +4167,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, if (readFile < 0) { if (!expectedTLEs) + { expectedTLEs = readTimeLineHistory(recoveryTargetTLI); + PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI); + } + + readFile = XLogFileRead(readSegNo, receiveTLI, XLOG_FROM_STREAM, false); Assert(readFile >= 0); @@ -3933,6 +4182,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, /* just make sure source info is correct... */ readSource = XLOG_FROM_STREAM; XLogReceiptSource = XLOG_FROM_STREAM; + if (wal_pipeline_enabled) + { + /* also update the shared state */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } return XLREAD_SUCCESS; } break; @@ -3970,15 +4226,35 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ if (!streaming_reply_sent) { - WalRcvForceReply(); - streaming_reply_sent = true; + if (wal_pipeline_enabled && AmWalPipeline()) + { + /* + * In case of pipeline enabled, we cannot just call + * WalRcvForceReply() directly as the consumer (startup proc) + * haven't actually received/replayed all the wal + * received from the wal_receiver yet. + */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->flushedUptoRecPtr = flushedUpto; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + streaming_reply_sent = true; + } + else + { + WalRcvForceReply(); + streaming_reply_sent = true; + } } /* Do any background tasks that might benefit us later. */ - KnownAssignedTransactionIdsIdleMaintenance(); + if (AmStartupProcess()) + KnownAssignedTransactionIdsIdleMaintenance(); /* Update pg_stat_recovery_prefetch before sleeping. */ - XLogPrefetcherComputeStats(xlogprefetcher); + if (AmWalPipeline()) + XLogPrefetcherComputeStats(xlogprefetcher_pipelined); + else + XLogPrefetcherComputeStats(xlogprefetcher); /* * Wait for more WAL to arrive, when we will be woken @@ -4009,6 +4285,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * process. */ ProcessStartupProcInterrupts(); + if (wal_pipeline_enabled) + ProcessPipelineBgwInterrupts(); } return XLREAD_FAIL; /* not reached */ @@ -4173,6 +4451,7 @@ rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN) recoveryTargetTLI = newtarget; list_free_deep(expectedTLEs); expectedTLEs = newExpectedTLEs; + PipelineConsumerexpectedTLEsUpdateTLI(newtarget); /* * As in StartupXLOG(), try to ensure we have all the history files @@ -4262,6 +4541,15 @@ XLogFileRead(XLogSegNo segno, TimeLineID tli, if (source != XLOG_FROM_STREAM) XLogReceiptTime = GetCurrentTimestamp(); + if (wal_pipeline_enabled) + { + /* also update the shared state */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime; + XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + return fd; } if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ @@ -4346,7 +4634,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source) { elog(DEBUG1, "got WAL segment from archive"); if (!expectedTLEs) + { expectedTLEs = tles; + PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI); + } return fd; } } @@ -4357,7 +4648,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source) if (fd != -1) { if (!expectedTLEs) + { expectedTLEs = tles; + PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI); + } return fd; } } @@ -4379,16 +4673,52 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source) void StartupRequestWalReceiverRestart(void) { + + /* + * currentSource is also defined as pipeline shared state variable. + * Update the state before procedding. + */ + if (wal_pipeline_enabled) + { + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + currentSource = XLogRecoveryCtl->currentSource; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + if (currentSource == XLOG_FROM_STREAM && WalRcvRunning()) { ereport(LOG, (errmsg("WAL receiver process shutdown requested"))); pendingWalRcvRestart = true; + + /* + * pendingWalRcvRestart is also defined as pipeline shared state variable. + * Update the state before procedding. + */ + if (wal_pipeline_enabled) + { + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } } } +/* + * standbyState is also defined as a shared state. Pipeline worker can also + * update its value, so always confirm the shared state before procedding. + */ +void +SetSharedHotStandbyState(void) +{ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->standbyState = standbyState; + SpinLockRelease(&XLogRecoveryCtl->info_lck); +} + + /* * Has a standby promotion already been triggered? * @@ -4440,7 +4770,7 @@ CheckForStandbyTrigger(void) if (LocalPromoteIsTriggered) return true; - if (IsPromoteSignaled() && CheckPromoteSignal()) + if (CheckPromoteSignal()) { ereport(LOG, (errmsg("received promote request"))); RemovePromoteSignalFiles(); @@ -4483,6 +4813,7 @@ void WakeupRecovery(void) { SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + SetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch); } /* @@ -4652,6 +4983,14 @@ GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream) */ Assert(InRecovery); + if (wal_pipeline_enabled) + { + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogReceiptTime = XLogRecoveryCtl->XLogReceiptTime; + XLogReceiptSource = XLogRecoveryCtl->XLogReceiptSource; + SpinLockRelease(&XLogRecoveryCtl->info_lck); + } + *rtime = XLogReceiptTime; *fromStream = (XLogReceiptSource == XLOG_FROM_STREAM); } @@ -4737,6 +5076,60 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue } } +static void +InitializePipelineStartupEnv(WalPipelineParams *params) +{ + /* + * These parameters are already set for the startup process but not for our + * pipeline worker. So in order to start decoding through the pipeline, + * these variables should be saved and then restored later. + */ + params->NextRecPtr = xlogreader->NextRecPtr; + params->recoveryTargetTLI = recoveryTargetTLI; + params->StandbyModeRequested = StandbyModeRequested; + params->StandbyMode = StandbyMode; + params->ArchiveRecoveryRequested = ArchiveRecoveryRequested; + params->InArchiveRecovery = InArchiveRecovery; + params->minRecoveryPointTLI = minRecoveryPointTLI; + params->minRecoveryPoint = minRecoveryPoint; + params->InRedo = InRedo; + params->currentSource = currentSource; + params->lastSourceFailed = lastSourceFailed; + params->pendingWalRcvRestart = pendingWalRcvRestart; + params->RedoStartTLI = RedoStartTLI; + params->CheckPointLoc = CheckPointLoc; + params->CheckPointTLI = CheckPointTLI; + params->RedoStartLSN = RedoStartLSN; + params->standbyState = standbyState; + params->flushedUpto = flushedUpto; + params->receiveTLI = receiveTLI; + params->abortedRecPtr = abortedRecPtr; + params->missingContrecPtr = missingContrecPtr; + params->backupEndRequired = backupEndRequired; + params->backupStartPoint = backupStartPoint; + params->backupEndPoint = backupEndPoint; + params->curFileTLI = curFileTLI; + + /* + * The pipeline will do the waiting in this case startup proc should disown + * the latch. + */ + DisownRecoveryWakeupLatch(); + + /* + * Update shared state before starting. + */ + SpinLockAcquire(&XLogRecoveryCtl->info_lck); + XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery; + XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart; + XLogRecoveryCtl->abortedRecPtr = abortedRecPtr; + XLogRecoveryCtl->missingContrecPtr = missingContrecPtr; + XLogRecoveryCtl->currentSource = currentSource; + XLogRecoveryCtl->standbyState = standbyState; + XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource; + XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime; + SpinLockRelease(&XLogRecoveryCtl->info_lck); +} /* * Pipeline bgw should be aware of all the parameters thats been initialized by diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index f3ad90c7c7a..dbf0d39212a 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -1196,6 +1196,7 @@ standby_redo(XLogReaderState *record) running.xids = xlrec->xids; ProcArrayApplyRecoveryInfo(&running); + SetSharedHotStandbyState(); /* * The startup process currently has no convenient way to schedule diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h index 5740b05f79c..e2be9f9e395 100644 --- a/src/include/access/xlogpipeline.h +++ b/src/include/access/xlogpipeline.h @@ -173,12 +173,27 @@ extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined; extern Size WalPipelineShmemSize(void); extern void WalPipelineShmemInit(void); +/* Start/stop the pipeline */ +extern void WalPipeline_Start(WalPipelineParams *params); +extern void WalPipeline_Stop(void); + /* Producer functions (called by background worker) */ extern void WalPipeline_ProducerMain(Datum main_arg); extern bool WalPipeline_SendRecord(XLogReaderState *record); extern bool WalPipeline_SendShutdown(void); extern bool WalPipeline_SendError(int errcode, const char *errmsg); +/* Consumer functions (called by startup process) */ +extern DecodedXLogRecord *WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration); +extern bool WalPipeline_CheckProducerAlive(void); + +/* Status and monitoring */ +extern bool WalPipeline_IsActive(void); +extern void WalPipeline_WaitForConsumerCatchup(void); +extern void WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received, + XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn); +extern bool AmWalPipeline(void); + extern void ProcessPipelineBgwInterrupts(void); diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index e675ab8353d..b61aad08627 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -13,6 +13,7 @@ #include "access/xlogprefetcher.h" #include "access/xlogreader.h" +#include "access/xlogutils.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" #include "storage/condition_variable.h" @@ -106,6 +107,13 @@ typedef struct XLogRecoveryCtlData */ Latch recoveryWakeupLatch; + /* + * In case pipeline enabled we will need two latches. One that can be used + * by the pipeline for WAL waiting and other that can be used by the + * startup process for the apply delay + */ + Latch recoveryApplyDelayLatch; + /* * Last record successfully replayed. */ @@ -133,6 +141,46 @@ typedef struct XLogRecoveryCtlData ConditionVariable recoveryNotPausedCV; slock_t info_lck; /* locks shared variables shown above */ + + /* ------------------------------------------------------------------ + * Variables use for IPC between pipeline and the startup proc. + * These are also the static variables in xlogrecovery.c but there values + * keep on changing. So we added then in the shared memory so that both + * the pipeline and the startup proc stay synced on any of this state + * change + * ------------------------------------------------------------------ + */ + + /* + * Pipeline could be waiting for the startup process to catchup with the + * decoder. This could happend when no wait wal is available from the + * current resource and now pipline have change the wal srouce + * i.e enabling standby if requested. + */ + bool pipeline_waiting; + bool InArchiveRecovery; + bool pendingWalRcvRestart; + bool stanbyEnabled; + + /* The target TLI for which expectedTLEs should be recomputed */ + TimeLineID expectedTLEsUpdateTLI; + + /* + * Normaly we wakeup walrcvr after specific records have been applied, as + * reads are sequential so we wkaeup after specific read. But in case of pipeline + * reads (decoded records) could be way ahead of the consumer. We cannot wakeup + * wal rcvr based on read, so we tell consumer to wakup after apllied records + * upto flushedUptoRecPtr + */ + XLogRecPtr flushedUptoRecPtr; + XLogRecPtr abortedRecPtr; + XLogRecPtr missingContrecPtr; + + XLogSource currentSource; + XLogSource XLogReceiptSource; + + HotStandbyState standbyState; + TimestampTz XLogReceiptTime; } XLogRecoveryCtlData; extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl; @@ -242,6 +290,8 @@ extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, i extern bool PromoteIsTriggered(void); extern bool CheckPromoteSignal(void); extern void WakeupRecovery(void); +extern void DisownRecoveryWakeupLatch(void); +extern void SetSharedHotStandbyState(void); extern void StartupRequestWalReceiverRestart(void); extern void XLogRequestWalReceiverReply(void); -- 2.34.1