From 6c8be932d24a54d1208b2356960f6352d9808b7d Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 15 Oct 2025 13:15:43 -0400 Subject: [PATCH v6 3/7] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger run, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level. The heuristic for eager eviction is to only flush buffers in the strategy ring which do not require a WAL flush. This patch also is a step toward AIO writes. Reviewed-by: Chao Li Reviewed-by: Nazir Bilal Yavuz Earlier version Reviewed-by: Kirill Reshke Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 238 +++++++++++++++++++++++++- src/backend/storage/buffer/freelist.c | 47 +++++ src/include/storage/buf_internals.h | 4 + 3 files changed, 281 insertions(+), 8 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 474afdcb4fe..41690fd9165 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -531,14 +531,25 @@ static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_c static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); + static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); -static bool PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn); +static BufferDesc *NextStratBufToFlush(BufferAccessStrategy strategy, + Buffer sweep_end, + XLogRecPtr *lsn, int *sweep_cursor); + +static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn); +static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, bool skip_pinned, + XLogRecPtr *max_lsn); +static bool PrepareFlushBuffer(BufferDesc *bufdesc, + XLogRecPtr *lsn); static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context, XLogRecPtr buffer_lsn); -static void CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring, - IOContext io_context); +static void CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, + bool from_ring, IOContext io_context); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -2395,7 +2406,7 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) } /* Content lock is released inside CleanVictimBuffer */ - CleanVictimBuffer(buf_hdr, from_ring, io_context); + CleanVictimBuffer(strategy, buf_hdr, from_ring, io_context); } @@ -4279,6 +4290,61 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * Returns true if the buffer needs WAL flushed before it can be written out. + * Caller must not already hold the buffer header spinlock. + */ +static bool +BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn) +{ + uint32 buf_state = LockBufHdr(bufdesc); + + *lsn = BufferGetLSN(bufdesc); + + UnlockBufHdr(bufdesc, buf_state); + + /* + * See buffer flushing code for more details on why we condition this on + * the relation being logged. + */ + return buf_state & BM_PERMANENT && XLogNeedsFlush(*lsn); +} + + +/* + * Returns the buffer descriptor of the buffer containing the next block we + * should eagerly flush or NULL when there are no further buffers to consider + * writing out. + */ +static BufferDesc * +NextStratBufToFlush(BufferAccessStrategy strategy, + Buffer sweep_end, + XLogRecPtr *lsn, int *sweep_cursor) +{ + Buffer bufnum; + BufferDesc *bufdesc; + + while ((bufnum = + StrategySweepNextBuffer(strategy, sweep_cursor)) != sweep_end) + { + /* + * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining + * buffers in the ring will be invalid. + */ + if (!BufferIsValid(bufnum)) + break; + + if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum, + InvalidBlockNumber, + NULL, + true, + lsn)) != NULL) + return bufdesc; + } + + return NULL; +} + /* * Prepare and write out a dirty victim buffer. * @@ -4289,22 +4355,178 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * bufdesc may be modified. */ static void -CleanVictimBuffer(BufferDesc *bufdesc, +CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, bool from_ring, IOContext io_context) { XLogRecPtr max_lsn = InvalidXLogRecPtr; LWLock *content_lock; + bool first_buffer = true; /* Set up this victim buffer to be flushed */ if (!PrepareFlushBuffer(bufdesc, &max_lsn)) return; - DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + if (from_ring && StrategySupportsEagerFlush(strategy)) + { + Buffer sweep_end = BufferDescriptorGetBuffer(bufdesc); + int cursor = StrategySweepStart(strategy); + + /* Clean victim buffer and find more to flush opportunistically */ + do + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + /* We leave the first buffer pinned for the caller */ + if (!first_buffer) + UnpinBuffer(bufdesc); + first_buffer = false; + } while ((bufdesc = NextStratBufToFlush(strategy, sweep_end, + &max_lsn, &cursor)) != NULL); + } + else + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + } +} + +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, return the buffer descriptor of the buffer to eagerly flush, + * pinned and locked, or NULL if this buffer does not contain a block that + * should be flushed. + * + * require is the BlockNumber required by the caller. Some callers may require + * a specific BlockNumber to be in bufnum because they are assembling a + * contiguous run of blocks. + * + * If the caller needs the block to be from a specific relation, rlocator will + * be provided. + */ +static BufferDesc * +PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, bool skip_pinned, + XLogRecPtr *max_lsn) +{ + BufferDesc *bufdesc; + uint32 old_buf_state; + uint32 buf_state; + XLogRecPtr lsn; + BlockNumber blknum; + LWLock *content_lock; + + if (!BufferIsValid(bufnum)) + return NULL; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + + /* Block may need to be in a specific relation */ + if (rlocator && + !RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag), + *rlocator)) + return NULL; + + /* + * Ensure that theres a free refcount entry and resource owner slot for + * the pin before pinning the buffer. While this may leake a refcount and + * slot if we return without a buffer, we should use that slot the next + * time we try and reserve a spot. + */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + /* + * Check whether the buffer can be used and pin it if so. Do this using a + * CAS loop, to avoid having to lock the buffer header. We have to lock + * the buffer header later if we succeed in pinning the buffer here, but + * avoiding locking the buffer header if the buffer is in use is worth it. + */ + old_buf_state = pg_atomic_read_u32(&bufdesc->state); + + for (;;) + { + buf_state = old_buf_state; + + if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID)) + return NULL; + + /* We don't eagerly flush buffers used by others */ + if (skip_pinned && + (BUF_STATE_GET_REFCOUNT(buf_state) > 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) > 1)) + return NULL; + + if (unlikely(buf_state & BM_LOCKED)) + { + old_buf_state = WaitBufHdrUnlocked(bufdesc); + continue; + } + + /* pin the buffer if the CAS succeeds */ + buf_state += BUF_REFCOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&bufdesc->state, &old_buf_state, + buf_state)) + { + TrackNewBufferPin(BufferDescriptorGetBuffer(bufdesc)); + break; + } + } + + CheckBufferIsPinnedOnce(bufnum); + + blknum = BufferGetBlockNumber(bufnum); + Assert(BlockNumberIsValid(blknum)); + + /* We only include contiguous blocks in the run */ + if (BlockNumberIsValid(require) && blknum != require) + goto except_unpin_buffer; + + /* Don't eagerly flush buffers requiring WAL flush */ + if (BufferNeedsWALFlush(bufdesc, &lsn)) + goto except_unpin_buffer; + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + goto except_unpin_buffer; + + /* + * Now that we have the content lock, we need to recheck if we need to + * flush WAL. + */ + if (BufferNeedsWALFlush(bufdesc, &lsn)) + goto except_unpin_buffer; + + /* Try to start an I/O operation */ + if (!StartBufferIO(bufdesc, false, true)) + goto except_unlock_content; + + if (lsn > *max_lsn) + *max_lsn = lsn; + + buf_state = LockBufHdr(bufdesc); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(bufdesc, buf_state); + + return bufdesc; + +except_unlock_content: LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &bufdesc->tag); + +except_unpin_buffer: + UnpinBuffer(bufdesc); + return NULL; } /* diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index b76be264eb5..71fed9d6ebd 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -156,6 +156,31 @@ ClockSweepTick(void) return victim; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lead to better I/O + * patterns than lazily flushing buffers immediately before reusing them. + */ +bool +StrategySupportsEagerFlush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -307,6 +332,28 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * Return the next buffer in the ring + */ +Buffer +StrategySweepNextBuffer(BufferAccessStrategy strategy, int *sweep_cursor) +{ + if (++(*sweep_cursor) >= strategy->nbuffers) + *sweep_cursor = 0; + + return strategy->buffers[*sweep_cursor]; +} + +/* + * Return the starting buffer of a sweep of the strategy ring + */ +int +StrategySweepStart(BufferAccessStrategy strategy) +{ + return strategy->current; +} + + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 7e258383048..b48dece3e63 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -442,6 +442,10 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ +extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); +extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy, + int *sweep_cursor); +extern int StrategySweepStart(BufferAccessStrategy strategy); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); -- 2.43.0