From ddcbd693f9aa8498c06b4f20fe4df20ff98974c5 Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Wed, 25 Mar 2026 16:06:57 +0900 Subject: [PATCH v9 5/5] Reuse partition pruning results in parallel workers Pass the leader's initial partition pruning results and unpruned relids to parallel workers and reuse them via ExecutorPrep(). This avoids repeating pruning logic in workers, which is not only redundant but also risks divergence due to nondeterminism in pruning steps or parameter evaluation timing. Factor the creation of PartitionPruneState structures out of ExecDoInitialPruning() into a new ExecCreatePartitionPruneStates() function. Parallel workers need to set up pruning state without performing initial pruning, since they receive the leader's results instead. Introduce CheckInitialPruningResultsInWorker() (debug-builds only) to verify that the results match what the worker would compute. This check helps catch inconsistencies across leader and worker pruning logic. --- src/backend/executor/execMain.c | 25 +++++-- src/backend/executor/execParallel.c | 108 ++++++++++++++++++++++++++- src/backend/executor/execPartition.c | 44 ++++++++--- src/backend/utils/cache/plancache.c | 2 +- src/include/executor/execPartition.h | 1 + src/include/executor/executor.h | 3 +- 6 files changed, 161 insertions(+), 22 deletions(-) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 336bd4d09b3..5fa312436fb 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -207,7 +207,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) queryDesc->estate = ExecutorPrep(queryDesc->plannedstmt, queryDesc->params, CurrentResourceOwner, - eflags); + eflags, true); } #ifdef USE_ASSERT_CHECKING else @@ -330,7 +330,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * ExecutorPrep: prepare executor state for a PlannedStmt outside ExecutorStart. * * Performs range table initialization, permission checks, and initial - * partition pruning if partPruneInfos are present. + * partition pruning if partPruneInfos are present and do_initial_pruning is + * true; false in a parallel worker. * * Returns an EState that the caller must either pass to ExecutorStart() * for reuse or free via FreeExecutorState() if execution will not proceed. @@ -341,7 +342,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) */ EState * ExecutorPrep(PlannedStmt *pstmt, ParamListInfo params, ResourceOwner owner, - int eflags) + int eflags, bool do_initial_pruning) { ResourceOwner oldowner; EState *estate; @@ -377,14 +378,22 @@ ExecutorPrep(PlannedStmt *pstmt, ParamListInfo params, ResourceOwner owner, CurrentResourceOwner = owner; /* - * Set up PartitionPruneState structures and perform initial partition - * pruning to compute the subset of child subplans that will be - * executed. The results, which are bitmapsets of selected child - * indexes, are saved in es_part_prune_results, parallel to + * Set up PartitionPruneState structures needed for initial + * partition pruning. + * + * If do_initial_pruning is true, also perform initial pruning to + * compute the subset of child subplans that will be executed. + * The results, which are bitmapsets of selected child indexes, + * are saved in es_part_prune_results, parallel to * es_part_prune_infos. RT indexes of surviving partitions are * added to es_unpruned_relids. + * + * Parallel workers pass false here and instead receive the + * leader's pruning results via shared memory. */ - ExecDoInitialPruning(estate); + ExecCreatePartitionPruneStates(estate); + if (do_initial_pruning) + ExecDoInitialPruning(estate); CurrentResourceOwner = oldowner; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 024780d3516..2de4b35a16e 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -24,6 +24,7 @@ #include "postgres.h" #include "executor/execParallel.h" +#include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" @@ -67,6 +68,8 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PARTITION_PRUNE_RESULTS UINT64CONST(0xE00000000000000B) +#define PARALLEL_KEY_UNPRUNED_RELIDS UINT64CONST(0xE00000000000000C) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -141,6 +144,8 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +static void CheckInitialPruningResultsInWorker(EState *estate); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -620,12 +625,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; + char *part_prune_results_data; + char *part_prune_results_space; + char *unpruned_relids_data; + char *unpruned_relids_space; char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; + int part_prune_results_len; + int unpruned_relids_len; int paramlistinfo_len; int instrumentation_len = 0; int jit_instrumentation_len = 0; @@ -654,6 +665,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* Fix up and serialize plan to be sent to workers. */ pstmt_data = ExecSerializePlan(planstate->plan, estate); + part_prune_results_data = nodeToString(estate->es_part_prune_results); + unpruned_relids_data = nodeToString(estate->es_unpruned_relids); /* Create a parallel context. */ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); @@ -680,6 +693,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized part_prune_results. */ + part_prune_results_len = strlen(part_prune_results_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, part_prune_results_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for serialized unpruned_relids. */ + unpruned_relids_len = strlen(unpruned_relids_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, unpruned_relids_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized ParamListInfo. */ paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info); shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); @@ -781,6 +804,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, memcpy(pstmt_space, pstmt_data, pstmt_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); + /* Store serialized part_prune_results */ + part_prune_results_space = shm_toc_allocate(pcxt->toc, part_prune_results_len); + memcpy(part_prune_results_space, part_prune_results_data, part_prune_results_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, part_prune_results_space); + + /* Store serialized unpruned_relids */ + unpruned_relids_space = shm_toc_allocate(pcxt->toc, unpruned_relids_len); + memcpy(unpruned_relids_space, unpruned_relids_data, unpruned_relids_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNPRUNED_RELIDS, unpruned_relids_space); + /* Store serialized ParamListInfo. */ paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); @@ -1280,10 +1313,15 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options) { char *pstmtspace; + char *part_prune_results_space; + char *unpruned_relids_space; char *paramspace; PlannedStmt *pstmt; + List *part_prune_results; + Bitmapset *unpruned_relids; ParamListInfo paramLI; char *queryString; + EState *prep_estate = NULL; /* Get the query string from shared memory */ queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false); @@ -1296,12 +1334,80 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false); paramLI = RestoreParamList(¶mspace); + /* Reconstruct leader-supplied part_prune_results and unpruned_relids. */ + part_prune_results_space = + shm_toc_lookup(toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, false); + part_prune_results = (List *) stringToNode(part_prune_results_space); + unpruned_relids_space = + shm_toc_lookup(toc, PARALLEL_KEY_UNPRUNED_RELIDS, false); + unpruned_relids = (Bitmapset *) stringToNode(unpruned_relids_space); + + /* + * If pruning was done in the leader, build a prep estate in the worker + * and inject the leader's pruning results into it for reuse. + */ + if (pstmt->partPruneInfos) + { + prep_estate = ExecutorPrep(pstmt, paramLI, CurrentResourceOwner, 0, false); + Assert(prep_estate); + + prep_estate->es_part_prune_results = part_prune_results; + prep_estate->es_unpruned_relids = + bms_add_members(prep_estate->es_unpruned_relids, + unpruned_relids); + + /* + * A debug-build-only check that the pruning results passed from the + * leader match what the worker would independently compute. + */ + CheckInitialPruningResultsInWorker(prep_estate); + } + /* Create a QueryDesc for the query. */ return CreateQueryDesc(pstmt, queryString, GetActiveSnapshot(), InvalidSnapshot, receiver, paramLI, NULL, instrument_options, - NULL); + prep_estate); +} + +/* + * CheckInitialPruningResultsInWorker + * Verify partition pruning results passed from the leader process. + * + * This is intended to be called during parallel worker query setup. + * It recomputes initial pruning results locally and compares them with + * those received from the leader. Any mismatch may indicate a divergence + * between leader and worker logic or environment. + * + * Only performed in debug builds. + */ +static void +CheckInitialPruningResultsInWorker(EState *estate) +{ +#ifdef USE_ASSERT_CHECKING + ListCell *lc; + int i; + + Assert(estate->es_part_prune_results != NULL); + i = 0; + foreach(lc, estate->es_part_prune_states) + { + PartitionPruneState *prunestate = (PartitionPruneState *) lfirst(lc); + Bitmapset *reuse_validsubplans = + list_nth_node(Bitmapset, estate->es_part_prune_results, i++); + Bitmapset *validsubplans = NULL; + Bitmapset *validsubplan_rtis = NULL; + + if (prunestate->do_initial_prune) + validsubplans = ExecFindMatchingSubPlans(prunestate, true, + &validsubplan_rtis); + if (!bms_equal(validsubplans, reuse_validsubplans)) + elog(ERROR, "different validsubplans in parallel worker"); + if (bms_nonempty_difference(validsubplan_rtis, estate->es_unpruned_relids)) + elog(ERROR, "different unprunable_relids in parallel worker"); + } +#endif } /* diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 2a3af006f77..47322614aad 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -1942,6 +1942,9 @@ adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap) * * Functions: * + * ExecCreatePartitionPruneStates + * Create PartitionPruneState for all PartitionPruneInfos in the EState + * * ExecDoInitialPruning: * Perform runtime "initial" pruning, if necessary, to determine the set * of child subnodes that need to be initialized during ExecInitNode() for @@ -1967,15 +1970,40 @@ adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap) */ +/* + * ExecCreatePartitionPruneStates + * + * Create a PartitionPruneState for each PartitionPruneInfo in the estate, + * and save them in estate->es_part_prune_states. This setup is required + * before any initial or runtime pruning can occur. + */ +void +ExecCreatePartitionPruneStates(EState *estate) +{ + ListCell *lc; + + foreach(lc, estate->es_part_prune_infos) + { + PartitionPruneInfo *pruneinfo = lfirst_node(PartitionPruneInfo, lc); + PartitionPruneState *prunestate; + + /* Create and save the PartitionPruneState. */ + prunestate = CreatePartitionPruneState(estate, pruneinfo); + estate->es_part_prune_states = lappend(estate->es_part_prune_states, + prunestate); + } +} + /* * ExecDoInitialPruning * Perform runtime "initial" pruning, if necessary, to determine the set * of child subnodes that need to be initialized during ExecInitNode() for * plan nodes that support partition pruning. * - * This function iterates over each PartitionPruneInfo entry in - * estate->es_part_prune_infos. For each entry, it creates a PartitionPruneState - * and adds it to es_part_prune_states. ExecInitPartitionExecPruning() accesses + * + * This function iterates over each PartitionPruneState in + * estate->es_part_prune_states, which must have been populated earlier by + * ExecCreatePartitionPruneStates(). ExecInitPartitionExecPruning() accesses * these states through their corresponding indexes in es_part_prune_states and * assigns each state to the parent node's PlanState, from where it will be used * for "exec" pruning. @@ -1996,18 +2024,12 @@ ExecDoInitialPruning(EState *estate) ListCell *lc; Assert(estate->es_part_prune_results == NULL); - foreach(lc, estate->es_part_prune_infos) + foreach(lc, estate->es_part_prune_states) { - PartitionPruneInfo *pruneinfo = lfirst_node(PartitionPruneInfo, lc); - PartitionPruneState *prunestate; + PartitionPruneState *prunestate = (PartitionPruneState *) lfirst(lc); Bitmapset *validsubplans = NULL; Bitmapset *validsubplan_rtis = NULL; - /* Create and save the PartitionPruneState. */ - prunestate = CreatePartitionPruneState(estate, pruneinfo); - estate->es_part_prune_states = lappend(estate->es_part_prune_states, - prunestate); - /* * Perform initial pruning steps, if any, and save the result * bitmapset or NULL as described in the header comment. RT indexes diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index bb62c648899..879b2d012a1 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -2102,7 +2102,7 @@ AcquireExecutorLocksUnpruned(List *stmt_list, bool acquire, } prep_estate = ExecutorPrep(plannedstmt, cprep->params, - cprep->owner, cprep->eflags); + cprep->owner, cprep->eflags, true); Assert(prep_estate); cprep->prep_estates = lappend(cprep->prep_estates, prep_estate); diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h index 82063ec2a16..4c96808c376 100644 --- a/src/include/executor/execPartition.h +++ b/src/include/executor/execPartition.h @@ -130,6 +130,7 @@ typedef struct PartitionPruneState PartitionPruningData *partprunedata[FLEXIBLE_ARRAY_MEMBER]; } PartitionPruneState; +extern void ExecCreatePartitionPruneStates(EState *estate); extern void ExecDoInitialPruning(EState *estate); extern PartitionPruneState *ExecInitPartitionExecPruning(PlanState *planstate, int n_total_subplans, diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 4505ceaca3c..8e5fde965ed 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -240,7 +240,8 @@ extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags); extern EState *ExecutorPrep(PlannedStmt *pstmt, ParamListInfo params, ResourceOwner owner, - int eflags); + int eflags, + bool do_initial_pruning); /* * Walk a prep_estates list in step with a parallel stmt_list iteration. -- 2.47.3