From c2ca448331460c813085a0a6ff09d53ef9f14b37 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 12 Feb 2021 14:19:10 -0800 Subject: [PATCH v1 10/14] WIP: Use streaming reads in nbtree vacuum scan. XXX Cherry-picked from https://github.com/anarazel/postgres/tree/aio and lightly modified by TM, for demonstration purposes. Author: Andres Freund --- src/backend/access/nbtree/nbtree.c | 77 +++++++++++++++++++++++++----- src/include/access/nbtree.h | 3 ++ 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 62bc9917f1..5b72144fd4 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -34,6 +34,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/smgr.h" +#include "storage/streaming_read.h" #include "utils/builtins.h" #include "utils/index_selfuncs.h" #include "utils/memutils.h" @@ -81,7 +82,7 @@ typedef struct BTParallelScanDescData *BTParallelScanDesc; static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state, BTCycleId cycleid); -static void btvacuumpage(BTVacState *vstate, BlockNumber scanblkno); +static void btvacuumpage(BTVacState *vstate, Buffer buf, BlockNumber scanblkno); static BTVacuumPosting btreevacuumposting(BTVacState *vstate, IndexTuple posting, OffsetNumber updatedoffset, @@ -890,6 +891,31 @@ btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) return stats; } +static bool +btvacuumscan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, + void *io_private, + BufferManagerRelation *bmr, ForkNumber *fork, + BlockNumber *block, ReadBufferMode *mode) +{ + BTVacState *vstate = (BTVacState *) pgsr_private; + + if (vstate->nextblock == vstate->num_pages) + return false; + + /* + * We can't use _bt_getbuf() here because it always applies + * _bt_checkpage(), which will barf on an all-zero page. We want to + * recycle all-zero pages, not fail. Also, we want to use a nondefault + * buffer access strategy. Also want to use AIO. + */ + *bmr = BMR_REL(vstate->info->index); + *fork = MAIN_FORKNUM; + *block = vstate->nextblock++; + *mode = RBM_NORMAL; + + return true; +} + /* * btvacuumscan --- scan the index for VACUUMing purposes * @@ -983,6 +1009,8 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, scanblkno = BTREE_METAPAGE + 1; for (;;) { + PgStreamingRead *pgsr; + /* Get the current relation length */ if (needLock) LockRelationForExtension(rel, ExclusiveLock); @@ -997,14 +1025,36 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, /* Quit if we've scanned the whole relation */ if (scanblkno >= num_pages) break; + + vstate.nextblock = scanblkno; + vstate.num_pages = num_pages; + + pgsr = pg_streaming_read_buffer_alloc(512, 0, (uintptr_t) &vstate, + vstate.info->strategy, + btvacuumscan_pgsr_next); + /* Iterate over pages, then loop back to recheck length */ for (; scanblkno < num_pages; scanblkno++) { - btvacuumpage(&vstate, scanblkno); + Buffer buf = pg_streaming_read_buffer_get_next(pgsr, NULL); + + Assert(BufferIsValid(buf)); + Assert(BufferGetBlockNumber(buf) == scanblkno); + + btvacuumpage(&vstate, buf, scanblkno); + if (info->report_progress) pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE, scanblkno); + + /* call vacuum_delay_point while not holding any buffer lock */ + vacuum_delay_point(); } + + if (pg_streaming_read_buffer_get_next(pgsr, NULL) != 0) + elog(ERROR, "aio and plain pos out of sync"); + + pg_streaming_read_free(pgsr); } /* Set statistics num_pages field to final size of index */ @@ -1037,7 +1087,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, * recycled (i.e. before the page split). */ static void -btvacuumpage(BTVacState *vstate, BlockNumber scanblkno) +btvacuumpage(BTVacState *vstate, Buffer buf, BlockNumber scanblkno) { IndexVacuumInfo *info = vstate->info; IndexBulkDeleteResult *stats = vstate->stats; @@ -1048,28 +1098,28 @@ btvacuumpage(BTVacState *vstate, BlockNumber scanblkno) bool attempt_pagedel; BlockNumber blkno, backtrack_to; - Buffer buf; Page page; BTPageOpaque opaque; blkno = scanblkno; + Assert(BufferIsValid(buf)); backtrack: attempt_pagedel = false; backtrack_to = P_NONE; - /* call vacuum_delay_point while not holding any buffer lock */ - vacuum_delay_point(); - /* - * We can't use _bt_getbuf() here because it always applies - * _bt_checkpage(), which will barf on an all-zero page. We want to - * recycle all-zero pages, not fail. Also, we want to use a nondefault - * buffer access strategy. + * If we're not backtracking we already read the page asynchronously. Not + * using AIO when backtracking isn't too tragic - it's relatively rare, + * and the buffer quite possibly is still in shared buffers. */ - buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, - info->strategy); + if (!BufferIsValid(buf)) + buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, + info->strategy); + + Assert(BufferGetBlockNumber(buf) == blkno); + _bt_lockbuf(rel, buf, BT_READ); page = BufferGetPage(buf); opaque = NULL; @@ -1356,6 +1406,7 @@ backtrack: if (backtrack_to != P_NONE) { blkno = backtrack_to; + buf = InvalidBuffer; goto backtrack; } } diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 8891fa7973..51009d24bd 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -343,6 +343,9 @@ typedef struct BTVacState int maxbufsize; /* max bufsize that respects work_mem */ BTPendingFSM *pendingpages; /* One entry per newly deleted page */ int npendingpages; /* current # valid pendingpages */ + + BlockNumber nextblock; + BlockNumber num_pages; } BTVacState; /* -- 2.39.2