From dc2cfc32410792b3f00422c07623f989901ee34b Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Tue, 11 Nov 2025 22:17:47 +0900 Subject: [PATCH v6 6/6] Reuse partition pruning results in parallel workers Pass the leader's initial partition pruning results and unpruned relids to parallel workers and reuse them via ExecutorPrep(). This avoids repeating pruning logic in workers, which is not only redundant but also risks divergence due to nondeterminism in pruning steps or parameter evaluation timing. Introduce CheckInitialPruningResultsInWorker() (debug-builds only) to verify that the results match what the worker would compute. This check helps catch inconsistencies across leader and worker pruning logic. --- src/backend/executor/execParallel.c | 108 +++++++++++++++++++++++++++- src/backend/utils/cache/plancache.c | 95 +++++++----------------- 2 files changed, 133 insertions(+), 70 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 024780d3516..d337bf8c081 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -24,6 +24,7 @@ #include "postgres.h" #include "executor/execParallel.h" +#include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" @@ -67,6 +68,8 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PARTITION_PRUNE_RESULTS UINT64CONST(0xE00000000000000B) +#define PARALLEL_KEY_UNPRUNED_RELIDS UINT64CONST(0xE00000000000000C) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -141,6 +144,8 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +static void CheckInitialPruningResultsInWorker(EState *estate); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -620,12 +625,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; + char *part_prune_results_data; + char *part_prune_results_space; + char *unpruned_relids_data; + char *unpruned_relids_space; char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; + int part_prune_results_len; + int unpruned_relids_len; int paramlistinfo_len; int instrumentation_len = 0; int jit_instrumentation_len = 0; @@ -654,6 +665,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* Fix up and serialize plan to be sent to workers. */ pstmt_data = ExecSerializePlan(planstate->plan, estate); + part_prune_results_data = nodeToString(estate->es_part_prune_results); + unpruned_relids_data = nodeToString(estate->es_unpruned_relids); /* Create a parallel context. */ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); @@ -680,6 +693,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized part_prune_results. */ + part_prune_results_len = strlen(part_prune_results_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, part_prune_results_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for serialized unpruned_relids. */ + unpruned_relids_len = strlen(unpruned_relids_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, unpruned_relids_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized ParamListInfo. */ paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info); shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); @@ -781,6 +804,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, memcpy(pstmt_space, pstmt_data, pstmt_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); + /* Store serialized part_prune_results */ + part_prune_results_space = shm_toc_allocate(pcxt->toc, part_prune_results_len); + memcpy(part_prune_results_space, part_prune_results_data, part_prune_results_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, part_prune_results_space); + + /* Store serialized unpruned_relids */ + unpruned_relids_space = shm_toc_allocate(pcxt->toc, unpruned_relids_len); + memcpy(unpruned_relids_space, unpruned_relids_data, unpruned_relids_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNPRUNED_RELIDS, unpruned_relids_space); + /* Store serialized ParamListInfo. */ paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); @@ -1280,10 +1313,15 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options) { char *pstmtspace; + char *part_prune_results_space; + char *unpruned_relids_space; char *paramspace; PlannedStmt *pstmt; + List *part_prune_results; + Bitmapset *unpruned_relids; ParamListInfo paramLI; char *queryString; + EState *prep_estate = NULL; /* Get the query string from shared memory */ queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false); @@ -1296,12 +1334,80 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false); paramLI = RestoreParamList(¶mspace); + /* Reconstruct leader-supplied part_prune_results and unpruned_relids. */ + part_prune_results_space = + shm_toc_lookup(toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, false); + part_prune_results = (List *) stringToNode(part_prune_results_space); + unpruned_relids_space = + shm_toc_lookup(toc, PARALLEL_KEY_UNPRUNED_RELIDS, false); + unpruned_relids = (Bitmapset *) stringToNode(unpruned_relids_space); + + /* + * If pruning was done in the leader, build a prep estate in the worker + * and inject the leader's pruning results into it for reuse. + */ + if (pstmt->partPruneInfos) + { + prep_estate = ExecutorPrep(pstmt, paramLI, CurrentResourceOwner, false, 0); + Assert(prep_estate); + + prep_estate->es_part_prune_results = part_prune_results; + prep_estate->es_unpruned_relids = + bms_add_members(prep_estate->es_unpruned_relids, + unpruned_relids); + + /* + * A debug-build-only check that the pruning results passed from the + * leader match what the worker would independently compute. + */ + CheckInitialPruningResultsInWorker(prep_estate); + } + /* Create a QueryDesc for the query. */ return CreateQueryDesc(pstmt, queryString, GetActiveSnapshot(), InvalidSnapshot, receiver, paramLI, NULL, instrument_options, - NULL); + prep_estate); +} + +/* + * CheckInitialPruningResultsInWorker + * Verify partition pruning results passed from the leader process. + * + * This is intended to be called during parallel worker query setup. + * It recomputes initial pruning results locally and compares them with + * those received from the leader. Any mismatch may indicate a divergence + * between leader and worker logic or environment. + * + * Only performed in debug builds. + */ +static void +CheckInitialPruningResultsInWorker(EState *estate) +{ +#ifdef USE_ASSERT_CHECKING + ListCell *lc; + int i; + + Assert(estate->es_part_prune_results != NULL); + i = 0; + foreach(lc, estate->es_part_prune_states) + { + PartitionPruneState *prunestate = (PartitionPruneState *) lfirst(lc); + Bitmapset *reuse_validsubplans = + list_nth_node(Bitmapset, estate->es_part_prune_results, i++); + Bitmapset *validsubplans = NULL; + Bitmapset *validsubplan_rtis = NULL; + + if (prunestate->do_initial_prune) + validsubplans = ExecFindMatchingSubPlans(prunestate, true, + &validsubplan_rtis); + if (!bms_equal(validsubplans, reuse_validsubplans)) + elog(ERROR, "different validsubplans in parallel worker"); + if (bms_nonempty_difference(validsubplan_rtis, estate->es_unpruned_relids)) + elog(ERROR, "different unprunable_relids in parallel worker"); + } +#endif } /* diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index be2a961a918..1d3244307da 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -93,14 +93,14 @@ static bool StmtPlanRequiresRevalidation(CachedPlanSource *plansource); static bool BuildingPlanRequiresSnapshot(CachedPlanSource *plansource); static List *RevalidateCachedQuery(CachedPlanSource *plansource, QueryEnvironment *queryEnv); -static bool PrepAndCheckCachedPlan(CachedPlanSource *plansource, CachedPlanPrepData *cprep); +static bool CheckCachedPlan(CachedPlanSource *plansource, CachedPlanPrepData *cprep); static CachedPlan *BuildCachedPlan(CachedPlanSource *plansource, List *qlist, ParamListInfo boundParams, QueryEnvironment *queryEnv); static bool choose_custom_plan(CachedPlanSource *plansource, ParamListInfo boundParams); static double cached_plan_cost(CachedPlan *plan, bool include_planner); static Query *QueryListGetPrimaryStmt(List *stmts); -static void AcquireExecutorLocks(List *stmt_list, bool acquire); +static void AcquireExecutorLocksAll(List *stmt_list, bool acquire); static void AcquireExecutorLocksUnpruned(List *stmt_list, bool acquire, CachedPlanPrepData *cprep); static void CachedPlanPrepCleanup(CachedPlanPrepData *cprep); @@ -142,26 +142,6 @@ ResourceOwnerForgetPlanCacheRef(ResourceOwner owner, CachedPlan *plan) /* GUC parameter */ int plan_cache_mode = PLAN_CACHE_MODE_AUTO; -/* - * Lock acquisition policy for execution locks. - * - * LOCK_ALL acquires locks on all relations mentioned in the plan, - * reproducing the behavior of AcquireExecutorLocks(). - * - * LOCK_UNPRUNED restricts locking to only the unpruned relations. That - * includes those mentioned in PlannedStmt.unprunableRelids and the leaf - * partitions remaining after performing initial pruning. - */ -typedef enum LockPolicy -{ - LOCK_ALL, - LOCK_UNPRUNED, -} LockPolicy; - -static void AcquireExecutorLocksWithPolicy(List *stmt_list, - LockPolicy policy, bool acquire, - CachedPlanPrepData *cprep); - /* * InitPlanCache: initialize module during InitPostgres. * @@ -963,7 +943,7 @@ RevalidateCachedQuery(CachedPlanSource *plansource, } /* - * PrepAndCheckCachedPlan: see if the CachedPlanSource's generic plan is valid. + * CheckCachedPlan: see if the CachedPlanSource's generic plan is valid. * * If 'cprep' is not NULL, ExecutorPrep() is applied to each PlannedStmt to * compute the set of partitions that survive initial runtime pruning in order @@ -977,7 +957,7 @@ RevalidateCachedQuery(CachedPlanSource *plansource, * (We must do this for the "true" result to be race-condition-free.) */ static bool -PrepAndCheckCachedPlan(CachedPlanSource *plansource, CachedPlanPrepData *cprep) +CheckCachedPlan(CachedPlanSource *plansource, CachedPlanPrepData *cprep) { CachedPlan *plan = plansource->gplan; @@ -1005,15 +985,16 @@ PrepAndCheckCachedPlan(CachedPlanSource *plansource, CachedPlanPrepData *cprep) */ if (plan->is_valid) { - LockPolicy policy = !cprep ? LOCK_ALL : LOCK_UNPRUNED; - /* * Plan must have positive refcount because it is referenced by * plansource; so no need to fear it disappears under us here. */ Assert(plan->refcount > 0); - AcquireExecutorLocksWithPolicy(plan->stmt_list, policy, true, cprep); + if (cprep) + AcquireExecutorLocksUnpruned(plan->stmt_list, true, cprep); + else + AcquireExecutorLocksAll(plan->stmt_list, true); /* * If plan was transient, check to see if TransactionXmin has @@ -1035,7 +1016,10 @@ PrepAndCheckCachedPlan(CachedPlanSource *plansource, CachedPlanPrepData *cprep) } /* Oops, the race case happened. Release useless locks. */ - AcquireExecutorLocksWithPolicy(plan->stmt_list, policy, false, cprep); + if (cprep) + AcquireExecutorLocksUnpruned(plan->stmt_list, false, cprep); + else + AcquireExecutorLocksAll(plan->stmt_list, false); /* Also clean up ExecutorPrep() state, if necessary. */ CachedPlanPrepCleanup(cprep); @@ -1358,7 +1342,7 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, { if (cprep) cprep->params = boundParams; - if (PrepAndCheckCachedPlan(plansource, cprep)) + if (CheckCachedPlan(plansource, cprep)) { /* We want a generic plan, and we already have a valid one */ plan = plansource->gplan; @@ -1945,43 +1929,13 @@ QueryListGetPrimaryStmt(List *stmts) } /* - * AcquireExecutorLocksWithPolicy - * Acquire or release execution locks for a cached plan according to - * the specified policy. - * - * LOCK_ALL reproduces AcquireExecutorLocks(), locking every relation in - * each PlannedStmt's rtable. LOCK_UNPRUNED restricts locking to the - * unprunable rels and partitions that survive initial runtime pruning. - * - * When LOCK_UNPRUNED is used on acquire, ExecutorPrep() is invoked for - * each PlannedStmt and the resulting EStates are appended to - * cprep->prep_estates in cprep->context. On release, the same EState - * list is consulted to determine which relations to unlock and each - * EState is released. - */ -static void -AcquireExecutorLocksWithPolicy(List *stmt_list, LockPolicy policy, bool acquire, - CachedPlanPrepData *cprep) -{ - switch (policy) - { - case LOCK_ALL: - AcquireExecutorLocks(stmt_list, acquire); - break; - case LOCK_UNPRUNED: - AcquireExecutorLocksUnpruned(stmt_list, acquire, cprep); - break; - default: - elog(ERROR, "invalid LockPolicy"); - } -} - -/* - * AcquireExecutorLocks: acquire locks needed for execution of a cached plan; - * or release them if acquire is false. + * AcquireExecutorLocksAll: acquire locks needed for execution of a cached + * plan; or release them if acquire is false. + * + * This locks all relations in a given PlannedStmt's range table. */ static void -AcquireExecutorLocks(List *stmt_list, bool acquire) +AcquireExecutorLocksAll(List *stmt_list, bool acquire) { ListCell *lc1; @@ -2044,10 +1998,8 @@ LockRelids(List *rtable, Bitmapset *relids, bool acquire) { RangeTblEntry *rte = list_nth_node(RangeTblEntry, rtable, rtindex - 1); - if (!(rte->rtekind == RTE_RELATION || - (rte->rtekind == RTE_SUBQUERY && OidIsValid(rte->relid)))) - elog(ERROR, "LockRelids(): cannot lock relation at RT index %d", - rtindex); + Assert(rte->rtekind == RTE_RELATION || + (rte->rtekind == RTE_SUBQUERY && OidIsValid(rte->relid))); /* * Acquire the appropriate type of lock on each relation OID. Note @@ -2204,7 +2156,7 @@ AcquireExecutorLocksUnpruned(List *stmt_list, bool acquire, * CachedPlanPrepCleanup * Clean up EState built for a generic plan. * - * This is used in the corner case where PrepAndCheckCachedPlan() discovers + * This is used in the corner case where CheckCachedPlan() discovers * that a CachedPlan has become invalid after AcquireExecutorLocksUnpruned() * has already run. In that case we must both release the execution locks * and dispose of the ExecPrep list stored in CachedPlanPrepData, since the @@ -2214,10 +2166,14 @@ static void CachedPlanPrepCleanup(CachedPlanPrepData *cprep) { ListCell *lc; + ResourceOwner oldowner; if (cprep == NULL) return; + /* Switch to owner that ExecutorPrep() would have used. */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = cprep->owner; foreach(lc, cprep->prep_estates) { EState *prep_estate = (EState *) lfirst(lc); @@ -2228,6 +2184,7 @@ CachedPlanPrepCleanup(CachedPlanPrepData *cprep) ExecCloseRangeTableRelations(prep_estate); FreeExecutorState(prep_estate); } + CurrentResourceOwner = oldowner; list_free(cprep->prep_estates); cprep->prep_estates = NIL; -- 2.47.3