From 3164ce4ebea1f7ad05f252f335f449c382e6fd8f Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 25 Aug 2025 14:13:04 -0400 Subject: [PATCH v2 1/2] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger batch, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level as well. The heuristic for eager eviction is to only flush buffers in the strategy ring which flushing does not require flushing WAL. This patch also is a stepping stone toward using AIO for COPY FROM. Reviewed-by: Kirill Reshke Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 71 +++++++++++++++++++++++++++ src/backend/storage/buffer/freelist.c | 67 +++++++++++++++++++++++++ src/include/storage/buf_internals.h | 1 + src/include/storage/bufmgr.h | 2 + 4 files changed, 141 insertions(+) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fd7e21d96d3..46fe206ddfd 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -2341,6 +2341,74 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr) return true; } +/* + * Pin and lock a shared buffer and then flush it. Don't flush the buffer if + * doing so would mean we have to flush WAL. We only evict the buffer if doing + * so is "cheap", i.e. we're able to lock the buffer and we don't have to + * flush WAL. This is appropriate for occasions in which we don't need to + * guarantee that the buffer is flushed. + */ +void +QuickCleanBuffer(Buffer buffer, IOContext io_context) +{ + uint32 buf_state; + XLogRecPtr lsn; + LWLock *content_lock; + BufferDesc *bufdesc; + + Assert(BufferIsValid(buffer)); + Assert(!BufferIsLocal(buffer)); + + bufdesc = GetBufferDescriptor(buffer - 1); + + buf_state = LockBufHdr(bufdesc); + + /* + * No need to flush the buffer if it isn't dirty. We won't flush buffers + * in use by other backends. + */ + if (!(buf_state & BM_DIRTY) || + BUF_STATE_GET_REFCOUNT(buf_state) > 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) > 1) + { + UnlockBufHdr(bufdesc, buf_state); + return; + } + + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + + /* Releases buffer header lock before acquiring content lock */ + PinBuffer_Locked(bufdesc); + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + UnpinBuffer(bufdesc); + return; + } + + CheckBufferIsPinnedOnce(buffer); + + /* Need buffer header lock to get the LSN */ + buf_state = LockBufHdr(bufdesc); + lsn = BufferGetLSN(bufdesc); + UnlockBufHdr(bufdesc, buf_state); + + if (XLogNeedsFlush(lsn)) + { + UnlockReleaseBuffer(buffer); + return; + } + + FlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context); + + LWLockRelease(content_lock); + UnpinBuffer(bufdesc); + + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); +} + static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) { @@ -2446,6 +2514,9 @@ again: ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, &buf_hdr->tag); + + if (strategy && from_ring) + EagerFlushStrategyRing(strategy, io_context); } diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 01909be0272..a4fefb599c7 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -98,6 +98,10 @@ static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); +static bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); + +static int StrategySweepNext(BufferAccessStrategy strategy, int current); + /* * ClockSweepTick - Helper routine for StrategyGetBuffer() * @@ -180,6 +184,31 @@ have_free_buffer(void) return false; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * data in buffers in the ring before they are needed. This can lead to better + * I/O patterns than lazily flushing buffers directly before reusing them. + */ +bool +StrategySupportsEagerFlush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -780,6 +809,44 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) return NULL; } +static int +StrategySweepNext(BufferAccessStrategy strategy, int current) +{ + int next = current + 1; + + if (next >= strategy->nbuffers) + next = 0; + return next; +} + +/* + * Flush all the buffers we can in the strategy ring. This encourages write + * batching at the kernel level and leaves a ring full of clean buffers. We'll + * skip flushing buffers that would require us to flush WAL first. + */ +void +EagerFlushStrategyRing(BufferAccessStrategy strategy, IOContext io_context) +{ + int sweep_start, + sweep_current; + + if (!StrategySupportsEagerFlush(strategy)) + return; + + sweep_start = StrategySweepNext(strategy, strategy->current); + sweep_current = sweep_start; + + do + { + Buffer bufnum = strategy->buffers[sweep_current]; + + if (bufnum != InvalidBuffer) + QuickCleanBuffer(bufnum, io_context); + sweep_current = StrategySweepNext(strategy, sweep_current); + } while (sweep_start != sweep_current); +} + + /* * AddBufferToRing -- add a buffer to the buffer ring * diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 52a71b138f7..974f494557a 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -433,6 +433,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context); extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, BufferTag *tag); +extern void QuickCleanBuffer(Buffer buffer, IOContext io_context); /* solely to make it easier to write tests */ extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 41fdc1e7693..8214ef982aa 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -14,6 +14,7 @@ #ifndef BUFMGR_H #define BUFMGR_H +#include "pgstat.h" #include "port/pg_iovec.h" #include "storage/aio_types.h" #include "storage/block.h" @@ -331,6 +332,7 @@ extern BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType b extern int GetAccessStrategyBufferCount(BufferAccessStrategy strategy); extern int GetAccessStrategyPinLimit(BufferAccessStrategy strategy); +extern void EagerFlushStrategyRing(BufferAccessStrategy strategy, IOContext io_context); extern void FreeAccessStrategy(BufferAccessStrategy strategy); -- 2.43.0