From d484f180d0193c77cf6034751fa4e8c81833a605 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 2 Sep 2025 12:56:38 -0400 Subject: [PATCH v3 4/9] Write combining for BAS_BULKWRITE Implement write combining for users of the bulkwrite buffer access strategy (e.g. COPY FROM). When the buffer access strategy needs to clean a buffer for reuse, it already opportunistically flushes some other buffers. Now, combine any contiguous blocks from the same relation into larger writes and issue them with smgrwritev(). The performance benefit for COPY FROM is mostly noticeable for multiple concurrent COPY FROMs because a single COPY FROM is either CPU bound or bound by WAL writes. The infrastructure for flushing larger batches of IOs will be reused by checkpointer and other processes doing writes of dirty data. --- src/backend/storage/buffer/bufmgr.c | 198 ++++++++++++++++++++++++-- src/backend/storage/buffer/freelist.c | 26 ++++ src/backend/storage/page/bufpage.c | 20 +++ src/backend/utils/probes.d | 2 + src/include/storage/buf_internals.h | 32 +++++ src/include/storage/bufpage.h | 1 + src/tools/pgindent/typedefs.list | 1 + 7 files changed, 269 insertions(+), 11 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 90f36a04c19..ade83adca59 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -539,6 +539,8 @@ static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber re RelFileLocator *rlocator, bool skip_pinned, XLogRecPtr *max_lsn); +static void FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *start, + uint32 max_batch_size, BufWriteBatch *batch); static void CleanVictimBuffer(BufferAccessStrategy strategy, BufferDesc *bufdesc, uint32 *buf_state, bool from_ring); static void FindAndDropRelationBuffers(RelFileLocator rlocator, @@ -4258,10 +4260,73 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } + +/* + * Given a buffer descriptor, start, from a strategy ring, strategy, that + * supports eager flushing, find additional buffers from the ring that can be + * combined into a single write batch with this buffer. + * + * max_batch_size is the maximum number of blocks that can be combined into a + * single write in general. This function, based on the block number of start, + * will determine the maximum IO size for this particular write given how much + * of the file remains. max_batch_size is provided by the caller so it doesn't + * have to be recalculated for each write. + * + * batch is an output parameter that this function will fill with the needed + * information to write this IO. + * + * This function will pin and content lock all of the buffers that it + * assembles for the IO batch. The caller is responsible for issuing the IO. + */ +static void +FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *start, + uint32 max_batch_size, BufWriteBatch *batch) +{ + BlockNumber limit; + uint32 buf_state; + + Assert(start); + batch->bufdescs[0] = start; + + buf_state = LockBufHdr(start); + batch->max_lsn = BufferGetLSN(start); + UnlockBufHdr(start, buf_state); + + batch->start = batch->bufdescs[0]->tag.blockNum; + batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); + batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + + Assert(BlockNumberIsValid(batch->start)); + + limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); + limit = Max(limit, 1); + limit = Min(max_batch_size, limit); + + /* Now assemble a run of blocks to write out. */ + for (batch->n = 1; batch->n < limit; batch->n++) + { + Buffer bufnum; + + if ((bufnum = StrategySweepNextBuffer(strategy)) == InvalidBuffer) + break; + + /* Stop when we encounter a buffer that will break the run */ + if ((batch->bufdescs[batch->n] = + PrepareOrRejectEagerFlushBuffer(bufnum, + batch->start + batch->n, + &batch->rlocator, + true, + &batch->max_lsn)) == NULL) + break; + } +} + /* * Returns the buffer descriptor of the buffer containing the next block we * should eagerly flush or or NULL when there are no further buffers to - * consider writing out. + * consider writing out. This will be the start of a new batch of buffers to + * write out. */ static BufferDesc * next_strat_buf_to_flush(BufferAccessStrategy strategy, @@ -4293,7 +4358,6 @@ CleanVictimBuffer(BufferAccessStrategy strategy, XLogRecPtr max_lsn = InvalidXLogRecPtr; LWLock *content_lock; - bool first_buffer = true; IOContext io_context = IOContextForStrategy(strategy); Assert(*buf_state & BM_DIRTY); @@ -4304,19 +4368,22 @@ CleanVictimBuffer(BufferAccessStrategy strategy, if (from_ring && strategy_supports_eager_flush(strategy)) { + uint32 max_batch_size = max_write_batch_size_for_strategy(strategy); + + /* Pin our victim again so it stays ours even after batch released */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + IncrBufferRefCount(BufferDescriptorGetBuffer(bufdesc)); + /* Clean victim buffer and find more to flush opportunistically */ StartStrategySweep(strategy); do { - DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); - content_lock = BufferDescriptorGetContentLock(bufdesc); - LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &bufdesc->tag); - /* We leave the first buffer pinned for the caller */ - if (!first_buffer) - UnpinBuffer(bufdesc); - first_buffer = false; + BufWriteBatch batch; + + FindFlushAdjacents(strategy, bufdesc, max_batch_size, &batch); + FlushBufferBatch(&batch, io_context); + CompleteWriteBatchIO(&batch, &BackendWritebackContext, io_context); } while ((bufdesc = next_strat_buf_to_flush(strategy, &max_lsn)) != NULL); } else @@ -4438,6 +4505,73 @@ except_unlock_header: return NULL; } +/* + * Given a prepared batch of buffers write them out as a vector. + */ +void +FlushBufferBatch(BufWriteBatch *batch, + IOContext io_context) +{ + BlockNumber blknums[MAX_IO_COMBINE_LIMIT]; + Block blocks[MAX_IO_COMBINE_LIMIT]; + instr_time io_start; + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + + error_context_stack = &errcallback; + + if (!XLogRecPtrIsInvalid(batch->max_lsn)) + XLogFlush(batch->max_lsn); + + if (batch->reln == NULL) + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + +#ifdef USE_ASSERT_CHECKING + for (uint32 i = 0; i < batch->n; i++) + { + BufferDesc *bufdesc = batch->bufdescs[i]; + uint32 buf_state = LockBufHdr(bufdesc); + XLogRecPtr lsn = BufferGetLSN(bufdesc); + + UnlockBufHdr(bufdesc, buf_state); + Assert(!(buf_state & BM_PERMANENT) || !XLogNeedsFlush(lsn)); + } +#endif + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n); + + /* + * XXX: All blocks should be copied and then checksummed but doing so + * takes a lot of extra memory and a future patch will eliminate this + * requirement. + */ + for (BlockNumber i = 0; i < batch->n; i++) + { + blknums[i] = batch->start + i; + blocks[i] = BufHdrGetBlock(batch->bufdescs[i]); + } + + PageSetBatchChecksumInplace((Page *) blocks, blknums, batch->n); + + io_start = pgstat_prepare_io_time(track_io_timing); + + smgrwritev(batch->reln, batch->forkno, + batch->start, (const void **) blocks, batch->n, false); + + pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE, + io_start, batch->n, BLCKSZ); + + error_context_stack = errcallback.previous; +} + /* * Prepare the buffer with budesc for writing. buf_state and lsn are output * parameters. Returns true if the buffer acutally needs writing and false @@ -4583,6 +4717,48 @@ DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, error_context_stack = errcallback.previous; } +/* + * Given a previously initialized batch with buffers that have already been + * flushed, terminate the IO on each buffer and then unlock and unpin them. + * This assumes all the buffers were locked and pinned. wb_context will be + * modified. + */ +void +CompleteWriteBatchIO(BufWriteBatch *batch, + WritebackContext *wb_context, IOContext io_context) +{ + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + + error_context_stack = &errcallback; + pgBufferUsage.shared_blks_written += batch->n; + + for (uint32 i = 0; i < batch->n; i++) + { + Buffer buffer = BufferDescriptorGetBuffer(batch->bufdescs[i]); + + errcallback.arg = batch->bufdescs[i]; + + /* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */ + TerminateBufferIO(batch->bufdescs[i], true, 0, true, false); + LWLockRelease(BufferDescriptorGetContentLock(batch->bufdescs[i])); + ReleaseBuffer(buffer); + ScheduleBufferTagForWriteback(wb_context, io_context, + &batch->bufdescs[i]->tag); + } + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n, batch->start); + error_context_stack = errcallback.previous; +} + /* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index e26a546bc99..1c94e95bf66 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -727,6 +727,32 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) return NULL; } + +/* + * Determine the largest IO we can assemble from the given strategy ring given + * strategy-specific as well as global constraints on the number of pinned + * buffers and max IO size. + */ +uint32 +max_write_batch_size_for_strategy(BufferAccessStrategy strategy) +{ + uint32 max_possible_buffer_limit; + uint32 max_write_batch_size; + int strategy_pin_limit; + + max_write_batch_size = io_combine_limit; + + strategy_pin_limit = GetAccessStrategyPinLimit(strategy); + max_possible_buffer_limit = GetPinLimit(); + + max_write_batch_size = Min(strategy_pin_limit, max_write_batch_size); + max_write_batch_size = Min(max_possible_buffer_limit, max_write_batch_size); + max_write_batch_size = Max(1, max_write_batch_size); + max_write_batch_size = Min(max_write_batch_size, io_combine_limit); + Assert(max_write_batch_size < MAX_IO_COMBINE_LIMIT); + return max_write_batch_size; +} + /* * AddBufferToRing -- add a buffer to the buffer ring * diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index dbb49ed9197..fc749dd5a50 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1546,3 +1546,23 @@ PageSetChecksumInplace(Page page, BlockNumber blkno) ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno); } + +/* + * A helper to set multiple block's checksums. + */ +void +PageSetBatchChecksumInplace(Page *pages, BlockNumber *blknos, uint32 length) +{ + /* If we don't need a checksum, just return */ + if (!DataChecksumsEnabled()) + return; + + for (uint32 i = 0; i < length; i++) + { + Page page = pages[i]; + + if (PageIsNew(page)) + continue; + ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]); + } +} diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index e9e413477ba..36dd4f8375b 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -61,6 +61,8 @@ provider postgresql { probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid); probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); + probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); + probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); probe buffer__checkpoint__start(int); probe buffer__checkpoint__sync__start(); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 7963d1189a6..d1f0ecb7ca4 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -416,6 +416,34 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer) ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc); } +/* + * Used to write out multiple blocks at a time in a combined IO. bufdescs + * contains buffer descriptors for buffers containing adjacent blocks of the + * same fork of the same relation. + */ +typedef struct BufWriteBatch +{ + RelFileLocator rlocator; + ForkNumber forkno; + SMgrRelation reln; + + /* + * The BlockNumber of the first block in the run of contiguous blocks to + * be written out as a single IO. + */ + BlockNumber start; + + /* + * While assembling the buffers, we keep track of the maximum LSN so that + * we can flush WAL through this LSN before flushing the buffers. + */ + XLogRecPtr max_lsn; + + /* The number of valid buffers in bufdescs */ + uint32 n; + BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT]; +} BufWriteBatch; + /* * Internal buffer management routines */ @@ -429,6 +457,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context); extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, BufferTag *tag); +extern void FlushBufferBatch(BufWriteBatch *batch, IOContext io_context); /* solely to make it easier to write tests */ extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); @@ -438,8 +467,11 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ extern bool strategy_supports_eager_flush(BufferAccessStrategy strategy); +extern uint32 max_write_batch_size_for_strategy(BufferAccessStrategy strategy); extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy); extern void StartStrategySweep(BufferAccessStrategy strategy); +extern void CompleteWriteBatchIO(BufWriteBatch *batch, WritebackContext *wb_context, + IOContext io_context); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index aeb67c498c5..1020cb3ac78 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -507,5 +507,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, Item newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); +extern void PageSetBatchChecksumInplace(Page *pages, BlockNumber *blknos, uint32 length); #endif /* BUFPAGE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a13e8162890..9492adeee58 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -349,6 +349,7 @@ BufferManagerRelation BufferStrategyControl BufferTag BufferUsage +BufWriteBatch BuildAccumulator BuiltinScript BulkInsertState -- 2.43.0