From 517e55c26298decd26eab0dec4da220aa084ad35 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 12 Feb 2025 14:19:20 -0500 Subject: [PATCH v2.4 20/29] bufmgr: Use aio for StartReadBuffers() Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/include/storage/bufmgr.h | 7 + src/backend/storage/buffer/bufmgr.c | 411 +++++++++++++++++++++------- 2 files changed, 317 insertions(+), 101 deletions(-) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index efba4d88d7d..dc8fe197d6f 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -15,6 +15,7 @@ #define BUFMGR_H #include "port/pg_iovec.h" +#include "storage/aio_types.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -111,6 +112,9 @@ typedef struct BufferManagerRelation #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0) /* Call smgrprefetch() if I/O necessary. */ #define READ_BUFFERS_ISSUE_ADVICE (1 << 1) +/* IO will immediately be waited for */ +#define READ_BUFFERS_SYNCHRONOUSLY (1 << 2) + struct ReadBuffersOperation { @@ -130,6 +134,9 @@ struct ReadBuffersOperation BlockNumber blocknum; int flags; int16 nblocks; + + PgAioWaitRef io_wref; + PgAioReturn io_return; }; typedef struct ReadBuffersOperation ReadBuffersOperation; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 96b54f7abdf..ee9a9f70167 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -529,6 +529,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr, IOContext io_context); +static bool AsyncReadBuffers(ReadBuffersOperation *operation, + int *nblocks); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); @@ -1237,10 +1239,9 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence, return buffer; } + flags = READ_BUFFERS_SYNCHRONOUSLY; if (mode == RBM_ZERO_ON_ERROR) - flags = READ_BUFFERS_ZERO_ON_ERROR; - else - flags = 0; + flags |= READ_BUFFERS_ZERO_ON_ERROR; operation.smgr = smgr; operation.rel = rel; operation.persistence = persistence; @@ -1268,6 +1269,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, Assert(*nblocks > 0); Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); + Assert(*nblocks == 1 || allow_forwarding); for (int i = 0; i < actual_nblocks; ++i) { @@ -1307,6 +1309,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, else bufHdr = GetBufferDescriptor(buffers[i] - 1); found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID; + + ereport(DEBUG3, + errmsg("found forwarded buffer %d", + buffers[i]), + errhidestmt(true), errhidecontext(true)); } else { @@ -1372,25 +1379,59 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, operation->blocknum = blockNum; operation->flags = flags; operation->nblocks = actual_nblocks; + pgaio_wref_clear(&operation->io_wref); - if (flags & READ_BUFFERS_ISSUE_ADVICE) + /* + * When using AIO, start the IO in the background. If not, issue prefetch + * requests if desired by the caller. + * + * The reason we have a dedicated path for IOMETHOD_SYNC here is to + * de-risk the introduction of AIO somewhat. It's a large architectural + * change, with lots of chances for unanticipated performance effects. + * + * Use of IOMETHOD_SYNC already leads to not actually performing IO + * asynchronously, but without the check here we'd execute IO earlier than + * we used to. Eventually this IOMETHOD_SYNC specific path should go away. + */ + if (io_method != IOMETHOD_SYNC) { /* - * In theory we should only do this if PinBufferForBlock() had to - * allocate new buffers above. That way, if two calls to - * StartReadBuffers() were made for the same blocks before - * WaitReadBuffers(), only the first would issue the advice. That'd be - * a better simulation of true asynchronous I/O, which would only - * start the I/O once, but isn't done here for simplicity. + * Try to start IO asynchronously. It's possible that no IO needs to + * be started, if another backend already performed the IO. + * + * Note that if an IO is started, it might not cover the entire + * requested range, e.g. because an intermediary block has been read + * in by another backend. In that case any "trailing" buffers we + * already pinned above will be "forwarded" by read_stream.c to the + * next call to StartReadBuffers(). This is signalled to the caller by + * decrementing *nblocks. */ - smgrprefetch(operation->smgr, - operation->forknum, - blockNum, - actual_nblocks); + return AsyncReadBuffers(operation, nblocks); } + else + { + operation->flags |= READ_BUFFERS_SYNCHRONOUSLY; - /* Indicate that WaitReadBuffers() should be called. */ - return true; + if (flags & READ_BUFFERS_ISSUE_ADVICE) + { + /* + * In theory we should only do this if PinBufferForBlock() had to + * allocate new buffers above. That way, if two calls to + * StartReadBuffers() were made for the same blocks before + * WaitReadBuffers(), only the first would issue the advice. + * That'd be a better simulation of true asynchronous I/O, which + * would only start the I/O once, but isn't done here for + * simplicity. + */ + smgrprefetch(operation->smgr, + operation->forknum, + blockNum, + actual_nblocks); + } + + /* Indicate that WaitReadBuffers() should be called. */ + return true; + } } /* @@ -1458,12 +1499,31 @@ StartReadBuffer(ReadBuffersOperation *operation, } static inline bool -WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) +ReadBuffersCanStartIO(Buffer buffer, bool nowait) { if (BufferIsLocal(buffer)) { BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); + /* + * The buffer could have IO in progress by another scan. Right now + * localbuf.c doesn't use IO_IN_PROGRESS, which is why we need this + * hack. + * + * TODO: localbuf.c should use IO_IN_PROGRESS / have an equivalent of + * StartBufferIO(). + */ + if (pgaio_wref_valid(&bufHdr->io_wref)) + { + PgAioWaitRef iow = bufHdr->io_wref; + + ereport(DEBUG3, + errmsg("waiting for temp buffer IO in CSIO"), + errhidestmt(true), errhidecontext(true)); + pgaio_wref_wait(&iow); + return false; + } + return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; } else @@ -1473,28 +1533,163 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) void WaitReadBuffers(ReadBuffersOperation *operation) { - Buffer *buffers; + IOContext io_context; + IOObject io_object; int nblocks; - BlockNumber blocknum; - ForkNumber forknum; - IOContext io_context; - IOObject io_object; - char persistence; + PgAioReturn *aio_ret; + + /* + * If we get here without an IO operation having been issued, io_method == + * IOMETHOD_SYNC path must have been used. In that case, we start - as we + * used to before - the IO now, just before waiting. + * + * This path is expected to eventually go away. + */ + if (!pgaio_wref_valid(&operation->io_wref)) + { + Assert(io_method == IOMETHOD_SYNC); + + while (true) + { + nblocks = operation->nblocks; + + if (!AsyncReadBuffers(operation, &nblocks)) + { + /* all blocks were already read in concurrently */ + Assert(nblocks == operation->nblocks); + return; + } + + Assert(nblocks > 0 && nblocks <= operation->nblocks); + + if (nblocks == operation->nblocks) + { + /* will wait below as if this had been normal AIO */ + break; + } + + /* + * It's unlikely, but possible, that AsyncReadBuffers() wasn't + * able to initiate IO for all the relevant buffers. In that case + * we need to wait for the prior IO before issuing more IO. + */ + WaitReadBuffers(operation); + } + } + + if (operation->persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; + } + +restart: /* Find the range of the physical read we need to perform. */ nblocks = operation->nblocks; - buffers = &operation->buffers[0]; - blocknum = operation->blocknum; - forknum = operation->forknum; - persistence = operation->persistence; - Assert(nblocks > 0); Assert(nblocks <= MAX_IO_COMBINE_LIMIT); + aio_ret = &operation->io_return; + + /* + * For IO timing we just count the time spent waiting for the IO. + * + * XXX: We probably should track the IO operation, rather than its time, + * separately, when initiating the IO. But right now that's not quite + * allowed by the interface. + */ + + /* + * Tracking a wait even if we don't actually need to wait + * + * a) is not cheap + * + * b) reports some time as waiting, even if we never waited. + */ + if (aio_ret->result.status == ARS_UNKNOWN && + !pgaio_wref_check_done(&operation->io_wref)) + { + instr_time io_start = pgstat_prepare_io_time(track_io_timing); + + pgaio_wref_wait(&operation->io_wref); + + /* + * The IO operation itself was already counted earlier, in + * AsyncReadBuffers(). + */ + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 0, 0); + } + else + { + Assert(pgaio_wref_check_done(&operation->io_wref)); + } + + if (aio_ret->result.status == ARS_PARTIAL) + { + /* + * We'll retry below, so we just emit a debug message the server log + * (or not even that in prod scenarios). + */ + pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1); + + /* + * Try to perform the rest of the IO. Buffers for which IO has + * completed successfully will be discovered as such and not retried. + */ + nblocks = operation->nblocks; + + elog(DEBUG3, "retrying IO after partial failure"); + CHECK_FOR_INTERRUPTS(); + AsyncReadBuffers(operation, &nblocks); + goto restart; + } + else if (aio_ret->result.status != ARS_OK) + pgaio_result_report(aio_ret->result, &aio_ret->target_data, ERROR); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * nblocks; + + /* NB: READ_DONE tracepoint is executed in IO completion callback */ +} + +/* + * Initiate IO for the ReadBuffersOperation. If IO is only initiated for a + * subset of the blocks, *nblocks is updated to reflect that. + * + * Returns true if IO was initiated, false if no IO was necessary. + */ +static bool +AsyncReadBuffers(ReadBuffersOperation *operation, + int *nblocks) +{ + int io_buffers_len = 0; + Buffer *buffers = &operation->buffers[0]; + int flags = operation->flags; + BlockNumber blocknum = operation->blocknum; + ForkNumber forknum = operation->forknum; + bool did_start_io = false; + PgAioHandle *ioh = NULL; + uint32 ioh_flags = 0; + IOContext io_context; + IOObject io_object; + char persistence; + + persistence = operation->rel + ? operation->rel->rd_rel->relpersistence + : RELPERSISTENCE_PERMANENT; + if (persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; io_object = IOOBJECT_TEMP_RELATION; + ioh_flags |= PGAIO_HF_REFERENCES_LOCAL; } else { @@ -1502,6 +1697,14 @@ WaitReadBuffers(ReadBuffersOperation *operation) io_object = IOOBJECT_RELATION; } + /* + * When this IO is executed synchronously, either because the caller will + * immediately block waiting for the IO or because IOMETHOD_SYNC is used, + * the AIO subsystem needs to know. + */ + if (flags & READ_BUFFERS_SYNCHRONOUSLY) + ioh_flags |= PGAIO_HF_SYNCHRONOUS; + /* * We count all these blocks as read by this backend. This is traditional * behavior, but might turn out to be not true if we find that someone @@ -1511,25 +1714,53 @@ WaitReadBuffers(ReadBuffersOperation *operation) * but another backend completed the read". */ if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_read += nblocks; + pgBufferUsage.local_blks_read += *nblocks; else - pgBufferUsage.shared_blks_read += nblocks; + pgBufferUsage.shared_blks_read += *nblocks; - for (int i = 0; i < nblocks; ++i) + pgaio_wref_clear(&operation->io_wref); + + /* + * Loop until we have started one IO or we discover that all buffers are + * already valid. + */ + for (int i = 0; i < *nblocks; ++i) { - int io_buffers_len; Buffer io_buffers[MAX_IO_COMBINE_LIMIT]; void *io_pages[MAX_IO_COMBINE_LIMIT]; - instr_time io_start; BlockNumber io_first_block; /* - * Skip this block if someone else has already completed it. If an - * I/O is already in progress in another backend, this will wait for - * the outcome: either done, or something went wrong and we will - * retry. + * Get IO before ReadBuffersCanStartIO, as pgaio_io_acquire() might + * block, which we don't want after setting IO_IN_PROGRESS. + * + * XXX: Should we attribute the time spent in here to the IO? If there + * already are a lot of IO operations in progress, getting an IO + * handle will block waiting for some other IO operation to finish. + * + * In most cases it'll be free to get the IO, so a timer would be + * overhead. Perhaps we should use pgaio_io_acquire_nb() and only + * account IO time when pgaio_io_acquire_nb() returned false? */ - if (!WaitReadBuffersCanStartIO(buffers[i], false)) + if (likely(!ioh)) + ioh = pgaio_io_acquire(CurrentResourceOwner, + &operation->io_return); + + /* + * Skip this block if someone else has already completed it. + * + * If an I/O is already in progress in another backend, this will wait + * for the outcome: either done, or something went wrong and we will + * retry. But don't wait if we have staged, but haven't issued, + * another IO. + * + * It's safe to start IO while we have unsubmitted IO, but it'd be + * better to first submit it. But right now the boolean return value + * from ReadBuffersCanStartIO()/StartBufferIO() doesn't allow to + * distinguish between nowait=true trigger failure and the buffer + * already being valid. + */ + if (!ReadBuffersCanStartIO(buffers[i], false)) { /* * Report this as a 'hit' for this backend, even though it must @@ -1541,6 +1772,11 @@ WaitReadBuffers(ReadBuffersOperation *operation) operation->smgr->smgr_rlocator.locator.relNumber, operation->smgr->smgr_rlocator.backend, true); + + ereport(DEBUG3, + errmsg("can't start io for first buffer %u: %s", + buffers[i], DebugPrintBufferRefcount(buffers[i])), + errhidestmt(true), errhidecontext(true)); continue; } @@ -1550,6 +1786,11 @@ WaitReadBuffers(ReadBuffersOperation *operation) io_first_block = blocknum + i; io_buffers_len = 1; + ereport(DEBUG5, + errmsg("first prepped for io: %s, offset %d", + DebugPrintBufferRefcount(io_buffers[0]), i), + errhidestmt(true), errhidecontext(true)); + /* * How many neighboring-on-disk blocks can we scatter-read into other * buffers at the same time? In this case we don't wait if we see an @@ -1557,86 +1798,54 @@ WaitReadBuffers(ReadBuffersOperation *operation) * head block, so we should get on with that I/O as soon as possible. * We'll come back to this block again, above. */ - while ((i + 1) < nblocks && - WaitReadBuffersCanStartIO(buffers[i + 1], true)) + while ((i + 1) < *nblocks && + ReadBuffersCanStartIO(buffers[i + 1], true)) { /* Must be consecutive block numbers. */ Assert(BufferGetBlockNumber(buffers[i + 1]) == BufferGetBlockNumber(buffers[i]) + 1); + ereport(DEBUG5, + errmsg("seq prepped for io: %s, offset %d", + DebugPrintBufferRefcount(buffers[i + 1]), + i + 1), + errhidestmt(true), errhidecontext(true)); + io_buffers[io_buffers_len] = buffers[++i]; io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); } - io_start = pgstat_prepare_io_time(track_io_timing); - smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len); - pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, - 1, io_buffers_len * BLCKSZ); + pgaio_io_get_wref(ioh, &operation->io_wref); - /* Verify each block we read, and terminate the I/O. */ - for (int j = 0; j < io_buffers_len; ++j) - { - BufferDesc *bufHdr; - Block bufBlock; + pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); - if (persistence == RELPERSISTENCE_TEMP) - { - bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); - bufBlock = LocalBufHdrGetBlock(bufHdr); - } - else - { - bufHdr = GetBufferDescriptor(io_buffers[j] - 1); - bufBlock = BufHdrGetBlock(bufHdr); - } + if (persistence == RELPERSISTENCE_TEMP) + pgaio_io_register_callbacks(ioh, PGAIO_HCB_LOCAL_BUFFER_READV); + else + pgaio_io_register_callbacks(ioh, PGAIO_HCB_SHARED_BUFFER_READV); - /* check for garbage data */ - if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, - PIV_LOG_WARNING | PIV_REPORT_STAT)) - { - if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) - { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - io_first_block + j, - relpath(operation->smgr->smgr_rlocator, forknum)))); - memset(bufBlock, 0, BLCKSZ); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - io_first_block + j, - relpath(operation->smgr->smgr_rlocator, forknum)))); - } + pgaio_io_set_flag(ioh, ioh_flags); - /* Terminate I/O and set BM_VALID. */ - if (persistence == RELPERSISTENCE_TEMP) - { - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + did_start_io = true; + smgrstartreadv(ioh, operation->smgr, forknum, io_first_block, + io_pages, io_buffers_len); + ioh = NULL; - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); - } - else - { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true, true); - } + /* not obvious what we'd use for time */ + pgstat_count_io_op(io_object, io_context, IOOP_READ, + 1, io_buffers_len * BLCKSZ); - /* Report I/Os as completing individually. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, - operation->smgr->smgr_rlocator.locator.spcOid, - operation->smgr->smgr_rlocator.locator.dbOid, - operation->smgr->smgr_rlocator.locator.relNumber, - operation->smgr->smgr_rlocator.backend, - false); - } + *nblocks = io_buffers_len; + break; + } - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + if (ioh) + { + pgaio_io_release(ioh); + ioh = NULL; } + + return did_start_io; } /* -- 2.48.1.76.g4e746b1a31.dirty