From 3318650e720a01cbd5948349b9fbcdbb8ddda7cf Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Mon, 1 Sep 2025 21:56:17 +0900 Subject: [PATCH v2 1/8] Add batch table AM API and heapam implementation Introduce new table AM callbacks to fetch multiple tuples per call. This reduces per-tuple call overhead by letting executor nodes work in batches. Define a HeapBatch structure and supporting code in tableam.h. Batches are limited to tuples from a single page and at most EXEC_BATCH_ROWS (currently 64) entries. Provide initial heapam support with heapgettup_pagemode_batch(). No executor node is switched over yet; a later commit will adapt SeqScan to use this API. Other nodes may adopt it in the future. Also add pgstat_count_heap_getnext_batch() to record batched fetches in pgstat. --- src/backend/access/heap/heapam.c | 212 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 4 + src/include/access/heapam.h | 21 +++ src/include/access/tableam.h | 58 +++++++ src/include/pgstat.h | 5 + 5 files changed, 299 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ed0c0c2dc9f..f62f7edbf5e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1008,7 +1008,7 @@ heapgettup_pagemode(HeapScanDesc scan, int nkeys, ScanKey key) { - HeapTuple tuple = &(scan->rs_ctup); + HeapTuple tuple = &scan->rs_ctup; Page page; uint32 lineindex; uint32 linesleft; @@ -1089,6 +1089,121 @@ continue_page: scan->rs_inited = false; } +/* + * heapgettup_pagemode_batch + * Collect up to 'maxitems' visible tuples from a single page in page mode. + * + * This function returns a *batch* of tuples from one heap page. If the + * current page (as tracked by the scan desc) has no more tuples left, + * it will advance to the next page and prepare it (via heap_prepare_pagescan). + * It will not cross a page boundary while filling the batch. + * + * Return value: + * number of tuples written into 'tdata' (0 at end-of-scan). + * + * Side effects: + * - Ensures rs_cbuf pins the page from which tuples were produced. + * - Sets rs_cblock, rs_cindex, rs_ntuples consistently (same as + * heapgettup_pagemode’s inner-loop effects). + * - Does *not* change buffer pin counts except through normal page + * transitions performed by heap_fetch_next_buffer(). + */ +static int +heapgettup_pagemode_batch(HeapScanDesc scan, + ScanDirection dir, + int nkeys, ScanKey key, + HeapTupleData *tdata, + int maxitems) +{ + Page page; + uint32 lineindex; + uint32 linesleft; + int nout = 0; + + Assert(ScanDirectionIsForward(dir)); + Assert(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE); + Assert(maxitems > 0); + + /* + * If we have no current page (or the current page is exhausted), + * advance to the next page that has any visible tuples and prepare it. + * This mirrors the outer loop of heapgettup_pagemode(), but we stop + * as soon as we have a prepared page; we never produce from two pages. + */ + for (;;) + { + if (BufferIsValid(scan->rs_cbuf)) + { + /* Are there more visible tuples left on this page? */ + lineindex = scan->rs_cindex + dir; + if (ScanDirectionIsForward(dir)) + linesleft = (lineindex <= (uint32) scan->rs_ntuples) ? + (scan->rs_ntuples - lineindex) : 0; + else + linesleft = scan->rs_cindex; + if (linesleft > 0) + break; /* continue on this page */ + } + + /* Move to next page and prepare its visible tuple list. */ + heap_fetch_next_buffer(scan, dir); + + if (!BufferIsValid(scan->rs_cbuf)) + { + /* end of scan; keep rs_cbuf invalid like heapgettup_pagemode */ + scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; + scan->rs_inited = false; + return 0; + } + + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); + heap_prepare_pagescan((TableScanDesc) scan); + + /* After prepare, either rs_ntuples > 0 or we'll loop again. */ + if (scan->rs_ntuples > 0) + { + lineindex = ScanDirectionIsForward(dir) ? 0 : scan->rs_ntuples - 1; + linesleft = scan->rs_ntuples - (ScanDirectionIsForward(dir) ? 0 : 0); + break; + } + /* else: page had no visible tuples; continue to next page */ + } + + /* From here on, we must only read tuples from this single page. */ + page = BufferGetPage(scan->rs_cbuf); + + /* + * Walk rs_vistuples[] from 'lineindex', copying headers into tdata[] + * until either the page is exhausted or the batch capacity is reached. + */ + for (; linesleft > 0 && nout < maxitems; linesleft--, lineindex += dir) + { + OffsetNumber lineoff; + ItemId lpp; + HeapTupleData *dst = &tdata[nout]; + + Assert(lineindex <= (uint32) scan->rs_ntuples); + lineoff = scan->rs_vistuples[lineindex]; + lpp = PageGetItemId(page, lineoff); + Assert(ItemIdIsNormal(lpp)); + + dst->t_data = (HeapTupleHeader) PageGetItem(page, lpp); + dst->t_len = ItemIdGetLength(lpp); + dst->t_tableOid = RelationGetRelid(scan->rs_base.rs_rd); + ItemPointerSet(&(dst->t_self), scan->rs_cblock, lineoff); + + if (key != NULL && + !HeapKeyTest(dst, RelationGetDescr(scan->rs_base.rs_rd), + nkeys, key)) + continue; + + scan->rs_cindex = lineindex; + nout++; + } + + return nout; +} /* ---------------------------------------------------------------- * heap access method interface @@ -1136,6 +1251,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_parallel = parallel_scan; scan->rs_strategy = NULL; /* set in initscan */ scan->rs_cbuf = InvalidBuffer; + scan->rs_batch_ctup = NULL; + scan->rs_batch_cbuf = InvalidBuffer; /* * Disable page-at-a-time mode if it's not a MVCC-safe snapshot. @@ -1315,6 +1432,8 @@ heap_endscan(TableScanDesc sscan) */ if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + if (BufferIsValid(scan->rs_batch_cbuf)) + ReleaseBuffer(scan->rs_batch_cbuf); /* * Must free the read stream before freeing the BufferAccessStrategy. @@ -1421,6 +1540,97 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s return true; } +/*---------- Batching support -----------*/ + +/* + * heap_scan_begin_batch + * + * Allocate a HeapBatch with space for 'maxitems' tuple headers. No pin is + * taken here. Memory is allocated under the scan's memory context. + */ +void * +heap_begin_batch(TableScanDesc sscan, int maxitems) +{ + HeapBatch *hb; + Oid relid; + + Assert(maxitems > 0); + + hb = palloc(sizeof(HeapBatch)); + hb->tupdata = palloc(sizeof(HeapTupleData) * maxitems); + hb->maxitems = maxitems; + hb->nitems = 0; + hb->buf = InvalidBuffer; + + /* Initialize static fields of HeapTupleData. Row bodies remain on page. */ + relid = RelationGetRelid(sscan->rs_rd); + for (int i = 0; i < maxitems; i++) + hb->tupdata[i].t_tableOid = relid; + + return hb; +} + +/* + * heap_scan_end_batch + * + * Release any outstanding pin and free the batch allocations. Caller will + * not use 'am_batch' after this point. + */ +void +heap_end_batch(TableScanDesc sscan, void *am_batch) +{ + HeapBatch *hb = (HeapBatch *) am_batch; + + if (BufferIsValid(hb->buf)) + ReleaseBuffer(hb->buf); + + pfree(hb->tupdata); + pfree(hb); +} + +int +heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir) +{ + HeapScanDesc scan = (HeapScanDesc) sscan; + HeapBatch *hb = (HeapBatch *) am_batch; + Buffer curbuf; + int n; + + Assert(ScanDirectionIsForward(dir)); + Assert(sscan->rs_flags & SO_ALLOW_PAGEMODE); + Assert(hb->maxitems > 0); + + /* Drop prior batch pin, if any. */ + if (BufferIsValid(hb->buf)) + { + ReleaseBuffer(hb->buf); + hb->buf = InvalidBuffer; + } + + hb->nitems = 0; + + /* One call per batch, never crosses a page. */ + n = heapgettup_pagemode_batch(scan, dir, + sscan->rs_nkeys, sscan->rs_key, + hb->tupdata, hb->maxitems); + + if (n == 0) + return 0; /* end of scan */ + + /* Hold a shared pin for the batch lifetime so t_data stays valid. */ + curbuf = scan->rs_cbuf; + IncrBufferRefCount(curbuf); + hb->buf = curbuf; + + /* Per-tuple stats (can be collapsed into a future _multi() call). */ + pgstat_count_heap_getnext_batch(sscan->rs_rd, n); + + hb->nitems = n; + return n; +} + +/*----- End of batching support -----*/ + void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, ItemPointer maxtid) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index bcbac844bb6..ec4eeccf19c 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2623,6 +2623,10 @@ static const TableAmRoutine heapam_methods = { .scan_rescan = heap_rescan, .scan_getnextslot = heap_getnextslot, + .scan_begin_batch = heap_begin_batch, + .scan_getnextbatch = heap_getnextbatch, + .scan_end_batch = heap_end_batch, + .scan_set_tidrange = heap_set_tidrange, .scan_getnextslot_tidrange = heap_getnextslot_tidrange, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index e60d34dad25..02f7793fba0 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -74,6 +74,9 @@ typedef struct HeapScanDescData HeapTupleData rs_ctup; /* current tuple in scan, if any */ + HeapTupleData *rs_batch_ctup; /* NULL when not using batched mode */ + Buffer rs_batch_cbuf; /* buffer feeding the batch */ + /* For scans that stream reads */ ReadStream *rs_read_stream; @@ -101,6 +104,19 @@ typedef struct HeapScanDescData } HeapScanDescData; typedef struct HeapScanDescData *HeapScanDesc; +/* + * HeapBatch -- stateless per-batch buffer. A batch pins one page and + * exposes up to maxitems HeapTupleData headers whose t_data point into that + * page. + */ +typedef struct HeapBatch +{ + HeapTupleData *tupdata; /* len = maxitems; headers only */ + int nitems; /* tuples produced in last getnextbatch() */ + int maxitems; /* fixed capacity set at begin_batch() */ + Buffer buf; /* single pinned buffer for this batch */ +} HeapBatch; + typedef struct BitmapHeapScanDescData { HeapScanDescData rs_heap_base; @@ -294,6 +310,11 @@ extern void heap_endscan(TableScanDesc sscan); extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction); extern bool heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot); + +extern void *heap_begin_batch(TableScanDesc sscan, int maxitems); +extern void heap_end_batch(TableScanDesc sscan, void *am_batch); +extern int heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir); + extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid, ItemPointer maxtid); extern bool heap_getnextslot_tidrange(TableScanDesc sscan, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index e16bf025692..953207eac50 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -351,6 +351,16 @@ typedef struct TableAmRoutine ScanDirection direction, TupleTableSlot *slot); + /* ------------------------------------------------------------------------ + * Batched scan support + * ------------------------------------------------------------------------ + */ + + void *(*scan_begin_batch)(TableScanDesc sscan, int maxitems); + int (*scan_getnextbatch)(TableScanDesc sscan, void *am_batch, + ScanDirection dir); + void (*scan_end_batch)(TableScanDesc sscan, void *am_batch); + /*----------- * Optional functions to provide scanning for ranges of ItemPointers. * Implementations must either provide both of these functions, or neither @@ -1036,6 +1046,54 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot); } +/* + * table_scan_begin_batch + * Allocate AM-owned batch payload with capacity 'maxitems'. + */ +static inline void * +table_scan_begin_batch(TableScanDesc sscan, int maxitems) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + Assert(tam->scan_begin_batch != NULL); + + return tam->scan_begin_batch(sscan, maxitems); +} + +/* + * table_scan_getnextbatch + * Fill next batch from the AM. Returns number of tuples, 0 => EOS. + * Batches are single-page in v1. Direction is forward only in v1. + */ +static inline int +table_scan_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + /* Only forward scans are supported in the batched mode. */ + Assert(dir == ForwardScanDirection); + Assert(tam->scan_getnextbatch != NULL); + + return tam->scan_getnextbatch(sscan, am_batch, dir); +} + +/* + * table_scan_end_batch + * Release AM-owned resources for the batch payload. + */ +static inline void +table_scan_end_batch(TableScanDesc sscan, void *am_batch) +{ + const TableAmRoutine *tam = sscan->rs_rd->rd_tableam; + + if (am_batch == NULL) + return; + + Assert(tam->scan_end_batch != NULL); + + tam->scan_end_batch(sscan, am_batch); +} + /* ---------------------------------------------------------------------------- * TID Range scanning related functions. * ---------------------------------------------------------------------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index e4a59a30b8c..aaea9520b1d 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -687,6 +687,11 @@ extern void pgstat_report_analyze(Relation rel, if (pgstat_should_count_relation(rel)) \ (rel)->pgstat_info->counts.tuples_returned++; \ } while (0) +#define pgstat_count_heap_getnext_batch(rel, n) \ + do { \ + if (pgstat_should_count_relation(rel)) \ + (rel)->pgstat_info->counts.tuples_returned += n; \ + } while (0) #define pgstat_count_heap_fetch(rel) \ do { \ if (pgstat_should_count_relation(rel)) \ -- 2.43.0