From ea0abea26bcd6ddd05c8c4ff952557ea4861c0cd Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jan 2025 16:06:51 -0500 Subject: [PATCH v2.5 08/30] aio: Implement smgr/md/fd read support --- src/include/storage/aio.h | 6 +- src/include/storage/aio_types.h | 10 +- src/include/storage/fd.h | 4 + src/include/storage/md.h | 7 + src/include/storage/smgr.h | 17 +++ src/backend/storage/aio/aio_callback.c | 3 + src/backend/storage/aio/aio_target.c | 2 + src/backend/storage/file/fd.c | 40 ++++++ src/backend/storage/smgr/md.c | 175 +++++++++++++++++++++++++ src/backend/storage/smgr/smgr.c | 106 +++++++++++++++ 10 files changed, 366 insertions(+), 4 deletions(-) diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index d45d70d12dc..8fbd206c343 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -108,9 +108,10 @@ typedef enum PgAioTargetID { /* intentionally the zero value, to help catch zeroed memory etc */ PGAIO_TID_INVALID = 0, + PGAIO_TID_SMGR, } PgAioTargetID; -#define PGAIO_TID_COUNT (PGAIO_TID_INVALID + 1) +#define PGAIO_TID_COUNT (PGAIO_TID_SMGR + 1) /* @@ -174,6 +175,9 @@ typedef struct PgAioTargetInfo typedef enum PgAioHandleCallbackID { PGAIO_HCB_INVALID, + + PGAIO_HCB_MD_READV, + PGAIO_HCB_MD_WRITEV, } PgAioHandleCallbackID; diff --git a/src/include/storage/aio_types.h b/src/include/storage/aio_types.h index d2617139a25..3ff9282bb1b 100644 --- a/src/include/storage/aio_types.h +++ b/src/include/storage/aio_types.h @@ -58,11 +58,15 @@ typedef struct PgAioWaitRef */ typedef union PgAioTargetData { - /* just as an example placeholder for later */ struct { - uint32 queue_id; - } wal; + RelFileLocator rlocator; /* physical relation identifier */ + BlockNumber blockNum; /* blknum relative to begin of reln */ + BlockNumber nblocks; + ForkNumber forkNum:8; /* don't waste 4 byte for four values */ + bool is_temp:1; /* proc can be inferred by owning AIO */ + bool skip_fsync:1; + } smgr; } PgAioTargetData; diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index e3067ab6597..f44f3b939d6 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -101,6 +101,8 @@ extern PGDLLIMPORT int max_safe_fds; * prototypes for functions in fd.c */ +struct PgAioHandle; + /* Operations on virtual Files --- equivalent to Unix kernel file ops */ extern File PathNameOpenFile(const char *fileName, int fileFlags); extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); @@ -109,6 +111,8 @@ extern void FileClose(File file); extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event_info); extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); +extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); +extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info); extern int FileSync(File file, uint32 wait_event_info); extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info); extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info); diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 05bf537066e..bf714a8896d 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -19,6 +19,9 @@ #include "storage/smgr.h" #include "storage/sync.h" +struct PgAioHandleCallbacks; +extern const struct PgAioHandleCallbacks aio_md_readv_cb; + /* md storage manager functionality */ extern void mdinit(void); extern void mdopen(SMgrRelation reln); @@ -36,6 +39,9 @@ extern uint32 mdmaxcombine(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void **buffers, BlockNumber nblocks); +extern void mdstartreadv(struct PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + void **buffers, BlockNumber nblocks); extern void mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); @@ -46,6 +52,7 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, BlockNumber nblocks); extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum); extern void mdregistersync(SMgrRelation reln, ForkNumber forknum); +extern int mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off); extern void ForgetDatabaseSyncRequests(Oid dbid); extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 4016b206ad6..85562fa0cc0 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -73,6 +73,11 @@ typedef SMgrRelationData *SMgrRelation; #define SmgrIsTemp(smgr) \ RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator) +struct PgAioHandle; +struct PgAioTargetInfo; + +extern const struct PgAioTargetInfo aio_smgr_target_info; + extern void smgrinit(void); extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend); extern bool smgrexists(SMgrRelation reln, ForkNumber forknum); @@ -97,6 +102,10 @@ extern uint32 smgrmaxcombine(SMgrRelation reln, ForkNumber forknum, extern void smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void **buffers, BlockNumber nblocks); +extern void smgrstartreadv(struct PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + void **buffers, BlockNumber nblocks); extern void smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, @@ -110,6 +119,7 @@ extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nblocks); extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum); extern void smgrregistersync(SMgrRelation reln, ForkNumber forknum); +extern int smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off); extern void AtEOXact_SMgr(void); extern bool ProcessBarrierSmgrRelease(void); @@ -127,4 +137,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync); } +extern void pgaio_io_set_target_smgr(struct PgAioHandle *ioh, + SMgrRelationData *smgr, + ForkNumber forknum, + BlockNumber blocknum, + int nblocks, + bool skip_fsync); + #endif /* SMGR_H */ diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index 3071bf19f23..3112b935676 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -18,6 +18,7 @@ #include "miscadmin.h" #include "storage/aio.h" #include "storage/aio_internal.h" +#include "storage/md.h" /* just to have something to put into the aio_handle_cbs */ @@ -37,6 +38,8 @@ typedef struct PgAioHandleCallbacksEntry static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { #define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback} CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), + + CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb), #undef CALLBACK_ENTRY }; diff --git a/src/backend/storage/aio/aio_target.c b/src/backend/storage/aio/aio_target.c index 637e3aa0928..45a4af68f24 100644 --- a/src/backend/storage/aio/aio_target.c +++ b/src/backend/storage/aio/aio_target.c @@ -16,6 +16,7 @@ #include "storage/aio.h" #include "storage/aio_internal.h" +#include "storage/smgr.h" /* @@ -29,6 +30,7 @@ static const PgAioTargetInfo *pgaio_target_info[] = { [PGAIO_TID_INVALID] = &(PgAioTargetInfo) { .name = "invalid", }, + [PGAIO_TID_SMGR] = &aio_smgr_target_info, }; diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index e454db4c020..996458fac71 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -94,6 +94,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/startup.h" +#include "storage/aio.h" #include "storage/fd.h" #include "storage/ipc.h" #include "utils/guc.h" @@ -1294,6 +1295,8 @@ LruDelete(File file) vfdP = &VfdCache[file]; + pgaio_closing_fd(vfdP->fd); + /* * Close the file. We aren't expecting this to fail; if it does, better * to leak the FD than to mess up our internal state. @@ -1987,6 +1990,8 @@ FileClose(File file) if (!FileIsNotOpen(file)) { + pgaio_closing_fd(vfdP->fd); + /* close the file */ if (close(vfdP->fd) != 0) { @@ -2210,6 +2215,32 @@ retry: return returnCode; } +int +FileStartReadV(struct PgAioHandle *ioh, File file, + int iovcnt, off_t offset, + uint32 wait_event_info) +{ + int returnCode; + Vfd *vfdP; + + Assert(FileIsValid(file)); + + DO_DB(elog(LOG, "FileStartReadV: %d (%s) " INT64_FORMAT " %d", + file, VfdCache[file].fileName, + (int64) offset, + iovcnt)); + + returnCode = FileAccess(file); + if (returnCode < 0) + return returnCode; + + vfdP = &VfdCache[file]; + + pgaio_io_prep_readv(ioh, vfdP->fd, iovcnt, offset); + + return 0; +} + ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info) @@ -2498,6 +2529,12 @@ FilePathName(File file) int FileGetRawDesc(File file) { + int returnCode; + + returnCode = FileAccess(file); + if (returnCode < 0) + return returnCode; + Assert(FileIsValid(file)); return VfdCache[file].fd; } @@ -2778,6 +2815,7 @@ FreeDesc(AllocateDesc *desc) result = closedir(desc->desc.dir); break; case AllocateDescRawFD: + pgaio_closing_fd(desc->desc.fd); result = close(desc->desc.fd); break; default: @@ -2846,6 +2884,8 @@ CloseTransientFile(int fd) /* Only get here if someone passes us a file not in allocatedDescs */ elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile"); + pgaio_closing_fd(fd); + return close(fd); } diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index f3220f98dc4..d01ae5f6a09 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -31,6 +31,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "storage/aio.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/md.h" @@ -152,6 +153,15 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum, static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg); +static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data); +static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel); + +const struct PgAioHandleCallbacks aio_md_readv_cb = { + .complete_shared = md_readv_complete, + .report = md_readv_report, +}; + + static inline int _mdfd_open_flags(void) { @@ -937,6 +947,61 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } } +/* + * mdstartreadv() -- Asynchronous version of mdreadv(). + */ +void +mdstartreadv(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + void **buffers, BlockNumber nblocks) +{ + off_t seekpos; + MdfdVec *v; + BlockNumber nblocks_this_segment; + struct iovec *iov; + int iovcnt; + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + nblocks_this_segment = + Min(nblocks, + RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE))); + + if (nblocks_this_segment != nblocks) + elog(ERROR, "read crossing segment boundary"); + + iovcnt = pgaio_io_get_iovec(ioh, &iov); + + Assert(nblocks <= iovcnt); + + iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment); + + Assert(iovcnt <= nblocks_this_segment); + + if (!(io_direct_flags & IO_DIRECT_DATA)) + pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED); + + pgaio_io_set_target_smgr(ioh, + reln, + forknum, + blocknum, + nblocks, + false); + pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_READV, 0); + + FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ); + + /* + * The error checks corresponding to the post-read checks in mdread() are + * in md_readv_complete(). + */ +} + /* * mdwritev() -- Write the supplied blocks at the appropriate location. * @@ -1365,6 +1430,21 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum) } } +int +mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off) +{ + MdfdVec *v = mdopenfork(reln, forknum, EXTENSION_FAIL); + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL); + + *off = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(*off < (off_t) BLCKSZ * RELSEG_SIZE); + + return FileGetRawDesc(v->mdfd_vfd); +} + /* * register_dirty_segment() -- Mark a relation segment as needing fsync * @@ -1841,3 +1921,98 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate) */ return ftag->rlocator.dbOid == candidate->rlocator.dbOid; } + +/* + * AIO completion callback for mdstartreadv(). + */ +static PgAioResult +md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data) +{ + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + PgAioResult result = prior_result; + + if (prior_result.result < 0) + { + result.status = ARS_ERROR; + result.id = PGAIO_HCB_MD_READV; + /* For "hard" errors, track the error number in error_data */ + result.error_data = -prior_result.result; + result.result = 0; + + md_readv_report(result, td, LOG); + + return result; + } + + /* + * The smgr API operates in blocks, therefore convert the result from + * bytes to blocks. + */ + result.result /= BLCKSZ; + + if (result.result == 0) + { + /* consider 0 blocks read a failure */ + result.status = ARS_ERROR; + result.id = PGAIO_HCB_MD_READV; + result.error_data = 0; + + md_readv_report(result, td, LOG); + + return result; + } + + if (result.status != ARS_ERROR && + result.result < td->smgr.nblocks) + { + /* partial reads should be retried at upper level */ + result.status = ARS_PARTIAL; + result.id = PGAIO_HCB_MD_READV; + } + + return result; +} + +/* + * AIO error reporting callback for mdstartreadv(). + */ +static void +md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel) +{ + RelPathStr path; + + path = relpathbackend(td->smgr.rlocator, + td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER, + td->smgr.forkNum); + + if (result.error_data != 0) + { + errno = result.error_data; /* for errcode_for_file_access() */ + + ereport(elevel, + errcode_for_file_access(), + errmsg("could not read blocks %u..%u in file \"%s\": %m", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks, + path.str + ) + ); + } + else + { + /* + * NB: This will typically only be output in debug messages, while + * retrying a partial IO. + */ + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks - 1, + path.str, + result.result * (size_t) BLCKSZ, + td->smgr.nblocks * (size_t) BLCKSZ + ) + ); + } +} diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index ebe35c04de5..ea8b42ee4e8 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -53,6 +53,7 @@ #include "access/xlogutils.h" #include "lib/ilist.h" +#include "storage/aio.h" #include "storage/bufmgr.h" #include "storage/ipc.h" #include "storage/md.h" @@ -93,6 +94,10 @@ typedef struct f_smgr void (*smgr_readv) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void **buffers, BlockNumber nblocks); + void (*smgr_startreadv) (struct PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + void **buffers, BlockNumber nblocks); void (*smgr_writev) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, @@ -104,6 +109,7 @@ typedef struct f_smgr BlockNumber old_blocks, BlockNumber nblocks); void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum); void (*smgr_registersync) (SMgrRelation reln, ForkNumber forknum); + int (*smgr_fd) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off); } f_smgr; static const f_smgr smgrsw[] = { @@ -121,12 +127,14 @@ static const f_smgr smgrsw[] = { .smgr_prefetch = mdprefetch, .smgr_maxcombine = mdmaxcombine, .smgr_readv = mdreadv, + .smgr_startreadv = mdstartreadv, .smgr_writev = mdwritev, .smgr_writeback = mdwriteback, .smgr_nblocks = mdnblocks, .smgr_truncate = mdtruncate, .smgr_immedsync = mdimmedsync, .smgr_registersync = mdregistersync, + .smgr_fd = mdfd, } }; @@ -145,6 +153,16 @@ static void smgrshutdown(int code, Datum arg); static void smgrdestroy(SMgrRelation reln); +static void smgr_aio_reopen(PgAioHandle *ioh); +static char *smgr_aio_describe_identity(const PgAioTargetData *sd); + +const struct PgAioTargetInfo aio_smgr_target_info = { + .name = "smgr", + .reopen = smgr_aio_reopen, + .describe_identity = smgr_aio_describe_identity, +}; + + /* * smgrinit(), smgrshutdown() -- Initialize or shut down storage * managers. @@ -623,6 +641,22 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, nblocks); } +/* + * smgrstartreadv() -- asynchronous version of smgrreadv() + * + * This starts an asynchronous readv IO with the IO handle `ioh`. Other than + * `ioh` all parameters are the same as smgrreadv(). + */ +void +smgrstartreadv(struct PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + void **buffers, BlockNumber nblocks) +{ + smgrsw[reln->smgr_which].smgr_startreadv(ioh, + reln, forknum, blocknum, buffers, + nblocks); +} + /* * smgrwritev() -- Write the supplied buffers out. * @@ -819,6 +853,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum) smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum); } +int +smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off) +{ + return smgrsw[reln->smgr_which].smgr_fd(reln, forknum, blocknum, off); +} + /* * AtEOXact_SMgr * @@ -847,3 +887,69 @@ ProcessBarrierSmgrRelease(void) smgrreleaseall(); return true; } + +void +pgaio_io_set_target_smgr(PgAioHandle *ioh, + struct SMgrRelationData *smgr, + ForkNumber forknum, + BlockNumber blocknum, + int nblocks, + bool skip_fsync) +{ + PgAioTargetData *sd = pgaio_io_get_target_data(ioh); + + pgaio_io_set_target(ioh, PGAIO_TID_SMGR); + + /* backend is implied via IO owner */ + sd->smgr.rlocator = smgr->smgr_rlocator.locator; + sd->smgr.forkNum = forknum; + sd->smgr.blockNum = blocknum; + sd->smgr.nblocks = nblocks; + sd->smgr.is_temp = SmgrIsTemp(smgr); + /* Temp relations should never be fsync'd */ + sd->smgr.skip_fsync = skip_fsync && !SmgrIsTemp(smgr); +} + +static void +smgr_aio_reopen(PgAioHandle *ioh) +{ + PgAioTargetData *sd = pgaio_io_get_target_data(ioh); + PgAioOpData *od = pgaio_io_get_op_data(ioh); + SMgrRelation reln; + ProcNumber procno; + uint32 off; + + if (sd->smgr.is_temp) + procno = pgaio_io_get_owner(ioh); + else + procno = INVALID_PROC_NUMBER; + + reln = smgropen(sd->smgr.rlocator, procno); + od->read.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off); + Assert(off == od->read.offset); +} + +static char * +smgr_aio_describe_identity(const PgAioTargetData *sd) +{ + RelPathStr path; + char *desc; + + path = relpathbackend(sd->smgr.rlocator, + sd->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER, + sd->smgr.forkNum); + + if (sd->smgr.nblocks == 0) + desc = psprintf(_("file \"%s\""), path.str); + else if (sd->smgr.nblocks == 1) + desc = psprintf(_("block %u in file \"%s\""), + sd->smgr.blockNum, + path.str); + else + desc = psprintf(_("blocks %u..%u in file \"%s\""), + sd->smgr.blockNum, + sd->smgr.blockNum + sd->smgr.nblocks - 1, + path.str); + + return desc; +} -- 2.48.1.76.g4e746b1a31.dirty