From 4c12c380b75b8684e9c41c80d0c77027cf592e17 Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Thu, 19 Mar 2026 20:03:58 +0900 Subject: [PATCH v8 5/5] Reuse partition pruning results in parallel workers Pass the leader's initial partition pruning results and unpruned relids to parallel workers and reuse them via ExecutorPrep(). This avoids repeating pruning logic in workers, which is not only redundant but also risks divergence due to nondeterminism in pruning steps or parameter evaluation timing. Introduce CheckInitialPruningResultsInWorker() (debug-builds only) to verify that the results match what the worker would compute. This check helps catch inconsistencies across leader and worker pruning logic. --- src/backend/executor/execMain.c | 10 +-- src/backend/executor/execParallel.c | 108 +++++++++++++++++++++++++++- src/backend/utils/cache/plancache.c | 2 +- src/include/executor/executor.h | 3 +- 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 0f95ad88497..9a3700e672f 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -207,7 +207,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) queryDesc->estate = ExecutorPrep(queryDesc->plannedstmt, queryDesc->params, CurrentResourceOwner, - eflags); + eflags, true); } #ifdef USE_ASSERT_CHECKING else @@ -330,7 +330,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * ExecutorPrep: prepare executor state for a PlannedStmt outside ExecutorStart. * * Performs range table initialization, permission checks, and initial - * partition pruning if partPruneInfos are present. + * partition pruning if partPruneInfos are present and do_initial_pruning is + * true; false in a parallel worker. * * Returns an EState that the caller must either pass to ExecutorStart() * for reuse or free via FreeExecutorState() if execution will not proceed. @@ -340,7 +341,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) */ EState * ExecutorPrep(PlannedStmt *pstmt, ParamListInfo params, ResourceOwner owner, - int eflags) + int eflags, bool do_initial_pruning) { ResourceOwner oldowner; EState *estate; @@ -386,7 +387,8 @@ ExecutorPrep(PlannedStmt *pstmt, ParamListInfo params, ResourceOwner owner, * to es_part_prune_infos. */ ExecCreatePartitionPruneStates(estate); - ExecDoInitialPruning(estate); + if (do_initial_pruning) + ExecDoInitialPruning(estate); CurrentResourceOwner = oldowner; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 024780d3516..2de4b35a16e 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -24,6 +24,7 @@ #include "postgres.h" #include "executor/execParallel.h" +#include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" @@ -67,6 +68,8 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PARTITION_PRUNE_RESULTS UINT64CONST(0xE00000000000000B) +#define PARALLEL_KEY_UNPRUNED_RELIDS UINT64CONST(0xE00000000000000C) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -141,6 +144,8 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +static void CheckInitialPruningResultsInWorker(EState *estate); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -620,12 +625,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; + char *part_prune_results_data; + char *part_prune_results_space; + char *unpruned_relids_data; + char *unpruned_relids_space; char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; + int part_prune_results_len; + int unpruned_relids_len; int paramlistinfo_len; int instrumentation_len = 0; int jit_instrumentation_len = 0; @@ -654,6 +665,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* Fix up and serialize plan to be sent to workers. */ pstmt_data = ExecSerializePlan(planstate->plan, estate); + part_prune_results_data = nodeToString(estate->es_part_prune_results); + unpruned_relids_data = nodeToString(estate->es_unpruned_relids); /* Create a parallel context. */ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); @@ -680,6 +693,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized part_prune_results. */ + part_prune_results_len = strlen(part_prune_results_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, part_prune_results_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for serialized unpruned_relids. */ + unpruned_relids_len = strlen(unpruned_relids_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, unpruned_relids_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized ParamListInfo. */ paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info); shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); @@ -781,6 +804,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, memcpy(pstmt_space, pstmt_data, pstmt_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); + /* Store serialized part_prune_results */ + part_prune_results_space = shm_toc_allocate(pcxt->toc, part_prune_results_len); + memcpy(part_prune_results_space, part_prune_results_data, part_prune_results_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, part_prune_results_space); + + /* Store serialized unpruned_relids */ + unpruned_relids_space = shm_toc_allocate(pcxt->toc, unpruned_relids_len); + memcpy(unpruned_relids_space, unpruned_relids_data, unpruned_relids_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNPRUNED_RELIDS, unpruned_relids_space); + /* Store serialized ParamListInfo. */ paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); @@ -1280,10 +1313,15 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options) { char *pstmtspace; + char *part_prune_results_space; + char *unpruned_relids_space; char *paramspace; PlannedStmt *pstmt; + List *part_prune_results; + Bitmapset *unpruned_relids; ParamListInfo paramLI; char *queryString; + EState *prep_estate = NULL; /* Get the query string from shared memory */ queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false); @@ -1296,12 +1334,80 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false); paramLI = RestoreParamList(¶mspace); + /* Reconstruct leader-supplied part_prune_results and unpruned_relids. */ + part_prune_results_space = + shm_toc_lookup(toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, false); + part_prune_results = (List *) stringToNode(part_prune_results_space); + unpruned_relids_space = + shm_toc_lookup(toc, PARALLEL_KEY_UNPRUNED_RELIDS, false); + unpruned_relids = (Bitmapset *) stringToNode(unpruned_relids_space); + + /* + * If pruning was done in the leader, build a prep estate in the worker + * and inject the leader's pruning results into it for reuse. + */ + if (pstmt->partPruneInfos) + { + prep_estate = ExecutorPrep(pstmt, paramLI, CurrentResourceOwner, 0, false); + Assert(prep_estate); + + prep_estate->es_part_prune_results = part_prune_results; + prep_estate->es_unpruned_relids = + bms_add_members(prep_estate->es_unpruned_relids, + unpruned_relids); + + /* + * A debug-build-only check that the pruning results passed from the + * leader match what the worker would independently compute. + */ + CheckInitialPruningResultsInWorker(prep_estate); + } + /* Create a QueryDesc for the query. */ return CreateQueryDesc(pstmt, queryString, GetActiveSnapshot(), InvalidSnapshot, receiver, paramLI, NULL, instrument_options, - NULL); + prep_estate); +} + +/* + * CheckInitialPruningResultsInWorker + * Verify partition pruning results passed from the leader process. + * + * This is intended to be called during parallel worker query setup. + * It recomputes initial pruning results locally and compares them with + * those received from the leader. Any mismatch may indicate a divergence + * between leader and worker logic or environment. + * + * Only performed in debug builds. + */ +static void +CheckInitialPruningResultsInWorker(EState *estate) +{ +#ifdef USE_ASSERT_CHECKING + ListCell *lc; + int i; + + Assert(estate->es_part_prune_results != NULL); + i = 0; + foreach(lc, estate->es_part_prune_states) + { + PartitionPruneState *prunestate = (PartitionPruneState *) lfirst(lc); + Bitmapset *reuse_validsubplans = + list_nth_node(Bitmapset, estate->es_part_prune_results, i++); + Bitmapset *validsubplans = NULL; + Bitmapset *validsubplan_rtis = NULL; + + if (prunestate->do_initial_prune) + validsubplans = ExecFindMatchingSubPlans(prunestate, true, + &validsubplan_rtis); + if (!bms_equal(validsubplans, reuse_validsubplans)) + elog(ERROR, "different validsubplans in parallel worker"); + if (bms_nonempty_difference(validsubplan_rtis, estate->es_unpruned_relids)) + elog(ERROR, "different unprunable_relids in parallel worker"); + } +#endif } /* diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 2d4c57d3deb..0dd4f40c964 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -2102,7 +2102,7 @@ AcquireExecutorLocksUnpruned(List *stmt_list, bool acquire, } prep_estate = ExecutorPrep(plannedstmt, cprep->params, - cprep->owner, cprep->eflags); + cprep->owner, cprep->eflags, true); Assert(prep_estate); cprep->prep_estates = lappend(cprep->prep_estates, prep_estate); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 24604120c27..38848ba0651 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -240,7 +240,8 @@ extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags); extern EState *ExecutorPrep(PlannedStmt *pstmt, ParamListInfo params, ResourceOwner owner, - int eflags); + int eflags, + bool do_initial_pruning); /* * Walk a prep_estates list in step with a parallel stmt_list iteration. -- 2.47.3