From 425ccad723c34ef8eeeb1d6d80400b1705c4ec2a Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Fri, 25 Apr 2025 12:34:15 +0200 Subject: [PATCH v20250709 3/6] prefetch for hash indexes Implements the hash_stream_read_next() callback, returning blocks from HashScanOpaque. --- src/backend/access/hash/hash.c | 105 +++++++++++++++++++++++++++ src/backend/access/hash/hashsearch.c | 37 ++++++++++ src/include/access/hash.h | 8 ++ 3 files changed, 150 insertions(+) diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index 2133e454e9b..0884f0e05d9 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -22,12 +22,14 @@ #include "access/hash_xlog.h" #include "access/relscan.h" #include "access/stratnum.h" +#include "access/table.h" #include "access/tableam.h" #include "access/xloginsert.h" #include "commands/progress.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "nodes/execnodes.h" +#include "optimizer/cost.h" #include "optimizer/plancat.h" #include "pgstat.h" #include "utils/fmgrprotos.h" @@ -366,6 +368,78 @@ hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) return ntids; } +/* + * hash_stream_read_next + * Return the next block to read from the read stream. + * + * Returns the next block from the current leaf page. The first block is + * when accessing the first tuple, after already receiving the TID from the + * index (for the item itemIndex points at). + * + * Returns the block number to get from the read stream. InvalidBlockNumber + * means we've ran out of item on the current leaf page - the stream will + * end, and we'll need to reset it after reading the next page (or after + * changing the scan direction). + * + * XXX Should skip duplicate blocks (for correlated indexes). But that's + * not implemented yet. + */ +static BlockNumber +hash_stream_read_next(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + IndexScanDesc scan = (IndexScanDesc) callback_private_data; + HashScanOpaque so = (HashScanOpaque) scan->opaque; + BlockNumber block = InvalidBlockNumber; + + /* + * Is this the first request for the read stream (possibly after a reset)? + * If yes, initialize the stream to the current item (itemIndex). + */ + if (so->currPos.streamIndex == -1) + so->currPos.streamIndex = so->currPos.itemIndex; + + /* + * Find the next block to read. For plain index scans we will return the + * very next item, but we might also skip duplicate blocks (in the future). + */ + while ((so->currPos.streamIndex >= so->currPos.firstItem) && + (so->currPos.streamIndex <= so->currPos.lastItem)) + { + ItemPointer tid; + HashScanPosItem *item; + + item = &so->currPos.items[so->currPos.streamIndex]; + + tid = &item->heapTid; + block = ItemPointerGetBlockNumber(tid); + + /* advance to the next item, depending on scan direction */ + if (ScanDirectionIsForward(so->currPos.dir)) + { + so->currPos.streamIndex++; + } + else + { + so->currPos.streamIndex--; + } + + /* don't return the same block twice (and remember this one) */ + if (so->lastBlock == block) + block = InvalidBlockNumber; + + /* Did we find a valid block? If yes, we're done. */ + if (block != InvalidBlockNumber) + break; + } + + /* remember the block we're returning */ + so->lastBlock = block; + + return block; +} + /* * hashbeginscan() -- start a scan on a hash index @@ -394,6 +468,25 @@ hashbeginscan(Relation heap, Relation index, int nkeys, int norderbys) scan->opaque = so; + /* nothing returned */ + so->lastBlock = InvalidBlockNumber; + + /* + * Initialize the read stream, to opt-in into prefetching. + * + * XXX See comments in btbeginscan(). + */ + if (enable_indexscan_prefetch && heap) + { + scan->xs_rs = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + heap, + MAIN_FORKNUM, + hash_stream_read_next, + scan, + 0); + } + return scan; } @@ -425,6 +518,14 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, so->hashso_buc_populated = false; so->hashso_buc_split = false; + + /* reset the stream, to start over */ + if (scan->xs_rs) + { + so->currPos.streamIndex = -1; + so->lastBlock = InvalidBlockNumber; + read_stream_reset(scan->xs_rs); + } } /* @@ -449,6 +550,10 @@ hashendscan(IndexScanDesc scan) pfree(so->killedItems); pfree(so); scan->opaque = NULL; + + /* terminate read stream */ + if (scan->xs_rs) + read_stream_end(scan->xs_rs); } /* diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c index 92c15a65be2..d5b045deb8c 100644 --- a/src/backend/access/hash/hashsearch.c +++ b/src/backend/access/hash/hashsearch.c @@ -54,6 +54,28 @@ _hash_next(IndexScanDesc scan, ScanDirection dir) Buffer buf; bool end_of_scan = false; + /* + * We need to reset the read stream when the scan direction changes. Hash + * indexes are not ordered, but there's still scrollable cursors, and those + * do have irection. So handle that here, and also remember the direction, + * so that the read_next callback can consider that. + * + * XXX we can't do that in the read_next callback, because we might have + * already hit the end of the stream (returned InvalidBlockNumber), in + * which case the callback won't be called. + */ + if (so->currPos.dir != dir) + { + so->currPos.dir = dir; + + if (scan->xs_rs) + { + so->currPos.streamIndex = -1; + so->lastBlock = InvalidBlockNumber; + read_stream_reset(scan->xs_rs); + } + } + /* * Advance to the next tuple on the current page; or if done, try to read * data from the next or previous page based on the scan direction. Before @@ -592,6 +614,21 @@ _hash_readpage(IndexScanDesc scan, Buffer *bufP, ScanDirection dir) so->currPos.buf = InvalidBuffer; } + /* + * restart the stream for this page + * + * XXX Maybe we should not reset prefetch distance to 0, but start from + * a somewhat higher value. We're merely continuing the same scan as + * before ... maybe reduce it a bit, to not harm LIMIT queries, but not + * reset it all the way to 0. + */ + if (scan->xs_rs) + { + so->currPos.streamIndex = - 1; + so->lastBlock = InvalidBlockNumber; + read_stream_reset(scan->xs_rs); + } + Assert(so->currPos.firstItem <= so->currPos.lastItem); return true; } diff --git a/src/include/access/hash.h b/src/include/access/hash.h index 6befa3ebf60..3916cf746c6 100644 --- a/src/include/access/hash.h +++ b/src/include/access/hash.h @@ -113,6 +113,9 @@ typedef struct HashScanPosData BlockNumber nextPage; /* next overflow page */ BlockNumber prevPage; /* prev overflow or bucket page */ + /* scan direction for the saved position's call to _hash_next */ + ScanDirection dir; + /* * The items array is always ordered in index order (ie, increasing * indexoffset). When scanning backwards it is convenient to fill the @@ -123,6 +126,7 @@ typedef struct HashScanPosData int firstItem; /* first valid index in items[] */ int lastItem; /* last valid index in items[] */ int itemIndex; /* current index in items[] */ + int streamIndex; /* position of the read stream */ HashScanPosItem items[MaxIndexTuplesPerPage]; /* MUST BE LAST */ } HashScanPosData; @@ -150,6 +154,7 @@ typedef struct HashScanPosData (scanpos).firstItem = 0; \ (scanpos).lastItem = 0; \ (scanpos).itemIndex = 0; \ + (scanpos).dir = NoMovementScanDirection; \ } while (0) /* @@ -182,6 +187,9 @@ typedef struct HashScanOpaqueData int *killedItems; /* currPos.items indexes of killed items */ int numKilled; /* number of currently stored items */ + /* last block returned by the read_next stream callback */ + BlockNumber lastBlock; + /* * Identify all the matching items on a page and save them in * HashScanPosData -- 2.50.0