From b5b766176d1d4a993af545a11b9e0ef848e9abcb Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 26 Feb 2024 23:48:31 +1300 Subject: [PATCH v5 1/7] Provide vectored variant of ReadBuffer(). Break ReadBuffer() up into two steps: StartReadBuffers() and WaitReadBuffers(). This has two advantages: 1. Multiple consecutive blocks can be read with one system call. 2. Advice (hints of future reads) can optionally be issued to the kernel. The traditional ReadBuffer() function is now implemented in terms of those functions, to avoid duplication. For now we still only read a block at a time so there is no change to generated system calls yet, but later commits will provide infrastructure to help build up larger calls. With some more infrastructure in later work, StartReadBuffers() should be extended to start real asynchronous I/O instead of advice. Reviewed-by: Melanie Plageman Reviewed-by: Heikki Linnakangas Reivewed-by: Nazir Bilal Yavuz Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 641 ++++++++++++++++++-------- src/backend/storage/buffer/localbuf.c | 14 +- src/include/storage/bufmgr.h | 45 ++ src/tools/pgindent/typedefs.list | 1 + 4 files changed, 492 insertions(+), 209 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index bdf89bbc4dc..3b1b0ad99df 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -19,6 +19,11 @@ * and pin it so that no one can destroy it while this process * is using it. * + * StartReadBuffers() -- as above, but for multiple contiguous blocks in + * two steps. + * + * WaitReadBuffers() -- second step of StartReadBuffers(). + * * ReleaseBuffer() -- unpin a buffer * * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". @@ -472,10 +477,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) ) -static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, +static Buffer ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, - ReadBufferMode mode, BufferAccessStrategy strategy, - bool *hit); + ReadBufferMode mode, BufferAccessStrategy strategy); static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr, ForkNumber fork, BufferAccessStrategy strategy, @@ -501,7 +505,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); -static bool StartBufferIO(BufferDesc *buf, bool forInput); +static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, bool forget_owner); static void AbortBufferIO(Buffer buffer); @@ -782,7 +786,6 @@ Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) { - bool hit; Buffer buf; /* @@ -795,15 +798,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions"))); - /* - * Read the buffer, and update pgstat counters to reflect a cache hit or - * miss. - */ - pgstat_count_buffer_read(reln); - buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence, - forkNum, blockNum, mode, strategy, &hit); - if (hit) - pgstat_count_buffer_hit(reln); + buf = ReadBuffer_common(BMR_REL(reln), + forkNum, blockNum, mode, strategy); + return buf; } @@ -823,13 +820,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent) { - bool hit; - SMgrRelation smgr = smgropen(rlocator, InvalidBackendId); - return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT : - RELPERSISTENCE_UNLOGGED, forkNum, blockNum, - mode, strategy, &hit); + return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT : + RELPERSISTENCE_UNLOGGED), + forkNum, blockNum, + mode, strategy); } /* @@ -995,35 +991,68 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, */ if (buffer == InvalidBuffer) { - bool hit; - Assert(extended_by == 0); - buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence, - fork, extend_to - 1, mode, strategy, - &hit); + buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy); } return buffer; } +/* + * Zero a buffer and lock it, as part of the implementation of + * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already + * pinned. It does not have to be valid, but it is valid and locked on + * return. + */ +static void +ZeroBuffer(Buffer buffer, ReadBufferMode mode) +{ + BufferDesc *bufHdr; + uint32 buf_state; + + Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); + + if (BufferIsLocal(buffer)) + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + else + { + bufHdr = GetBufferDescriptor(buffer - 1); + if (mode == RBM_ZERO_AND_LOCK) + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + else + LockBufferForCleanup(buffer); + } + + memset(BufferGetPage(buffer), 0, BLCKSZ); + + if (BufferIsLocal(buffer)) + { + buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + buf_state = LockBufHdr(bufHdr); + buf_state |= BM_VALID; + UnlockBufHdr(bufHdr, buf_state); + } +} + /* * ReadBuffer_common -- common logic for all ReadBuffer variants * * *hit is set to true if the request was satisfied from shared buffer cache. */ static Buffer -ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, +ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, - BufferAccessStrategy strategy, bool *hit) + BufferAccessStrategy strategy) { - BufferDesc *bufHdr; - Block bufBlock; - bool found; - IOContext io_context; - IOObject io_object; - bool isLocalBuf = SmgrIsTemp(smgr); - - *hit = false; + ReadBuffersOperation operation; + Buffer buffer; + int nblocks; + int flags; /* * Backward compatibility path, most code should use ExtendBufferedRel() @@ -1042,181 +1071,404 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) flags |= EB_LOCK_FIRST; - return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence), - forkNum, strategy, flags); + return ExtendBufferedRel(bmr, forkNum, strategy, flags); } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend); + nblocks = 1; + if (mode == RBM_ZERO_ON_ERROR) + flags = READ_BUFFERS_ZERO_ON_ERROR; + else + flags = 0; + if (StartReadBuffers(bmr, + &buffer, + forkNum, + blockNum, + &nblocks, + strategy, + flags, + &operation)) + WaitReadBuffers(&operation); + Assert(nblocks == 1); /* single block can't be short */ + + if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK) + ZeroBuffer(buffer, mode); + + return buffer; +} + +static Buffer +PrepareReadBuffer(BufferManagerRelation bmr, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr) +{ + BufferDesc *bufHdr; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; + + Assert(blockNum != P_NEW); + Assert(bmr.smgr); + + isLocalBuf = SmgrIsTemp(bmr.smgr); if (isLocalBuf) { - /* - * We do not use a BufferAccessStrategy for I/O of temporary tables. - * However, in some cases, the "strategy" may not be NULL, so we can't - * rely on IOContextForStrategy() to set the right IOContext for us. - * This may happen in cases like CREATE TEMPORARY TABLE AS... - */ io_context = IOCONTEXT_NORMAL; io_object = IOOBJECT_TEMP_RELATION; - bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found); - if (found) - pgBufferUsage.local_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.local_blks_read++; } else { - /* - * lookup the buffer. IO_IN_PROGRESS is set if the requested block is - * not currently in memory. - */ io_context = IOContextForStrategy(strategy); io_object = IOOBJECT_RELATION; - bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, - strategy, &found, io_context); - if (found) - pgBufferUsage.shared_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.shared_blks_read++; } - /* At this point we do NOT hold any locks. */ + TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend); - /* if it was already in the buffer pool, we're done */ - if (found) + ResourceOwnerEnlarge(CurrentResourceOwner); + if (isLocalBuf) + { + bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr); + if (*foundPtr) + pgBufferUsage.local_blks_hit++; + } + else + { + bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum, + strategy, foundPtr, io_context); + if (*foundPtr) + pgBufferUsage.shared_blks_hit++; + } + if (bmr.rel) + { + /* + * While pgBufferUsage's "read" counter isn't bumped unless we reach + * WaitReadBuffers() (so, not for hits, and not for buffers that are + * zeroed instead), the per-relation stats always count them. + */ + pgstat_count_buffer_read(bmr.rel); + if (*foundPtr) + pgstat_count_buffer_hit(bmr.rel); + } + if (*foundPtr) { - /* Just need to update stats before we exit */ - *hit = true; VacuumPageHit++; pgstat_count_io_op(io_object, io_context, IOOP_HIT); - if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit; TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + } - /* - * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked - * on return. - */ - if (!isLocalBuf) - { - if (mode == RBM_ZERO_AND_LOCK) - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE); - else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) - LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); - } + return BufferDescriptorGetBuffer(bufHdr); +} - return BufferDescriptorGetBuffer(bufHdr); +/* + * Begin reading a range of blocks beginning at blockNum and extending for + * *nblocks. On return, up to *nblocks pinned buffers holding those blocks + * are written into the buffers array, and *nblocks is updated to contain the + * actual number, which may be fewer than requested. + * + * If false is returned, no I/O is necessary and WaitReadBuffers() is not + * necessary. If true is returned, one I/O has been started, and + * WaitReadBuffers() must be called with the same operation object before the + * buffers are accessed. Along with the operation object, the caller-supplied + * array of buffers must remain valid until WaitReadBuffers() is called. + * + * Currently the I/O is only started with optional operating system advice, + * and the real I/O happens in WaitReadBuffers(). In future work, true I/O + * could be initiated here. + */ +bool +StartReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forkNum, + BlockNumber blockNum, + int *nblocks, + BufferAccessStrategy strategy, + int flags, + ReadBuffersOperation *operation) +{ + int actual_nblocks = *nblocks; + + if (bmr.rel) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; } - /* - * if we have gotten to this point, we have allocated a buffer for the - * page but its contents are not yet valid. IO_IN_PROGRESS is set for it, - * if it's a shared buffer. - */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ + operation->bmr = bmr; + operation->forknum = forkNum; + operation->blocknum = blockNum; + operation->buffers = buffers; + operation->nblocks = actual_nblocks; + operation->strategy = strategy; + operation->flags = flags; - bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + operation->io_buffers_len = 0; - /* - * Read in the page, unless the caller intends to overwrite it and just - * wants us to allocate a buffer. - */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - MemSet((char *) bufBlock, 0, BLCKSZ); - else + for (int i = 0; i < actual_nblocks; ++i) { - instr_time io_start = pgstat_prepare_io_time(track_io_timing); + bool found; - smgrread(smgr, forkNum, blockNum, bufBlock); + buffers[i] = PrepareReadBuffer(bmr, + forkNum, + blockNum + i, + strategy, + &found); - pgstat_count_io_op_time(io_object, io_context, - IOOP_READ, io_start, 1); + if (found) + { + /* + * Terminate the read as soon as we get a hit. It could be a + * single buffer hit, or it could be a hit that follows a readable + * range. We don't want to create more than one readable range, + * so we stop here. + */ + actual_nblocks = operation->nblocks = *nblocks = i + 1; + } + else + { + /* Extend the readable range to cover this block. */ + operation->io_buffers_len++; + } + } - /* check for garbage data */ - if (!PageIsVerifiedExtended((Page) bufBlock, blockNum, - PIV_LOG_WARNING | PIV_REPORT_STAT)) + if (operation->io_buffers_len > 0) + { + if (flags & READ_BUFFERS_ISSUE_ADVICE) { - if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) - { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - MemSet((char *) bufBlock, 0, BLCKSZ); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); + /* + * In theory we should only do this if PrepareReadBuffers() had to + * allocate new buffers above. That way, if two calls to + * StartReadBuffers() were made for the same blocks before + * WaitReadBuffers(), only the first would issue the advice. + * That'd be a better simulation of true asynchronous I/O, which + * would only start the I/O once, but isn't done here for + * simplicity. Note also that the following call might actually + * issue two advice calls if we cross a segment boundary; in a + * true asynchronous version we might choose to process only one + * real I/O at a time in that case. + */ + smgrprefetch(bmr.smgr, forkNum, blockNum, operation->io_buffers_len); } + + /* Indicate that WaitReadBuffers() should be called. */ + return true; } + else + { + return false; + } +} - /* - * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer - * content lock before marking the page as valid, to make sure that no - * other backend sees the zeroed page before the caller has had a chance - * to initialize it. - * - * Since no-one else can be looking at the page contents yet, there is no - * difference between an exclusive lock and a cleanup-strength lock. (Note - * that we cannot use LockBuffer() or LockBufferForCleanup() here, because - * they assert that the buffer is already valid.) - */ - if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && - !isLocalBuf) +static inline bool +WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) +{ + if (BufferIsLocal(buffer)) { - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); + BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); + + return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; } + else + return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); +} + +void +WaitReadBuffers(ReadBuffersOperation *operation) +{ + BufferManagerRelation bmr; + Buffer *buffers; + int nblocks; + BlockNumber blocknum; + ForkNumber forknum; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; + + /* + * Currently operations are only allowed to include a read of some range, + * with an optional extra buffer that is already pinned at the end. So + * nblocks can be at most one more than io_buffers_len. + */ + Assert((operation->nblocks == operation->io_buffers_len) || + (operation->nblocks == operation->io_buffers_len + 1)); + /* Find the range of the physical read we need to perform. */ + nblocks = operation->io_buffers_len; + if (nblocks == 0) + return; /* nothing to do */ + + buffers = &operation->buffers[0]; + blocknum = operation->blocknum; + forknum = operation->forknum; + bmr = operation->bmr; + + isLocalBuf = SmgrIsTemp(bmr.smgr); if (isLocalBuf) { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); - - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; } else { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; } - VacuumPageMiss++; - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss; + /* + * We count all these blocks as read by this backend. This is traditional + * behavior, but might turn out to be not true if we find that someone + * else has beaten us and completed the read of some of these blocks. In + * that case the system globally double-counts, but we traditionally don't + * count this as a "hit", and we don't have a separate counter for "miss, + * but another backend completed the read". + */ + if (isLocalBuf) + pgBufferUsage.local_blks_read += nblocks; + else + pgBufferUsage.shared_blks_read += nblocks; - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + for (int i = 0; i < nblocks; ++i) + { + int io_buffers_len; + Buffer io_buffers[MAX_BUFFERS_PER_TRANSFER]; + void *io_pages[MAX_BUFFERS_PER_TRANSFER]; + instr_time io_start; + BlockNumber io_first_block; - return BufferDescriptorGetBuffer(bufHdr); + /* + * Skip this block if someone else has already completed it. If an + * I/O is already in progress in another backend, this will wait for + * the outcome: either done, or something went wrong and we will + * retry. + */ + if (!WaitReadBuffersCanStartIO(buffers[i], false)) + { + /* + * Report this as a 'hit' for this backend, even though it must + * have started out as a miss in PrepareReadBuffer(). + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + continue; + } + + /* We found a buffer that we need to read in. */ + io_buffers[0] = buffers[i]; + io_pages[0] = BufferGetBlock(buffers[i]); + io_first_block = blocknum + i; + io_buffers_len = 1; + + /* + * How many neighboring-on-disk blocks can we can scatter-read into + * other buffers at the same time? In this case we don't wait if we + * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS + * for the head block, so we should get on with that I/O as soon as + * possible. We'll come back to this block again, above. + */ + while ((i + 1) < nblocks && + WaitReadBuffersCanStartIO(buffers[i + 1], true)) + { + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i + 1]) == + BufferGetBlockNumber(buffers[i]) + 1); + + io_buffers[io_buffers_len] = buffers[++i]; + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); + } + + io_start = pgstat_prepare_io_time(track_io_timing); + smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, + io_buffers_len); + + /* Verify each block we read, and terminate the I/O. */ + for (int j = 0; j < io_buffers_len; ++j) + { + BufferDesc *bufHdr; + Block bufBlock; + + if (isLocalBuf) + { + bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); + bufBlock = LocalBufHdrGetBlock(bufHdr); + } + else + { + bufHdr = GetBufferDescriptor(io_buffers[j] - 1); + bufBlock = BufHdrGetBlock(bufHdr); + } + + /* check for garbage data */ + if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + memset(bufBlock, 0, BLCKSZ); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + } + + /* Terminate I/O and set BM_VALID. */ + if (isLocalBuf) + { + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + /* Set BM_VALID, terminate IO, and wake up any waiters */ + TerminateBufferIO(bufHdr, false, BM_VALID, true); + } + + /* Report I/Os as completing individually. */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + false); + } + + VacuumPageMiss += io_buffers_len; + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + } } /* - * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared - * buffer. If no buffer exists already, selects a replacement - * victim and evicts the old page, but does NOT read in new page. + * BufferAlloc -- subroutine for StartReadBuffers. Handles lookup of a shared + * buffer. If no buffer exists already, selects a replacement victim and + * evicts the old page, but does NOT read in new page. * * "strategy" can be a buffer replacement strategy object, or NULL for * the default strategy. The selected buffer's usage_count is advanced when @@ -1224,11 +1476,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * * The returned buffer is pinned and is already marked as holding the * desired page. If it already did have the desired page, *foundPtr is - * set true. Otherwise, *foundPtr is set false and the buffer is marked - * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. - * - * *foundPtr is actually redundant with the buffer's BM_VALID flag, but - * we keep it for simplicity in ReadBuffer. + * set true. Otherwise, *foundPtr is set false. * * io_context is passed as an output parameter to avoid calling * IOContextForStrategy() when there is a shared buffers hit and no IO @@ -1287,19 +1535,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(buf, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return buf; @@ -1364,19 +1603,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(existing_buf_hdr, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return existing_buf_hdr; @@ -1408,15 +1638,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, LWLockRelease(newPartitionLock); /* - * Buffer contents are currently invalid. Try to obtain the right to - * start I/O. If StartBufferIO returns false, then someone else managed - * to read it before we did, so there's nothing left for BufferAlloc() to - * do. + * Buffer contents are currently invalid. */ - if (StartBufferIO(victim_buf_hdr, true)) - *foundPtr = false; - else - *foundPtr = true; + *foundPtr = false; return victim_buf_hdr; } @@ -1770,7 +1994,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; @@ -2035,7 +2259,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, buf_state &= ~BM_VALID; UnlockBufHdr(existing_hdr, buf_state); - } while (!StartBufferIO(existing_hdr, true)); + } while (!StartBufferIO(existing_hdr, true, false)); } else { @@ -2058,7 +2282,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true); + StartBufferIO(victim_buf_hdr, true, false); } } @@ -2373,7 +2597,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) else { /* - * If we previously pinned the buffer, it must surely be valid. + * If we previously pinned the buffer, it is likely to be valid, but + * it may not be if StartReadBuffers() was called and + * WaitReadBuffers() hasn't been called yet. We'll check by loading + * the flags without locking. This is racy, but it's OK to return + * false spuriously: when WaitReadBuffers() calls StartBufferIO(), + * it'll see that it's now valid. * * Note: We deliberately avoid a Valgrind client request here. * Individual access methods can optionally superimpose buffer page @@ -2382,7 +2611,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) * that the buffer page is legitimately non-accessible here. We * cannot meddle with that. */ - result = true; + result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; } ref->refcount++; @@ -3450,7 +3679,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false)) + if (!StartBufferIO(buf, false, false)) return; /* Setup error traceback support for ereport() */ @@ -5185,9 +5414,15 @@ WaitIO(BufferDesc *buf) * * Returns true if we successfully marked the buffer as I/O busy, * false if someone else already did the work. + * + * If nowait is true, then we don't wait for an I/O to be finished by another + * backend. In that case, false indicates either that the I/O was already + * finished, or is still in progress. This is useful for callers that want to + * find out if they can perform the I/O as part of a larger operation, without + * waiting for the answer or distinguishing the reasons why not. */ static bool -StartBufferIO(BufferDesc *buf, bool forInput) +StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) { uint32 buf_state; @@ -5200,6 +5435,8 @@ StartBufferIO(BufferDesc *buf, bool forInput) if (!(buf_state & BM_IO_IN_PROGRESS)) break; UnlockBufHdr(buf, buf_state); + if (nowait) + return false; WaitIO(buf); } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 1f02fed250e..6956d4e5b49 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, * LocalBufferAlloc - * Find or create a local buffer for the given page of the given relation. * - * API is similar to bufmgr.c's BufferAlloc, except that we do not need - * to do any locking since this is all local. Also, IO_IN_PROGRESS - * does not get set. Lastly, we support only default access strategy - * (hence, usage_count is always advanced). + * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do + * any locking since this is all local. We support only default access + * strategy (hence, usage_count is always advanced). */ BufferDesc * LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, @@ -288,7 +287,7 @@ GetLocalVictimBuffer(void) } /* see LimitAdditionalPins() */ -static void +void LimitAdditionalLocalPins(uint32 *additional_pins) { uint32 max_pins; @@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins) /* * In contrast to LimitAdditionalPins() other backends don't play a role - * here. We can allow up to NLocBuffer pins in total. + * here. We can allow up to NLocBuffer pins in total, but it might not be + * initialized yet so read num_temp_buffers. */ - max_pins = (NLocBuffer - NLocalPinnedBuffers); + max_pins = (num_temp_buffers - NLocalPinnedBuffers); if (*additional_pins >= max_pins) *additional_pins = max_pins; diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index d51d46d3353..b57f71f97e3 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -14,6 +14,7 @@ #ifndef BUFMGR_H #define BUFMGR_H +#include "port/pg_iovec.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount; #define BUFFER_LOCK_SHARE 1 #define BUFFER_LOCK_EXCLUSIVE 2 +/* + * Maximum number of buffers for multi-buffer I/O functions. This is set to + * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum. + */ +#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ) /* * prototypes for functions in bufmgr.c @@ -177,6 +183,42 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent); + +#define READ_BUFFERS_ZERO_ON_ERROR 0x01 +#define READ_BUFFERS_ISSUE_ADVICE 0x02 + +/* + * Private state used by StartReadBuffers() and WaitReadBuffers(). Declared + * in public header only to allow inclusion in other structs, but contents + * should not be accessed. + */ +struct ReadBuffersOperation +{ + /* Parameters passed in to StartReadBuffers(). */ + BufferManagerRelation bmr; + Buffer *buffers; + ForkNumber forknum; + BlockNumber blocknum; + int nblocks; + BufferAccessStrategy strategy; + int flags; + + /* Range of buffers, if we need to perform a read. */ + int io_buffers_len; +}; + +typedef struct ReadBuffersOperation ReadBuffersOperation; + +extern bool StartReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forknum, + BlockNumber blocknum, + int *nblocks, + BufferAccessStrategy strategy, + int flags, + ReadBuffersOperation *operation); +extern void WaitReadBuffers(ReadBuffersOperation *operation); + extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern bool BufferIsExclusiveLocked(Buffer buffer); @@ -250,6 +292,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern void LimitAdditionalPins(uint32 *additional_pins); +extern void LimitAdditionalLocalPins(uint32 *additional_pins); + /* in buf_init.c */ extern void InitBufferPool(void); extern Size BufferShmemSize(void); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index fc8b15d0cf2..6deb18234ac 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2267,6 +2267,7 @@ ReInitializeDSMForeignScan_function ReScanForeignScan_function ReadBufPtrType ReadBufferMode +ReadBuffersOperation ReadBytePtrType ReadExtraTocPtrType ReadFunc -- 2.39.2