From a2f7a2d85772011077082cd06b7f8fa54c324035 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jan 2025 16:08:58 -0500 Subject: [PATCH v2.4 19/29] bufmgr: Implement AIO read support As of this commit there are no users of these AIO facilities, that'll come in later commits. Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/include/storage/aio.h | 4 + src/include/storage/buf_internals.h | 6 + src/include/storage/bufmgr.h | 8 + src/backend/storage/aio/aio_callback.c | 5 + src/backend/storage/buffer/buf_init.c | 3 + src/backend/storage/buffer/bufmgr.c | 376 ++++++++++++++++++++++++- src/backend/storage/buffer/localbuf.c | 77 +++++ 7 files changed, 472 insertions(+), 7 deletions(-) diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 4c5fb7bcfce..6b34422607c 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -178,6 +178,10 @@ typedef enum PgAioHandleCallbackID PGAIO_HCB_MD_READV, PGAIO_HCB_MD_WRITEV, + + PGAIO_HCB_SHARED_BUFFER_READV, + + PGAIO_HCB_LOCAL_BUFFER_READV, } PgAioHandleCallbackID; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 1a65342177d..2a0c70c9998 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -17,6 +17,7 @@ #include "pgstat.h" #include "port/atomics.h" +#include "storage/aio_types.h" #include "storage/buf.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" @@ -251,6 +252,8 @@ typedef struct BufferDesc int wait_backend_pgprocno; /* backend of pin-count waiter */ int freeNext; /* link in freelist chain */ + + PgAioWaitRef io_wref; LWLock content_lock; /* to lock access to buffer contents */ } BufferDesc; @@ -464,4 +467,7 @@ extern void DropRelationLocalBuffers(RelFileLocator rlocator, extern void DropRelationAllLocalBuffers(RelFileLocator rlocator); extern void AtEOXact_LocalBuffers(bool isCommit); + +extern PgAioResult LocalBufferCompleteRead(int buf_off, Buffer buffer, int mode, bool failed); + #endif /* BUFMGR_INTERNALS_H */ diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 4a035f59a7d..efba4d88d7d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -176,6 +176,12 @@ extern PGDLLIMPORT int NLocBuffer; extern PGDLLIMPORT Block *LocalBufferBlockPointers; extern PGDLLIMPORT int32 *LocalRefCount; + +struct PgAioHandleCallbacks; +extern const struct PgAioHandleCallbacks aio_shared_buffer_readv_cb; +extern const struct PgAioHandleCallbacks aio_local_buffer_readv_cb; + + /* upper limit for effective_io_concurrency */ #define MAX_IO_CONCURRENCY 1000 @@ -193,6 +199,8 @@ extern PGDLLIMPORT int32 *LocalRefCount; /* * prototypes for functions in bufmgr.c */ +struct PgAioHandle; + extern PrefetchBufferResult PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, ForkNumber forkNum, BlockNumber blockNum); diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index adb8050eb58..6afdaaa434b 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -18,6 +18,7 @@ #include "miscadmin.h" #include "storage/aio.h" #include "storage/aio_internal.h" +#include "storage/bufmgr.h" #include "storage/md.h" #include "utils/memutils.h" @@ -42,6 +43,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb), CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb), + + CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb), + + CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb), #undef CALLBACK_ENTRY }; diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index ed1f8e03190..ed1dc488a42 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" @@ -125,6 +126,8 @@ BufferManagerShmemInit(void) buf->buf_id = i; + pgaio_wref_clear(&buf->io_wref); + /* * Initially link all the buffers together as unused. Subsequent * management of this list is done by freelist.c. diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index ec308557179..96b54f7abdf 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -48,6 +48,7 @@ #include "pg_trace.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -58,6 +59,7 @@ #include "storage/smgr.h" #include "storage/standby.h" #include "utils/memdebug.h" +#include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/rel.h" #include "utils/resowner.h" @@ -516,7 +518,8 @@ static int SyncOneBuffer(int buf_id, bool skip_recently_used, static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - uint32 set_flag_bits, bool forget_owner); + uint32 set_flag_bits, bool forget_owner, + bool syncio); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); @@ -1083,7 +1086,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) else { /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); + TerminateBufferIO(bufHdr, false, BM_VALID, true, true); } } else if (!isLocalBuf) @@ -1619,7 +1622,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) else { /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); + TerminateBufferIO(bufHdr, false, BM_VALID, true, true); } /* Report I/Os as completing individually. */ @@ -2530,7 +2533,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, if (lock) LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE); - TerminateBufferIO(buf_hdr, false, BM_VALID, true); + TerminateBufferIO(buf_hdr, false, BM_VALID, true, true); } pgBufferUsage.shared_blks_written += extend_by; @@ -3989,7 +3992,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and * end the BM_IO_IN_PROGRESS state. */ - TerminateBufferIO(buf, true, 0, true); + TerminateBufferIO(buf, true, 0, true, true); TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag), buf->tag.blockNum, @@ -5569,6 +5572,7 @@ WaitIO(BufferDesc *buf) for (;;) { uint32 buf_state; + PgAioWaitRef iow; /* * It may not be necessary to acquire the spinlock to check the flag @@ -5576,10 +5580,19 @@ WaitIO(BufferDesc *buf) * play it safe. */ buf_state = LockBufHdr(buf); + iow = buf->io_wref; UnlockBufHdr(buf, buf_state); if (!(buf_state & BM_IO_IN_PROGRESS)) break; + + if (pgaio_wref_valid(&iow)) + { + pgaio_wref_wait(&iow); + ConditionVariablePrepareToSleep(cv); + continue; + } + ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO); } ConditionVariableCancelSleep(); @@ -5668,7 +5681,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) */ static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, - bool forget_owner) + bool forget_owner, bool syncio) { uint32 buf_state; @@ -5680,6 +5693,13 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); + if (!syncio) + { + /* release ownership by the AIO subsystem */ + buf_state -= BUF_REFCOUNT_ONE; + pgaio_wref_clear(&buf->io_wref); + } + buf_state |= set_flag_bits; UnlockBufHdr(buf, buf_state); @@ -5688,6 +5708,40 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, BufferDescriptorGetBuffer(buf)); ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf)); + + /* + * If we just released a pin, need to do BM_PIN_COUNT_WAITER handling. + * Most of the time the current backend will hold another pin preventing + * that from happening, but that's e.g. not the case when completing an IO + * another backend started. + * + * AFIXME: Deduplicate with UnpinBufferNoOwner() or just replace + * BM_PIN_COUNT_WAITER with something saner. + */ + /* Support LockBufferForCleanup() */ + if (buf_state & BM_PIN_COUNT_WAITER) + { + /* + * Acquire the buffer header lock, re-check that there's a waiter. + * Another backend could have unpinned this buffer, and already woken + * up the waiter. There's no danger of the buffer being replaced + * after we unpinned it above, as it's pinned by the waiter. + */ + buf_state = LockBufHdr(buf); + + if ((buf_state & BM_PIN_COUNT_WAITER) && + BUF_STATE_GET_REFCOUNT(buf_state) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pgprocno = buf->wait_backend_pgprocno; + + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(buf, buf_state); + ProcSendSignal(wait_backend_pgprocno); + } + else + UnlockBufHdr(buf, buf_state); + } } /* @@ -5739,7 +5793,7 @@ AbortBufferIO(Buffer buffer) } } - TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false); + TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false, true); } /* @@ -6198,3 +6252,311 @@ EvictUnpinnedBuffer(Buffer buf) return result; } + +static PgAioResult +SharedBufferCompleteRead(int buf_off, Buffer buffer, int mode, bool failed) +{ + BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1); + BufferTag tag = bufHdr->tag; + char *bufdata = BufferGetBlock(buffer); + PgAioResult result; + + Assert(BufferIsValid(buffer)); + +#ifdef USE_ASSERT_CHECKING + { + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + Assert(buf_state & BM_TAG_VALID); + Assert(!(buf_state & BM_VALID)); + Assert(buf_state & BM_IO_IN_PROGRESS); + Assert(!(buf_state & BM_DIRTY)); + } +#endif + + result.status = ARS_OK; + + /* check for garbage data */ + if (!failed && + !PageIsVerifiedExtended((Page) bufdata, tag.blockNum, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + RelFileLocator rlocator = BufTagGetRelFileLocator(&tag); + + /* AFIXME: relpathperm allocates memory */ + MemoryContextSwitchTo(ErrorContext); + if (mode == READ_BUFFERS_ZERO_ON_ERROR || zero_damaged_pages) + { + ereport(LOG, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + tag.blockNum, + relpathperm(rlocator, tag.forkNum)))); + memset(bufdata, 0, BLCKSZ); + } + else + { + /* mark buffer as having failed */ + failed = true; + + /* encode error for buffer_readv_report */ + result.status = ARS_ERROR; + result.id = PGAIO_HCB_SHARED_BUFFER_READV; + result.error_data = buf_off; + } + } + + /* Terminate I/O and set BM_VALID. */ + TerminateBufferIO(bufHdr, false, + failed ? BM_IO_ERROR : BM_VALID, + false, false); + + TRACE_POSTGRESQL_BUFFER_READ_DONE(tag.forkNum, + tag.blockNum, + tag.spcOid, + tag.dbOid, + tag.relNumber, + INVALID_PROC_NUMBER, + false); + + return result; +} + +/* + * Helper to prepare IO on shared buffers for execution, shared between reads + * and writes. + */ +static void +shared_buffer_stage_common(PgAioHandle *ioh, bool is_write) +{ + uint64 *io_data; + uint8 handle_data_len; + PgAioWaitRef io_ref; + BufferTag first PG_USED_FOR_ASSERTS_ONLY = {0}; + + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + + pgaio_io_get_wref(ioh, &io_ref); + + for (int i = 0; i < handle_data_len; i++) + { + Buffer buf = (Buffer) io_data[i]; + BufferDesc *bufHdr; + uint32 buf_state; + + bufHdr = GetBufferDescriptor(buf - 1); + + if (i == 0) + first = bufHdr->tag; + else + { + Assert(bufHdr->tag.relNumber == first.relNumber); + Assert(bufHdr->tag.blockNum == first.blockNum + i); + } + + + buf_state = LockBufHdr(bufHdr); + + Assert(buf_state & BM_TAG_VALID); + if (is_write) + { + Assert(buf_state & BM_VALID); + Assert(buf_state & BM_DIRTY); + } + else + Assert(!(buf_state & BM_VALID)); + + Assert(buf_state & BM_IO_IN_PROGRESS); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) >= 1); + + buf_state += BUF_REFCOUNT_ONE; + bufHdr->io_wref = io_ref; + + UnlockBufHdr(bufHdr, buf_state); + + if (is_write) + { + LWLock *content_lock; + + content_lock = BufferDescriptorGetContentLock(bufHdr); + + Assert(LWLockHeldByMe(content_lock)); + + /* + * Lock is now owned by AIO subsystem. + */ + LWLockDisown(content_lock); + } + + /* + * Stop tracking this buffer via the resowner - the AIO system now + * keeps track. + */ + ResourceOwnerForgetBufferIO(CurrentResourceOwner, buf); + } +} + +static void +shared_buffer_readv_stage(PgAioHandle *ioh) +{ + shared_buffer_stage_common(ioh, false); +} + +static void +buffer_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel) +{ + MemoryContext oldContext = CurrentMemoryContext; + ProcNumber errProc; + + if (target_data->smgr.is_temp) + errProc = MyProcNumber; + else + errProc = INVALID_PROC_NUMBER; + + /* + * AFIXME: need infrastructure to allow memory allocation for error + * reporting + */ + oldContext = MemoryContextSwitchTo(ErrorContext); + + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + target_data->smgr.blockNum + result.error_data, + relpathbackend(target_data->smgr.rlocator, errProc, target_data->smgr.forkNum) + ) + ); + MemoryContextSwitchTo(oldContext); +} + +/* + * Perform completion handling of a single AIO read. This read may cover + * multiple blocks / buffers. + * + * Shared between shared and local buffers, to reduce code duplication. + */ +static PgAioResult +buffer_readv_complete_common(PgAioHandle *ioh, PgAioResult prior_result, bool is_temp) +{ + PgAioResult result = prior_result; + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + int mode = td->smgr.mode; + uint64 *io_data; + uint8 handle_data_len; + + if (is_temp) + { + Assert(td->smgr.is_temp); + Assert(pgaio_io_get_owner(ioh) == MyProcNumber); + } + else + Assert(!td->smgr.is_temp); + + /* + * Iterate over all the buffers affected by this IO and call appropriate + * per-buffer completion function for each buffer. + */ + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + for (int buf_off = 0; buf_off < handle_data_len; buf_off++) + { + Buffer buf = io_data[buf_off]; + PgAioResult buf_result; + bool failed; + + /* + * If the entire failed on a lower-level, each buffer needs to be + * marked as failed. In case of a partial read, some buffers may be + * ok. + */ + failed = + prior_result.status == ARS_ERROR + || prior_result.result <= buf_off; + + if (is_temp) + buf_result = LocalBufferCompleteRead(buf_off, buf, mode, failed); + else + buf_result = SharedBufferCompleteRead(buf_off, buf, mode, failed); + + /* + * If there wasn't any prior error and the IO for this page failed in + * some form, set the whole IO's to the page's result. + */ + if (result.status != ARS_ERROR && buf_result.status != ARS_OK) + { + buffer_readv_report(result, td, LOG); + result = buf_result; + } + } + + return result; +} + +static PgAioResult +shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result) +{ + return buffer_readv_complete_common(ioh, prior_result, false); +} + +/* + * Helper to stage IO on local buffers for execution, shared between reads + * and writes. + */ +static void +local_buffer_readv_stage(PgAioHandle *ioh) +{ + uint64 *io_data; + uint8 handle_data_len; + PgAioWaitRef io_wref; + + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + + pgaio_io_get_wref(ioh, &io_wref); + + for (int i = 0; i < handle_data_len; i++) + { + Buffer buf = (Buffer) io_data[i]; + BufferDesc *bufHdr; + uint32 buf_state; + + bufHdr = GetLocalBufferDescriptor(-buf - 1); + + buf_state = pg_atomic_read_u32(&bufHdr->state); + + bufHdr->io_wref = io_wref; + + /* + * Track pin by AIO subsystem in BufferDesc, not in LocalRefCount as + * one might initially think. This is necessary to handle this backend + * erroring out while AIO is still in progress. + */ + buf_state += BUF_REFCOUNT_ONE; + + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } +} + +static PgAioResult +local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result) +{ + return buffer_readv_complete_common(ioh, prior_result, true); + +} + + +const struct PgAioHandleCallbacks aio_shared_buffer_readv_cb = { + .stage = shared_buffer_readv_stage, + .complete_shared = shared_buffer_readv_complete, + .report = buffer_readv_report, +}; +const struct PgAioHandleCallbacks aio_local_buffer_readv_cb = { + .stage = local_buffer_readv_stage, + + /* + * Note that this, in contrast to the shared_buffers case, uses + * complete_local, as only the issuing backend has access to the required + * datastructures. This is important in case the IO completion may be + * consumed incidentally by another backend. + */ + .complete_local = local_buffer_readv_complete, + .report = buffer_readv_report, +}; diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 92c45611e0f..d997d8e8632 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -17,7 +17,9 @@ #include "access/parallel.h" #include "executor/instrument.h" +#include "pg_trace.h" #include "pgstat.h" +#include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -649,6 +651,8 @@ InitLocalBuffers(void) */ buf->buf_id = -i - 2; + pgaio_wref_clear(&buf->io_wref); + /* * Intentionally do not initialize the buffer's atomic variable * (besides zeroing the underlying memory above). That way we get @@ -876,3 +880,76 @@ AtProcExit_LocalBuffers(void) */ CheckForLocalBufferLeaks(); } + +PgAioResult +LocalBufferCompleteRead(int buf_off, Buffer buffer, int mode, bool failed) +{ + BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); + BufferTag tag = bufHdr->tag; + char *bufdata = BufferGetBlock(buffer); + PgAioResult result; + + Assert(BufferIsValid(buffer)); + + result.status = ARS_OK; + + /* check for garbage data */ + if (!failed && + !PageIsVerifiedExtended((Page) bufdata, tag.blockNum, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + RelFileLocator rlocator = BufTagGetRelFileLocator(&tag); + BlockNumber forkNum = tag.forkNum; + + MemoryContextSwitchTo(ErrorContext); + + if (mode == READ_BUFFERS_ZERO_ON_ERROR || zero_damaged_pages) + { + + ereport(LOG, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + tag.blockNum, + relpathbackend(rlocator, MyProcNumber, forkNum)))); + memset(bufdata, 0, BLCKSZ); + } + else + { + /* mark buffer as having failed */ + failed = true; + + /* encode error for buffer_readv_report */ + result.status = ARS_ERROR; + result.id = PGAIO_HCB_LOCAL_BUFFER_READV; + result.error_data = buf_off; + } + } + + /* Terminate I/O and set BM_VALID. */ + pgaio_wref_clear(&bufHdr->io_wref); + + { + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state |= BM_VALID; + + /* + * Release pin held by IO subsystem, see also + * local_buffer_readv_prepare(). + */ + buf_state -= BUF_REFCOUNT_ONE; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + + + TRACE_POSTGRESQL_BUFFER_READ_DONE(tag.forkNum, + tag.blockNum, + tag.spcOid, + tag.dbOid, + tag.relNumber, + INVALID_PROC_NUMBER, + false); + + return result; +} -- 2.48.1.76.g4e746b1a31.dirty