From 10d0df2676462f1931b2ef5072eed7129d936328 Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Mon, 1 Sep 2025 22:18:30 +0900 Subject: [PATCH v2 3/8] Executor: add ExecProcNodeBatch() and integrate SeqScan with batch API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a batch-capable executor interface alongside the existing slot-at-a-time path: * ExecProcNodeBatch() is added to return a TupleBatch instead of a TupleTableSlot. PlanState gains ExecProcNodeBatch as a function pointer. Integrate SeqScan with this interface: * Add ExecSeqScanBatch* routines that drive heap via the batch table AM API and return a TupleBatch. * At init, set ps.ExecProcNodeBatch to these routines when ScanCanUseBatching() allows. * Retain ExecSeqScanBatchSlot* variants for slot-at-a-time consumers. This builds on 0002, which introduced TupleBatch and made SeqScan consume the AM’s batch API internally but still surface slots. With this patch, SeqScan can surface batches directly to batch-aware upper nodes. Plan shape and EXPLAIN output remain unchanged; only internal tuple flow differs when batching is enabled and allowed. --- src/backend/executor/execProcnode.c | 52 +++++++++++++++++++++++++++++ src/backend/executor/nodeSeqscan.c | 35 +++++++++++++++++++ src/include/executor/execScan.h | 51 ++++++++++++++++++++++++++++ src/include/executor/executor.h | 10 ++++++ src/include/nodes/execnodes.h | 5 +++ 5 files changed, 153 insertions(+) diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f5f9cfbeead..a8c0315e874 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -121,6 +121,8 @@ static TupleTableSlot *ExecProcNodeFirst(PlanState *node); static TupleTableSlot *ExecProcNodeInstr(PlanState *node); +static TupleBatch *ExecProcNodeBatchFirst(PlanState *node); +static TupleBatch *ExecProcNodeBatchInstr(PlanState *node); static bool ExecShutdownNode_walker(PlanState *node, void *context); @@ -389,6 +391,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags) } ExecSetExecProcNode(result, result->ExecProcNode); + if (result->ExecProcNodeBatch) + ExecSetExecProcNodeBatch(result, result->ExecProcNodeBatch); /* * Initialize any initPlans present in this node. The planner put them in @@ -489,6 +493,54 @@ ExecProcNodeInstr(PlanState *node) return result; } +/* + * ExecSetExecProcNodeBatch + * Install ExecProcNodeBatch with first-call wrapper, mirroring row path. + */ +void +ExecSetExecProcNodeBatch(PlanState *node, ExecProcNodeBatchMtd function) +{ + node->ExecProcNodeBatchReal = function; + node->ExecProcNodeBatch = ExecProcNodeBatchFirst; +} + +/* + * ExecProcNodeBatchFirst + * One-time stack-depth check; then pick instrument/no-instrument wrapper. + */ +static TupleBatch * +ExecProcNodeBatchFirst(PlanState *node) +{ + check_stack_depth(); + + if (node->instrument) + node->ExecProcNodeBatch = ExecProcNodeBatchInstr; + else + node->ExecProcNodeBatch = node->ExecProcNodeBatchReal; + + return node->ExecProcNodeBatch(node); +} + +/* + * ExecProcNodeBatchInstr + * Instrumentation wrapper for batch calls. + * + * Note: we can record nrows as the "tuple" count for this call. That keeps + * instrumentation meaningful without changing Instr API. + */ +static TupleBatch * +ExecProcNodeBatchInstr(PlanState *node) +{ + TupleBatch *b; + + InstrStartNode(node->instrument); + + b = node->ExecProcNodeBatchReal(node); + + InstrStopNode(node->instrument, b ? (double) b->nvalid : 0.0); + + return b; +} /* ---------------------------------------------------------------- * MultiExecProcNode diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 2552d420f1c..a4cf1e51af0 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -334,6 +334,37 @@ ExecSeqScanBatchSlotWithQualProject(PlanState *pstate) pstate->qual, pstate->ps_ProjInfo); } +static TupleBatch * +ExecSeqScanBatch(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + Assert(pstate->qual == NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return ExecScanExtendedBatch(&node->ss, + (ExecScanAccessBatchMtd) SeqNextBatch, + NULL, NULL); +} + +/* + * Variant of ExecSeqScan() but when qual evaluation is required. + */ +static TupleBatch * +ExecSeqScanBatchWithQual(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + pg_assume(pstate->qual != NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return ExecScanExtendedBatch(&node->ss, + (ExecScanAccessBatchMtd) SeqNextBatchMaterialize, + pstate->qual, NULL); +} + /* Batch SeqScan enablement and dispatch */ static void SeqScanInitBatching(SeqScanState *scanstate, int eflags) @@ -348,10 +379,12 @@ SeqScanInitBatching(SeqScanState *scanstate, int eflags) { if (scanstate->ss.ps.ps_ProjInfo == NULL) { + scanstate->ss.ps.ExecProcNodeBatch = ExecSeqScanBatch; scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlot; } else { + scanstate->ss.ps.ExecProcNodeBatch = NULL; scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithProject; } } @@ -359,10 +392,12 @@ SeqScanInitBatching(SeqScanState *scanstate, int eflags) { if (scanstate->ss.ps.ps_ProjInfo == NULL) { + scanstate->ss.ps.ExecProcNodeBatch = ExecSeqScanBatchWithQual; scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQual; } else { + scanstate->ss.ps.ExecProcNodeBatch = NULL; scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject; } } diff --git a/src/include/executor/execScan.h b/src/include/executor/execScan.h index fec606471c8..fb4b57a831c 100644 --- a/src/include/executor/execScan.h +++ b/src/include/executor/execScan.h @@ -297,4 +297,55 @@ ExecScanExtendedBatchSlot(ScanState *node, } } +static inline TupleBatch * +ExecScanExtendedBatch(ScanState *node, + ExecScanAccessBatchMtd accessBatchMtd, + ExprState *qual, ProjectionInfo *projInfo) +{ + ExprContext *econtext = node->ps.ps_ExprContext; + TupleBatch *b = node->ps.ps_Batch; + int qualified; + + /* Batch path does not support EPQ */ + Assert(node->ps.state->es_epq_active == NULL); + Assert(TupleBatchIsValid(b)); + + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + /* Get next batch from the AM */ + if (!accessBatchMtd(node)) + return NULL; + + if (qual != NULL) + { + qualified = 0; + while (TupleBatchHasMore(b)) + { + TupleTableSlot *in = TupleBatchGetNextSlot(b); + + Assert(in); + ResetExprContext(econtext); + econtext->ecxt_scantuple = in; + + if (ExecQual(qual, econtext)) + { + TupleBatchStoreInOut(b, qualified, in); + qualified++; + } + else + InstrCountFiltered1(node, 1); + } + TupleBatchUseOutput(b, qualified); + } + else + qualified = b->nvalid; + + if (qualified > 0) + return b; + /* else get the next batch from the AM */ + } +} + #endif /* EXECSCAN_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 17258f7ae2d..cf5b0c7e05c 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -294,6 +294,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate); */ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function); +extern void ExecSetExecProcNodeBatch(PlanState *node, ExecProcNodeBatchMtd function); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); extern void ExecShutdownNode(PlanState *node); @@ -315,6 +316,15 @@ ExecProcNode(PlanState *node) return node->ExecProcNode(node); } + +static inline TupleBatch * +ExecProcNodeBatch(PlanState *node) +{ + if (node->chgParam != NULL) /* something changed? */ + ExecReScan(node); /* let ReScan handle this */ + + return node->ExecProcNodeBatch(node); +} #endif /* diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index f4bb8f7dd7f..a104591ac20 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1147,6 +1147,7 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (PlanState *pstate); /* Return a batch; may reuse caller-provided envelope. NULL => end of scan. */ struct TupleBatch; typedef struct TupleBatch TupleBatch; +typedef TupleBatch *(*ExecProcNodeBatchMtd)(struct PlanState *ps); /* ---------------- * PlanState node @@ -1171,6 +1172,10 @@ typedef struct PlanState ExecProcNodeMtd ExecProcNodeReal; /* actual function, if above is a * wrapper */ + /* Optional batch-producing entry point (NULL => no batching). */ + ExecProcNodeBatchMtd ExecProcNodeBatch; + ExecProcNodeBatchMtd ExecProcNodeBatchReal; + Instrumentation *instrument; /* Optional runtime stats for this node */ WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */ -- 2.43.0