From e5c31a39c1be0e6d4d04be54ea83d9a640dd7a8d Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 7 Jan 2026 14:56:49 -0500 Subject: [PATCH v14 05/12] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger run, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level. The heuristic for eager eviction is to only flush buffers in the strategy ring which do not require a WAL flush. This patch also is a step toward AIO writes, as it lines up multiple buffers that can be issued asynchronously once the infrastructure exists. Author: Melanie Plageman Reviewed-by: Chao Li Reviewed-by: Nazir Bilal Yavuz Earlier version Reviewed-by: Kirill Reshke Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 130 +++++++++++++++++++++++++- src/backend/storage/buffer/freelist.c | 48 ++++++++++ src/include/storage/buf_internals.h | 4 + 3 files changed, 181 insertions(+), 1 deletion(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 902b91ab6c2..9cb84328d89 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -653,6 +653,7 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, BufferTag *tag); +static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -2547,8 +2548,54 @@ again: ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, &buf_hdr->tag); - } + if (from_ring && StrategySupportsEagerFlush(strategy)) + { + BufferDesc *next_bufdesc; + Buffer next_buf; + Buffer sweep_end = buf; + int cursor = StrategyGetCurrentIndex(strategy); + + /* + * Loop around strategy ring one time eagerly flushing all of the + * eligible buffers. + */ + for (;;) + { + next_buf = StrategyNextBuffer(strategy, &cursor); + + /* Completed one sweep of the strategy ring */ + if (next_buf == sweep_end) + break; + + /* + * For strategies currently supporting eager flush + * (BAS_BULKWRITE, eventually BAS_VACUUM), once you hit an + * InvalidBuffer, the remaining buffers in the ring will be + * invalid. If BAS_BULKREAD is someday supported, this logic + * will have to change. + */ + if (!BufferIsValid(next_buf)) + break; + + /* + * Check buffer eager flush eligibility. If the buffer is + * ineligible, we'll keep looking until we complete one full + * sweep around the ring. + */ + next_bufdesc = PrepareOrRejectEagerFlushBuffer(next_buf); + + if (next_bufdesc) + { + FlushBuffer(next_bufdesc, NULL, IOOBJECT_RELATION, io_context); + BufferLockUnlock(next_buf, next_bufdesc); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &next_bufdesc->tag); + UnpinBuffer(next_bufdesc); + } + } + } + } if (buf_state & BM_VALID) { @@ -4595,6 +4642,87 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, error_context_stack = errcallback.previous; } +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, return the buffer descriptor of the buffer to eagerly flush, + * pinned and locked and with BM_IO_IN_PROGRESS set, or NULL if this buffer + * does not contain a block that should be flushed. + * + * If returning a buffer, also return its LSN. + */ +static BufferDesc * +PrepareOrRejectEagerFlushBuffer(Buffer bufnum) +{ + BufferDesc *bufdesc; + uint64 buf_state; + + if (!BufferIsValid(bufnum)) + goto reject_buffer; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + buf_state = pg_atomic_read_u64(&bufdesc->state); + + /* + * Quick racy check to see if the buffer is clean, in which case we don't + * need to flush it. We'll recheck if it is dirty again later before + * actually setting BM_IO_IN_PROGRESS. + */ + if (!(buf_state & BM_DIRTY)) + goto reject_buffer; + + /* + * Quick check to see if the buffer is pinned, in which case it is more + * likely to be dirtied again soon, and we don't want to eagerly flush it. + * We don't care if it has a non-zero usage count because we don't need to + * reuse it right away and a non-zero usage count doesn't necessarily mean + * it will be dirtied again soon. + */ + if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) + goto reject_buffer; + + /* + * Don't eagerly flush buffers requiring WAL flush. We must check this + * again later while holding the buffer content lock for correctness. + */ + if (BufferNeedsWALFlush(bufdesc, false)) + goto reject_buffer; + + /* + * Ensure that there's a free refcount entry and resource owner slot for + * the pin before pinning the buffer. While this may leak a refcount and + * slot if we return without a buffer, that slot will be reused. + */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + /* There is no need to flush the buffer if it is not BM_VALID */ + if (!PinBuffer(bufdesc, BUC_ZERO, true /* skip_if_not_valid */ )) + goto reject_buffer; + + CheckBufferIsPinnedOnce(bufnum); + + if (!BufferLockConditional(bufnum, bufdesc, BUFFER_LOCK_SHARE)) + goto reject_buffer_unpin; + + /* Now that we have the lock, recheck if it needs WAL flush */ + if (BufferNeedsWALFlush(bufdesc, false)) + goto reject_buffer_unlock; + + return bufdesc; + +reject_buffer_unlock: + BufferLockUnlock(bufnum, bufdesc); + +reject_buffer_unpin: + UnpinBuffer(bufdesc); + +reject_buffer: + return NULL; +} + /* * Convenience wrapper around FlushBuffer() that locks/unlocks the buffer * before/after calling FlushBuffer(). diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 37398865d19..9623f04b7e4 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -155,6 +155,31 @@ ClockSweepTick(void) return victim; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lead to better I/O + * patterns than lazily flushing buffers immediately before reusing them. + */ +bool +StrategySupportsEagerFlush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -306,6 +331,29 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r } } +/* + * Returns the next buffer in the ring after the one at cursor and increments + * cursor. + */ +Buffer +StrategyNextBuffer(BufferAccessStrategy strategy, int *cursor) +{ + if (++(*cursor) >= strategy->nbuffers) + *cursor = 0; + + return strategy->buffers[*cursor]; +} + +/* + * Return the current slot in the strategy ring. + */ +int +StrategyGetCurrentIndex(BufferAccessStrategy strategy) +{ + return strategy->current; +} + + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 0899be8da48..088f2fbdde4 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -561,6 +561,10 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag /* freelist.c */ +extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); +extern Buffer StrategyNextBuffer(BufferAccessStrategy strategy, + int *cursor); +extern int StrategyGetCurrentIndex(BufferAccessStrategy strategy); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_ring); -- 2.43.0