From 9302f6c197275f8d0dd38f491722a293e82105f1 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 15 Oct 2025 15:23:16 -0400 Subject: [PATCH v11 6/7] Implement checkpointer data write combining When the checkpointer writes out dirty buffers, writing multiple contiguous blocks as a single IO is a substantial performance improvement. The checkpointer is usually bottlenecked on IO, so issuing larger IOs leads to increased write throughput and faster checkpoints. Author: Melanie Plageman Reviewed-by: Chao Li Reviewed-by: Soumya Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com --- src/backend/storage/buffer/bufmgr.c | 224 ++++++++++++++++++++++++---- src/backend/utils/probes.d | 2 +- 2 files changed, 198 insertions(+), 28 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 68f6d4f2f45..8cc2fc06646 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -513,6 +513,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy, static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); +static uint32 CheckpointerMaxBatchSize(void); static void BufferSync(int flags); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); @@ -3346,7 +3347,6 @@ TrackNewBufferPin(Buffer buf) static void BufferSync(int flags) { - uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -3358,6 +3358,8 @@ BufferSync(int flags) int i; uint32 mask = BM_DIRTY; WritebackContext wb_context; + uint32 max_batch_size; + BufWriteBatch batch; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3389,6 +3391,7 @@ BufferSync(int flags) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint32 set_bits = 0; + uint32 buf_state; /* * Header spinlock is enough to examine BM_DIRTY, see comment in @@ -3531,48 +3534,199 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + max_batch_size = CheckpointerMaxBatchSize(); while (!binaryheap_empty(ts_heap)) { + BlockNumber limit = max_batch_size; BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); + int ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan; + int processed = 0; - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); + batch.start = InvalidBlockNumber; + batch.max_lsn = InvalidXLogRecPtr; + batch.n = 0; - bufHdr = GetBufferDescriptor(buf_id); + while (batch.n < limit) + { + uint32 buf_state; + XLogRecPtr lsn = InvalidXLogRecPtr; + LWLock *content_lock; + CkptSortItem item; - num_processed++; + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); - /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. - */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) - { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + /* Check if we are done with this tablespace */ + if (ts_stat->index + processed >= ts_end) + break; + + item = CkptBufferIds[ts_stat->index + processed]; + + buf_id = item.buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + + /* + * If this is the first block of the batch, then check if we need + * to open a new relation. Open the relation now because we have + * to determine the maximum IO size based on how many blocks + * remain in the file. + */ + if (!BlockNumberIsValid(batch.start)) { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0); + batch.rlocator.spcOid = item.tsId; + batch.rlocator.dbOid = item.dbId; + batch.rlocator.relNumber = item.relNumber; + batch.forkno = item.forkNum; + batch.start = item.blockNum; + batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER); + limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + /* Guarantee progress */ + limit = Max(limit, 1); } + + /* + * Once we hit blocks from the next relation or fork of the + * relation, break out of the loop and issue the IO we've built up + * so far. It is important that we don't increment processed + * because we want to start the next IO with this item. + */ + if (item.dbId != batch.rlocator.dbOid) + break; + + if (item.relNumber != batch.rlocator.relNumber) + break; + + if (item.forkNum != batch.forkno) + break; + + Assert(item.tsId == batch.rlocator.spcOid); + + /* + * If the next block is not contiguous, we can't include it in the + * IO we will issue. Break out of the loop and issue what we have + * so far. Do not count this item as processed -- otherwise we + * will end up skipping it. + */ + if (item.blockNum != batch.start + batch.n) + break; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a few bits. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since StartBufferIO will then return false. + * + * If the buffer doesn't need checkpointing, don't include it in + * the batch we are building. And if the buffer doesn't need + * flushing, we're done with the item, so count it as processed + * and break out of the loop to issue the IO so far. + */ + buf_state = pg_atomic_read_u32(&bufHdr->state); + if ((buf_state & (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) != + (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) + { + processed++; + break; + } + + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + PinBuffer(bufHdr, NULL, false); + + /* + * There is a race condition here: it's conceivable that between + * the time we examine the buffer header for BM_CHECKPOINT_NEEDED + * above and when we are now acquiring the lock that, someone else + * not only wrote the buffer but replaced it with another page and + * dirtied it. In that improbable case, we will write the buffer + * though we didn't need to. It doesn't seem worth guarding + * against this, though. + */ + content_lock = BufferDescriptorGetContentLock(bufHdr); + + /* + * We are willing to wait for the content lock on the first IO in + * the batch. However, for subsequent IOs, waiting could lead to + * deadlock. We have to eventually flush all eligible buffers, + * though. So, if we fail to acquire the lock on a subsequent + * buffer, we break out and issue the IO we've built up so far. + * Then we come back and start a new IO with that buffer as the + * starting buffer. As such, we must not count the item as + * processed if we end up failing to acquire the content lock. + */ + if (batch.n == 0) + LWLockAcquire(content_lock, LW_SHARED); + else if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + UnpinBuffer(bufHdr); + break; + } + + /* + * If the buffer doesn't need IO, count the item as processed, + * release the buffer, and break out of the loop to issue the IO + * we have built up so far. + */ + if (!StartBufferIO(bufHdr, false, true)) + { + processed++; + LWLockRelease(content_lock); + UnpinBuffer(bufHdr); + break; + } + + /* + * Lock buffer header lock before examining LSN because we only + * have a shared lock on the buffer. + */ + buf_state = LockBufHdr(bufHdr); + lsn = BufferGetLSN(bufHdr); + UnlockBufHdrExt(bufHdr, buf_state, 0, BM_JUST_DIRTIED, 0); + + /* + * Keep track of the max LSN so that we can be sure to flush + * enough WAL before flushing data from the buffers. See comment + * in DoFlushBuffer() for more on why we don't consider the LSNs + * of unlogged relations. + */ + if (buf_state & BM_PERMANENT && lsn > batch.max_lsn) + batch.max_lsn = lsn; + + batch.bufdescs[batch.n++] = bufHdr; + processed++; } /* * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. + * - otherwise writing becomes unbalanced. */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + num_processed += processed; + ts_stat->progress += ts_stat->progress_slice * processed; + ts_stat->num_scanned += processed; + ts_stat->index += processed; + + /* + * If we built up an IO, issue it. There's a chance we didn't find any + * items referencing buffers that needed flushing this time, but we + * still want to check if we should update the heap if we examined and + * processed the items. + */ + if (batch.n > 0) + { + FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context); + + TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n); + PendingCheckpointerStats.buffers_written += batch.n; + num_written += batch.n; + } /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) @@ -6421,6 +6575,22 @@ IsBufferCleanupOK(Buffer buffer) return false; } +/* + * The maximum number of blocks that can be written out in a single batch by + * the checkpointer. + */ +static uint32 +CheckpointerMaxBatchSize(void) +{ + uint32 result; + uint32 pin_limit = GetPinLimit(); + + result = Min(pin_limit, io_combine_limit); + result = Min(result, MAX_IO_COMBINE_LIMIT); + result = Max(result, 1); + return result; +} + /* * Functions for buffer I/O handling diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index 36dd4f8375b..d6970731ba9 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -68,7 +68,7 @@ provider postgresql { probe buffer__checkpoint__sync__start(); probe buffer__checkpoint__done(); probe buffer__sync__start(int, int); - probe buffer__sync__written(int); + probe buffer__batch__sync__written(BlockNumber); probe buffer__sync__done(int, int, int); probe deadlock__found(); -- 2.43.0