From 38ad5ab077cdbba64da8ea019293030b81b8394e Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Wed, 19 Jul 2023 14:50:41 +1200 Subject: [PATCH v4 3/8] Provide vectored variants of smgrread() and smgrwrite(). smgrreadv() and smgrwritev() map to FileReadV() and FileWriteV(), and thence the OS's vector I/O routines, after dealing with segmentation. These functions allow for contiguous regions of relation files to be transferred into and out of non-neighboring buffers with a single system call. The traditional smgrread() and smgrwrite() functions are implemented in terms of the new functions. The md.c implementation automatically handles short transfers by retrying, where previously it would fail. This is in line with what xlog.c does with its writes, based on the suspicion that larger I/Os might be more likely to be partially completed. Reviewed-by: Heikki Linnakangas Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com --- doc/src/sgml/monitoring.sgml | 4 +- src/backend/storage/smgr/md.c | 327 ++++++++++++++++++++++---------- src/backend/storage/smgr/smgr.c | 37 ++-- src/include/storage/md.h | 9 +- src/include/storage/smgr.h | 25 ++- 5 files changed, 280 insertions(+), 122 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 42509042ad..4f8058d8b1 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6868,7 +6868,7 @@ FROM pg_stat_get_backend_idset() AS backendid; arg5 is the ID of the backend which created the temporary relation for a local buffer, or InvalidBackendId (-1) for a shared buffer. arg6 is the number of bytes actually read, while arg7 is the number - requested (if these are different it indicates trouble). + requested (if these are different it indicates a short read). smgr-md-write-start @@ -6890,7 +6890,7 @@ FROM pg_stat_get_backend_idset() AS backendid; arg5 is the ID of the backend which created the temporary relation for a local buffer, or InvalidBackendId (-1) for a shared buffer. arg6 is the number of bytes actually written, while arg7 is the number - requested (if these are different it indicates trouble). + requested (if these are different it indicates a short write). sort-start diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index fdecbad170..8e58637792 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -28,6 +28,7 @@ #include "access/xlog.h" #include "access/xlogutils.h" #include "commands/tablespace.h" +#include "common/file_utils.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" @@ -452,7 +453,7 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo) /* * mdextend() -- Add a block to the specified relation. * - * The semantics are nearly the same as mdwrite(): write at the + * The semantics are nearly the same as mdwritev(): write at the * specified position. However, this is to be used for the case of * extending a relation (i.e., blocknum is at or beyond the current * EOF). Note that we assume writing a block beyond current EOF @@ -737,138 +738,274 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) } /* - * mdread() -- Read the specified block from a relation. + * Convert an array of buffer address into an array of iovec objects, and + * return the number that were required. 'iov' must have enough space for up + * to 'nblocks' elements, but the number used may be less depending on + * merging. In the case of a run of fully contiguous buffers, a single iovec + * will be populated that can be handled as a plain non-vectored I/O. */ -void -mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - void *buffer) +static int +buffers_to_iovec(struct iovec *iov, void **buffers, int nblocks) { - off_t seekpos; - int nbytes; - MdfdVec *v; + struct iovec *iovp; + int iovcnt; - /* If this build supports direct I/O, the buffer must be I/O aligned. */ - if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ) - Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer)); - - TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, - reln->smgr_rlocator.locator.spcOid, - reln->smgr_rlocator.locator.dbOid, - reln->smgr_rlocator.locator.relNumber, - reln->smgr_rlocator.backend); + Assert(nblocks >= 1); - v = _mdfd_getseg(reln, forknum, blocknum, false, - EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + /* If this build supports direct I/O, buffers must be I/O aligned. */ + for (int i = 0; i < nblocks; ++i) + { + if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ) + Assert((uintptr_t) buffers[i] == + TYPEALIGN(PG_IO_ALIGN_SIZE, buffers[i])); + } - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + /* Start the first iovec off with the first buffer. */ + iovp = &iov[0]; + iovp->iov_base = buffers[0]; + iovp->iov_len = BLCKSZ; + iovcnt = 1; - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + /* Try to merge the rest. */ + for (int i = 1; i < nblocks; ++i) + { + void *buffer = buffers[i]; - nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ); + if (((char *) iovp->iov_base + iovp->iov_len) == buffer) + { + /* Contiguous with the last iovec. */ + iovp->iov_len += BLCKSZ; + } + else + { + /* Need a new iovec. */ + iovp++; + iovp->iov_base = buffer; + iovp->iov_len = BLCKSZ; + iovcnt++; + } + } - TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, - reln->smgr_rlocator.locator.spcOid, - reln->smgr_rlocator.locator.dbOid, - reln->smgr_rlocator.locator.relNumber, - reln->smgr_rlocator.backend, - nbytes, - BLCKSZ); + return iovcnt; +} - if (nbytes != BLCKSZ) +/* + * mdreadv() -- Read the specified blocks from a relation. + */ +void +mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + void **buffers, BlockNumber nblocks) +{ + while (nblocks > 0) { - if (nbytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read block %u in file \"%s\": %m", - blocknum, FilePathName(v->mdfd_vfd)))); + struct iovec iov[PG_IOV_MAX]; + int iovcnt; + off_t seekpos; + int nbytes; + MdfdVec *v; + BlockNumber nblocks_this_segment; + size_t transferred_this_segment; + size_t size_this_segment; + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + nblocks_this_segment = + Min(nblocks, + RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE))); + nblocks_this_segment = Min(nblocks_this_segment, lengthof(iov)); + + iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment); + size_this_segment = nblocks_this_segment * BLCKSZ; + transferred_this_segment = 0; /* - * Short read: we are at or past EOF, or we read a partial block at - * EOF. Normally this is an error; upper levels should never try to - * read a nonexistent block. However, if zero_damaged_pages is ON or - * we are InRecovery, we should instead return zeroes without - * complaining. This allows, for example, the case of trying to - * update a block that was later truncated away. + * Inner loop to continue after a short read. We'll keep going until + * we hit EOF rather than assuming that a short read means we hit the + * end. */ - if (zero_damaged_pages || InRecovery) - MemSet(buffer, 0, BLCKSZ); - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read block %u in file \"%s\": read only %d of %d bytes", - blocknum, FilePathName(v->mdfd_vfd), - nbytes, BLCKSZ))); + for (;;) + { + TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, + reln->smgr_rlocator.locator.spcOid, + reln->smgr_rlocator.locator.dbOid, + reln->smgr_rlocator.locator.relNumber, + reln->smgr_rlocator.backend); + nbytes = FileReadV(v->mdfd_vfd, iov, iovcnt, seekpos, + WAIT_EVENT_DATA_FILE_READ); + TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, + reln->smgr_rlocator.locator.spcOid, + reln->smgr_rlocator.locator.dbOid, + reln->smgr_rlocator.locator.relNumber, + reln->smgr_rlocator.backend, + nbytes, + size_this_segment - transferred_this_segment); + +#ifdef SIMULATE_SHORT_READ + nbytes = Min(nbytes, 4096); +#endif + + if (nbytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read blocks %u..%u in file \"%s\": %m", + blocknum, + blocknum + nblocks_this_segment - 1, + FilePathName(v->mdfd_vfd)))); + + if (nbytes == 0) + { + /* + * We are at or past EOF, or we read a partial block at EOF. + * Normally this is an error; upper levels should never try to + * read a nonexistent block. However, if zero_damaged_pages + * is ON or we are InRecovery, we should instead return zeroes + * without complaining. This allows, for example, the case of + * trying to update a block that was later truncated away. + */ + if (zero_damaged_pages || InRecovery) + { + for (BlockNumber i = transferred_this_segment / BLCKSZ; + i < nblocks_this_segment; + ++i) + memset(buffers[i], 0, BLCKSZ); + break; + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes", + blocknum, + blocknum + nblocks_this_segment - 1, + FilePathName(v->mdfd_vfd), + transferred_this_segment, + size_this_segment))); + } + + /* One loop should usually be enough. */ + transferred_this_segment += nbytes; + Assert(transferred_this_segment <= size_this_segment); + if (transferred_this_segment == size_this_segment) + break; + + /* Adjust position and vectors after a short read. */ + seekpos += nbytes; + iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes); + } + + nblocks -= nblocks_this_segment; + buffers += nblocks_this_segment; + blocknum += nblocks_this_segment; } } /* - * mdwrite() -- Write the supplied block at the appropriate location. + * mdwritev() -- Write the supplied blocks at the appropriate location. * * This is to be used only for updating already-existing blocks of a * relation (ie, those before the current EOF). To extend a relation, * use mdextend(). */ void -mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - const void *buffer, bool skipFsync) +mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) { - off_t seekpos; - int nbytes; - MdfdVec *v; - - /* If this build supports direct I/O, the buffer must be I/O aligned. */ - if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ) - Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer)); - /* This assert is too expensive to have on normally ... */ #ifdef CHECK_WRITE_VS_EXTEND Assert(blocknum < mdnblocks(reln, forknum)); #endif - TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum, - reln->smgr_rlocator.locator.spcOid, - reln->smgr_rlocator.locator.dbOid, - reln->smgr_rlocator.locator.relNumber, - reln->smgr_rlocator.backend); + while (nblocks > 0) + { + struct iovec iov[PG_IOV_MAX]; + int iovcnt; + off_t seekpos; + int nbytes; + MdfdVec *v; + BlockNumber nblocks_this_segment; + size_t transferred_this_segment; + size_t size_this_segment; - v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, - EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + nblocks_this_segment = + Min(nblocks, + RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE))); + nblocks_this_segment = Min(nblocks_this_segment, lengthof(iov)); - TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, - reln->smgr_rlocator.locator.spcOid, - reln->smgr_rlocator.locator.dbOid, - reln->smgr_rlocator.locator.relNumber, - reln->smgr_rlocator.backend, - nbytes, - BLCKSZ); + iovcnt = buffers_to_iovec(iov, (void **) buffers, nblocks_this_segment); + size_this_segment = nblocks_this_segment * BLCKSZ; + transferred_this_segment = 0; - if (nbytes != BLCKSZ) - { - if (nbytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write block %u in file \"%s\": %m", - blocknum, FilePathName(v->mdfd_vfd)))); - /* short write: complain appropriately */ - ereport(ERROR, - (errcode(ERRCODE_DISK_FULL), - errmsg("could not write block %u in file \"%s\": wrote only %d of %d bytes", - blocknum, - FilePathName(v->mdfd_vfd), - nbytes, BLCKSZ), - errhint("Check free disk space."))); - } + /* + * Inner loop to continue after a short write. If the reason is that + * we're out of disk space, a future attempt should get an ENOSPC + * error from the kernel. + */ + for (;;) + { + TRACE_POSTGRESQL_SMGR_MD_WRITE_START(forknum, blocknum, + reln->smgr_rlocator.locator.spcOid, + reln->smgr_rlocator.locator.dbOid, + reln->smgr_rlocator.locator.relNumber, + reln->smgr_rlocator.backend); + nbytes = FileWriteV(v->mdfd_vfd, iov, iovcnt, seekpos, + WAIT_EVENT_DATA_FILE_WRITE); + TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum, + reln->smgr_rlocator.locator.spcOid, + reln->smgr_rlocator.locator.dbOid, + reln->smgr_rlocator.locator.relNumber, + reln->smgr_rlocator.backend, + nbytes, + size_this_segment - transferred_this_segment); + +#ifdef SIMULATE_SHORT_WRITE + nbytes = Min(nbytes, 4096); +#endif - if (!skipFsync && !SmgrIsTemp(reln)) - register_dirty_segment(reln, forknum, v); + if (nbytes < 0) + { + bool enospc = errno == ENOSPC; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write blocks %u..%u in file \"%s\": %m", + blocknum, + blocknum + nblocks_this_segment - 1, + FilePathName(v->mdfd_vfd)), + enospc ? errhint("Check free disk space.") : 0)); + } + + /* One loop should usually be enough. */ + transferred_this_segment += nbytes; + Assert(transferred_this_segment <= size_this_segment); + if (transferred_this_segment == size_this_segment) + break; + + /* Adjust position and iovecs after a short write. */ + seekpos += nbytes; + iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes); + } + + if (!skipFsync && !SmgrIsTemp(reln)) + register_dirty_segment(reln, forknum, v); + + nblocks -= nblocks_this_segment; + buffers += nblocks_this_segment; + blocknum += nblocks_this_segment; + } } + /* * mdwriteback() -- Tell the kernel to write pages back to storage. * diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 4c55264933..15633838c9 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -55,10 +55,13 @@ typedef struct f_smgr BlockNumber blocknum, int nblocks, bool skipFsync); bool (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); - void (*smgr_read) (SMgrRelation reln, ForkNumber forknum, - BlockNumber blocknum, void *buffer); - void (*smgr_write) (SMgrRelation reln, ForkNumber forknum, - BlockNumber blocknum, const void *buffer, bool skipFsync); + void (*smgr_readv) (SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + void **buffers, BlockNumber nblocks); + void (*smgr_writev) (SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, + bool skipFsync); void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); @@ -80,8 +83,8 @@ static const f_smgr smgrsw[] = { .smgr_extend = mdextend, .smgr_zeroextend = mdzeroextend, .smgr_prefetch = mdprefetch, - .smgr_read = mdread, - .smgr_write = mdwrite, + .smgr_readv = mdreadv, + .smgr_writev = mdwritev, .smgr_writeback = mdwriteback, .smgr_nblocks = mdnblocks, .smgr_truncate = mdtruncate, @@ -553,22 +556,23 @@ smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) } /* - * smgrread() -- read a particular block from a relation into the supplied - * buffer. + * smgrreadv() -- read a particular block range from a relation into the + * supplied buffers. * * This routine is called from the buffer manager in order to * instantiate pages in the shared buffer cache. All storage managers * return pages in the format that POSTGRES expects. */ void -smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - void *buffer) +smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + void **buffers, BlockNumber nblocks) { - smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer); + smgrsw[reln->smgr_which].smgr_readv(reln, forknum, blocknum, buffers, + nblocks); } /* - * smgrwrite() -- Write the supplied buffer out. + * smgrwritev() -- Write the supplied buffers out. * * This is to be used only for updating already-existing blocks of a * relation (ie, those before the current EOF). To extend a relation, @@ -583,14 +587,13 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, * do not require fsync. */ void -smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - const void *buffer, bool skipFsync) +smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) { - smgrsw[reln->smgr_which].smgr_write(reln, forknum, blocknum, - buffer, skipFsync); + smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum, + buffers, nblocks, skipFsync); } - /* * smgrwriteback() -- Trigger kernel writeback for the supplied range of * blocks. diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 941879ee6a..0f73d9af49 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -32,10 +32,11 @@ extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks, bool skipFsync); extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); -extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - void *buffer); -extern void mdwrite(SMgrRelation reln, ForkNumber forknum, - BlockNumber blocknum, const void *buffer, bool skipFsync); +extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + void **buffers, BlockNumber nblocks); +extern void mdwritev(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync); extern void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index a9a179aaba..ae93910132 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -96,10 +96,13 @@ extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks, bool skipFsync); extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); -extern void smgrread(SMgrRelation reln, ForkNumber forknum, - BlockNumber blocknum, void *buffer); -extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, - BlockNumber blocknum, const void *buffer, bool skipFsync); +extern void smgrreadv(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + void **buffer, BlockNumber nblocks); +extern void smgrwritev(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffer, BlockNumber nblocks, + bool skipFsync); extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); @@ -110,4 +113,18 @@ extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum); extern void AtEOXact_SMgr(void); extern bool ProcessBarrierSmgrRelease(void); +static inline void +smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + void *buffer) +{ + smgrreadv(reln, forknum, blocknum, &buffer, 1); +} + +static inline void +smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void *buffer, bool skipFsync) +{ + smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync); +} + #endif /* SMGR_H */ -- 2.39.3 (Apple Git-145)