From db429b6dd4a18f3cf5343f44c9f553328e697f13 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Fri, 25 Apr 2025 13:22:38 +0200 Subject: [PATCH v20250709 4/6] prefetch for gist indexes Implements gist_stream_read_next() and gist_ordered_stream_read_next() callbacks, for different types of scans: * gist_stream_read_next() is for traditional index scans, returning blocks from GISTScanOpaque. * gist_ordered_stream_read_next() is for scans with results ordered by distance, etc. The ordered scans rely on a pairing heap - the items are fed into it, but then are read and returned one by one. That would make prefetching quite useless, so the patch introduces a small queue on top of the pairing heap, and the items are loaded in batches. This is what getNextNearestPrefetch() is responsible for. Note: Right now the batches are always 32 items, which may regress queries with LIMIT clauses, etc. It should start at 1 and gradually increase the batch size. Similarly to how prefetch distance grows. FIXME The memory management of the batches is almost certainly leaky, needs to be cleaned up. --- src/backend/access/gist/gistget.c | 160 +++++++++++++++++++- src/backend/access/gist/gistscan.c | 225 +++++++++++++++++++++++++++++ src/include/access/gist_private.h | 16 ++ 3 files changed, 400 insertions(+), 1 deletion(-) diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 387d9972345..7e292ebb442 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -21,7 +21,9 @@ #include "miscadmin.h" #include "pgstat.h" #include "storage/predicate.h" +#include "utils/datum.h" #include "utils/float.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -395,6 +397,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, } so->nPageData = so->curPageData = 0; + so->streamPageData = -1; scan->xs_hitup = NULL; /* might point into pageDataCxt */ if (so->pageDataCxt) MemoryContextReset(so->pageDataCxt); @@ -460,6 +463,8 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, so->pageData[so->nPageData].heapPtr = it->t_tid; so->pageData[so->nPageData].recheck = recheck; so->pageData[so->nPageData].offnum = i; + so->pageData[so->nPageData].allVisible = false; + so->pageData[so->nPageData].allVisibleSet = false; /* * In an index-only scan, also fetch the data from the tuple. The @@ -496,6 +501,8 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, item->data.heap.heapPtr = it->t_tid; item->data.heap.recheck = recheck; item->data.heap.recheckDistances = recheck_distances; + item->data.heap.allVisible = false; + item->data.heap.allVisibleSet = false; /* * In an index-only scan, also fetch the data from the tuple. @@ -589,6 +596,22 @@ getNextNearest(IndexScanDesc scan) /* in an index-only scan, also return the reconstructed tuple. */ if (scan->xs_want_itup) scan->xs_hitup = item->data.heap.recontup; + + /* + * If this is index-only scan, determine the VM status, so that + * we can set xs_visible correctly. + */ + if (scan->xs_want_itup && ! item->data.heap.allVisibleSet) + { + item->data.heap.allVisibleSet = true; + item->data.heap.allVisible + = VM_ALL_VISIBLE(scan->heapRelation, + ItemPointerGetBlockNumber(&item->data.heap.heapPtr), + &so->vmBuffer); + } + + scan->xs_visible = item->data.heap.allVisible; + res = true; } else @@ -605,6 +628,119 @@ getNextNearest(IndexScanDesc scan) return res; } +/* + * A variant of getNextNearest() that stashes the items into a small buffer, so + * that the prefetching can work (getNextNearest returns items one by one). + * + * XXX Uses a small secondary queue, because getNextNearest() may be modifying + * the regular pageData[] buffer. + */ +static bool +getNextNearestPrefetch(IndexScanDesc scan) +{ + GISTScanOpaque so = (GISTScanOpaque) scan->opaque; + GISTSearchHeapItem *item; + + /* did we use all items from the queue */ + if (so->queueItem == so->queueUsed) + { + /* grow the number of items */ + int maxitems = Min(Max(1, 2 * so->queueUsed), 32); + + /* FIXME gradually incresse the number of items, not 32 all the time */ + maxitems = 32; + + so->queueItem = 0; + so->queueUsed = 0; + + while (so->queueUsed < maxitems) + { + if (!getNextNearest(scan)) + break; + + item = &so->queueItems[so->queueUsed++]; + + item->recheck = scan->xs_recheck; + item->heapPtr = scan->xs_heaptid; + item->recontup = scan->xs_hitup; + + /* + * FIXME free the memory (for tuples and orderbyvals/orderbynulls) + * it's leaking now. + */ + item->orderbyvals = palloc0(sizeof(Datum) * scan->numberOfOrderBys); + item->orderbynulls = palloc0(sizeof(bool) * scan->numberOfOrderBys); + + /* + * copy the distances - might be float8, which may be byref, so use + * datumCopy, otherwise it gets clobbered by other items + */ + for (int i = 0; i < scan->numberOfOrderBys; i++) + { + int16 typlen; + bool typbyval; + + /* don't copy NULL values */ + if (scan->xs_orderbynulls[i]) + continue; + + get_typlenbyval(so->orderByTypes[i], &typlen, &typbyval); + + item->orderbyvals[i] = datumCopy(scan->xs_orderbyvals[i], + typbyval, typlen); + } + + memcpy(item->orderbynulls, + scan->xs_orderbynulls, + sizeof(bool) * scan->numberOfOrderBys); + + /* reset, so that we don't free it accidentally */ + scan->xs_hitup = NULL; + } + + /* found no new items, we're done */ + if (so->queueUsed == 0) + return false; + + /* restart the stream for the new queue */ + so->queueStream = -1; + so->lastBlock = InvalidBlockNumber; /* XXX needed? */ + read_stream_reset(scan->xs_rs); + } + + /* next item to return */ + item = &so->queueItems[so->queueItem++]; + + scan->xs_heaptid = item->heapPtr; + scan->xs_recheck = item->recheck; + + /* here it's fine to copy the datum (even if byref pointers) */ + memcpy(scan->xs_orderbyvals, + item->orderbyvals, + sizeof(Datum) * scan->numberOfOrderBys); + + memcpy(scan->xs_orderbynulls, + item->orderbynulls, + sizeof(bool) * scan->numberOfOrderBys); + + /* in an index-only scan, also return the reconstructed tuple. */ + if (scan->xs_want_itup) + scan->xs_hitup = item->recontup; + + if (scan->xs_want_itup && ! item->allVisibleSet) + { + item->allVisibleSet = true; + item->allVisible + = VM_ALL_VISIBLE(scan->heapRelation, + ItemPointerGetBlockNumber(&item->heapPtr), + &so->vmBuffer); + } + + scan->xs_visible = item->allVisible; + + return true; +} + /* * gistgettuple() -- Get the next tuple in the scan */ @@ -642,7 +778,10 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir) if (scan->numberOfOrderBys > 0) { /* Must fetch tuples in strict distance order */ - return getNextNearest(scan); + if (scan->xs_rs) + return getNextNearestPrefetch(scan); + else + return getNextNearest(scan); } else { @@ -677,6 +816,18 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir) if (scan->xs_want_itup) scan->xs_hitup = so->pageData[so->curPageData].recontup; + /* determine VM status, if not done already */ + if (scan->xs_want_itup && !so->pageData[so->curPageData].allVisibleSet) + { + so->pageData[so->curPageData].allVisibleSet = true; + so->pageData[so->curPageData].allVisible + = VM_ALL_VISIBLE(scan->heapRelation, + ItemPointerGetBlockNumber(&scan->xs_heaptid), + &so->vmBuffer); + } + + scan->xs_visible = so->pageData[so->curPageData].allVisible; + so->curPageData++; return true; @@ -734,6 +885,13 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir) pfree(item); } while (so->nPageData == 0); + + if (scan->xs_rs) + { + so->streamPageData = -1; + so->lastBlock = InvalidBlockNumber; + read_stream_reset(scan->xs_rs); + } } } } diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c index d8ba7f7eff5..df05f282aa1 100644 --- a/src/backend/access/gist/gistscan.c +++ b/src/backend/access/gist/gistscan.c @@ -17,6 +17,8 @@ #include "access/gist_private.h" #include "access/gistscan.h" #include "access/relscan.h" +#include "access/table.h" +#include "optimizer/cost.h" #include "utils/float.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -70,6 +72,176 @@ pairingheap_GISTSearchItem_cmp(const pairingheap_node *a, const pairingheap_node * Index AM API functions for scanning GiST indexes */ +/* + * gist_stream_read_next + * Return the next block to read from the read stream. + * + * Returns the next block from the current leaf page. The first block is + * when accessing the first tuple, after already receiving the TID from the + * index (for the item itemIndex points at). + * + * With index-only scans this skips all-visible pages. The visibility info + * is stored, so that we can later pass it to the scan (we must not access + * the VM again, the bit might have changes, and the read stream would get + * out of sync (we'd get different blocks than we expect expect). + * + * Returns the block number to get from the read stream. InvalidBlockNumber + * means we've ran out of item on the current leaf page - the stream will + * end, and we'll need to reset it after reading the next page (or after + * changing the scan direction). + * + * XXX Should skip duplicate blocks (for correlated indexes). But that's + * not implemented yet. + */ +static BlockNumber +gist_stream_read_next(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + IndexScanDesc scan = (IndexScanDesc) callback_private_data; + GISTScanOpaque so = (GISTScanOpaque) scan->opaque; + BlockNumber block = InvalidBlockNumber; + + /* + * Is this the first request for the read stream (possibly after a reset)? + * If yes, initialize the stream to the current item (itemIndex). + */ + if (so->streamPageData == (OffsetNumber) - 1) + so->streamPageData = (so->curPageData - 1); + + /* + * Find the next block to read. For plain index scans we will return the + * very next item, but with index-only scans we skip TIDs from all-visible + * pages (because we won't read those). + */ + while (so->streamPageData < so->nPageData) + { + ItemPointer tid; + GISTSearchHeapItem *item; + + item = &so->pageData[so->streamPageData]; + + tid = &item->heapPtr; + block = ItemPointerGetBlockNumber(tid); + + /* + * For index-only scans, check the VM and remember the result. If the page + * is all-visible, don't return the block number, try reading the next one. + * + * XXX Maybe this could use the same logic to check for duplicate blocks, + * and reuse the VM result if possible. + */ + if (scan->xs_want_itup) + { + if (!item->allVisibleSet) + { + item->allVisibleSet = true; + item->allVisible = VM_ALL_VISIBLE(scan->heapRelation, + ItemPointerGetBlockNumber(tid), + &so->vmBuffer); + } + + /* don't prefetch this all-visible block, try the next one */ + if (item->allVisible) + block = InvalidBlockNumber; + } + + /* advance to the next item, assuming the current scan direction */ + so->streamPageData++; + + /* don't return the same block twice (and remember this one) */ + if (so->lastBlock == block) + block = InvalidBlockNumber; + + /* Did we find a valid block? If yes, we're done. */ + if (block != InvalidBlockNumber) + break; + } + + /* remember the block we're returning */ + so->lastBlock = block; + + return block; +} + +/* + * gist_ordered_stream_read_next + * Return the next block to read from the read stream. + * + * A variant of gist_stream_read_next for ordered scans, returning items from + * a small secondary queue. + */ +static BlockNumber +gist_ordered_stream_read_next(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + IndexScanDesc scan = (IndexScanDesc) callback_private_data; + GISTScanOpaque so = (GISTScanOpaque) scan->opaque; + BlockNumber block = InvalidBlockNumber; + + /* + * Is this the first request for the read stream (possibly after a reset)? + * If yes, initialize the stream to the current item (itemIndex). + */ + if (so->queueStream == - 1) + so->queueStream = (so->queueItem - 1); + + /* + * Find the next block to read. For plain index scans we will return the + * very next item, but with index-only scans we skip TIDs from all-visible + * pages (because we won't read those). + */ + while (so->queueStream < so->queueUsed) + { + ItemPointer tid; + GISTSearchHeapItem *item; + + item = &so->queueItems[so->queueStream]; + + tid = &item->heapPtr; + block = ItemPointerGetBlockNumber(tid); + + /* + * For index-only scans, check the VM and remember the result. If the page + * is all-visible, don't return the block number, try reading the next one. + * + * XXX Maybe this could use the same logic to check for duplicate blocks, + * and reuse the VM result if possible. + */ + if (scan->xs_want_itup) + { + if (!item->allVisibleSet) + { + item->allVisibleSet = true; + item->allVisible = VM_ALL_VISIBLE(scan->heapRelation, + ItemPointerGetBlockNumber(tid), + &so->vmBuffer); + } + + /* don't prefetch this all-visible block, try the next one */ + if (item->allVisible) + block = InvalidBlockNumber; + } + + /* advance to the next item, assuming the current scan direction */ + so->queueStream++; + + /* don't return the same block twice (and remember this one) */ + if (so->lastBlock == block) + block = InvalidBlockNumber; + + /* Did we find a valid block? If yes, we're done. */ + if (block != InvalidBlockNumber) + break; + } + + /* remember the block we're returning */ + so->lastBlock = block; + + return block; +} + IndexScanDesc gistbeginscan(Relation heap, Relation index, int nkeys, int norderbys) { @@ -110,6 +282,14 @@ gistbeginscan(Relation heap, Relation index, int nkeys, int norderbys) so->numKilled = 0; so->curBlkno = InvalidBlockNumber; so->curPageLSN = InvalidXLogRecPtr; + so->vmBuffer = InvalidBuffer; + + /* initialize small prefetch queue */ + so->queueUsed = 0; + so->queueItem = 0; + + /* nothing returned */ + so->lastBlock = InvalidBlockNumber; scan->opaque = so; @@ -120,6 +300,31 @@ gistbeginscan(Relation heap, Relation index, int nkeys, int norderbys) MemoryContextSwitchTo(oldCxt); + /* + * Initialize the read stream to opt-in into prefetching. + * + * XXX See the comments in btbeginscan(). + */ + if (enable_indexscan_prefetch && heap) + { + if (scan->numberOfOrderBys == 0) + scan->xs_rs = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + heap, + MAIN_FORKNUM, + gist_stream_read_next, + scan, + 0); + else + scan->xs_rs = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + heap, + MAIN_FORKNUM, + gist_ordered_stream_read_next, + scan, + 0); + } + return scan; } @@ -341,6 +546,15 @@ gistrescan(IndexScanDesc scan, ScanKey key, int nkeys, /* any previous xs_hitup will have been pfree'd in context resets above */ scan->xs_hitup = NULL; + + /* reset stream */ + if (scan->xs_rs) + { + so->streamPageData = -1; + so->lastBlock = InvalidBlockNumber; + read_stream_reset(scan->xs_rs); + so->queueItem = so->queueUsed = so->queueStream = 0; + } } void @@ -348,9 +562,20 @@ gistendscan(IndexScanDesc scan) { GISTScanOpaque so = (GISTScanOpaque) scan->opaque; + /* needs to happen before freeGISTstate */ + if (so->vmBuffer != InvalidBuffer) + { + ReleaseBuffer(so->vmBuffer); + so->vmBuffer = InvalidBuffer; + } + /* * freeGISTstate is enough to clean up everything made by gistbeginscan, * as well as the queueCxt if there is a separate context for it. */ freeGISTstate(so->giststate); + + /* reset stream */ + if (scan->xs_rs) + read_stream_end(scan->xs_rs); } diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 39404ec7cdb..924bbae22e2 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -17,6 +17,7 @@ #include "access/amapi.h" #include "access/gist.h" #include "access/itup.h" +#include "access/visibilitymap.h" #include "lib/pairingheap.h" #include "storage/bufmgr.h" #include "storage/buffile.h" @@ -124,6 +125,10 @@ typedef struct GISTSearchHeapItem * index-only scans */ OffsetNumber offnum; /* track offset in page to mark tuple as * LP_DEAD */ + bool allVisible; + bool allVisibleSet; + Datum *orderbyvals; + bool *orderbynulls; } GISTSearchHeapItem; /* Unvisited item, either index page or heap tuple */ @@ -169,13 +174,24 @@ typedef struct GISTScanOpaqueData int numKilled; /* number of currently stored items */ BlockNumber curBlkno; /* current number of block */ GistNSN curPageLSN; /* pos in the WAL stream when page was read */ + Buffer vmBuffer; /* In a non-ordered search, returnable heap items are stored here: */ GISTSearchHeapItem pageData[BLCKSZ / sizeof(IndexTupleData)]; OffsetNumber nPageData; /* number of valid items in array */ OffsetNumber curPageData; /* next item to return */ + OffsetNumber streamPageData; /* next item to queue */ MemoryContext pageDataCxt; /* context holding the fetched tuples, for * index-only scans */ + + /* last block returned by the read_next stream callback */ + BlockNumber lastBlock; + + /* queue to allow prefetching with ordered scans */ + GISTSearchHeapItem queueItems[32]; + int queueItem; + int queueStream; + int queueUsed; } GISTScanOpaqueData; typedef GISTScanOpaqueData *GISTScanOpaque; -- 2.50.0