From 87797eb0d3334415e13a9c7e4037369f7f8e9511 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Fri, 23 Jan 2026 14:00:31 -0500 Subject: [PATCH v11 10/12] Don't wait for already in-progress IO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a backend attempts to start a read on a buffer and finds that I/O is already in progress, it previously waited for that I/O to complete before initiating reads for any other buffers. Although the backend must still wait for the I/O to finish when later acquiring the buffer, it should not need to wait at read start time. Other buffers may be available for I/O, and in some workloads this waiting significantly reduces concurrency. For example, index scans may repeatedly request the same heap block. If the backend waits each time it encounters an in-progress read, the access pattern effectively degenerates into synchronous I/O. By introducing the concept of foreign I/O operations, a backend can record the buffer’s wait reference and defer waiting until WaitReadBuffers() when it actually acquires the buffer. In rare cases, a backend may still need to wait when starting a read if it encounters a buffer after another backend has set BM_IO_IN_PROGRESS but before the buffer descriptor’s wait reference has been set. Such windows should be brief and uncommon. --- src/include/storage/bufmgr.h | 1 + src/backend/storage/buffer/bufmgr.c | 491 ++++++++++++++++++---------- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 325 insertions(+), 168 deletions(-) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index a40adf6b2..1358fc7fa 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -147,6 +147,7 @@ struct ReadBuffersOperation int flags; int16 nblocks; int16 nblocks_done; + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 37bb7c824..557b05e18 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -170,6 +170,21 @@ typedef struct SMgrSortArray SMgrRelation srel; } SMgrSortArray; + +/* + * In AsyncReadBuffers(), when preparing a buffer for reading and setting + * BM_IO_IN_PROGRESS, the buffer may already have I/O in progress or may + * already contain the desired block. AsyncReadBuffers() must distinguish + * between these cases (and the case where it should initiate I/O) so it can + * mark an in-progress buffer as foreign I/O rather than waiting on it. + */ +typedef enum PrepareReadBuffer_Status +{ + READ_BUFFER_ALREADY_DONE, + READ_BUFFER_IN_PROGRESS, + READ_BUFFER_READY_FOR_IO, +} PrepareReadBuffer_Status; + /* GUC variables */ bool zero_damaged_pages = false; int bgwriter_lru_maxpages = 100; @@ -1619,45 +1634,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) #endif } -/* helper for ReadBuffersCanStartIO(), to avoid repetition */ -static inline bool -ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) -{ - if (BufferIsLocal(buffer)) - return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), - true, nowait); - else - return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); -} - -/* - * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. - */ -static inline bool -ReadBuffersCanStartIO(Buffer buffer, bool nowait) -{ - /* - * If this backend currently has staged IO, we need to submit the pending - * IO before waiting for the right to issue IO, to avoid the potential for - * deadlocks (and, more commonly, unnecessary delays for other backends). - */ - if (!nowait && pgaio_have_staged()) - { - if (ReadBuffersCanStartIOOnce(buffer, true)) - return true; - - /* - * Unfortunately StartBufferIO() returning false doesn't allow to - * distinguish between the buffer already being valid and IO already - * being in progress. Since IO already being in progress is quite - * rare, this approach seems fine. - */ - pgaio_submit_staged(); - } - - return ReadBuffersCanStartIOOnce(buffer, nowait); -} - /* * We track various stats related to buffer hits. Because this is done in a * few separate places, this helper exists for convenience. @@ -1807,7 +1783,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) * * we first check if we already know the IO is complete. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1826,11 +1802,33 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (unlikely(operation->foreign_io)) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc = BufferIsLocal(buffer) ? + GetLocalBufferDescriptor(-buffer - 1) : + GetBufferDescriptor(buffer - 1); + uint32 buf_state = pg_atomic_read_u64(&desc->state); + + if (buf_state & BM_VALID) + { + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + ProcessBufferHit(operation->strategy, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + operation->blocknum + operation->nblocks_done); + } + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -1861,6 +1859,159 @@ WaitReadBuffers(ReadBuffersOperation *operation) /* NB: READ_DONE tracepoint was already executed in completion callback */ } +/* + * Local version of PrepareNewReadBufferIO(). Here instead of localbuf.c to + * avoid an external function call. + */ +static PrepareReadBuffer_Status +PrepareNewLocalReadBufferIO(ReadBuffersOperation *operation, + Buffer buffer) +{ + BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1); + uint64 buf_state = pg_atomic_read_u64(&desc->state); + + /* Already valid, no work to do */ + if (buf_state & BM_VALID) + { + pgaio_wref_clear(&operation->io_wref); + return READ_BUFFER_ALREADY_DONE; + } + + pgaio_submit_staged(); + + if (pgaio_wref_valid(&desc->io_wref)) + { + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + return READ_BUFFER_IN_PROGRESS; + } + + return READ_BUFFER_READY_FOR_IO; +} + +/* + * Try to start IO on the first buffer in a new run of blocks. If AIO is in + * progress, be it in this backend or another backend, we just associate the + * wait reference with the operation and wait in WaitReadBuffers(). This turns + * out to be important for performance in two workloads: + * + * 1) A read stream that has to read the same block multiple times within the + * readahead distance. This can happen e.g. for the table accesses of an + * index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be able + * to keep enough I/O in flight. + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. + * + * It's possible that another backend has started IO on the buffer but not yet + * set its wait reference. In this case, we have no choice but to wait for + * either the wait reference to be valid or the IO to be done. + */ +static PrepareReadBuffer_Status +PrepareNewReadBufferIO(ReadBuffersOperation *operation, + Buffer buffer) +{ + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + return PrepareNewLocalReadBufferIO(operation, buffer); + + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + + for (;;) + { + buf_state = LockBufHdr(desc); + + /* Already valid, no work to do */ + if (buf_state & BM_VALID) + { + UnlockBufHdr(desc); + pgaio_wref_clear(&operation->io_wref); + return READ_BUFFER_ALREADY_DONE; + } + + if (buf_state & BM_IO_IN_PROGRESS) + { + /* Join existing read */ + if (pgaio_wref_valid(&desc->io_wref)) + { + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + UnlockBufHdr(desc); + return READ_BUFFER_IN_PROGRESS; + } + + /* + * If the wait ref is not valid but the IO is in progress, someone + * else started IO but hasn't set the wait ref yet. We have no + * choice but to wait until the wait ref is set or the IO + * completes. + */ + UnlockBufHdr(desc); + pgaio_submit_staged(); + WaitIO(desc); + continue; + } + + /* + * No IO in progress and not already valid; We will start IO. It's + * possible that the IO was in progress and never became valid because + * the IO errored out. We'll do the IO ourselves. + */ + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, + BufferDescriptorGetBuffer(desc)); + + return READ_BUFFER_READY_FOR_IO; + } +} + + +/* + * When building a new IO from multiple buffers, we won't include buffers + * that are already valid or already in progress. This function should only be + * used for additional adjacent buffers following the head buffer in a new IO. + * + * Returns true if the buffer was successfully prepared for IO and false if it + * is rejected and the read IO should not include this buffer. +*/ +static bool +PrepareAdditionalReadBuffer(Buffer buffer) +{ + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u64(&desc->state); + /* Local buffers don't use BM_IO_IN_PROGRESS */ + if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref)) + return false; + } + else + { + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS)) + { + UnlockBufHdr(desc); + return false; + } + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer); + } + + return true; +} + /* * Initiate IO for the ReadBuffersOperation * @@ -1894,7 +2045,75 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) void *io_pages[MAX_IO_COMBINE_LIMIT]; IOContext io_context; IOObject io_object; - bool did_start_io; + instr_time io_start; + PrepareReadBuffer_Status status; + + /* + * We must get an IO handle before StartNewBufferReadIO(), as + * pgaio_io_acquire() might block, which we don't want after setting + * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the + * handle. + * + * If we need to wait for IO before we can get a handle, submit + * already-staged IO first, so that other backends don't need to wait. + * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to + * wait for already submitted IO, which doesn't require additional locks, + * but it could still cause undesirable waits. + * + * A secondary benefit is that this would allow us to measure the time in + * pgaio_io_acquire() without causing undue timer overhead in the common, + * non-blocking, case. However, currently the pgstats infrastructure + * doesn't really allow that, as it a) asserts that an operation can't + * have time without operations b) doesn't have an API to report + * "accumulated" time. + */ + ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return); + if (unlikely(!ioh)) + { + pgaio_submit_staged(); + ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); + } + + operation->foreign_io = false; + + /* Check if we can start IO on the first to-be-read buffer */ + if ((status = PrepareNewReadBufferIO(operation, buffers[nblocks_done])) < + READ_BUFFER_READY_FOR_IO) + { + pgaio_io_release(ioh); + *nblocks_progress = 1; + if (status == READ_BUFFER_ALREADY_DONE) + { + /* + * Someone else has already completed this block, we're done. + * + * When IO is necessary, ->nblocks_done is updated in + * ProcessReadBuffersResult(), but that is not called if no IO is + * necessary. Thus update here. + */ + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + /* + * Report and track this as a 'hit' for this backend, even though + * it must have started out as a miss in PinBufferForBlock(). The + * other backend will track this as a 'read'. + */ + ProcessBufferHit(operation->strategy, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + operation->blocknum + operation->nblocks_done); + return false; + } + + /* The IO is already in-progress */ + Assert(status == READ_BUFFER_IN_PROGRESS); + CheckReadBuffersOperation(operation, false); + return true; + } + + /* We can read in at least the head buffer . */ + Assert(status == READ_BUFFER_READY_FOR_IO); /* * When this IO is executed synchronously, either because the caller will @@ -1945,138 +2164,74 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) */ pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); - /* - * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() - * might block, which we don't want after setting IO_IN_PROGRESS. - * - * If we need to wait for IO before we can get a handle, submit - * already-staged IO first, so that other backends don't need to wait. - * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to - * wait for already submitted IO, which doesn't require additional locks, - * but it could still cause undesirable waits. - * - * A secondary benefit is that this would allow us to measure the time in - * pgaio_io_acquire() without causing undue timer overhead in the common, - * non-blocking, case. However, currently the pgstats infrastructure - * doesn't really allow that, as it a) asserts that an operation can't - * have time without operations b) doesn't have an API to report - * "accumulated" time. - */ - ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return); - if (unlikely(!ioh)) - { - pgaio_submit_staged(); - - ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); - } + Assert(io_buffers[0] == buffers[nblocks_done]); + io_pages[0] = BufferGetBlock(buffers[nblocks_done]); + io_buffers_len = 1; /* - * Check if we can start IO on the first to-be-read buffer. - * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. + * How many neighboring-on-disk blocks can we scatter-read into other + * buffers at the same time? In this case we don't wait if we see an I/O + * already in progress. We already set BM_IO_IN_PROGRESS for the head + * block, so we should get on with that I/O as soon as possible. */ - if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) + for (int i = nblocks_done + 1; i < operation->nblocks; i++) { - /* - * Someone else has already completed this block, we're done. - * - * When IO is necessary, ->nblocks_done is updated in - * ProcessReadBuffersResult(), but that is not called if no IO is - * necessary. Thus update here. - */ - operation->nblocks_done += 1; - *nblocks_progress = 1; + if (!PrepareAdditionalReadBuffer(buffers[i])) + break; + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i - 1]) == + BufferGetBlockNumber(buffers[i]) - 1); + Assert(io_buffers[io_buffers_len] == buffers[i]); - pgaio_io_release(ioh); - pgaio_wref_clear(&operation->io_wref); - did_start_io = false; - - /* - * Report and track this as a 'hit' for this backend, even though it - * must have started out as a miss in PinBufferForBlock(). The other - * backend will track this as a 'read'. - */ - ProcessBufferHit(operation->strategy, operation->rel, persistence, - operation->smgr, forknum, - blocknum + operation->nblocks_done); + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); } + + /* get a reference to wait for in WaitReadBuffers() */ + pgaio_io_get_wref(ioh, &operation->io_wref); + + /* provide the list of buffers to the completion callbacks */ + pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); + + pgaio_io_register_callbacks(ioh, + persistence == RELPERSISTENCE_TEMP ? + PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV, + flags); + + pgaio_io_set_flag(ioh, ioh_flags); + + /* --- + * Even though we're trying to issue IO asynchronously, track the time + * in smgrstartreadv(): + * - if io_method == IOMETHOD_SYNC, we will always perform the IO + * immediately + * - the io method might not support the IO (e.g. worker IO for a temp + * table) + * --- + */ + io_start = pgstat_prepare_io_time(track_io_timing); + smgrstartreadv(ioh, operation->smgr, forknum, + blocknum + nblocks_done, + io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 1, io_buffers_len * BLCKSZ); + + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_read += io_buffers_len; else - { - instr_time io_start; + pgBufferUsage.shared_blks_read += io_buffers_len; - /* We found a buffer that we need to read in. */ - Assert(io_buffers[0] == buffers[nblocks_done]); - io_pages[0] = BufferGetBlock(buffers[nblocks_done]); - io_buffers_len = 1; + /* + * Track vacuum cost when issuing IO, not after waiting for it. Otherwise + * we could end up issuing a lot of IO in a short timespan, despite a low + * cost limit. + */ + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; - /* - * How many neighboring-on-disk blocks can we scatter-read into other - * buffers at the same time? In this case we don't wait if we see an - * I/O already in progress. We already set BM_IO_IN_PROGRESS for the - * head block, so we should get on with that I/O as soon as possible. - */ - for (int i = nblocks_done + 1; i < operation->nblocks; i++) - { - if (!ReadBuffersCanStartIO(buffers[i], true)) - break; - /* Must be consecutive block numbers. */ - Assert(BufferGetBlockNumber(buffers[i - 1]) == - BufferGetBlockNumber(buffers[i]) - 1); - Assert(io_buffers[io_buffers_len] == buffers[i]); + *nblocks_progress = io_buffers_len; - io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); - } - - /* get a reference to wait for in WaitReadBuffers() */ - pgaio_io_get_wref(ioh, &operation->io_wref); - - /* provide the list of buffers to the completion callbacks */ - pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); - - pgaio_io_register_callbacks(ioh, - persistence == RELPERSISTENCE_TEMP ? - PGAIO_HCB_LOCAL_BUFFER_READV : - PGAIO_HCB_SHARED_BUFFER_READV, - flags); - - pgaio_io_set_flag(ioh, ioh_flags); - - /* --- - * Even though we're trying to issue IO asynchronously, track the time - * in smgrstartreadv(): - * - if io_method == IOMETHOD_SYNC, we will always perform the IO - * immediately - * - the io method might not support the IO (e.g. worker IO for a temp - * table) - * --- - */ - io_start = pgstat_prepare_io_time(track_io_timing); - smgrstartreadv(ioh, operation->smgr, forknum, - blocknum + nblocks_done, - io_pages, io_buffers_len); - pgstat_count_io_op_time(io_object, io_context, IOOP_READ, - io_start, 1, io_buffers_len * BLCKSZ); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_read += io_buffers_len; - else - pgBufferUsage.shared_blks_read += io_buffers_len; - - /* - * Track vacuum cost when issuing IO, not after waiting for it. - * Otherwise we could end up issuing a lot of IO in a short timespan, - * despite a low cost limit. - */ - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; - - *nblocks_progress = io_buffers_len; - did_start_io = true; - } - - return did_start_io; + return true; } /* diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ba52cf502..1a0f9ea60 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2346,6 +2346,7 @@ PredicateLockData PredicateLockTargetType PrefetchBufferResult PrepParallelRestorePtrType +PrepareReadBuffer_Status PrepareStmt PreparedStatement PresortedKeyData -- 2.51.0