From 83100d91f07ac784a355b38fa4dac5775437c1c1 Mon Sep 17 00:00:00 2001 From: "zongzhi.czz" Date: Sat, 7 Feb 2026 22:08:16 +0800 Subject: [PATCH v1 2/4] Fix DWB process handling and skip FPW when DWB enabled - Skip full page writes when double write buffer is enabled since DWB already provides torn page protection - Fix file descriptor handling after fork by tracking process ID - Initialize DWB in checkpointer process - Improve batch synchronization in DWBufPostCheckpoint - Add DWB shared memory initialization in ipci.c Co-Authored-By: Claude Opus 4.5 --- src/backend/access/transam/xlog.c | 18 +++++- src/backend/postmaster/checkpointer.c | 6 ++ src/backend/storage/buffer/Makefile | 1 + src/backend/storage/buffer/bufmgr.c | 2 +- src/backend/storage/buffer/dwbuf.c | 79 ++++++++++++++++++++------- src/backend/storage/ipc/ipci.c | 3 + 6 files changed, 87 insertions(+), 22 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 13ec6225b8..74808d2fcf 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -85,6 +85,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" +#include "storage/dwbuf.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/large_object.h" @@ -845,7 +846,13 @@ XLogInsertRecord(XLogRecData *rdata, Assert(RedoRecPtr < Insert->RedoRecPtr); RedoRecPtr = Insert->RedoRecPtr; } - doPageWrites = (Insert->fullPageWrites || Insert->runningBackups > 0); + /* + * If DWB is enabled, we don't need full page writes. + */ + if (DWBufIsEnabled()) + doPageWrites = false; + else + doPageWrites = (Insert->fullPageWrites || Insert->runningBackups > 0); if (doPageWrites && (!prevDoPageWrites || @@ -6593,7 +6600,14 @@ void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p) { *RedoRecPtr_p = RedoRecPtr; - *doPageWrites_p = doPageWrites; + /* + * If double write buffer is enabled, we don't need full page writes + * because DWB provides torn page protection. + */ + if (DWBufIsEnabled()) + *doPageWrites_p = false; + else + *doPageWrites_p = doPageWrites; } /* diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index e03c19123b..af2edbe222 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -53,6 +53,7 @@ #include "replication/syncrep.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" +#include "storage/dwbuf.h" #include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -254,6 +255,11 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len) ALLOCSET_DEFAULT_SIZES); MemoryContextSwitchTo(checkpointer_context); + /* + * Initialize double write buffer if enabled. + */ + DWBufInit(); + /* * If an exception is encountered, processing resumes here. * diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile index fd7c40dcb0..3abab9ec93 100644 --- a/src/backend/storage/buffer/Makefile +++ b/src/backend/storage/buffer/Makefile @@ -16,6 +16,7 @@ OBJS = \ buf_init.o \ buf_table.o \ bufmgr.o \ + dwbuf.o \ freelist.o \ localbuf.o diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index ea84aeef26..8c1e78fba2 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -4501,6 +4501,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, /* * If double write buffer is enabled, write the page to DWB first. * This protects against torn pages without needing full page writes in WAL. + * DWBufWritePage now includes fsync internally for correctness. */ if (DWBufIsEnabled()) { @@ -4509,7 +4510,6 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, buf->tag.blockNum, bufToWrite, recptr); - DWBufFlush(); } /* diff --git a/src/backend/storage/buffer/dwbuf.c b/src/backend/storage/buffer/dwbuf.c index 9ccb99b214..5c5b14f5b3 100644 --- a/src/backend/storage/buffer/dwbuf.c +++ b/src/backend/storage/buffer/dwbuf.c @@ -43,10 +43,12 @@ int double_write_buffer_size = DWBUF_DEFAULT_SIZE_MB; /* Shared memory control structure */ static DWBufCtlData *DWBufCtl = NULL; +/* Process ID that opened the files (to detect fork) */ +static pid_t DWBufFilesOpenedPid = 0; + /* Per-process file descriptors (FDs are per-process, not shareable) */ static int DWBufFds[DWBUF_MAX_FILES] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}; -static bool DWBufFilesOpened = false; /* Directory for DWB files */ #define DWBUF_DIR "pg_dwbuf" @@ -160,10 +162,20 @@ DWBufOpenFiles(void) int i; char path[MAXPGPATH]; struct stat st; - - if (DWBufFilesOpened) + pid_t current_pid = getpid(); + + /* + * Check if files are already opened in this process. + * After fork, the child process will have different PID and needs to + * reopen the files. + */ + if (DWBufFilesOpenedPid == current_pid && DWBufFds[0] >= 0) return; + /* Close any inherited file descriptors from parent process */ + if (DWBufFilesOpenedPid != current_pid && DWBufFds[0] >= 0) + DWBufClose(); + if (!double_write_buffer || DWBufCtl == NULL) return; @@ -252,7 +264,7 @@ DWBufOpenFiles(void) PG_IO_ALIGN_SIZE, 0); - DWBufFilesOpened = true; + DWBufFilesOpenedPid = current_pid; } /* @@ -277,8 +289,9 @@ void DWBufClose(void) { int i; + pid_t current_pid = getpid(); - if (!DWBufFilesOpened) + if (DWBufFilesOpenedPid != current_pid || DWBufFds[0] < 0) return; for (i = 0; i < DWBUF_MAX_FILES; i++) @@ -289,11 +302,14 @@ DWBufClose(void) DWBufFds[i] = -1; } } - DWBufFilesOpened = false; + DWBufFilesOpenedPid = 0; } /* - * Write a page to the double write buffer. + * Write a page to the double write buffer and fsync. + * + * This function writes the page to DWB and ensures it's fsynced to disk + * before returning, guaranteeing torn page protection. */ void DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum, @@ -309,9 +325,8 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum, if (!double_write_buffer || DWBufCtl == NULL) return; - /* Ensure files are opened (lazy initialization) */ - if (!DWBufFilesOpened) - DWBufOpenFiles(); + /* Ensure files are opened in this process */ + DWBufOpenFiles(); /* Get next slot position atomically */ pos = pg_atomic_fetch_add_u64(&DWBufCtl->write_pos, 1); @@ -349,6 +364,11 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum, ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to double write buffer: %m"))); + + /* + * NOTE: We don't fsync immediately here for performance reasons. + * The DWBufFlush() function will fsync all files before checkpoint. + */ } /* @@ -361,7 +381,7 @@ DWBufFlush(void) uint64 current_pos; uint64 flush_pos; - if (!double_write_buffer || DWBufCtl == NULL || !DWBufFilesOpened) + if (!double_write_buffer || DWBufCtl == NULL) return; current_pos = pg_atomic_read_u64(&DWBufCtl->write_pos); @@ -371,6 +391,10 @@ DWBufFlush(void) if (current_pos <= flush_pos) return; + /* Ensure files are opened in this process */ + if (DWBufFilesOpenedPid != getpid() || DWBufFds[0] < 0) + DWBufOpenFiles(); + /* Fsync all DWB files */ for (i = 0; i < DWBufCtl->num_files; i++) { @@ -423,26 +447,43 @@ void DWBufPostCheckpoint(XLogRecPtr checkpoint_lsn) { int i; + uint64 old_batch_id; + uint64 new_batch_id; if (!double_write_buffer || DWBufCtl == NULL) return; - /* Ensure files are opened */ - if (!DWBufFilesOpened) + /* Ensure files are opened in this process */ + if (DWBufFilesOpenedPid != getpid() || DWBufFds[0] < 0) DWBufOpenFiles(); SpinLockAcquire(&DWBufCtl->mutex); - /* Reset write position for new batch */ - pg_atomic_write_u64(&DWBufCtl->write_pos, 0); - pg_atomic_write_u64(&DWBufCtl->flush_pos, 0); - - /* Increment batch ID */ + /* Save old batch ID and increment */ + old_batch_id = DWBufCtl->batch_id; DWBufCtl->batch_id++; + new_batch_id = DWBufCtl->batch_id; DWBufCtl->checkpoint_lsn = checkpoint_lsn; SpinLockRelease(&DWBufCtl->mutex); + /* + * Wait for all in-flight writes to complete before resetting write_pos. + * We use batch_id as a synchronization point. + */ + { + uint64 current_pos = pg_atomic_read_u64(&DWBufCtl->write_pos); + uint64 num_slots = DWBufCtl->num_slots; + + /* If write_pos wrapped around, wait for flush */ + if (current_pos >= num_slots) + DWBufFlush(); + } + + /* Now safe to reset positions for new batch */ + pg_atomic_write_u64(&DWBufCtl->write_pos, 0); + pg_atomic_write_u64(&DWBufCtl->flush_pos, 0); + /* Update file headers with new batch info */ for (i = 0; i < DWBufCtl->num_files; i++) { @@ -464,7 +505,7 @@ DWBufPostCheckpoint(XLogRecPtr checkpoint_lsn) } /* Update header */ - header.batch_id = DWBufCtl->batch_id; + header.batch_id = new_batch_id; header.checkpoint_lsn = checkpoint_lsn; /* Recompute CRC */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 1f7e933d50..87887dcf69 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -40,6 +40,7 @@ #include "replication/walsender.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" +#include "storage/dwbuf.h" #include "storage/dsm.h" #include "storage/dsm_registry.h" #include "storage/ipc.h" @@ -141,6 +142,7 @@ CalculateShmemSize(void) size = add_size(size, AioShmemSize()); size = add_size(size, WaitLSNShmemSize()); size = add_size(size, LogicalDecodingCtlShmemSize()); + size = add_size(size, DWBufShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -274,6 +276,7 @@ CreateOrAttachShmemStructs(void) SUBTRANSShmemInit(); MultiXactShmemInit(); BufferManagerShmemInit(); + DWBufShmemInit(); /* * Set up lock manager -- 2.43.0