From a095d26e1b5a361a7d42300e5364da948496f2ba Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Mon, 23 Mar 2026 18:21:47 +0900 Subject: [PATCH v6 3/5] Add batch table AM API and heapam implementation Introduce table AM callbacks for batched tuple fetching: scan_begin_batch, scan_getnextbatch, scan_reset_batch, and scan_end_batch. AMs implement all four or none; checked by table_supports_batching(). scan_reset_batch releases held resources (e.g. buffer pins) without freeing, allowing reuse across rescans. Provide the heapam implementation. HeapPageBatch (stored in RowBatch.am_payload) is a thin slice descriptor over the scan's rs_vistuples[] array, which was introduced in the previous commit. Rather than owning a copy of tuple headers, HeapPageBatch holds a pointer into scan->rs_vistuples[] for the current slice and a buffer pin for the current page. heap_getnextbatch() calls heap_prepare_pagescan() to populate rs_vistuples[] for each new page, then re-points hb->tuples to the next slice of rs_vistuples[] on each call. If the page has more tuples than the executor's max_rows, subsequent calls return the next slice without re-entering page preparation. The buffer pin is held until the page is fully consumed. scan_begin_batch creates a single TupleTableSlot with TTSOpsBufferHeapTuple ops. heap_repoint_slot() re-points this slot to each tuple in turn via ExecStoreBufferHeapTuple(). Consumers that need to retain the slot across calls rely on the normal slot materialization contract. Reviewed-by: Daniil Davydov <3danissimo@gmail.com> Reviewed-by: ChangAo Chen <2624345507@qq.com> Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com --- src/backend/access/heap/heapam.c | 229 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 8 +- src/include/access/heapam.h | 33 ++++ src/include/access/tableam.h | 136 ++++++++++++++ src/include/pgstat.h | 4 +- 5 files changed, 403 insertions(+), 7 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c6d0aacc5c9..e70c0ccbe82 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -43,6 +43,7 @@ #include "catalog/pg_database.h" #include "catalog/pg_database_d.h" #include "commands/vacuum.h" +#include "executor/execRowBatch.h" #include "pgstat.h" #include "port/pg_bitutils.h" #include "storage/lmgr.h" @@ -109,6 +110,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); +static void heap_repoint_slot(RowBatch *b, int idx); /* @@ -1213,7 +1215,7 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_cbuf = InvalidBuffer; /* - * Disable page-at-a-time mode if it's not a MVCC-safe snapshot. + * Disable page-at-a-time mode if the snapshot does not allow it. */ if (!(snapshot && IsMVCCSnapshot(snapshot))) scan->rs_base.rs_flags &= ~SO_ALLOW_PAGEMODE; @@ -1463,7 +1465,7 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction) * the proper return buffer and return the tuple. */ - pgstat_count_heap_getnext(scan->rs_base.rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd, 1); return &scan->rs_ctup; } @@ -1491,13 +1493,232 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s * the proper return buffer and return the tuple. */ - pgstat_count_heap_getnext(scan->rs_base.rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd, 1); ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, scan->rs_cbuf); return true; } +/*---------- Batching support -----------*/ + +static const RowBatchOps RowBatchHeapOps = +{ + .repoint_slot = heap_repoint_slot +}; + +/* + * heap_batch_feasible + * Batching requires a MVCC snapshot since it relies on + * page-at-a-time mode, which heap_beginscan() disables for + * non-MVCC snapshots. + */ +bool +heap_batch_feasible(Relation relation, Snapshot snapshot) +{ + return snapshot && IsMVCCSnapshot(snapshot); +} + +/* + * heap_begin_batch + * Initialize AM-side batch state for a heap scan. + * + * Allocates a HeapPageBatch, which acts as a thin slice descriptor over + * the scan's rs_vistuples[] array. Unlike the previous version there is + * no separate tuple header storage in HeapPageBatch itself; rs_vistuples[] + * in HeapScanDescData (populated by page_collect_tuples() via + * heap_prepare_pagescan()) serves as the page-level buffer. HeapPageBatch + * holds a pointer into that array for the current slice and the buffer pin + * for the current page. + * + * b->slot must be a TTSOpsBufferHeapTuple slot. + */ +void +heap_begin_batch(TableScanDesc sscan, RowBatch *b) +{ + HeapPageBatch *hb; + + /* Batch path relies on executor-level qual eval, not AM scan keys */ + Assert(sscan->rs_nkeys == 0); + Assert(TTS_IS_BUFFERTUPLE(b->slot)); + + hb = palloc(sizeof(HeapPageBatch)); + hb->tuples = NULL; + hb->ntuples = 0; + hb->nextitem = 0; + hb->buf = InvalidBuffer; + + b->am_payload = hb; + b->ops = &RowBatchHeapOps; +} + +/* + * heap_reset_batch + * Release pin and reset for rescan, keeping allocations. + */ +void +heap_reset_batch(TableScanDesc sscan, RowBatch *b) +{ + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + + Assert(hb != NULL); + if (BufferIsValid(hb->buf)) + { + ReleaseBuffer(hb->buf); + hb->buf = InvalidBuffer; + } + hb->ntuples = 0; + hb->nextitem = 0; +} + +/* + * heap_end_batch + * Release all batch resources. + */ +void +heap_end_batch(TableScanDesc sscan, RowBatch *b) +{ + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + + if (BufferIsValid(hb->buf)) + ReleaseBuffer(hb->buf); + + pfree(hb); + b->am_payload = NULL; +} + +/* + * heap_getnextbatch + * Fetch the next slice of visible tuples from a heap scan. + * + * Serves slices from the current page's rs_vistuples[] array. If the + * current page has remaining tuples, sets hb->tuples to point at the next + * slice without re-entering the page scan. If the page is exhausted, + * advances to the next page via heap_fetch_next_buffer(), prepares it + * with heap_prepare_pagescan(), and serves the first slice from it. + * + * hb->tuples points directly into scan->rs_vistuples[]; the entries remain + * valid as long as hb->buf (the page's buffer pin) is held. The pin is + * released at the top of the next call once the page is fully consumed. + * + * Each call returns at most b->max_rows tuples. + * + * Returns true if tuples were fetched, false at end of scan. + */ +bool +heap_getnextbatch(TableScanDesc sscan, RowBatch *b, ScanDirection dir) +{ + HeapScanDesc scan = (HeapScanDesc) sscan; + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + int remaining; + int nserve; + + Assert(ScanDirectionIsForward(dir)); + Assert(sscan->rs_flags & SO_ALLOW_PAGEMODE); + + /* + * Try to serve from the current page first. No page advance, no buffer + * management, no re-entry into heap code. + */ + remaining = scan->rs_ntuples - hb->nextitem; + if (remaining > 0) + { + nserve = Min(remaining, b->max_rows); + + hb->tuples = &scan->rs_vistuples[hb->nextitem]; + hb->ntuples = nserve; + hb->nextitem += nserve; + + b->nrows = nserve; + b->pos = 0; + + pgstat_count_heap_getnext(sscan->rs_rd, nserve); + return true; + } + + /* + * Current page exhausted. Advance to the next page with visible tuples. + */ + for (;;) + { + /* + * Release the previous page's pin. The page is fully consumed at + * this point -- all slices have been served. + */ + if (BufferIsValid(hb->buf)) + { + ReleaseBuffer(hb->buf); + hb->buf = InvalidBuffer; + } + + heap_fetch_next_buffer(scan, dir); + + if (!BufferIsValid(scan->rs_cbuf)) + { + /* End of scan */ + scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; + scan->rs_inited = false; + b->nrows = 0; + return false; + } + + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); + + /* + * Prepare the page: prune, run visibility checks, and populate + * scan->rs_vistuples[0..rs_ntuples-1] via page_collect_tuples(). + */ + heap_prepare_pagescan(sscan); + + if (scan->rs_ntuples > 0) + { + /* + * Pin the page so tuple data stays valid while the executor + * processes slices. Released at the top of the next call + * once the page is fully consumed. + */ + IncrBufferRefCount(scan->rs_cbuf); + hb->buf = scan->rs_cbuf; + + nserve = Min(scan->rs_ntuples, b->max_rows); + + hb->tuples = &scan->rs_vistuples[0]; + hb->ntuples = nserve; + hb->nextitem = nserve; + + b->nrows = nserve; + b->pos = 0; + + pgstat_count_heap_getnext(sscan->rs_rd, nserve); + return true; + } + + /* Empty page (all dead/invisible tuples), try next */ + } +} + +/* + * heap_repoint_slot + * Re-point the batch's single slot to the tuple at index idx. + * + * Called by RowBatchGetNextSlot() for each tuple served to the parent + * node. hb->tuples[idx] was populated by page_collect_tuples() via + * heap_prepare_pagescan() and remains valid as long as hb->buf is pinned. + */ +static void +heap_repoint_slot(RowBatch *b, int idx) +{ + HeapPageBatch *hb = (HeapPageBatch *) b->am_payload; + + Assert(idx >= 0 && idx < hb->ntuples); + Assert(TTS_IS_BUFFERTUPLE(b->slot)); + + ExecStoreBufferHeapTuple(&hb->tuples[idx], b->slot, hb->buf); +} + +/*----- End of batching support -----*/ + void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, ItemPointer maxtid) @@ -1639,7 +1860,7 @@ heap_getnextslot_tidrange(TableScanDesc sscan, ScanDirection direction, * if we get here it means we have a new current scan tuple, so point to * the proper return buffer and return the tuple. */ - pgstat_count_heap_getnext(scan->rs_base.rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd, 1); ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, scan->rs_cbuf); return true; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 2fd120028bb..8124d573ac3 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2348,7 +2348,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, ExecStoreBufferHeapTuple(tuple, slot, hscan->rs_cbuf); /* Count successfully-fetched tuples as heap fetches */ - pgstat_count_heap_getnext(scan->rs_rd); + pgstat_count_heap_getnext(scan->rs_rd, 1); return true; } @@ -2637,6 +2637,12 @@ static const TableAmRoutine heapam_methods = { .scan_rescan = heap_rescan, .scan_getnextslot = heap_getnextslot, + .scan_batch_feasible = heap_batch_feasible, + .scan_begin_batch = heap_begin_batch, + .scan_getnextbatch = heap_getnextbatch, + .scan_end_batch = heap_end_batch, + .scan_reset_batch = heap_reset_batch, + .scan_set_tidrange = heap_set_tidrange, .scan_getnextslot_tidrange = heap_getnextslot_tidrange, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 09b9566d0ac..0783fa13c4c 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -107,6 +107,32 @@ typedef struct HeapScanDescData } HeapScanDescData; typedef struct HeapScanDescData *HeapScanDesc; +/* + * HeapPageBatch -- heapam-private page-level batch state. + * + * Thin slice descriptor over the scan's rs_vistuples[] array. Rather + * than owning a copy of tuple headers, HeapPageBatch holds a pointer + * into scan->rs_vistuples[] for the current slice, which was populated + * by page_collect_tuples() during heap_prepare_pagescan(). + * + * The executor consumes tuples in slices. Each heap_getnextbatch call + * re-points tuples to the next slice and advances nextitem, serving up + * to RowBatch.max_rows tuples from the current page before advancing + * to the next. + * + * buf holds the pin for the current page. tuple data referenced via + * tuples remains valid as long as buf is pinned. + * + * Stored in RowBatch.am_payload. + */ +typedef struct HeapPageBatch +{ + HeapTupleData *tuples; /* points into scan->rs_vistuples[nextitem] */ + int ntuples; /* tuples in current slice */ + int nextitem; /* next unserved tuple index in rs_vistuples[] */ + Buffer buf; /* pinned buffer for current page */ +} HeapPageBatch; + typedef struct BitmapHeapScanDescData { HeapScanDescData rs_heap_base; @@ -362,6 +388,13 @@ extern void heap_endscan(TableScanDesc sscan); extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction); extern bool heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot); + +extern bool heap_batch_feasible(Relation relation, Snapshot snapshot); +extern void heap_begin_batch(TableScanDesc sscan, RowBatch *batch); +extern bool heap_getnextbatch(TableScanDesc sscan, RowBatch *batch, ScanDirection dir); +extern void heap_end_batch(TableScanDesc sscan, RowBatch *batch); +extern void heap_reset_batch(TableScanDesc sscan, RowBatch *batch); + extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, ItemPointer maxtid); extern bool heap_getnextslot_tidrange(TableScanDesc sscan, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 06084752245..a72be111c26 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -275,6 +275,8 @@ typedef void (*IndexBuildCallback) (Relation index, bool tupleIsAlive, void *state); +typedef struct RowBatch RowBatch; + /* * API struct for a table AM. Note this must be allocated in a * server-lifetime manner, typically as a static const struct, which then gets @@ -351,6 +353,56 @@ typedef struct TableAmRoutine ScanDirection direction, TupleTableSlot *slot); + /* ------------------------------------------------------------------------ + * Batched scan support + * ------------------------------------------------------------------------ + */ + + /* + * Returns true if the AM can support batching for a scan with the + * given snapshot. Called at plan init time before the scan descriptor + * exists. AMs that have no snapshot-based restrictions can omit this + * callback, in which case batching is considered feasible. + */ + bool (*scan_batch_feasible)(Relation relation, Snapshot snapshot); + + /* + * Initialize AM-owned batch state for a scan. Called once before + * the first scan_getnextbatch call. The AM allocates whatever + * private state it needs and stores it in b->am_payload. b->slot + * is the scan node's ss_ScanTupleSlot, whose type was already + * determined by the AM via table_slot_callbacks(). The AM's + * repoint_slot callback re-points it to each tuple in the batch + * in turn. Future interfaces may allow the AM to expose batch + * data in other forms without going through a slot. + */ + void (*scan_begin_batch)(TableScanDesc sscan, RowBatch *b); + + /* + * Fetch the next batch of tuples from the scan into b. Sets b->nrows + * to the number of tuples available and resets b->pos to 0. Returns + * true if any tuples were fetched, false at end of scan. The caller + * advances through the batch via RowBatchGetNextSlot(), which calls + * ops->repoint_slot for each position up to b->nrows. + */ + bool (*scan_getnextbatch)(TableScanDesc sscan, RowBatch *b, + ScanDirection dir); + + /* + * Release all AM-owned batch resources, including any buffer pins + * held in am_payload. Called when the scan node is shut down. + * After this call b->am_payload must not be used. + */ + void (*scan_end_batch)(TableScanDesc sscan, RowBatch *b); + + /* + * Reset batch state for rescan. Release any held resources (e.g. + * buffer pins) and reset counts, but keep the allocation so the + * next getnextbatch call can reuse it without re-entering + * begin_batch. + */ + void (*scan_reset_batch)(TableScanDesc sscan, RowBatch *b); + /*----------- * Optional functions to provide scanning for ranges of ItemPointers. * Implementations must either provide both of these functions, or neither @@ -1047,6 +1099,90 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); } +/* + * table_supports_batching + * Does the relation's AM support batching? + */ +static inline bool +table_supports_batching(Relation relation, Snapshot snapshot) +{ + const TableAmRoutine *tam = relation->rd_tableam; + + if (tam->scan_getnextbatch == NULL) + return false; + + Assert(tam->scan_begin_batch != NULL); + Assert(tam->scan_reset_batch != NULL); + Assert(tam->scan_end_batch != NULL); + + /* + * Optional: AM may restrict batching based on snapshot or other conditions. + */ + if (tam->scan_batch_feasible != NULL && + !tam->scan_batch_feasible(relation, snapshot)) + return false; + + return true; +} + +/* + * table_scan_begin_batch + * Allocate AM-owned batch payload in the RowBatch + */ +static inline void +table_scan_begin_batch(TableScanDesc sscan, RowBatch *b) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(tam->scan_begin_batch != NULL); + + return tam->scan_begin_batch(sscan, b); +} + +/* + * table_scan_getnextbatch + * Fetch the next batch of tuples from the AM. Returns true if tuples + * were fetched, false at end of scan. Only forward scans are supported. + */ +static inline bool +table_scan_getnextbatch(TableScanDesc sscan, RowBatch *b, ScanDirection dir) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(ScanDirectionIsForward(dir)); + Assert(tam->scan_getnextbatch != NULL); + + return tam->scan_getnextbatch(sscan, b, dir); +} + +/* + * table_scan_end_batch + * Release AM-owned resources for the batch payload. + */ +static inline void +table_scan_end_batch(TableScanDesc sscan, RowBatch *b) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(tam->scan_end_batch != NULL); + + tam->scan_end_batch(sscan, b); +} + +/* + * table_scan_reset_batch + * Reset AM-owned batch state for rescan without freeing. + */ +static inline void +table_scan_reset_batch(TableScanDesc sscan, RowBatch *b) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(tam->scan_reset_batch != NULL); + + tam->scan_reset_batch(sscan, b); +} + /* ---------------------------------------------------------------------------- * TID Range scanning related functions. * ---------------------------------------------------------------------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 216b93492ba..0344c4e88c3 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -695,10 +695,10 @@ extern void pgstat_report_analyze(Relation rel, if (pgstat_should_count_relation(rel)) \ (rel)->pgstat_info->counts.numscans++; \ } while (0) -#define pgstat_count_heap_getnext(rel) \ +#define pgstat_count_heap_getnext(rel, n) \ do { \ if (pgstat_should_count_relation(rel)) \ - (rel)->pgstat_info->counts.tuples_returned++; \ + (rel)->pgstat_info->counts.tuples_returned += (n); \ } while (0) #define pgstat_count_heap_fetch(rel) \ do { \ -- 2.47.3