From 24e3f3a7a16bfb1c77021954a53b74f402b5d6d4 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Mon, 30 Sep 2024 22:48:12 +0200 Subject: [PATCH v20250422 1/5] WIP: index prefetching Allows the index AM to provide items (TIDs and tuples) in batches, which is then used to implement prefetching of heap tuples in index scans (including index-only scans). This is similar to prefetching already done in bitmap scans, and can result in significant speedups. The index AM may implement an optional "amgetbatch" callback, returning a batch of items. The indexam.c code then handles this transparently through the existing "getnext" interface. It is up to the index AM to return only batches that it can handle internally. For example, most of the later patches adding support for batching to relevant index AMs (btree, hash, gist, sp-gist) restrict the batches to a single leaf page. This makes implementation of batching much simpler, with only minimal changes to the index AMs, but it's not a hard requirement. The index AM can produce batches spanning arbitrary number of leaf pages. This is left as a possible future improvement. Most of the batching/prefetching logic happens in indexam.c. This means the executor code can continue to call the interface just like before. The only "violation" happens in index-only scans, which need to check the visibility map both when the prefetching pages (we don't want to prefetch pages that are unnecessary) and later when reading the data. For cached data the visibility map checks can be fairly expensive, so it's desirable to keep and reuse the result of the first check. At the moment, the prefetching does not handle mark/restore plans. This is doable, but requires additional synchronization between the batching and index AM code in the "opposite direction". This patch does not actually add batching to any of the index AMs, it's just the common infrastructure. TODO Add the new index AM callback to sgml docs. Re-introduce the callback to check VM and remember the result. It can happen the first few batches (leaf pages) may be returned from the index, skipping the heap fetches. Which means the read stream does no reads until much later after the first batches are already freed. Because the reads only happen when first reading from the stream. In that case we need to be careful about initializing the stream position because setting it to (0,0) would be wrong as the batch is already gone. So just initialize to readPost, which should be initialized already. Could it happen later, or just on first call? Probably first call only, as the read stream always looks ahead for the block that actually needs reading. --- src/backend/access/heap/heapam_handler.c | 81 +- src/backend/access/index/genam.c | 30 +- src/backend/access/index/indexam.c | 1381 ++++++++++++++++- src/backend/access/table/tableam.c | 2 +- src/backend/commands/constraint.c | 3 +- src/backend/executor/execIndexing.c | 12 +- src/backend/executor/execReplication.c | 9 +- src/backend/executor/nodeIndexonlyscan.c | 133 +- src/backend/executor/nodeIndexscan.c | 32 +- src/backend/utils/adt/selfuncs.c | 7 +- src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/access/amapi.h | 10 + src/include/access/genam.h | 13 +- src/include/access/relscan.h | 160 ++ src/include/access/tableam.h | 12 +- src/include/nodes/execnodes.h | 7 + src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 5 + 19 files changed, 1872 insertions(+), 39 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index ac082fefa77..f79d97a8c64 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -79,11 +79,12 @@ heapam_slot_callbacks(Relation relation) */ static IndexFetchTableData * -heapam_index_fetch_begin(Relation rel) +heapam_index_fetch_begin(Relation rel, ReadStream *rs) { IndexFetchHeapData *hscan = palloc0(sizeof(IndexFetchHeapData)); hscan->xs_base.rel = rel; + hscan->xs_base.rs = rs; hscan->xs_cbuf = InvalidBuffer; return &hscan->xs_base; @@ -94,6 +95,9 @@ heapam_index_fetch_reset(IndexFetchTableData *scan) { IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan; + if (scan->rs) + read_stream_reset(scan->rs); + if (BufferIsValid(hscan->xs_cbuf)) { ReleaseBuffer(hscan->xs_cbuf); @@ -108,6 +112,9 @@ heapam_index_fetch_end(IndexFetchTableData *scan) heapam_index_fetch_reset(scan); + if (scan->rs) + read_stream_end(scan->rs); + pfree(hscan); } @@ -130,15 +137,72 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan, /* Switch to correct buffer if we don't have it already */ Buffer prev_buf = hscan->xs_cbuf; - hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, - hscan->xs_base.rel, - ItemPointerGetBlockNumber(tid)); + /* + * Read the block for the requested TID. With a read stream, simply + * read the next block we queued earlier (from the callback). + * Otherwise just do the regular read using the TID. + * + * XXX It's a bit fragile to just read buffers, expecting the right + * block, which we queued from the callback sometime much earlier. If + * the two streams get out of sync in any way (which can happen + * easily, due to some optimization heuristics), it may misbehave in + * strange ways. + * + * XXX We need to support both the old ReadBuffer and ReadStream, as + * some places are unlikely to benefit from a read stream - e.g. + * because they only fetch a single tuple. So better to support this. + * + * XXX Another reason is that some index AMs may not support the + * batching interface, which is a prerequisite for using read_stream + * API. + */ + if (scan->rs) + hscan->xs_cbuf = read_stream_next_buffer(scan->rs, NULL); + else + hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, + hscan->xs_base.rel, + ItemPointerGetBlockNumber(tid)); + + /* We should always get a valid buffer for a valid TID. */ + Assert(BufferIsValid(hscan->xs_cbuf)); + + /* + * Did we read the expected block number (per the TID)? For the + * regular buffer reads this should always match, but with the read + * stream it might disagree due to a bug elsewhere (happened + * repeatedly). + */ + Assert(BufferGetBlockNumber(hscan->xs_cbuf) == ItemPointerGetBlockNumber(tid)); /* * Prune page, but only if we weren't already on this page */ if (prev_buf != hscan->xs_cbuf) heap_page_prune_opt(hscan->xs_base.rel, hscan->xs_cbuf); + + /* + * When using the read stream, release the old buffer. + * + * XXX Not sure this is really needed, or maybe this is not the right + * place to do this, and buffers should be released elsewhere. The + * problem is that other place may not really know if the index scan + * uses read stream API. + * + * XXX We need to do this, because otherwise the caller would need to + * do different things depending on whether the read_stream was used + * or not. With the read_stream it'd have to also explicitly release + * the buffers, but doing that for every caller seems error prone + * (easy to forget). It's also not clear whether it would free the + * buffer before or after the index_fetch_tuple call (we don't know if + * the buffer changed until *after* the call, etc.). + * + * XXX Does this do the right thing when reading the same page? That + * should return the same buffer, so won't we release it prematurely? + */ + if (scan->rs && (prev_buf != InvalidBuffer)) + { + ReleaseBuffer(prev_buf); + } } /* Obtain share-lock on the buffer so we can examine visibility */ @@ -753,7 +817,14 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, tableScan = NULL; heapScan = NULL; - indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, 0); + + /* + * XXX Maybe enable batching/prefetch for clustering? Seems like it + * might be a pretty substantial win if the table is not yet well + * clustered by the index. + */ + indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, 0, + false); index_rescan(indexScan, NULL, 0, NULL, 0); } else diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index 8f532e14590..8266d5e0e87 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -446,8 +446,21 @@ systable_beginscan(Relation heapRelation, elog(ERROR, "column is not in index"); } + /* + * No batching/prefetch for catalogs. We don't expect that to help + * very much, because we usually need just one row, and even if we + * need multiple rows, they tend to be colocated in heap. + * + * XXX Maybe we could do that, the prefetching only ramps up over time + * anyway? There was a problem with infinite recursion when looking up + * effective_io_concurrency for a tablespace (which may do an index + * scan internally), but the read_stream should care of that. Still, + * we don't expect this to help a lot. + * + * XXX This also means scans on catalogs won't use read_stream. + */ sysscan->iscan = index_beginscan(heapRelation, irel, - snapshot, NULL, nkeys, 0); + snapshot, NULL, nkeys, 0, false); index_rescan(sysscan->iscan, idxkey, nkeys, NULL, 0); sysscan->scan = NULL; @@ -707,8 +720,21 @@ systable_beginscan_ordered(Relation heapRelation, elog(ERROR, "column is not in index"); } + /* + * No batching/prefetch for catalogs. We don't expect that to help very + * much, because we usually need just one row, and even if we need + * multiple rows, they tend to be colocated in heap. + * + * XXX Maybe we could do that, the prefetching only ramps up over time + * anyway? There was a problem with infinite recursion when looking up + * effective_io_concurrency for a tablespace (which may do an index scan + * internally), but the read_stream should care of that. Still, we don't + * expect this to help a lot. + * + * XXX This also means scans on catalogs won't use read_stream. + */ sysscan->iscan = index_beginscan(heapRelation, indexRelation, - snapshot, NULL, nkeys, 0); + snapshot, NULL, nkeys, 0, false); index_rescan(sysscan->iscan, idxkey, nkeys, NULL, 0); sysscan->scan = NULL; diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 219df1971da..190a112e457 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -44,6 +44,7 @@ #include "postgres.h" #include "access/amapi.h" +#include "access/nbtree.h" /* XXX for MaxTIDsPerBTreePage (should remove) */ #include "access/relation.h" #include "access/reloptions.h" #include "access/relscan.h" @@ -58,6 +59,8 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" +/* enable batching / prefetching during index scans */ +bool enable_indexscan_batching = false; /* ---------------------------------------------------------------- * macros used in index_ routines @@ -109,6 +112,36 @@ static IndexScanDesc index_beginscan_internal(Relation indexRelation, ParallelIndexScanDesc pscan, bool temp_snap); static inline void validate_relation_kind(Relation r); +/* index batching */ +static void index_batch_init(IndexScanDesc scan); +static void index_batch_reset(IndexScanDesc scan, bool complete); +static void index_batch_end(IndexScanDesc scan); +static bool index_batch_getnext(IndexScanDesc scan); +static void index_batch_free(IndexScanDesc scan, IndexScanBatch batch); +static ItemPointer index_batch_getnext_tid(IndexScanDesc scan, + ScanDirection direction); + +static BlockNumber index_scan_stream_read_next(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data); + +static bool index_batch_pos_advance(IndexScanDesc scan, IndexScanBatchPos *pos); +static void index_batch_pos_reset(IndexScanDesc scan, IndexScanBatchPos *pos); +static void index_batch_kill_item(IndexScanDesc scan); + +static void AssertCheckBatchPosValid(IndexScanDesc scan, IndexScanBatchPos *pos); +static void AssertCheckBatch(IndexScanDesc scan, IndexScanBatch batch); +static void AssertCheckBatches(IndexScanDesc scan); + + +#define INDEX_SCAN_BATCH(scan, idx) \ + ((scan)->xs_batches->batches[(idx) % (scan)->xs_batches->maxBatches]) + +#ifdef INDEXAM_DEBUG +#define DEBUG_LOG(...) elog(WARNING, __VA_ARGS__) +#else +#define DEBUG_LOG(...) +#endif /* ---------------------------------------------------------------- * index_ interface functions @@ -250,6 +283,10 @@ index_insert_cleanup(Relation indexRelation, /* * index_beginscan - start a scan of an index with amgettuple * + * enable_batching determines whether the scan should try using the batching + * interface (amgetbatch/amfreebatch), if supported by the index AM, or the + * regular amgettuple interface. + * * Caller must be holding suitable locks on the heap and the index. */ IndexScanDesc @@ -257,8 +294,10 @@ index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, - int nkeys, int norderbys) + int nkeys, int norderbys, + bool enable_batching) { + ReadStream *rs = NULL; IndexScanDesc scan; Assert(snapshot != InvalidSnapshot); @@ -273,8 +312,45 @@ index_beginscan(Relation heapRelation, scan->xs_snapshot = snapshot; scan->instrument = instrument; + /* + * If explicitly requested and supported by both the index AM and the + * plan, initialize batching info. We only use stream read API with + * batching enabled (so not with systable scans). But maybe we should + * change that, and just use different read_next callbacks (or something + * like that)? + * + * XXX Maybe we should have a separate "amcanbatch" call, to let the AM + * decide if batching is supported depending on the scan details. That + * might be needed for certain index AMs, that can do batching only for + * some scans (I'm thinking about GiST/SP-GiST indexes, with ORDER BY). + * + * XXX Do this before initializing xs_heapfetch, so that we can pass the + * read stream to it. + */ + if ((indexRelation->rd_indam->amgetbatch != NULL) && + enable_batching && + enable_indexscan_batching) + { + /* + * XXX We do this after index_beginscan_internal(), which means we + * can't init the batch state in there (it doesn't even know if + * batching will be used at that point). We can't init the read_stream + * there, because it needs the heapRelation. + */ + index_batch_init(scan); + + /* initialize stream */ + rs = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + heapRelation, + MAIN_FORKNUM, + index_scan_stream_read_next, + scan, + 0); + } + /* prepare to fetch index matches from table */ - scan->xs_heapfetch = table_index_fetch_begin(heapRelation); + scan->xs_heapfetch = table_index_fetch_begin(heapRelation, rs); return scan; } @@ -337,6 +413,12 @@ index_beginscan_internal(Relation indexRelation, scan->parallel_scan = pscan; scan->xs_temp_snap = temp_snap; + /* + * No batching by default, so set it to NULL. Will be initialized later if + * batching is requested and AM supports it. + */ + scan->xs_batches = NULL; + return scan; } @@ -370,6 +452,19 @@ index_rescan(IndexScanDesc scan, scan->kill_prior_tuple = false; /* for safety */ scan->xs_heap_continue = false; + /* + * Reset the batching. This makes it look like there are no batches, + * discards reads already scheduled to the read stream, etc. + * + * XXX We do this before calling amrescan, so that it could reinitialize + * everything (this probably does not matter very much, now that we've + * moved all the batching logic to indexam.c, it was more important when + * the index AM was responsible for more of it). + * + * XXX Maybe this should also happen before table_index_fetch_reset? + */ + index_batch_reset(scan, true); + scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys, orderbys, norderbys); } @@ -384,6 +479,9 @@ index_endscan(IndexScanDesc scan) SCAN_CHECKS; CHECK_SCAN_PROCEDURE(amendscan); + /* Cleanup batching, so that the AM can release pins and so on. */ + index_batch_end(scan); + /* Release resources (like buffer pins) from table accesses */ if (scan->xs_heapfetch) { @@ -414,7 +512,46 @@ index_markpos(IndexScanDesc scan) SCAN_CHECKS; CHECK_SCAN_PROCEDURE(ammarkpos); - scan->indexRelation->rd_indam->ammarkpos(scan); + /* + * Without batching, just use the ammarkpos() callback. With batching + * everything is handled at this layer, without calling the AM. + */ + if (scan->xs_batches == NULL) + { + scan->indexRelation->rd_indam->ammarkpos(scan); + } + else + { + IndexScanBatches *batches = scan->xs_batches; + IndexScanBatchPos *pos = &batches->markPos; + IndexScanBatchData *batch = batches->markBatch; + + /* + * Free the previous mark batch (if any), but only if the batch is no + * longer valid (in the current first/next range). This means that if + * we're marking the same batch (different item), we don't really do + * anything. + * + * XXX Should have some macro for this check, I guess. + */ + if ((batch != NULL) && + (pos->batch < batches->firstBatch || pos->batch >= batches->nextBatch)) + { + batches->markBatch = NULL; + index_batch_free(scan, batch); + } + + /* just copy the read position (which has to be valid) */ + batches->markPos = batches->readPos; + batches->markBatch = INDEX_SCAN_BATCH(scan, batches->markPos.batch); + + /* + * FIXME we need to make sure the batch does not get freed during the + * regular advances. + */ + + AssertCheckBatchPosValid(scan, &batches->markPos); + } } /* ---------------- @@ -447,7 +584,58 @@ index_restrpos(IndexScanDesc scan) scan->kill_prior_tuple = false; /* for safety */ scan->xs_heap_continue = false; - scan->indexRelation->rd_indam->amrestrpos(scan); + /* + * Without batching, just use the amrestrpos() callback. With batching + * everything is handled at this layer, without calling the AM. + */ + if (scan->xs_batches == NULL) + scan->indexRelation->rd_indam->amrestrpos(scan); + else + { + IndexScanBatches *batches = scan->xs_batches; + IndexScanBatchPos *pos = &batches->markPos; + IndexScanBatchData *batch = scan->xs_batches->markBatch; + + Assert(batch != NULL); + + /* + * XXX The pos can be invalid, if we already advanced past the the + * marked batch (and stashed it in markBatch instead of freeing). So + * this assert would be incorrect. + */ + /* AssertCheckBatchPosValid(scan, &pos); */ + + /* FIXME we should still check the batch was not freed yet */ + + /* + * Reset the batching state, except for the marked batch, and make it + * look like we have a single batch - the marked one. + * + * XXX This seems a bit ugly / hacky, maybe there's a more elegant way + * to do this? + */ + index_batch_reset(scan, false); + + batches->markPos = *pos; + batches->readPos = *pos; + batches->firstBatch = pos->batch; + batches->nextBatch = (batches->firstBatch + 1); + + INDEX_SCAN_BATCH(scan, batches->markPos.batch) = batch; + + /* + * XXX I really dislike that we have so many definitions of "current" + * batch. We have readPos, streamPos, currentBatch, ... seems very ad + * hoc - I just added a new "current" field when I needed one. We + * should make that somewhat more consistent, or at least explain it + * clearly somewhere. + * + * XXX Do we even need currentBatch? It's not accessed anywhere, at + * least not in this patch. + */ + // batches->currentBatch = batch; + batches->markBatch = batch; /* also remember this */ + } } /* @@ -569,6 +757,18 @@ index_parallelrescan(IndexScanDesc scan) if (scan->xs_heapfetch) table_index_fetch_reset(scan->xs_heapfetch); + /* + * Reset the batching. This makes it look like there are no batches, + * discards reads already scheduled to the read stream, etc. We Do this + * before calling amrescan, so that it can reinitialize everything. + * + * XXX We do this before calling amparallelrescan, so that it could + * reinitialize everything (this probably does not matter very much, now + * that we've moved all the batching logic to indexam.c, it was more + * important when the index AM was responsible for more of it). + */ + index_batch_reset(scan, true); + /* amparallelrescan is optional; assume no-op if not provided by AM */ if (scan->indexRelation->rd_indam->amparallelrescan != NULL) scan->indexRelation->rd_indam->amparallelrescan(scan); @@ -583,10 +783,12 @@ IndexScanDesc index_beginscan_parallel(Relation heaprel, Relation indexrel, IndexScanInstrumentation *instrument, int nkeys, int norderbys, - ParallelIndexScanDesc pscan) + ParallelIndexScanDesc pscan, + bool enable_batching) { Snapshot snapshot; IndexScanDesc scan; + ReadStream *rs = NULL; Assert(RelFileLocatorEquals(heaprel->rd_locator, pscan->ps_locator)); Assert(RelFileLocatorEquals(indexrel->rd_locator, pscan->ps_indexlocator)); @@ -604,8 +806,48 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, scan->xs_snapshot = snapshot; scan->instrument = instrument; + /* + * If explicitly requested and supported by both the index AM and the + * plan, initialize batching info. We only use stream read API with + * batching enabled (so not with systable scans). But maybe we should + * change that, and just use different read_next callbacks (or something + * like that)? + * + * XXX Maybe we should have a separate "amcanbatch" call, to let the AM + * decide if batching is supported depending on the scan details. That + * might be needed for certain index AMs, that can do batching only for + * some scans (I'm thinking about GiST/SP-GiST indexes, with ORDER BY). + * + * XXX Do this before initializing xs_heapfetch, so that we can pass the + * read stream to it. + * + * XXX Pretty duplicate with the code in index_beginscan(), so maybe move + * into a shared function. + */ + if ((indexrel->rd_indam->amgetbatch != NULL) && + enable_batching && + enable_indexscan_batching) + { + /* + * XXX We do this after index_beginscan_internal(), which means we + * can't init the batch state in there (it doesn't even know if + * batching will be used at that point). We can't init the read_stream + * there, because it needs the heapRelation. + */ + index_batch_init(scan); + + /* initialize stream */ + rs = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + heaprel, + MAIN_FORKNUM, + index_scan_stream_read_next, + scan, + 0); + } + /* prepare to fetch index matches from table */ - scan->xs_heapfetch = table_index_fetch_begin(heaprel); + scan->xs_heapfetch = table_index_fetch_begin(heaprel, rs); return scan; } @@ -628,6 +870,27 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction) /* XXX: we should assert that a snapshot is pushed or registered */ Assert(TransactionIdIsValid(RecentXmin)); + /* + * When using batching (which may be disabled for various reasons - e.g. + * through a GUC, the index AM not supporting it), redirect the code to + * the "batch" variant. If needed (e.g. for the first call) the call may + * read the next batch (leaf page) from the index (but that's driven by + * the read stream). + * + * XXX Maybe we should enable batching based on the plan too, so that we + * don't do batching when it's probably useless (e.g. semijoins or queries + * with LIMIT 1 etc.). The amcanbatch() callback might consider things + * like that, or maybe that should be considered outside AM. However, the + * slow ramp-up (starting with small batches) in read_stream should handle + * this well enough. + * + * XXX Perhaps it'd be possible to do both in index_getnext_slot(), i.e. + * call either the original code without batching, or the new batching + * code if supported/enabled. It's not great to have duplicated code. + */ + if (scan->xs_batches != NULL) + return index_batch_getnext_tid(scan, direction); + /* * The AM's amgettuple proc finds the next index entry matching the scan * keys, and puts the TID into scan->xs_heaptid. It should also set @@ -694,9 +957,22 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot) * amgettuple call, in index_getnext_tid). We do not do this when in * recovery because it may violate MVCC to do so. See comments in * RelationGetIndexScan(). + * + * XXX For scans using batching, record the flag in the batch (we will + * pass it to the AM later, when freeing it). Otherwise just pass it to + * the AM using the kill_prior_tuple field. */ if (!scan->xactStartedInRecovery) - scan->kill_prior_tuple = all_dead; + { + if (scan->xs_batches == NULL) + { + scan->kill_prior_tuple = all_dead; + } + else if (all_dead) + { + index_batch_kill_item(scan); + } + } return found; } @@ -1084,3 +1360,1094 @@ index_opclass_options(Relation indrel, AttrNumber attnum, Datum attoptions, return build_local_reloptions(&relopts, attoptions, validate); } + +/* + * INDEX BATCHING (AND PREFETCHING) + * + * The traditional AM interface (amgettuple) is designed to walk the index one + * leaf page at a time, and the state (representing the leaf page) is managed + * by the AM implementation. Before advancing to the next leaf page, the index + * AM forgets the "current" leaf page. This makes it impossible to implement + * features that operate on multiple leaf pages - like for example prefetch. + * + * The batching relaxes this by extending the AM API with two new methods, + * amgetbatch and amfreebatch, that separate the "advance" to the next leaf + * page, and "forgetting" the previous one. This means there may be multiple + * leaf pages loaded at once, if necessary. It's a bit like having multiple + * "positions" within the index. + * + * The AM is no longer responsible for management of these "batches" - once + * a batch is returned from amgetbatch(), it's up to indexam.c to determine + * when it's no longer necessary, and call amfreebatch(). That is, the AM + * can no longer discard a leaf page when advancing to the next one. + * + * This allows operating on "future" index entries, e.g. to prefetch tuples + * from the table. Without the batching, we could do this within the single + * leaf page, which has limitations, e.g. inability to prefetch beyond the + * of the current leaf page, and the prefetch distance drop to 0. (Most + * indexes have many index items per leaf page, so the prefetching would + * be beneficial even with this limitation, but it's not great either.) + * + * Moving the batch management to the indexam.c also means defining a common + * batch state, instead of each index AM defining it's own opaque state. The + * AM merely "fills" the batch, and everything else is handled by code in + * indexam.c (so not AM-specific). Including prefetching. + * + * Without this "common" batch definition, each AM would need to do a fair + * bit of the prefetching on it's own. + * + * + * note: Strictly speaking, the AM may keep a second leaf page because of + * mark/restore may, but that's a minor detail. + * + * note: There are different definitions of "batch" - I use it as a synonym + * for a leaf page, or the index tuples read from one leaf page. Others use + * "batch" when talking about all the leaf pages kept in memory at a given + * moment in time (so in a way, there's a single batch, changing over time). + * It's not my ambition to present a binding definition of a batch, but it's + * good to consider this when reading comments by other people. + * + * note: In theory, how the batch maps to leaf pages is mostly up to the index + * AM - as long as it can "advance" between batches, etc. it could use batches + * that represent a subset of a leaf page, or multiple leaf pages at once. + * + * note: Or maybe it doesn't need to map to leaf pages at all, at least not + * in a simple way. Consider for example ordered scans on SP-GiST indexes, + * or similar cases. I think that could be handled by having "abstract" + * batches - such indexes don't support mark/restore or changing direction, + * so this should be OK. + * + * note: When thinking about an index AM, think about BTREE, unless another + * AM is mentioned explicitly. Most AMs are based on / derived from BTREE, + * and everything about BTREE directly extends to them. + * + * note: In the following text "index AM" refers to an implementation of a + * particular index AM (e.g. BTREE), i.e. code src/backend/access/nbtree), + * while "indexam.c" is the shared executor level used to interact with + * indexes. + * + * + * index scan state + * ---------------- + * With the traditional API (amgettuple), index scan state is stored at the + * scan-level in AM-specific structs - e.g. in BTScanOpaque for BTREE). So + * there can be only a single leaf page "loaded" for a scan at a time. + * + * With the new API (amgetbatch/amfreebatch), an index scan needs to store + * multiple batches - but not in private "scan opaque" struct. Instead, + * the queue of batches and some of the other information was moved to the + * IndexScanDesc, into a common struct. So the AM-specific scan-opaque + * structs get split and moved into three places: + * + * 1) scan-opaque - Fields that are truly related to the scan as a whole + * remain in the struct (which is AM-specific, i.e. each AM method may + * keep something different). Example: scankeys/arraykeys are still + * kept in BTScanOpaque. + * + * 2) batch-opaque - AM-specific information related to a particular leaf + * page are moved to a new batch-level struct. A good example are for + * example the position of the leaf page / batch in the index (current + * page, left/righ pages, etc.). + * + * 3) batch - A significant part of the patch is introducing a common + * representation of a batch, common to all the index AMs. Until now + * each AM had it's own way of representing tuples from a leaf page, + * and accessing it required going through the AM again. The common + * representation allows accessing the batches through the indexam.c + * layer, without having to go through the AM. + * + * + * amgetbatch/amfreebatch + * ---------------------- + * To support batching, the index AM needs to implement two optional + * callbacks - amgetbatch() and amfreebatch(), which load data from the + * "next" leaf page, and then free it when the batch is no longer needed. + * + * For now the amgettuple() callback is still required even for AMs that + * support batching, so that we can fall-back to the non-batched scan + * for cases when batching is not supported (e.g. scans of system tables) + * or when batching is disabled using the enable_indexscan_batching GUC. + * + * + * batch + * ---------------------- + * A good way to visualize batching is a sliding window over the key space of + * an index. At any given moment, we have a "window" representing a range of + * the keys, consisting of one or more batches, each with items from a single + * leaf page. + * + * For now, each batch is exactly one whole leaf page. We might allow batches + * to be smaller or larger, but that doesn't seem very useful. It would make + * things more complex, without providing much benefit. Ultimately it's up to + * the index AM - it can produce any batches it wants, as long as it keeps + * necessary information in the batch-opaque struct, and handles this in the + * amgetbatch/amfreebatch callbacks. + * + * + * prefetching: leaf pages vs. heap pages + * -------------------------------------- + * This patch is only about prefetching pages from the indexed relation (e.g. + * heap), not about prefetching index leaf pages etc. The read_next callback + * does read leaf pages when needed (after reaching the end of the current + * batch), but this is synchronous, and the callback will block until the leaf + * page is read. + * + * + * gradual ramp up + * --------------- + * The prefetching is driven by the read_stream API / implementation. There + * are no explicit fadvise calls in the index code, that all happens in the + * read stream. The read stream does the usual gradual ramp up to not regress + * LIMIT 1 queries etc. + * + * + * kill_prior_tuples + * ----------------- + * If we decide a tuple should be "killed" in the index, the a flag is used to + * pass this information to indexam.c - the item is recorded in the batch, and + * the actual killing is postponed until the batch is freed using amfreebatch(). + * The scan flag is reset to false, so that the index AM does not get confused + * and does not do something for a different "current" item. + * + * That is, this is very similar to what happens without batching, except that + * the killed items are accumulated in indexam.c, not in the AM. + */ + +/* + * Maximum number of batches (leaf pages) we can keep in memory. + * + * The value 64 value is arbitrary, it's about 1MB of data with 8KB pages. We + * should not really need this many batches - we need a certain number of TIDs, + * to satisfy the prefetch distance, and there usually are many index tuples + * per page. In the worst case we might have one index tuple per leaf page, + * but even that may not quite work in some cases. + * + * But there may be cases when this does not work - some examples: + * + * a) the index may be bloated, with many pages only have a single index item + * + * b) the index is correlated, and we skip prefetches of duplicate blocks + * + * c) we may be doing index-only scan, and we don't prefetch all-visible pages + * + * So we might need to load huge number of batches before we find the first + * block to load from the table. Or enough pages to satisfy the prefetch + * distance. + * + * XXX Currently, once we hit this number of batches, we fail in the stream + * callback (or rather in index_batch_getnext), because that's where we load + * batches. It'd be nice to "pause" the read stream for a bit instead, but + * there's no built-in way to do that. So we can only "stop" the stream by + * returning InvalidBlockNumber. But we could also remember this, and do + * read_stream_reset() to continue, after consuming all the already scheduled + * blocks. + * + * XXX Maybe 64 is too high - it also defines the maximum amount of overhead + * allowed. In the worst case, reading a single row might trigger reading this + * many leaf pages (e.g. with IOS). Which might be an issue with LIMIT queries, + * when we actually won't need most of the leaf pages. + * + * XXX We could/should use a lower value for testing, to make it more likely + * we hit this issue. With 64 the whole check-world passes without hitting + * the limit, wo we wouldn't test it's handled correctly. + */ +#define INDEX_SCAN_MAX_BATCHES 64 + +#define INDEX_SCAN_BATCH_COUNT(scan) \ + ((scan)->xs_batches->nextBatch - (scan)->xs_batches->firstBatch) + +#define INDEX_SCAN_BATCH_LOADED(scan, idx) \ + ((idx) < (scan)->xs_batches->nextBatch) + +#define INDEX_SCAN_BATCH_FULL(scan) \ + (INDEX_SCAN_BATCH_COUNT(scan) == scan->xs_batches->maxBatches) + +/* + * Check that a position (batch,item) is valid with respect to the batches we + * have currently loaded. + * + * XXX The "marked" batch is an exception. The marked batch may get outside + * the range of current batches, so make sure to never check the position + * for that. + */ +static void +AssertCheckBatchPosValid(IndexScanDesc scan, IndexScanBatchPos *pos) +{ +#ifdef USE_ASSERT_CHECKING + IndexScanBatches *batch = scan->xs_batches; + + /* make sure the position is valid for currently loaded batches */ + Assert(pos->batch >= batch->firstBatch); + Assert(pos->batch < batch->nextBatch); +#endif +} + +/* + * Check a single batch is valid. + */ +static void +AssertCheckBatch(IndexScanDesc scan, IndexScanBatch batch) +{ +#ifdef USE_ASSERT_CHECKING + /* there must be valid range of items */ + Assert(batch->firstItem <= batch->lastItem); + Assert(batch->firstItem >= 0); + Assert(batch->lastItem <= MaxTIDsPerBTreePage); /* XXX tied to BTREE */ + + /* we should have items (buffer and pointers) */ + Assert(batch->items != NULL); + // Assert(batch->currTuples != NULL); + + /* + * The number of killed items must be valid, and there must be an array of + * indexes if there are items. + */ + Assert(batch->numKilled >= 0); + Assert(batch->numKilled <= MaxTIDsPerBTreePage); /* XXX tied to BTREE */ + Assert(!((batch->numKilled > 0) && (batch->killedItems == NULL))); + + /* XXX can we check some of the other batch fields? */ +#endif +} + +/* + * Check invariants on current batches + * + * Makes sure the indexes are set as expected, the buffer size is within + * limits, and so on. + */ +static void +AssertCheckBatches(IndexScanDesc scan) +{ +#ifdef USE_ASSERT_CHECKING + IndexScanBatches *batches = scan->xs_batches; + + /* we should have batches initialized */ + Assert(batches != NULL); + + /* We should not have too many batches. */ + Assert((batches->maxBatches > 0) && + (batches->maxBatches <= INDEX_SCAN_MAX_BATCHES)); + + /* + * The first/next indexes should define a valid range (in the cyclic + * buffer, and should not overflow maxBatches. + */ + Assert((batches->firstBatch >= 0) && + (batches->firstBatch <= batches->nextBatch)); + Assert((batches->nextBatch - batches->firstBatch) <= batches->maxBatches); + + /* Check all current batches */ + for (int i = batches->firstBatch; i < batches->nextBatch; i++) + { + IndexScanBatch batch = INDEX_SCAN_BATCH(scan, i); + + AssertCheckBatch(scan, batch); + } +#endif +} + +/* debug: print info about current batches */ +static void +index_batch_print(const char *label, IndexScanDesc scan) +{ +#ifdef INDEXAM_DEBUG + IndexScanBatches *batches = scan->xs_batches; + + if (!scan->xs_batches) + return; + + DEBUG_LOG("%s: batches firstBatch %d nextBatch %d maxBatches %d", + label, + batches->firstBatch, batches->nextBatch, batches->maxBatches); + + for (int i = batches->firstBatch; i < batches->nextBatch; i++) + { + IndexScanBatchData *batch = INDEX_SCAN_BATCH(scan, i); + + DEBUG_LOG("%s: batch %d %p first %d last %d item %d killed %d", + label, i, batch, batch->firstItem, batch->lastItem, + batch->itemIndex, batch->numKilled); + } +#endif +} + +/* + * index_batch_pos_advance + * Advance the position to the next item, depending on scan direction. + * + * Advance the position to the next item, either in the same batch or the + * following one (if already available). + * + * We can advance only if we already have some batches loaded, and there's + * either enough items in the current batch, or some more items in the + * subsequent batches. + * + * If this is the first advance, right after loading the first batch, the + * position is still be undefined. Otherwise we expect the position to be + * valid. + * + * Returns true if the position was advanced, false otherwise. + * + * The poisition is guaranteed to be valid only after an advance. + */ +static bool +index_batch_pos_advance(IndexScanDesc scan, IndexScanBatchPos *pos) +{ + IndexScanBatchData *batch; + ScanDirection direction = scan->xs_batches->direction; + + /* make sure we have batching initialized and consistent */ + AssertCheckBatches(scan); + + /* should know direction by now */ + Assert(direction != NoMovementScanDirection); + + /* We can't advance if there are no batches available. */ + if (INDEX_SCAN_BATCH_COUNT(scan) == 0) + return false; + + /* + * If the position has not been advanced yet, it has to be right after we + * loaded the first batch. In that case just initialize it to the first + * item in the batch (or last item, if it's backwards scaa). + * + * XXX Maybe we should just explicitly initialize the postition after + * loading the first batch, without having to go through the advance. + * + * XXX Add a macro INDEX_SCAN_POS_DEFINED() or something like this, to + * make this easier to understand. + */ + if ((pos->batch == -1) && (pos->index == -1)) + { + /* we should have loaded the very first batch */ + Assert(scan->xs_batches->firstBatch == 0); + + batch = INDEX_SCAN_BATCH(scan, scan->xs_batches->firstBatch); + Assert(batch != NULL); + + pos->batch = 0; + + if (ScanDirectionIsForward(direction)) + pos->index = batch->firstItem; + else + pos->index = batch->lastItem; + + /* the position we just set has to be valid */ + AssertCheckBatchPosValid(scan, pos); + + return true; + } + + /* + * The position is already defined, so we should have some batches loaded + * and the position has to be valid with respect to those. + */ + AssertCheckBatchPosValid(scan, pos); + + /* + * Advance to the next item in the same batch. If the position is for the + * last item in the batch, try advancing to the next batch (if loaded). + */ + batch = INDEX_SCAN_BATCH(scan, pos->batch); + + if (ScanDirectionIsForward(direction)) + { + if (pos->index < batch->lastItem) + { + pos->index++; + + /* the position has to be valid */ + AssertCheckBatchPosValid(scan, pos); + + return true; + } + } + else /* ScanDirectionIsBackward */ + { + if (pos->index > batch->firstItem) + { + pos->index--; + + /* the position has to be valid */ + AssertCheckBatchPosValid(scan, pos); + + return true; + } + } + + /* + * We couldn't advance within the same batch, try advancing to the next + * batch, if it's already loaded. + */ + if (INDEX_SCAN_BATCH_LOADED(scan, pos->batch + 1)) + { + /* advance to the next batch */ + pos->batch++; + + batch = INDEX_SCAN_BATCH(scan, pos->batch); + Assert(batch != NULL); + + if (ScanDirectionIsForward(direction)) + pos->index = batch->firstItem; + else + pos->index = batch->lastItem; + + /* the position has to be valid */ + AssertCheckBatchPosValid(scan, pos); + + return true; + } + + /* can't advance */ + return false; +} + +/* + * index_batch_pos_reset + * Reset the position, so that it looks as if never advanced. + */ +static void +index_batch_pos_reset(IndexScanDesc scan, IndexScanBatchPos *pos) +{ + pos->batch = -1; + pos->index = -1; +} + +/* + * index_scan_stream_read_next + * return the next block to pass to the read stream + * + * This assumes the "current" scan direction, requested by the caller. If + * that changes before consuming all buffers, we'll reset the stream and + * start from scratch. Which may seem inefficient, but it's no worse than + * what we do now, and it's not a very common case. + * + * The position of the read_stream is stored in streamPos, which may be + * ahead of the current readPos (which is what got consumed by the scan). + * + * The scan direction change is checked / handled elsewhere. Here we rely + * on having the correct value in xs_batches->direction. + */ +static BlockNumber +index_scan_stream_read_next(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + IndexScanDesc scan = (IndexScanDesc) callback_private_data; + IndexScanBatchPos *pos = &scan->xs_batches->streamPos; + + /* we should have set the direction already */ + Assert(scan->xs_batches->direction != NoMovementScanDirection); + + /* + * The read position has to be valid, because we initialize/advance it + * before maybe even attempting to read the heap tuple. And it lags behind + * the stream position, so it can't be invalid yet. If this is the first + * time for this callback, we will use the readPos to init streamPos, so + * better check it's valid. + */ + AssertCheckBatchPosValid(scan, &scan->xs_batches->readPos); + + /* + * Try to advance to the next item, and if there's none in the current + * batch, try loading the next batch. + * + * XXX This loop shouldn't happen more than twice, because if we fail to + * advance the position, we'll try to load the next batch and then in the + * next loop the advance has to succeed. + */ + while (true) + { + bool advanced = false; + + /* + * If the stream position is undefined, just use the read position. + * + * It's possible we got here only fairly late in the scan, e.g. if + * many tuples got skipped in the index-only scan, etc. In this case + * just use the read position as a starting point. + * + * The first batch is loaded from index_batch_getnext_tid(), because + * we don't get here until the first index_fetch_heap() call - only + * then can read_stream start loading more batches. It's also possible + * to disable prefetching (effective_io_concurrency=0), in which case + * all batches get loaded in index_batch_getnext_tid. + */ + if ((pos->batch == -1) && (pos->index == -1)) + { + *pos = scan->xs_batches->readPos; + advanced = true; + } + else if (index_batch_pos_advance(scan, pos)) + { + advanced = true; + } + + /* FIXME maybe check the streamPos is not behind readPos? */ + + /* If we advanced the position, return the block for the TID. */ + if (advanced) + { + IndexScanBatch batch = INDEX_SCAN_BATCH(scan, pos->batch); + ItemPointer tid = &batch->items[pos->index].heapTid; + + DEBUG_LOG("index_scan_stream_read_next: index %d TID (%u,%u)", + pos->index, + ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)); + + /* + * if there's a prefetch callback, use it to decide if we will + * need to read the block + */ + if (scan->xs_batches->prefetchCallback && + !scan->xs_batches->prefetchCallback(scan, scan->xs_batches->prefetchArgument, pos)) + { + DEBUG_LOG("index_scan_stream_read_next: skip block (callback)"); + continue; + } + + return ItemPointerGetBlockNumber(tid); + } + + /* + * Couldn't advance the position, so either there are no more items in + * the current batch, or maybe we don't have any batches yet (if is + * the first time through). Try loading the next batch - if that + * succeeds, try the advance again (and this time the advance should + * work). + * + * If we fail to load the next batch, we're done. + */ + if (!index_batch_getnext(scan)) + break; + } + + /* no more items in this scan */ + return InvalidBlockNumber; +} + +/* ---------------- + * index_batch_getnext - get the next batch of TIDs from a scan + * + * Returns true if we managed to read at least some TIDs into the batch, or + * false if there are no more TIDs in the scan. The batch load may fail for + * multiple reasons - there really may not be more batches in the scan, or + * maybe we reached INDEX_SCAN_MAX_BATCHES. + * + * Returns true if the batch was loaded successfully, false otherwise. + * + * XXX This only loads the TIDs and resets the various batch fields to + * fresh state. It does not set xs_heaptid/xs_itup/xs_hitup, that's the + * responsibility of the following index_batch_getnext_tid() calls. + * ---------------- + */ +static bool +index_batch_getnext(IndexScanDesc scan) +{ + IndexScanBatchData *batch; + ItemPointerData tid; + ScanDirection direction = scan->xs_batches->direction; + IndexTuple itup; + + SCAN_CHECKS; + CHECK_SCAN_PROCEDURE(amgetbatch); + + /* XXX: we should assert that a snapshot is pushed or registered */ + Assert(TransactionIdIsValid(RecentXmin)); + + /* + * If we already used the maximum number of batch slots available, it's + * pointless to try loading another one. This can happen for various + * reasons, e.g. for index-only scans on all-visible table, or skipping + * duplicate blocks on perfectly correlated indexes, etc. + * + * We could enlarge the array to allow more batches, but that's futile, we + * can always construct a case using more memory. Not only it would risk + * OOM, it'd also be inefficient because this happens early in the scan + * (so it'd interfere with LIMIT queries). + * + * XXX For now we just error out, but the correct solution is to pause the + * stream by returning InvalidBlockNumber and then unpause it by doing + * read_stream_reset. + */ + if (INDEX_SCAN_BATCH_FULL(scan)) + { + DEBUG_LOG("index_batch_getnext: ran out of space for batches"); + scan->xs_batches->reset = true; + } + + /* + * Did we fill the batch queue, either in this or some earlier call? + * If yes, we have to consume everything from currently loaded batch + * before we reset the stream and continue. It's a bit like 'finished' + * but it's only a temporary pause, not the end of the stream. + */ + if (scan->xs_batches->reset) + return NULL; + + /* + * Did we already read the last batch for this scan? + * + * We may read the batches in two places, so we need to remember that, + * otherwise the retry restarts the scan. + * + * XXX This comment might be obsolete, from before using the read_stream. + * + * XXX Also, maybe we should do this before calling INDEX_SCAN_BATCH_FULL? + */ + if (scan->xs_batches->finished) + return NULL; + + index_batch_print("index_batch_getnext / start", scan); + + /* + * FIXME btgetbatch calls _bt_returnitem, which however sets xs_heaptid, + * and so would interfere with index scans (because this may get executed + * from the read_stream_next_buffer callback during the scan (fetching + * heap tuples in heapam_index_fetch_tuple). Ultimately we should not do + * _bt_returnitem at all, just functions like _bt_steppage etc. while + * loading the next batch. + * + * XXX I think this is no longer true, the amgetbatch does not do that I + * believe (_bt_returnitem_batch should not set these fields). + */ + tid = scan->xs_heaptid; + itup = scan->xs_itup; + + batch = scan->indexRelation->rd_indam->amgetbatch(scan, direction); + if (batch != NULL) + { + /* + * We got the batch from the AM, but we need to add it to the queue. + * Maybe that should be part of the "batch allocation" that happens in + * the AM? + */ + int batchIndex = scan->xs_batches->nextBatch; + + INDEX_SCAN_BATCH(scan, batchIndex) = batch; + + scan->xs_batches->nextBatch++; + + /* + * XXX Why do we need currentBatch, actually? It doesn't seem to be + * used anywhere, just set ... + */ + // scan->xs_batches->currentBatch = batch; + + DEBUG_LOG("index_batch_getnext firstBatch %d nextBatch %d batch %p", + scan->xs_batches->firstBatch, scan->xs_batches->nextBatch, batch); + } + else + scan->xs_batches->finished = true; + + /* XXX see FIXME above */ + scan->xs_heaptid = tid; + scan->xs_itup = itup; + + AssertCheckBatches(scan); + + index_batch_print("index_batch_getnext / end", scan); + + return (batch != NULL); +} + +/* ---------------- + * index_getnext_batch_tid - get the next TID from the current batch + * + * The calling convention is similar to index_getnext_tid() - NULL means no + * more items in the current batch, and no more batches. + * + * If we advance to the next batch, we release the previous one (unless it's + * tracked for mark/restore). + * + * Returns the next TID, or NULL if no more items (or batches). + * + * FIXME This only sets xs_heaptid and xs_itup (if requested). Not sure if + * we need to do something with xs_hitup. Should this set xs_hitup? + * + * XXX Maybe if we advance the position to the next batch, we could keep the + * batch for a bit more, in case the scan direction changes (as long as it + * fits into maxBatches)? But maybe that's unnecessary complexity for too + * little gain, we'd need to be careful about releasing the batches lazily. + * ---------------- + */ +static ItemPointer +index_batch_getnext_tid(IndexScanDesc scan, ScanDirection direction) +{ + IndexScanBatchPos *pos; + + /* shouldn't get here without batching */ + AssertCheckBatches(scan); + + /* read the next TID from the index */ + pos = &scan->xs_batches->readPos; + + /* FIXME handle change of scan direction (reset stream, ...) */ + scan->xs_batches->direction = direction; + + DEBUG_LOG("index_batch_getnext_tid pos %d %d direction %d", + pos->batch, pos->index, direction); + + /* + * Try advancing the batch position. If that doesn't succeed, it means we + * don't have more items in the current batch, and there's no future batch + * loaded. So try loading another batch, and maybe retry. + * + * FIXME This loop shouldn't happen more than twice. Maybe we should have + * some protection against infinite loops? If the advance/getnext + * functions get to disagree? + */ + while (true) + { + /* + * If we manage to advance to the next items, return it and we're + * done. Otherwise try loading another batch. + */ + if (index_batch_pos_advance(scan, pos)) + { + IndexScanBatchData *batch = INDEX_SCAN_BATCH(scan, pos->batch); + + Assert(batch != NULL); + + /* set the TID / itup for the scan */ + scan->xs_heaptid = batch->items[pos->index].heapTid; + scan->xs_itup = (IndexTuple) (batch->currTuples + batch->items[pos->index].tupleOffset); + + DEBUG_LOG("pos batch %p first %d last %d pos %d/%d TID (%u,%u)", + batch, batch->firstItem, batch->lastItem, + pos->batch, pos->index, + ItemPointerGetBlockNumber(&scan->xs_heaptid), + ItemPointerGetOffsetNumber(&scan->xs_heaptid)); + + /* + * If we advanced to the next batch, release the batch we no + * longer need. The positions is the "read" position, and we can + * compare it to firstBatch. + */ + if (pos->batch != scan->xs_batches->firstBatch) + { + batch = INDEX_SCAN_BATCH(scan, scan->xs_batches->firstBatch); + Assert(batch != NULL); + + /* + * XXX When advancing readPos, the streamPos may get behind as + * we're only advancing it when actually requesting heap blocks. + * But we may not do that often enough - e.g. IOS may not need + * to access all-visible heap blocks, so the read_next callback + * does not get invoked for a long time. It's possible the + * stream gets so mucu behind the position gets invalid, as we + * already removed the batch. But that means we don't need any + * heap blocks until the current read position - if we did, we + * would not be in this situation (or it's a sign of a bug, as + * those two places are expected to be in sync). So if the + * streamPos still points at the batch we're about to free, + * just reset the position - we'll set it to readPos in the + * read_next callback later. + * + * XXX This can happen after the queue gets full, we "pause" + * the stream, and then reset it to continue. But I think that + * just increases the probability of hitting the issue, it's + * just more chance to to not advance the streamPos, which + * depends on when we try to fetch the first heap block after + * calling read_stream_reset(). + */ + if (scan->xs_batches->streamPos.batch == scan->xs_batches->firstBatch) + { + index_batch_pos_reset(scan, &scan->xs_batches->streamPos); + } + + DEBUG_LOG("index_batch_getnext_tid free batch %p firstBatch %d nextBatch %d", + batch, + scan->xs_batches->firstBatch, + scan->xs_batches->nextBatch); + + /* Free the batch (except when it's needed for mark/restore). */ + index_batch_free(scan, batch); + + /* + * In any case, remove the batch from the regular queue, even + * if we kept it for mar/restore. + */ + scan->xs_batches->firstBatch++; + + DEBUG_LOG("index_batch_getnext_tid batch freed firstBatch %d nextBatch %d", + scan->xs_batches->firstBatch, + scan->xs_batches->nextBatch); + + index_batch_print("index_batch_getnext_tid / free old batch", scan); + + /* we can't skip any batches */ + Assert(scan->xs_batches->firstBatch == pos->batch); + } + + return &scan->xs_heaptid; + } + + /* + * We failed to advance, i.e. we ran out of currently loaded batches. + * So if we filled the queue, this is a good time to reset the stream + * (before we try loading the next batch). + */ + if (scan->xs_batches->reset) + { + DEBUG_LOG("resetting read stream pos %d,%d", + scan->xs_batches->readPos.batch, scan->xs_batches->readPos.index); + + scan->xs_batches->reset = false; + + /* + * Need to reset the stream position, it might be too far behind. + * Ultimately we want to set it to readPos, but we can't do that + * yet - readPos still point sat the old batch, so just reset it + * and we'll init it to readPos later in the callback. + */ + index_batch_pos_reset(scan, &scan->xs_batches->streamPos); + + read_stream_reset(scan->xs_heapfetch->rs); + } + + /* + * Failed to advance the read position, so try reading the next batch. + * If this fails, we're done - there's nothing more to load. + * + * Most of the batches should be loaded from read_stream_next_buffer, + * but we need to call index_batch_getnext here too, for two reasons. + * First, the read_stream only gets working after we try fetching the + * first heap tuple, so we need to load the first batch from here. + * Second, while most batches will be preloaded by the stream thank's + * to prefetching, it's possible to set effective_io_concurrency=0, in + * which case all the batch loads happen from here. + */ + if (!index_batch_getnext(scan)) + break; + + DEBUG_LOG("loaded next batch, retry to advance position"); + } + + /* + * If we get here, we failed to advance the position and there are no more + * batches, so we're done. + */ + DEBUG_LOG("no more batches to process"); + + return NULL; +} + +/* + * index_batch_init + * Initialize various fields / arrays needed by batching. + * + * FIXME This is a bit ad-hoc hodge podge, due to how I was adding more and + * more pieces. Some of the fields may be not quite necessary, needs cleanup. + */ +static void +index_batch_init(IndexScanDesc scan) +{ + /* init batching info, assume batching is supported by the AM */ + Assert(scan->indexRelation->rd_indam->amgetbatch != NULL); + Assert(scan->indexRelation->rd_indam->amfreebatch != NULL); + + scan->xs_batches = palloc0(sizeof(IndexScanBatches)); + + /* We don't know direction of the scan yet. */ + scan->xs_batches->direction = NoMovementScanDirection; + + /* Initialize the batch */ + scan->xs_batches->maxBatches = INDEX_SCAN_MAX_BATCHES; + scan->xs_batches->firstBatch = 0; /* first batch */ + scan->xs_batches->nextBatch = 0; /* first batch is empty */ + + scan->xs_batches->batches + = palloc(sizeof(IndexScanBatchData *) * scan->xs_batches->maxBatches); + + /* positions in the queue of batches */ + index_batch_pos_reset(scan, &scan->xs_batches->readPos); + index_batch_pos_reset(scan, &scan->xs_batches->streamPos); + index_batch_pos_reset(scan, &scan->xs_batches->markPos); + + // scan->xs_batches->currentBatch = NULL; +} + +/* + * index_batch_reset + * Reset the batch before reading the next chunk of data. + * + * complete - true means we reset even marked batch + * + * XXX Should this reset the batch memory context, xs_itup, xs_hitup, etc? + */ +static void +index_batch_reset(IndexScanDesc scan, bool complete) +{ + IndexScanBatches *batches = scan->xs_batches; + + /* bail out if batching not enabled */ + if (!batches) + return; + + AssertCheckBatches(scan); + + index_batch_print("index_batch_reset", scan); + + /* With batching enabled, we should have a read stream. Reset it. */ + Assert(scan->xs_heapfetch); + read_stream_reset(scan->xs_heapfetch->rs); + + /* reset the positions */ + index_batch_pos_reset(scan, &batches->readPos); + index_batch_pos_reset(scan, &batches->streamPos); + + /* + * With "complete" reset, make sure to also free the marked batch, either + * by just forgetting it (if it's still in the queue), or by explicitly + * freeing it. + * + * XXX Do this before the loop, so that it calls the amfreebatch(). + */ + if (complete && (batches->markBatch != NULL)) + { + IndexScanBatchPos *pos = &batches->markPos; + IndexScanBatch batch = batches->markBatch; + + /* always reset the position, forget the marked batch */ + batches->markBatch = NULL; + + /* + * If we've already moved past the marked batch (it's not in the + * current queue), free it explicitly. Otherwise it'll be in the freed + * later. + */ + if ((pos->batch < batches->firstBatch) || + (pos->batch >= batches->nextBatch)) + { + index_batch_free(scan, batch); + } + + /* reset position only after the queue range check */ + index_batch_pos_reset(scan, &batches->markPos); + } + + /* release all currently loaded batches */ + while (batches->firstBatch < batches->nextBatch) + { + IndexScanBatch batch = INDEX_SCAN_BATCH(scan, batches->firstBatch); + + DEBUG_LOG("freeing batch %d %p", batches->firstBatch, batch); + + index_batch_free(scan, batch); + + /* update the valid range, so that asserts / debugging works */ + batches->firstBatch++; + } + + /* reset relevant IndexScanBatches fields */ + batches->maxBatches = INDEX_SCAN_MAX_BATCHES; + batches->firstBatch = 0; /* first batch */ + batches->nextBatch = 0; /* first batch is empty */ + + batches->finished = false; + batches->reset = false; + // batches->currentBatch = NULL; + + AssertCheckBatches(scan); +} + +static void +index_batch_kill_item(IndexScanDesc scan) +{ + IndexScanBatchPos *pos = &scan->xs_batches->readPos; + IndexScanBatchData *batch = INDEX_SCAN_BATCH(scan, pos->batch); + + /* FIXME mark item at current readPos as deleted */ + AssertCheckBatchPosValid(scan, pos); + + /* + * XXX Too tied to btree (through MaxTIDsPerBTreePage), we should make + * this AM agnostic. We could maybe even replace this with Bitmapset. It + * might be more expensive if we only kill items at the end of the page + * (in which case we still have to walk the first part to find the bits at + * the end). But given the lower memory usage it still sees like a good + * tradeoff overall. + */ + if (batch->killedItems == NULL) + batch->killedItems = (int *) + palloc(MaxTIDsPerBTreePage * sizeof(int)); + if (batch->numKilled < MaxTIDsPerBTreePage) + batch->killedItems[batch->numKilled++] = pos->index; + + /* elog(WARNING, "index_batch_kill_item (%d,%d)", pos->batch, pos->index); */ + /* FIXME index_batch_kill_item not implemented */ +} + +static void +index_batch_free(IndexScanDesc scan, IndexScanBatch batch) +{ + SCAN_CHECKS; + CHECK_SCAN_PROCEDURE(amfreebatch); + + AssertCheckBatch(scan, batch); + + /* don't free the batch that is marked */ + if (batch == scan->xs_batches->markBatch) + return; + + scan->indexRelation->rd_indam->amfreebatch(scan, batch); +} + +/* */ +static void +index_batch_end(IndexScanDesc scan) +{ + index_batch_reset(scan, true); +} + +IndexScanBatch +index_batch_alloc(int maxitems, bool want_itup) +{ + IndexScanBatch batch = palloc(sizeof(IndexScanBatchData)); + + batch->firstItem = -1; + batch->lastItem = -1; + batch->itemIndex = -1; + + batch->killedItems = NULL; /* FIXME allocate an array, actually */ + batch->numKilled = 0; /* nothing killed yet */ + + /* + * If we are doing an index-only scan, these are the tuple storage + * workspaces for the currPos and markPos respectively. Each is of size + * BLCKSZ, so it can hold as much as a full page's worth of tuples. + * + * XXX allocate + */ + batch->currTuples = NULL; /* tuple storage for currPos */ + if (want_itup) + batch->currTuples = palloc(BLCKSZ); + + /* + * XXX Maybe don't size to MaxTIDsPerBTreePage? We don't reuse batches + * (unlike currPos), so we can size it for just what we need. + */ + batch->items = palloc0(sizeof(IndexScanBatchPosItem) * maxitems); + + /* + * batch contents (TIDs, index tuples, kill bitmap, ...) + * + * XXX allocate as needed? + */ + batch->itups = NULL; /* IndexTuples, if requested */ + batch->htups = NULL; /* HeapTuples, if requested */ + batch->recheck = NULL; /* recheck flags */ + batch->privateData = NULL; /* private data for batch */ + + /* xs_orderbyvals / xs_orderbynulls */ + batch->orderbyvals = NULL; + batch->orderbynulls = NULL; + + /* AM-specific per-batch state */ + batch->opaque = NULL; + + return batch; +} diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index a56c5eceb14..be8e02a9c45 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -217,7 +217,7 @@ table_index_fetch_tuple_check(Relation rel, bool found; slot = table_slot_create(rel, NULL); - scan = table_index_fetch_begin(rel); + scan = table_index_fetch_begin(rel, NULL); found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again, all_dead); table_index_fetch_end(scan); diff --git a/src/backend/commands/constraint.c b/src/backend/commands/constraint.c index 3497a8221f2..8a5d79a27a6 100644 --- a/src/backend/commands/constraint.c +++ b/src/backend/commands/constraint.c @@ -106,7 +106,8 @@ unique_key_recheck(PG_FUNCTION_ARGS) */ tmptid = checktid; { - IndexFetchTableData *scan = table_index_fetch_begin(trigdata->tg_relation); + IndexFetchTableData *scan = table_index_fetch_begin(trigdata->tg_relation, + NULL); bool call_again = false; if (!table_index_fetch_tuple(scan, &tmptid, SnapshotSelf, slot, diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index bdf862b2406..1ec046adeff 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -815,7 +815,17 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index, retry: conflict = false; found_self = false; - index_scan = index_beginscan(heap, index, &DirtySnapshot, NULL, indnkeyatts, 0); + + /* + * It doesn't seem very useful to allow batching/prefetching when checking + * exclusion/uniqueness constraints. We should only find either no or just + * one row, I think. + * + * XXX Maybe there are cases where we could find multiple "candidate" + * rows, e.g. with exclusion constraints? Not sure. + */ + index_scan = index_beginscan(heap, index, &DirtySnapshot, NULL, indnkeyatts, 0, + false); index_rescan(index_scan, scankeys, indnkeyatts, NULL, 0); while (index_getnext_slot(index_scan, ForwardScanDirection, existing_slot)) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 53ddd25c42d..9c7df9b9ccb 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -201,8 +201,13 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, /* Build scan key. */ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); - /* Start an index scan. */ - scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0); + /* + * Start an index scan. + * + * XXX No prefetching for replication identity. We expect to find just one + * row, so prefetching would be pointless. + */ + scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0, false); retry: found = false; diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index f464cca9507..1a14f5faa68 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -49,7 +49,13 @@ static TupleTableSlot *IndexOnlyNext(IndexOnlyScanState *node); static void StoreIndexTuple(IndexOnlyScanState *node, TupleTableSlot *slot, IndexTuple itup, TupleDesc itupdesc); +static bool ios_prefetch_block(IndexScanDesc scan, void *data, + IndexScanBatchPos *pos); +/* values stored in ios_prefetch_block in the batch cache */ +#define IOS_UNKNOWN_VISIBILITY 0 /* default value */ +#define IOS_ALL_VISIBLE 1 +#define IOS_NOT_ALL_VISIBLE 2 /* ---------------------------------------------------------------- * IndexOnlyNext @@ -94,15 +100,26 @@ IndexOnlyNext(IndexOnlyScanState *node) estate->es_snapshot, &node->ioss_Instrument, node->ioss_NumScanKeys, - node->ioss_NumOrderByKeys); + node->ioss_NumOrderByKeys, + node->ioss_CanBatch); node->ioss_ScanDesc = scandesc; - /* Set it up for index-only scan */ node->ioss_ScanDesc->xs_want_itup = true; node->ioss_VMBuffer = InvalidBuffer; + /* + * Set the prefetch callback info, if the scan has batching enabled + * (we only know what after index_beginscan, which also checks which + * callbacks are defined for the AM. + */ + if (scandesc->xs_batches != NULL) + { + scandesc->xs_batches->prefetchCallback = ios_prefetch_block; + scandesc->xs_batches->prefetchArgument = (void *) node; + } + /* * If no run-time keys to calculate or they are ready, go ahead and * pass the scankeys to the index AM. @@ -120,10 +137,42 @@ IndexOnlyNext(IndexOnlyScanState *node) */ while ((tid = index_getnext_tid(scandesc, direction)) != NULL) { + bool all_visible; bool tuple_from_heap = false; CHECK_FOR_INTERRUPTS(); + /* + * Without batching, inspect the VM directly. With batching, we need + * to retrieve the visibility information seen by the read_stream + * callback (or rather by ios_prefetch_block), otherwise the + * read_stream might get out of sync (if the VM got updated since + * then). + */ + if (scandesc->xs_batches == NULL) + { + all_visible = VM_ALL_VISIBLE(scandesc->heapRelation, + ItemPointerGetBlockNumber(tid), + &node->ioss_VMBuffer); + } + else + { + /* + * Reuse the previously determined page visibility info, or + * calculate it now. If we decided not to prefetch the block, the + * page had to be all-visible at that point. The VM bit might have + * changed since then, but the tuple visibility could not have. + * + * XXX It's a bit weird we use the visibility to decide if we + * should skip prefetching the block, and then deduce the + * visibility from that (even if it matches pretty clearly). But + * maybe we could/should have a more direct way to read the + * private state? + */ + all_visible = !ios_prefetch_block(scandesc, node, + &scandesc->xs_batches->readPos); + } + /* * We can skip the heap fetch if the TID references a heap page on * which all tuples are known visible to everybody. In any case, @@ -158,9 +207,7 @@ IndexOnlyNext(IndexOnlyScanState *node) * It's worth going through this complexity to avoid needing to lock * the VM buffer, which could cause significant contention. */ - if (!VM_ALL_VISIBLE(scandesc->heapRelation, - ItemPointerGetBlockNumber(tid), - &node->ioss_VMBuffer)) + if (!all_visible) { /* * Rats, we have to visit the heap to check visibility. @@ -596,6 +643,20 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags) indexstate->recheckqual = ExecInitQual(node->recheckqual, (PlanState *) indexstate); + /* + * All index scans can do batching. + * + * XXX Maybe this should check if the index AM supports batching, or even + * call something like "amcanbatch" (does not exist yet). Or check the + * enable_indexscan_batching GUC? + * + * XXX For now we only know if the scan gets to use batching after the + * index_beginscan() returns, so maybe this name is a bit misleading. It's + * more about "allow batching". But maybe this field is unnecessary - we + * check all the interesting stuff in index_beginscan() anyway. + */ + indexstate->ioss_CanBatch = true; + /* * If we are just doing EXPLAIN (ie, aren't going to run the plan), stop * here. This allows an index-advisor plugin to EXPLAIN a plan containing @@ -783,13 +844,21 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, return; } + /* + * XXX Do we actually want prefetching for parallel index scans? Maybe + * not, but then we need to be careful not to call index_batch_getnext_tid + * (which now can happen, because we'll call IndexOnlyNext even for + * parallel plans). Although, that should not happen, because we only call + * that with (xs_batches != NULL). + */ node->ioss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->ioss_RelationDesc, &node->ioss_Instrument, node->ioss_NumScanKeys, node->ioss_NumOrderByKeys, - piscan); + piscan, + node->ioss_CanBatch); node->ioss_ScanDesc->xs_want_itup = true; node->ioss_VMBuffer = InvalidBuffer; @@ -849,13 +918,15 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, return; } + /* XXX Do we actually want prefetching for parallel index scans? */ node->ioss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->ioss_RelationDesc, &node->ioss_Instrument, node->ioss_NumScanKeys, node->ioss_NumOrderByKeys, - piscan); + piscan, + node->ioss_CanBatch); node->ioss_ScanDesc->xs_want_itup = true; /* @@ -889,3 +960,51 @@ ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node) node->ioss_SharedInfo = palloc(size); memcpy(node->ioss_SharedInfo, SharedInfo, size); } + +/* FIXME duplicate from indexam.c */ +#define INDEX_SCAN_BATCH(scan, idx) \ + ((scan)->xs_batches->batches[(idx) % (scan)->xs_batches->maxBatches]) + +/* + * ios_prefetch_block + * Callback to only prefetch blocks that are not all-visible. + * + * We don't want to inspect the visibility map repeatedly, so the result of + * VM_ALL_VISIBLE is stored in the batch private data. The values are set + * to 0 by default, so we use two constants to remember if all-visible or + * not all-visible. + * + * However, this is not merely a question of performance. The VM may get + * modified during the scan, and we need to make sure the two places (the + * read_next callback and the index_fetch_heap here) make the same decision, + * otherwise we might get out of sync with the stream. For example, the + * callback might find a page is all-visible (and skips reading the block), + * and then someone might update the page, resetting the VM bit. If this + * place attempts to read the page from the stream, it'll fail because it + * will probably receive an entirely different page. + */ +static bool +ios_prefetch_block(IndexScanDesc scan, void *arg, IndexScanBatchPos *pos) +{ + IndexOnlyScanState *node = (IndexOnlyScanState *) arg; + IndexScanBatch batch = INDEX_SCAN_BATCH(scan, pos->batch); + + if (batch->privateData == NULL) + batch->privateData = palloc0(sizeof(Datum) * (batch->lastItem + 1)); + + if (batch->privateData[pos->index] == IOS_UNKNOWN_VISIBILITY) + { + bool all_visible; + ItemPointer tid = &batch->items[pos->index].heapTid; + + all_visible = VM_ALL_VISIBLE(scan->heapRelation, + ItemPointerGetBlockNumber(tid), + &node->ioss_VMBuffer); + + batch->privateData[pos->index] + = all_visible ? IOS_ALL_VISIBLE : IOS_NOT_ALL_VISIBLE; + } + + /* prefetch only blocks that are not all-visible */ + return (batch->privateData[pos->index] == IOS_NOT_ALL_VISIBLE); +} diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 7fcaa37fe62..177d74c2c27 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -111,7 +111,8 @@ IndexNext(IndexScanState *node) estate->es_snapshot, &node->iss_Instrument, node->iss_NumScanKeys, - node->iss_NumOrderByKeys); + node->iss_NumOrderByKeys, + node->iss_CanBatch); node->iss_ScanDesc = scandesc; @@ -201,13 +202,16 @@ IndexNextWithReorder(IndexScanState *node) /* * We reach here if the index scan is not parallel, or if we're * serially executing an index scan that was planned to be parallel. + * + * XXX Should we use batching here? Does it even work for reordering? */ scandesc = index_beginscan(node->ss.ss_currentRelation, node->iss_RelationDesc, estate->es_snapshot, &node->iss_Instrument, node->iss_NumScanKeys, - node->iss_NumOrderByKeys); + node->iss_NumOrderByKeys, + false); node->iss_ScanDesc = scandesc; @@ -965,6 +969,18 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags) indexstate->indexorderbyorig = ExecInitExprList(node->indexorderbyorig, (PlanState *) indexstate); + /* + * All index scans can do batching. + * + * XXX Maybe this should check if the index AM supports batching, or even + * call something like "amcanbatch" (does not exist yet). Or check the + * enable_indexscan_batching GUC? + * + * XXX Well, we disable batching for reordering, so maybe we should check + * that here instead? But maybe it's unnecessary limitation? + */ + indexstate->iss_CanBatch = true; + /* * If we are just doing EXPLAIN (ie, aren't going to run the plan), stop * here. This allows an index-advisor plugin to EXPLAIN a plan containing @@ -1719,13 +1735,17 @@ ExecIndexScanInitializeDSM(IndexScanState *node, return; } + /* + * XXX Do we actually want prefetching for parallel index scans? + */ node->iss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc, &node->iss_Instrument, node->iss_NumScanKeys, node->iss_NumOrderByKeys, - piscan); + piscan, + node->iss_CanBatch); /* * If no run-time keys to calculate or they are ready, go ahead and pass @@ -1783,13 +1803,17 @@ ExecIndexScanInitializeWorker(IndexScanState *node, return; } + /* + * XXX Do we actually want prefetching for parallel index scans? + */ node->iss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc, &node->iss_Instrument, node->iss_NumScanKeys, node->iss_NumOrderByKeys, - piscan); + piscan, + node->iss_CanBatch); /* * If no run-time keys to calculate or they are ready, go ahead and pass diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c index 987f2154459..30947649bc7 100644 --- a/src/backend/utils/adt/selfuncs.c +++ b/src/backend/utils/adt/selfuncs.c @@ -6665,9 +6665,14 @@ get_actual_variable_endpoint(Relation heapRel, InitNonVacuumableSnapshot(SnapshotNonVacuumable, GlobalVisTestFor(heapRel)); + /* + * XXX I'm not sure about batching/prefetching here. In most cases we + * expect to find the endpoints immediately, but sometimes we have a lot + * of dead tuples - and then prefetching might help. + */ index_scan = index_beginscan(heapRel, indexRel, &SnapshotNonVacuumable, NULL, - 1, 0); + 1, 0, false); /* Set it up for index-only scan */ index_scan->xs_want_itup = true; index_rescan(index_scan, scankeys, 1, NULL, 0); diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 60b12446a1c..2d6438f3259 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -809,6 +809,16 @@ struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_indexscan_batching", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of index-scan batching."), + NULL, + GUC_EXPLAIN + }, + &enable_indexscan_batching, + true, + NULL, NULL, NULL + }, { {"enable_indexonlyscan", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of index-only-scan plans."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 34826d01380..649df2b06a0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -415,6 +415,7 @@ #enable_hashjoin = on #enable_incremental_sort = on #enable_indexscan = on +#enable_indexscan_batching = on #enable_indexonlyscan = on #enable_material = on #enable_memoize = on diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h index 52916bab7a3..0028bb55843 100644 --- a/src/include/access/amapi.h +++ b/src/include/access/amapi.h @@ -196,6 +196,14 @@ typedef void (*amrescan_function) (IndexScanDesc scan, typedef bool (*amgettuple_function) (IndexScanDesc scan, ScanDirection direction); +/* next batch of valid tuples */ +typedef IndexScanBatch(*amgetbatch_function) (IndexScanDesc scan, + ScanDirection direction); + +/* release batch of valid tuples */ +typedef void (*amfreebatch_function) (IndexScanDesc scan, + IndexScanBatch batch); + /* fetch all valid tuples */ typedef int64 (*amgetbitmap_function) (IndexScanDesc scan, TIDBitmap *tbm); @@ -307,6 +315,8 @@ typedef struct IndexAmRoutine ambeginscan_function ambeginscan; amrescan_function amrescan; amgettuple_function amgettuple; /* can be NULL */ + amgetbatch_function amgetbatch; /* can be NULL */ + amfreebatch_function amfreebatch; /* can be NULL */ amgetbitmap_function amgetbitmap; /* can be NULL */ amendscan_function amendscan; ammarkpos_function ammarkpos; /* can be NULL */ diff --git a/src/include/access/genam.h b/src/include/access/genam.h index 5b2ab181b5f..8bef942b11d 100644 --- a/src/include/access/genam.h +++ b/src/include/access/genam.h @@ -15,6 +15,7 @@ #define GENAM_H #include "access/htup.h" +#include "access/itup.h" #include "access/sdir.h" #include "access/skey.h" #include "nodes/tidbitmap.h" @@ -111,6 +112,7 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state); /* struct definitions appear in relscan.h */ typedef struct IndexScanDescData *IndexScanDesc; +typedef struct IndexScanBatchData *IndexScanBatch; typedef struct SysScanDescData *SysScanDesc; typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc; @@ -155,6 +157,8 @@ typedef struct IndexOrderByDistance * generalized index_ interface routines (in indexam.c) */ +extern PGDLLIMPORT bool enable_indexscan_batching; + /* * IndexScanIsValid * True iff the index scan is valid. @@ -179,7 +183,8 @@ extern IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, - int nkeys, int norderbys); + int nkeys, int norderbys, + bool enable_batching); extern IndexScanDesc index_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, @@ -205,7 +210,8 @@ extern IndexScanDesc index_beginscan_parallel(Relation heaprel, Relation indexrel, IndexScanInstrumentation *instrument, int nkeys, int norderbys, - ParallelIndexScanDesc pscan); + ParallelIndexScanDesc pscan, + bool enable_batching); extern ItemPointer index_getnext_tid(IndexScanDesc scan, ScanDirection direction); struct TupleTableSlot; @@ -213,7 +219,6 @@ extern bool index_fetch_heap(IndexScanDesc scan, struct TupleTableSlot *slot); extern bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, struct TupleTableSlot *slot); extern int64 index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap); - extern IndexBulkDeleteResult *index_bulk_delete(IndexVacuumInfo *info, IndexBulkDeleteResult *istat, IndexBulkDeleteCallback callback, @@ -231,7 +236,7 @@ extern void index_store_float8_orderby_distances(IndexScanDesc scan, bool recheckOrderBy); extern bytea *index_opclass_options(Relation indrel, AttrNumber attnum, Datum attoptions, bool validate); - +extern IndexScanBatch index_batch_alloc(int maxitems, bool want_itup); /* * index access method support routines (in genam.c) diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index b5e0fb386c0..b63af845ca6 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -16,9 +16,11 @@ #include "access/htup_details.h" #include "access/itup.h" +#include "access/sdir.h" #include "nodes/tidbitmap.h" #include "port/atomics.h" #include "storage/buf.h" +#include "storage/read_stream.h" #include "storage/relfilelocator.h" #include "storage/spin.h" #include "utils/relcache.h" @@ -121,10 +123,162 @@ typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker; typedef struct IndexFetchTableData { Relation rel; + ReadStream *rs; } IndexFetchTableData; struct IndexScanInstrumentation; +/* Forward declaration, the prefetch callback needs IndexScanDescData. */ +typedef struct IndexScanBatchData IndexScanBatchData; + +/* + * XXX parts of BTScanOpaqueData, BTScanPosItem and BTScanPosData relevant + * for one batch. + */ +typedef struct IndexScanBatchPosItem /* what we remember about each match */ +{ + ItemPointerData heapTid; /* TID of referenced heap item */ + OffsetNumber indexOffset; /* index item's location within page */ + LocationIndex tupleOffset; /* IndexTuple's offset in workspace, if any */ +} IndexScanBatchPosItem; + +/* + * Data about one batch of items returned by the index AM. This is similar + * to the AM-specific "opaque" structs, used by each AM to track items + * loaded from one leaf page, but generalized for all AMs. + * + * XXX Not sure which of there fields are 100% needed for all index AMs, + * most of this comes from nbtree. + * + * XXX Mostly a copy of BTScanPosData, but other AMs may need different (or + * only some of those) fields. + */ +typedef struct IndexScanBatchData +{ + /* + * AM-specific concept of position within the index, and other stuff the + * AM might need to store for each batch. + * + * XXX maybe "position" is not the best name, it can have other stuff the + * AM needs to keep per-batch (even only for reading the leaf items, like + * nextTupleOffset). + */ + void *opaque; + + /* + * The items array is always ordered in index order (ie, increasing + * indexoffset). When scanning backwards it is convenient to fill the + * array back-to-front, so we start at the last slot and fill downwards. + * Hence we need both a first-valid-entry and a last-valid-entry counter. + * itemIndex is a cursor showing which entry was last returned to caller. + * + * XXX Do we need all these indexes, or would it be enough to have just + * 0-indexed array with only itemIndex? + */ + int firstItem; /* first valid index in items[] */ + int lastItem; /* last valid index in items[] */ + int itemIndex; /* current index in items[] */ + + /* info about killed items if any (killedItems is NULL if never used) */ + int *killedItems; /* indexes of killed items */ + int numKilled; /* number of currently stored items */ + + /* + * If we are doing an index-only scan, these are the tuple storage + * workspaces for the currPos and markPos respectively. Each is of size + * BLCKSZ, so it can hold as much as a full page's worth of tuples. + * + * XXX maybe currTuples should be part of the am-specific per-batch state + * stored in "position" field? + */ + char *currTuples; /* tuple storage for currPos */ + IndexScanBatchPosItem *items; /* XXX don't size to MaxTIDsPerBTreePage */ + + /* + * batch contents (TIDs, index tuples, kill bitmap, ...) + * + * XXX Shouldn't this be part of the "IndexScanBatchPosItem" struct? To + * keep everything in one place? Or why should we have separate arrays? + * One advantage is that we don't need to allocate memory for arrays that + * we don't need ... e.g. if we don't need heap tuples, we don't allocate + * that. We couldn't do that with everything in one struct. + */ + IndexTuple *itups; /* IndexTuples, if requested */ + HeapTuple *htups; /* HeapTuples, if requested */ + bool *recheck; /* recheck flags */ + + /* XXX why do we need this on top of "opaque" pointer? */ + Datum *privateData; /* private data for batch */ + + /* xs_orderbyvals / xs_orderbynulls */ + Datum *orderbyvals; + bool *orderbynulls; + +} IndexScanBatchData; + +/* + * Position in the queue of batches - index of a batch, index of item in a batch. + */ +typedef struct IndexScanBatchPos +{ + int batch; + int index; +} IndexScanBatchPos; + +typedef struct IndexScanDescData IndexScanDescData; +typedef bool (*IndexPrefetchCallback) (IndexScanDescData * scan, void *arg, IndexScanBatchPos *pos); + +/* + * Queue + */ +typedef struct IndexScanBatches +{ + /* + * Did we read the last batch? The batches may be loaded from multiple + * places, and we need to remember when we fail to load the next batch in + * a given scan (which means "no more batches"). amgetbatch may restart + * the scan on the get call, so we need to remember it's over. + */ + bool finished; + bool reset; + + /* + * Current scan direction, for the currently loaded batches. This is used + * to load data in the read stream API callback, etc. + * + * XXX May need some work to use already loaded batches after change of + * direction, instead of just throwing everything away. May need to reset + * the stream but keep the batches? + */ + ScanDirection direction; + + /* positions in the queue of batches (batch + item) */ + IndexScanBatchPos readPos; /* read position */ + IndexScanBatchPos streamPos; /* prefetch position (for read stream API) */ + IndexScanBatchPos markPos; /* mark/restore position */ + + IndexScanBatchData *markBatch; + // IndexScanBatchData *currentBatch; + + /* + * Array of batches returned by the AM. The array has a capacity (but can + * be resized if needed). The firstBatch is an index of the first batch, + * but needs to be translated by (modulo maxBatches) into index in the + * batches array. + * + * FIXME Maybe these fields should be uint32, or something like that? + */ + int maxBatches; /* size of the batches array */ + int firstBatch; /* first used batch slot */ + int nextBatch; /* next empty batch slot */ + + IndexScanBatchData **batches; + + /* callback to skip prefetching in IOS etc. */ + IndexPrefetchCallback prefetchCallback; + void *prefetchArgument; +} IndexScanBatches; + /* * We use the same IndexScanDescData structure for both amgettuple-based * and amgetbitmap-based index scans. Some fields are only relevant in @@ -176,6 +330,12 @@ typedef struct IndexScanDescData bool xs_recheck; /* T means scan keys must be rechecked */ + /* + * Batches index scan keep a list of batches loaded from the index in a + * circular buffer. + */ + IndexScanBatches *xs_batches; + /* * When fetching with an ordering operator, the values of the ORDER BY * expressions of the last returned tuple, according to the index. If diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8713e12cbfb..5bed359cf13 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -413,8 +413,14 @@ typedef struct TableAmRoutine * structure with additional information. * * Tuples for an index scan can then be fetched via index_fetch_tuple. + * + * The ReadStream pointer is optional - NULL means the regular buffer + * reads are used. If a valid ReadStream is provided, the callback + * (generating the blocks to read) and index_fetch_tuple (consuming the + * buffers) need to agree on the exact order. */ - struct IndexFetchTableData *(*index_fetch_begin) (Relation rel); + struct IndexFetchTableData *(*index_fetch_begin) (Relation rel, + ReadStream *rs); /* * Reset index fetch. Typically this will release cross index fetch @@ -1149,9 +1155,9 @@ table_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) * Tuples for an index scan can then be fetched via table_index_fetch_tuple(). */ static inline IndexFetchTableData * -table_index_fetch_begin(Relation rel) +table_index_fetch_begin(Relation rel, ReadStream *rs) { - return rel->rd_tableam->index_fetch_begin(rel); + return rel->rd_tableam->index_fetch_begin(rel, rs); } /* diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 5b6cadb5a6c..ef672e203d0 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1697,6 +1697,7 @@ typedef struct * OrderByTypByVals is the datatype of order by expression pass-by-value? * OrderByTypLens typlens of the datatypes of order by expressions * PscanLen size of parallel index scan descriptor + * CanBatch batching (and prefetching) enabled * ---------------- */ typedef struct IndexScanState @@ -1726,6 +1727,10 @@ typedef struct IndexScanState bool *iss_OrderByTypByVals; int16 *iss_OrderByTypLens; Size iss_PscanLen; + + /* batching/prefetching enabled? */ + bool iss_CanBatch; + } IndexScanState; /* ---------------- @@ -1749,6 +1754,7 @@ typedef struct IndexScanState * PscanLen size of parallel index-only scan descriptor * NameCStringAttNums attnums of name typed columns to pad to NAMEDATALEN * NameCStringCount number of elements in the NameCStringAttNums array + * CanBatch batching (and prefetching) enabled * ---------------- */ typedef struct IndexOnlyScanState @@ -1772,6 +1778,7 @@ typedef struct IndexOnlyScanState Size ioss_PscanLen; AttrNumber *ioss_NameCStringAttNums; int ioss_NameCStringCount; + bool ioss_CanBatch; } IndexOnlyScanState; /* ---------------- diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index ae17d028ed3..220b61fad2d 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -158,6 +158,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_incremental_sort | on enable_indexonlyscan | on enable_indexscan | on + enable_indexscan_batching | on enable_material | on enable_memoize | on enable_mergejoin | on @@ -172,7 +173,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(24 rows) +(25 rows) -- There are always wait event descriptions for various types. InjectionPoint -- may be present or absent, depending on history since last postmaster start. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e5879e00dff..060d964e399 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1260,6 +1260,10 @@ IndexOrderByDistance IndexPath IndexRuntimeKeyInfo IndexScan +IndexScanBatchData +IndexScanBatchPos +IndexScanBatchPosItem +IndexScanBatches IndexScanDesc IndexScanInstrumentation IndexScanState @@ -3396,6 +3400,7 @@ amendscan_function amestimateparallelscan_function amgetbitmap_function amgettuple_function +amgetbatch_function aminitparallelscan_function aminsert_function aminsertcleanup_function -- 2.49.0