From d954d754656b9bc05da4c2edc0a4bad8b3118091 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 2 Sep 2025 12:43:24 -0400 Subject: [PATCH v3 3/9] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger batch, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level as well. The heuristic for eager eviction is to only flush buffers in the strategy ring which flushing does not require flushing WAL. This patch also is a stepping stone toward AIO writes. Earlier version Reviewed-by: Kirill Reshke Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 166 +++++++++++++++++++++++++- src/backend/storage/buffer/freelist.c | 63 ++++++++++ src/include/storage/buf_internals.h | 3 + 3 files changed, 229 insertions(+), 3 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 84ff5e0f1bf..90f36a04c19 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -534,6 +534,11 @@ static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object IOContext io_context, XLogRecPtr buffer_lsn); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static BufferDesc *next_strat_buf_to_flush(BufferAccessStrategy strategy, XLogRecPtr *lsn); +static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, + bool skip_pinned, + XLogRecPtr *max_lsn); static void CleanVictimBuffer(BufferAccessStrategy strategy, BufferDesc *bufdesc, uint32 *buf_state, bool from_ring); static void FindAndDropRelationBuffers(RelFileLocator rlocator, @@ -4253,6 +4258,31 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * Returns the buffer descriptor of the buffer containing the next block we + * should eagerly flush or or NULL when there are no further buffers to + * consider writing out. + */ +static BufferDesc * +next_strat_buf_to_flush(BufferAccessStrategy strategy, + XLogRecPtr *lsn) +{ + Buffer bufnum; + BufferDesc *bufdesc; + + while ((bufnum = StrategySweepNextBuffer(strategy)) != InvalidBuffer) + { + if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum, + InvalidBlockNumber, + NULL, + true, + lsn)) != NULL) + return bufdesc; + } + + return NULL; +} + /* * Prepare to write and write a dirty victim buffer. */ @@ -4263,6 +4293,7 @@ CleanVictimBuffer(BufferAccessStrategy strategy, XLogRecPtr max_lsn = InvalidXLogRecPtr; LWLock *content_lock; + bool first_buffer = true; IOContext io_context = IOContextForStrategy(strategy); Assert(*buf_state & BM_DIRTY); @@ -4271,11 +4302,140 @@ CleanVictimBuffer(BufferAccessStrategy strategy, if (!PrepareFlushBuffer(bufdesc, buf_state, &max_lsn)) return; - DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + if (from_ring && strategy_supports_eager_flush(strategy)) + { + /* Clean victim buffer and find more to flush opportunistically */ + StartStrategySweep(strategy); + do + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + /* We leave the first buffer pinned for the caller */ + if (!first_buffer) + UnpinBuffer(bufdesc); + first_buffer = false; + } while ((bufdesc = next_strat_buf_to_flush(strategy, &max_lsn)) != NULL); + } + else + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + } +} + +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, returns the block -- the pointer to the block data in memory + * -- which we will opportunistically flush or NULL if this buffer does not + * contain a block that should be flushed. + * + * require is the BlockNumber required by the caller. Some callers may require + * a specific BlockNumber to be in bufnum because they are assembling a + * contiguous run of blocks. + * + * If the caller needs the block to be from a specific relation, rlocator will + * be provided. + */ +BufferDesc * +PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, bool skip_pinned, + XLogRecPtr *max_lsn) +{ + BufferDesc *bufdesc; + uint32 buf_state; + XLogRecPtr lsn; + BlockNumber blknum; + LWLock *content_lock; + + if (!BufferIsValid(bufnum)) + return NULL; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + + /* Block may need to be in a specific relation */ + if (rlocator && + !RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag), + *rlocator)) + return NULL; + + /* Must do this before taking the buffer header spinlock. */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + buf_state = LockBufHdr(bufdesc); + + if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID)) + goto except_unlock_header; + + /* We don't include used buffers in batches */ + if (skip_pinned && + (BUF_STATE_GET_REFCOUNT(buf_state) > 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) > 1)) + goto except_unlock_header; + + /* Get page LSN while holding header lock */ + lsn = BufferGetLSN(bufdesc); + + PinBuffer_Locked(bufdesc); + CheckBufferIsPinnedOnce(bufnum); + + blknum = BufferGetBlockNumber(bufnum); + Assert(BlockNumberIsValid(blknum)); + + /* If we'll have to flush WAL to flush the block, we're done */ + if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn)) + goto except_unpin_buffer; + + /* We only include contiguous blocks in the run */ + if (BlockNumberIsValid(require) && blknum != require) + goto except_unpin_buffer; + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + goto except_unpin_buffer; + + /* + * Now that we have the content lock, we need to recheck if we need to + * flush WAL. + */ + buf_state = LockBufHdr(bufdesc); + lsn = BufferGetLSN(bufdesc); + UnlockBufHdr(bufdesc, buf_state); + + if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn)) + goto except_unlock_content; + + /* Try to start an I/O operation. */ + if (!StartBufferIO(bufdesc, false, true)) + goto except_unlock_content; + + if (lsn > *max_lsn) + *max_lsn = lsn; + buf_state = LockBufHdr(bufdesc); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(bufdesc, buf_state); + + return bufdesc; + +except_unlock_content: LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &bufdesc->tag); + +except_unpin_buffer: + UnpinBuffer(bufdesc); + return NULL; + +except_unlock_header: + UnlockBufHdr(bufdesc, buf_state); + return NULL; } /* diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index a90a7ed4e16..e26a546bc99 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -75,6 +75,15 @@ typedef struct BufferAccessStrategyData */ int current; + /* + * If the strategy supports eager flushing, we may initiate a sweep of the + * strategy ring, flushing all the dirty buffers we can cheaply flush. + * sweep_start and sweep_current keep track of a given sweep so we don't + * loop around the ring infinitely. + */ + int sweep_start; + int sweep_current; + /* * Array of buffer numbers. InvalidBuffer (that is, zero) indicates we * have not yet selected a buffer for this ring slot. For allocation @@ -156,6 +165,31 @@ ClockSweepTick(void) return victim; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lean to better I/O + * patterns than lazily flushing buffers directly before reusing them. + */ +bool +strategy_supports_eager_flush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -270,6 +304,35 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * Return the next buffer in the ring or InvalidBuffer if the current sweep is + * over. + */ +Buffer +StrategySweepNextBuffer(BufferAccessStrategy strategy) +{ + strategy->sweep_current++; + if (strategy->sweep_current >= strategy->nbuffers) + strategy->sweep_current = 0; + + if (strategy->sweep_current == strategy->sweep_start) + return InvalidBuffer; + + return strategy->buffers[strategy->sweep_current]; +} + +/* + * Start a sweep of the strategy ring. + */ +void +StartStrategySweep(BufferAccessStrategy strategy) +{ + if (!strategy) + return; + strategy->sweep_start = strategy->sweep_current = strategy->current; +} + + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index b1b81f31419..7963d1189a6 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -437,6 +437,9 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ +extern bool strategy_supports_eager_flush(BufferAccessStrategy strategy); +extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy); +extern void StartStrategySweep(BufferAccessStrategy strategy); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); -- 2.43.0