From 9b79a6489e9a7721fb947306a5e248aac52b3656 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH v2.12 23/28] bufmgr: use AIO in checkpointer, bgwriter This is far from ready - just included to be able to exercise AIO writes and get some preliminary numbers. In all likelihood this will instead be based on-top of work by Thomas Munro instead of the preceding commit. --- src/include/postmaster/bgwriter.h | 3 +- src/include/storage/buf_internals.h | 2 + src/include/storage/bufmgr.h | 3 +- src/include/storage/bufpage.h | 1 + src/backend/postmaster/bgwriter.c | 19 +- src/backend/postmaster/checkpointer.c | 11 +- src/backend/storage/buffer/bufmgr.c | 594 +++++++++++++++++++++++--- src/backend/storage/page/bufpage.c | 10 + src/tools/pgindent/typedefs.list | 1 + 9 files changed, 586 insertions(+), 58 deletions(-) diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h index 800ecbfd13b..a8081d411b6 100644 --- a/src/include/postmaster/bgwriter.h +++ b/src/include/postmaster/bgwriter.h @@ -31,7 +31,8 @@ pg_noreturn extern void BackgroundWriterMain(const void *startup_data, size_t st pg_noreturn extern void CheckpointerMain(const void *startup_data, size_t startup_data_len); extern void RequestCheckpoint(int flags); -extern void CheckpointWriteDelay(int flags, double progress); +struct IOQueue; +extern void CheckpointWriteDelay(struct IOQueue *ioq, int flags, double progress); extern bool ForwardSyncRequest(const FileTag *ftag, SyncRequestType type); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 6821a710e46..234e093cd04 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -21,6 +21,8 @@ #include "storage/buf.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" +#include "storage/io_queue.h" +#include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procnumber.h" #include "storage/shmem.h" diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index ba9bf247ddb..af5035317b7 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -295,7 +295,8 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer); extern bool IsBufferCleanupOK(Buffer buffer); extern bool HoldingBufferPinThatDelaysRecovery(void); -extern bool BgBufferSync(struct WritebackContext *wb_context); +struct IOQueue; +extern bool BgBufferSync(struct IOQueue *ioq, struct WritebackContext *wb_context); extern uint32 GetPinLimit(void); extern uint32 GetLocalPinLimit(void); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index 6646b6f6371..9c045e81857 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -509,5 +509,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, Item newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); +extern bool PageNeedsChecksumCopy(Page page); #endif /* BUFPAGE_H */ diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 72f5acceec7..6e8801a39e3 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -38,11 +38,13 @@ #include "postmaster/auxprocess.h" #include "postmaster/bgwriter.h" #include "postmaster/interrupt.h" +#include "storage/aio.h" #include "storage/aio_subsys.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procsignal.h" @@ -90,6 +92,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) sigjmp_buf local_sigjmp_buf; MemoryContext bgwriter_context; bool prev_hibernate; + IOQueue *ioq; WritebackContext wb_context; Assert(startup_data_len == 0); @@ -131,6 +134,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) ALLOCSET_DEFAULT_SIZES); MemoryContextSwitchTo(bgwriter_context); + ioq = io_queue_create(128, 0); WritebackContextInit(&wb_context, &bgwriter_flush_after); /* @@ -228,12 +232,22 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) /* Clear any already-pending wakeups */ ResetLatch(MyLatch); + /* + * FIXME: this is theoretically racy, but I didn't want to copy + * ProcessMainLoopInterrupts() remaining body here. + */ + if (ShutdownRequestPending) + { + io_queue_wait_all(ioq); + io_queue_free(ioq); + } + ProcessMainLoopInterrupts(); /* * Do one cycle of dirty-buffer writing. */ - can_hibernate = BgBufferSync(&wb_context); + can_hibernate = BgBufferSync(ioq, &wb_context); /* Report pending statistics to the cumulative stats system */ pgstat_report_bgwriter(); @@ -250,6 +264,9 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) smgrdestroyall(); } + /* finish IO before sleeping, to avoid blocking other backends */ + io_queue_wait_all(ioq); + /* * Log a new xl_running_xacts every now and then so replication can * get into a consistent state faster (think of suboverflowed diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index fda91ffd1ce..904fe167eb4 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -49,10 +49,12 @@ #include "postmaster/bgwriter.h" #include "postmaster/interrupt.h" #include "replication/syncrep.h" +#include "storage/aio.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/pmsignal.h" @@ -766,7 +768,7 @@ ImmediateCheckpointRequested(void) * fraction between 0.0 meaning none, and 1.0 meaning all done. */ void -CheckpointWriteDelay(int flags, double progress) +CheckpointWriteDelay(IOQueue *ioq, int flags, double progress) { static int absorb_counter = WRITES_PER_ABSORB; @@ -800,6 +802,13 @@ CheckpointWriteDelay(int flags, double progress) /* Report interim statistics to the cumulative stats system */ pgstat_report_checkpointer(); + /* + * Ensure all pending IO is submitted to avoid unnecessary delays for + * other processes. + */ + io_queue_wait_all(ioq); + + /* * This sleep used to be connected to bgwriter_delay, typically 200ms. * That resulted in more frequent wakeups if not much work to do. diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index d037aa76489..8702614b5f3 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -52,6 +52,7 @@ #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -76,6 +77,7 @@ /* Bits in SyncOneBuffer's return value */ #define BUF_WRITTEN 0x01 #define BUF_REUSABLE 0x02 +#define BUF_CANT_MERGE 0x04 #define RELS_BSEARCH_THRESHOLD 20 @@ -515,8 +517,6 @@ static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); static void BufferSync(int flags); static uint32 WaitBufHdrUnlocked(BufferDesc *buf); -static int SyncOneBuffer(int buf_id, bool skip_recently_used, - WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); @@ -532,6 +532,7 @@ static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_c static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); + static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -3299,6 +3300,57 @@ UnpinBufferNoOwner(BufferDesc *buf) } } +typedef struct BuffersToWrite +{ + int nbuffers; + BufferTag start_at_tag; + uint32 max_combine; + + XLogRecPtr max_lsn; + + PgAioHandle *ioh; + PgAioWaitRef iow; + + uint64 total_writes; + + Buffer buffers[IOV_MAX]; + PgAioBounceBuffer *bounce_buffers[IOV_MAX]; + const void *data_ptrs[IOV_MAX]; +} BuffersToWrite; + +static int PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf, + bool skip_recently_used, + IOQueue *ioq, WritebackContext *wb_context); + +static void WriteBuffers(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context); + +static void +BuffersToWriteInit(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context) +{ + to_write->total_writes = 0; + to_write->nbuffers = 0; + to_write->ioh = NULL; + pgaio_wref_clear(&to_write->iow); + to_write->max_lsn = InvalidXLogRecPtr; + + pgaio_enter_batchmode(); +} + +static void +BuffersToWriteEnd(BuffersToWrite *to_write) +{ + if (to_write->ioh != NULL) + { + pgaio_io_release(to_write->ioh); + to_write->ioh = NULL; + } + + pgaio_exit_batchmode(); +} + + #define ST_SORT sort_checkpoint_bufferids #define ST_ELEMENT_TYPE CkptSortItem #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b) @@ -3330,7 +3382,10 @@ BufferSync(int flags) binaryheap *ts_heap; int i; int mask = BM_DIRTY; + IOQueue *ioq; WritebackContext wb_context; + BuffersToWrite to_write; + int max_combine; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3392,7 +3447,9 @@ BufferSync(int flags) if (num_to_scan == 0) return; /* nothing to do */ + ioq = io_queue_create(512, 0); WritebackContextInit(&wb_context, &checkpoint_flush_after); + max_combine = Min(io_bounce_buffers, io_combine_limit); TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); @@ -3500,48 +3557,91 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + + BuffersToWriteInit(&to_write, ioq, &wb_context); + while (!binaryheap_empty(ts_heap)) { BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); + bool batch_continue = true; - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); - - bufHdr = GetBufferDescriptor(buf_id); - - num_processed++; + Assert(ts_stat->num_scanned <= ts_stat->num_to_scan); /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. + * Collect a batch of buffers to write out from the current + * tablespace. That causes some imbalance between the tablespaces, but + * that's more than outweighed by the efficiency gain due to batching. */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) + while (batch_continue && + to_write.nbuffers < max_combine && + ts_stat->num_scanned < ts_stat->num_to_scan) { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + buf_id = CkptBufferIds[ts_stat->index].buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + + num_processed++; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a single bit. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since SyncOneBuffer will then do nothing. + * However, there is a further race condition: it's conceivable + * that between the time we examine the bit here and the time + * SyncOneBuffer acquires the lock, someone else not only wrote + * the buffer but replaced it with another page and dirtied it. In + * that improbable case, SyncOneBuffer will write the buffer + * though we didn't need to. It doesn't seem worth guarding + * against this, though. + */ + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + int result = PrepareToWriteBuffer(&to_write, buf_id + 1, false, + ioq, &wb_context); + + if (result & BUF_CANT_MERGE) + { + Assert(to_write.nbuffers > 0); + WriteBuffers(&to_write, ioq, &wb_context); + + result = PrepareToWriteBuffer(&to_write, buf_id + 1, false, + ioq, &wb_context); + Assert(result != BUF_CANT_MERGE); + } + + if (result & BUF_WRITTEN) + { + TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); + PendingCheckpointerStats.buffers_written++; + num_written++; + } + else + { + batch_continue = false; + } } + else + { + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, &wb_context); + } + + /* + * Measure progress independent of actually having to flush the + * buffer - otherwise writing become unbalanced. + */ + ts_stat->progress += ts_stat->progress_slice; + ts_stat->num_scanned++; + ts_stat->index++; } - /* - * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. - */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, &wb_context); + /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) @@ -3559,15 +3659,23 @@ BufferSync(int flags) * * (This will check for barrier events even if it doesn't sleep.) */ - CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); + CheckpointWriteDelay(ioq, flags, (double) num_processed / num_to_scan); } + Assert(to_write.nbuffers == 0); + io_queue_wait_all(ioq); + /* * Issue all pending flushes. Only checkpointer calls BufferSync(), so * IOContext will always be IOCONTEXT_NORMAL. */ IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL); + io_queue_wait_all(ioq); /* IssuePendingWritebacks might have added + * more */ + io_queue_free(ioq); + BuffersToWriteEnd(&to_write); + pfree(per_ts_stat); per_ts_stat = NULL; binaryheap_free(ts_heap); @@ -3593,7 +3701,7 @@ BufferSync(int flags) * bgwriter_lru_maxpages to 0.) */ bool -BgBufferSync(WritebackContext *wb_context) +BgBufferSync(IOQueue *ioq, WritebackContext *wb_context) { /* info obtained from freelist.c */ int strategy_buf_id; @@ -3636,6 +3744,9 @@ BgBufferSync(WritebackContext *wb_context) long new_strategy_delta; uint32 new_recent_alloc; + BuffersToWrite to_write; + int max_combine; + /* * Find out where the freelist clock sweep currently is, and how many * buffer allocations have happened since our last call. @@ -3656,6 +3767,8 @@ BgBufferSync(WritebackContext *wb_context) return true; } + max_combine = Min(io_bounce_buffers, io_combine_limit); + /* * Compute strategy_delta = how many buffers have been scanned by the * clock sweep since last time. If first time through, assume none. Then @@ -3812,11 +3925,25 @@ BgBufferSync(WritebackContext *wb_context) num_written = 0; reusable_buffers = reusable_buffers_est; + BuffersToWriteInit(&to_write, ioq, wb_context); + /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int sync_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state; + + sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1, + true, ioq, wb_context); + if (sync_state & BUF_CANT_MERGE) + { + Assert(to_write.nbuffers > 0); + + WriteBuffers(&to_write, ioq, wb_context); + + sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1, + true, ioq, wb_context); + Assert(sync_state != BUF_CANT_MERGE); + } if (++next_to_clean >= NBuffers) { @@ -3827,6 +3954,13 @@ BgBufferSync(WritebackContext *wb_context) if (sync_state & BUF_WRITTEN) { + Assert(sync_state & BUF_REUSABLE); + + if (to_write.nbuffers == max_combine) + { + WriteBuffers(&to_write, ioq, wb_context); + } + reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) { @@ -3838,6 +3972,11 @@ BgBufferSync(WritebackContext *wb_context) reusable_buffers++; } + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, wb_context); + + BuffersToWriteEnd(&to_write); + PendingBgWriterStats.buf_written_clean += num_written; #ifdef BGW_DEBUG @@ -3876,8 +4015,66 @@ BgBufferSync(WritebackContext *wb_context) return (bufs_to_lap == 0 && recent_alloc == 0); } +static inline bool +BufferTagsSameRel(const BufferTag *tag1, const BufferTag *tag2) +{ + return (tag1->spcOid == tag2->spcOid) && + (tag1->dbOid == tag2->dbOid) && + (tag1->relNumber == tag2->relNumber) && + (tag1->forkNum == tag2->forkNum) + ; +} + +static bool +CanMergeWrite(BuffersToWrite *to_write, BufferDesc *cur_buf_hdr) +{ + BlockNumber cur_block = cur_buf_hdr->tag.blockNum; + + Assert(to_write->nbuffers > 0); /* can't merge with nothing */ + Assert(to_write->start_at_tag.relNumber != InvalidOid); + Assert(to_write->start_at_tag.blockNum != InvalidBlockNumber); + + Assert(to_write->ioh != NULL); + + /* + * First check if the blocknumber is one that we could actually merge, + * that's cheaper than checking the tablespace/db/relnumber/fork match. + */ + if (to_write->start_at_tag.blockNum + to_write->nbuffers != cur_block) + return false; + + if (!BufferTagsSameRel(&to_write->start_at_tag, &cur_buf_hdr->tag)) + return false; + + /* + * Need to check with smgr how large a write we're allowed to make. To + * reduce the overhead of the smgr check, only inquire once, when + * processing the first to-be-merged buffer. That avoids the overhead in + * the common case of writing out buffers that definitely not mergeable. + */ + if (to_write->nbuffers == 1) + { + SMgrRelation smgr; + + smgr = smgropen(BufTagGetRelFileLocator(&to_write->start_at_tag), INVALID_PROC_NUMBER); + + to_write->max_combine = smgrmaxcombine(smgr, + to_write->start_at_tag.forkNum, + to_write->start_at_tag.blockNum); + } + else + { + Assert(to_write->max_combine > 0); + } + + if (to_write->start_at_tag.blockNum + to_write->max_combine <= cur_block) + return false; + + return true; +} + /* - * SyncOneBuffer -- process a single buffer during syncing. + * PrepareToWriteBuffer -- process a single buffer during syncing. * * If skip_recently_used is true, we don't write currently-pinned buffers, nor * buffers marked recently used, as these are not replacement candidates. @@ -3886,22 +4083,50 @@ BgBufferSync(WritebackContext *wb_context) * BUF_WRITTEN: we wrote the buffer. * BUF_REUSABLE: buffer is available for replacement, ie, it has * pin count 0 and usage count 0. + * BUF_CANT_MERGE: can't combine this write with prior writes, caller needs + * to issue those first * * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean * after locking it, but we don't care all that much.) */ static int -SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) +PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf, + bool skip_recently_used, + IOQueue *ioq, WritebackContext *wb_context) { - BufferDesc *bufHdr = GetBufferDescriptor(buf_id); + BufferDesc *cur_buf_hdr = GetBufferDescriptor(buf - 1); + uint32 buf_state; int result = 0; - uint32 buf_state; - BufferTag tag; + XLogRecPtr cur_buf_lsn; + LWLock *content_lock; + bool may_block; + + /* + * Check if this buffer can be written out together with already prepared + * writes. We check before we have pinned the buffer, so the buffer can be + * written out and replaced between this check and us pinning the buffer - + * we'll recheck below. The reason for the pre-check is that we don't want + * to pin the buffer just to find out that we can't merge the IO. + */ + if (to_write->nbuffers != 0) + { + if (!CanMergeWrite(to_write, cur_buf_hdr)) + { + result |= BUF_CANT_MERGE; + return result; + } + } + else + { + to_write->start_at_tag = cur_buf_hdr->tag; + } /* Make sure we can handle the pin */ ReservePrivateRefCountEntry(); ResourceOwnerEnlarge(CurrentResourceOwner); + /* XXX: Should also check if we are allowed to pin one more buffer */ + /* * Check whether buffer needs writing. * @@ -3911,7 +4136,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - buf_state = LockBufHdr(bufHdr); + buf_state = LockBufHdr(cur_buf_hdr); if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && BUF_STATE_GET_USAGECOUNT(buf_state) == 0) @@ -3920,40 +4145,300 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) } else if (skip_recently_used) { +#if 0 + elog(LOG, "at block %d: skip recent with nbuffers %d", + cur_buf_hdr->tag.blockNum, to_write->nbuffers); +#endif /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(cur_buf_hdr, buf_state); return result; } if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(cur_buf_hdr, buf_state); return result; } + /* pin the buffer, from now on its identity can't change anymore */ + PinBuffer_Locked(cur_buf_hdr); + + /* + * Acquire IO, if needed, now that it's likely that we'll need to write. + */ + if (to_write->ioh == NULL) + { + /* otherwise we should already have acquired a handle */ + Assert(to_write->nbuffers == 0); + + to_write->ioh = io_queue_acquire_io(ioq); + pgaio_io_get_wref(to_write->ioh, &to_write->iow); + } + + /* + * If we are merging, check if the buffer's identity possibly changed + * while we hadn't yet pinned it. + * + * XXX: It might be worth checking if we still want to write the buffer + * out, e.g. it could have been replaced with a buffer that doesn't have + * BM_CHECKPOINT_NEEDED set. + */ + if (to_write->nbuffers != 0) + { + if (!CanMergeWrite(to_write, cur_buf_hdr)) + { + elog(LOG, "changed identity"); + UnpinBuffer(cur_buf_hdr); + + result |= BUF_CANT_MERGE; + + return result; + } + } + + may_block = to_write->nbuffers == 0 + && !pgaio_have_staged() + && io_queue_is_empty(ioq) + ; + content_lock = BufferDescriptorGetContentLock(cur_buf_hdr); + + if (!may_block) + { + if (LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + /* done */ + } + else if (to_write->nbuffers == 0) + { + /* + * Need to wait for all prior IO to finish before blocking for + * lock acquisition, to avoid the risk a deadlock due to us + * waiting for another backend that is waiting for our unsubmitted + * IO to complete. + */ + pgaio_submit_staged(); + io_queue_wait_all(ioq); + + elog(DEBUG2, "at block %u: can't block, nbuffers = 0", + cur_buf_hdr->tag.blockNum + ); + + may_block = to_write->nbuffers == 0 + && !pgaio_have_staged() + && io_queue_is_empty(ioq) + ; + Assert(may_block); + + LWLockAcquire(content_lock, LW_SHARED); + } + else + { + elog(DEBUG2, "at block %d: can't block nbuffers = %d", + cur_buf_hdr->tag.blockNum, + to_write->nbuffers); + + UnpinBuffer(cur_buf_hdr); + result |= BUF_CANT_MERGE; + Assert(to_write->nbuffers > 0); + + return result; + } + } + else + { + LWLockAcquire(content_lock, LW_SHARED); + } + + if (!may_block) + { + if (!StartBufferIO(cur_buf_hdr, false, !may_block)) + { + pgaio_submit_staged(); + io_queue_wait_all(ioq); + + may_block = io_queue_is_empty(ioq) && to_write->nbuffers == 0 && !pgaio_have_staged(); + + if (!StartBufferIO(cur_buf_hdr, false, !may_block)) + { + elog(DEBUG2, "at block %d: non-waitable StartBufferIO returns false, %d", + cur_buf_hdr->tag.blockNum, + may_block); + + /* + * FIXME: can't tell whether this is because the buffer has + * been cleaned + */ + if (!may_block) + { + result |= BUF_CANT_MERGE; + Assert(to_write->nbuffers > 0); + } + LWLockRelease(content_lock); + UnpinBuffer(cur_buf_hdr); + + return result; + } + } + } + else + { + if (!StartBufferIO(cur_buf_hdr, false, false)) + { + elog(DEBUG2, "waitable StartBufferIO returns false"); + LWLockRelease(content_lock); + UnpinBuffer(cur_buf_hdr); + + /* + * FIXME: Historically we returned BUF_WRITTEN in this case, which + * seems wrong + */ + return result; + } + } + /* - * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the - * buffer is clean by the time we've locked it.) + * Run PageGetLSN while holding header lock, since we don't have the + * buffer locked exclusively in all cases. */ - PinBuffer_Locked(bufHdr); - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); + buf_state = LockBufHdr(cur_buf_hdr); + + cur_buf_lsn = BufferGetLSN(cur_buf_hdr); - FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); + /* To check if block content changes while flushing. - vadim 01/17/97 */ + buf_state &= ~BM_JUST_DIRTIED; - LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + UnlockBufHdr(cur_buf_hdr, buf_state); - tag = bufHdr->tag; + to_write->buffers[to_write->nbuffers] = buf; + to_write->nbuffers++; - UnpinBuffer(bufHdr); + if (buf_state & BM_PERMANENT && + (to_write->max_lsn == InvalidXLogRecPtr || to_write->max_lsn < cur_buf_lsn)) + { + to_write->max_lsn = cur_buf_lsn; + } + + result |= BUF_WRITTEN; + + return result; +} + +static void +WriteBuffers(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context) +{ + SMgrRelation smgr; + Buffer first_buf; + BufferDesc *first_buf_hdr; + bool needs_checksum; + + Assert(to_write->nbuffers > 0 && to_write->nbuffers <= io_combine_limit); + + first_buf = to_write->buffers[0]; + first_buf_hdr = GetBufferDescriptor(first_buf - 1); + + smgr = smgropen(BufTagGetRelFileLocator(&first_buf_hdr->tag), INVALID_PROC_NUMBER); /* - * SyncOneBuffer() is only called by checkpointer and bgwriter, so - * IOContext will always be IOCONTEXT_NORMAL. + * Force XLOG flush up to buffer's LSN. This implements the basic WAL + * rule that log updates must hit disk before any of the data-file changes + * they describe do. + * + * However, this rule does not apply to unlogged relations, which will be + * lost after a crash anyway. Most unlogged relation pages do not bear + * LSNs since we never emit WAL records for them, and therefore flushing + * up through the buffer LSN would be useless, but harmless. However, + * GiST indexes use LSNs internally to track page-splits, and therefore + * unlogged GiST pages bear "fake" LSNs generated by + * GetFakeLSNForUnloggedRel. It is unlikely but possible that the fake + * LSN counter could advance past the WAL insertion point; and if it did + * happen, attempting to flush WAL through that location would fail, with + * disastrous system-wide consequences. To make sure that can't happen, + * skip the flush if the buffer isn't permanent. */ - ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); + if (to_write->max_lsn != InvalidXLogRecPtr) + XLogFlush(to_write->max_lsn); + + /* + * Now it's safe to write the buffer to disk. Note that no one else should + * have been able to write it, while we were busy with log flushing, + * because we got the exclusive right to perform I/O by setting the + * BM_IO_IN_PROGRESS bit. + */ + + for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++) + { + Buffer cur_buf = to_write->buffers[nbuf]; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1); + Block bufBlock; + char *bufToWrite; + + bufBlock = BufHdrGetBlock(cur_buf_hdr); + needs_checksum = PageNeedsChecksumCopy((Page) bufBlock); + + /* + * Update page checksum if desired. Since we have only shared lock on + * the buffer, other processes might be updating hint bits in it, so + * we must copy the page to a bounce buffer if we do checksumming. + */ + if (needs_checksum) + { + PgAioBounceBuffer *bb = pgaio_bounce_buffer_get(); - return result | BUF_WRITTEN; + pgaio_io_assoc_bounce_buffer(to_write->ioh, bb); + + bufToWrite = pgaio_bounce_buffer_buffer(bb); + memcpy(bufToWrite, bufBlock, BLCKSZ); + PageSetChecksumInplace((Page) bufToWrite, cur_buf_hdr->tag.blockNum); + } + else + { + bufToWrite = bufBlock; + } + + to_write->data_ptrs[nbuf] = bufToWrite; + } + + pgaio_io_set_handle_data_32(to_write->ioh, + (uint32 *) to_write->buffers, + to_write->nbuffers); + pgaio_io_register_callbacks(to_write->ioh, PGAIO_HCB_SHARED_BUFFER_WRITEV, 0); + + smgrstartwritev(to_write->ioh, smgr, + BufTagGetForkNum(&first_buf_hdr->tag), + first_buf_hdr->tag.blockNum, + to_write->data_ptrs, + to_write->nbuffers, + false); + pgstat_count_io_op(IOOBJECT_RELATION, IOCONTEXT_NORMAL, + IOOP_WRITE, 1, BLCKSZ * to_write->nbuffers); + + + for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++) + { + Buffer cur_buf = to_write->buffers[nbuf]; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1); + + UnpinBuffer(cur_buf_hdr); + } + + io_queue_track(ioq, &to_write->iow); + to_write->total_writes++; + + /* clear state for next write */ + to_write->nbuffers = 0; + to_write->start_at_tag.relNumber = InvalidOid; + to_write->start_at_tag.blockNum = InvalidBlockNumber; + to_write->max_combine = 0; + to_write->max_lsn = InvalidXLogRecPtr; + to_write->ioh = NULL; + pgaio_wref_clear(&to_write->iow); + + /* + * FIXME: Implement issuing writebacks (note wb_context isn't used here). + * Possibly needs to be integrated with io_queue.c. + */ } /* @@ -4327,6 +4812,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, error_context_stack = errcallback.previous; } + /* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index ecc81aacfc3..71dc6f559dd 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1480,6 +1480,16 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum, return true; } +bool +PageNeedsChecksumCopy(Page page) +{ + if (PageIsNew(page)) + return false; + + /* If we don't need a checksum, just return the passed-in data */ + return DataChecksumsEnabled(); +} + /* * Set checksum for a page in shared buffers. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0e40687bb43..125fcf35bd8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -348,6 +348,7 @@ BufferManagerRelation BufferStrategyControl BufferTag BufferUsage +BuffersToWrite BuildAccumulator BuiltinScript BulkInsertState -- 2.48.1.76.g4e746b1a31.dirty