From c1860e6f46643d8bae1bfeb1a45c9f3311034925 Mon Sep 17 00:00:00 2001 From: James Hunter Date: Tue, 25 Feb 2025 22:44:01 +0000 Subject: [PATCH 2/5] Store working memory limit on Plan field, rather than in GUC This commit moves the working-memory limit that an executor node checks, at runtime, from the "work_mem" and "hash_mem_multiplier" GUCs, to a new field, "workmem_limit", added to the Plan node. To preserve backward compatibility, it also copies the "work_mem", etc., values from these GUCs to the new field. This means that this commit is just a refactoring, and doesn't change any behavior. This field is on the Plan node, instead of the PlanState, because it needs to be set before we can call ExecInitNode(). Many PlanStates look at their working-memory limit when creating their data structures, during initialization. So the field is on the Plan node, but set between planning and execution phases. --- src/backend/executor/Makefile | 1 + src/backend/executor/execGrouping.c | 10 +- src/backend/executor/execMain.c | 6 + src/backend/executor/execSRF.c | 5 +- src/backend/executor/execWorkmem.c | 277 +++++++++++++++++++++ src/backend/executor/meson.build | 1 + src/backend/executor/nodeAgg.c | 69 +++-- src/backend/executor/nodeBitmapIndexscan.c | 3 +- src/backend/executor/nodeBitmapOr.c | 3 +- src/backend/executor/nodeCtescan.c | 3 +- src/backend/executor/nodeFunctionscan.c | 2 + src/backend/executor/nodeHash.c | 23 +- src/backend/executor/nodeIncrementalSort.c | 4 +- src/backend/executor/nodeMaterial.c | 3 +- src/backend/executor/nodeMemoize.c | 2 +- src/backend/executor/nodeRecursiveunion.c | 12 +- src/backend/executor/nodeSetOp.c | 1 + src/backend/executor/nodeSort.c | 4 +- src/backend/executor/nodeSubplan.c | 2 + src/backend/executor/nodeTableFuncscan.c | 3 +- src/backend/executor/nodeWindowAgg.c | 3 +- src/backend/optimizer/path/costsize.c | 4 +- src/include/executor/executor.h | 7 + src/include/executor/hashjoin.h | 3 +- src/include/executor/nodeAgg.h | 5 +- src/include/executor/nodeHash.h | 3 +- src/include/nodes/plannodes.h | 14 +- src/include/nodes/primnodes.h | 3 + 28 files changed, 423 insertions(+), 53 deletions(-) create mode 100644 src/backend/executor/execWorkmem.c diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 11118d0ce02..8aa9580558f 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -30,6 +30,7 @@ OBJS = \ execScan.o \ execTuples.o \ execUtils.o \ + execWorkmem.o \ functions.o \ instrument.o \ nodeAgg.o \ diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c index 33b124fbb0a..bcd1822da80 100644 --- a/src/backend/executor/execGrouping.c +++ b/src/backend/executor/execGrouping.c @@ -168,6 +168,7 @@ BuildTupleHashTable(PlanState *parent, Oid *collations, long nbuckets, Size additionalsize, + Size hash_mem_limit, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, @@ -175,15 +176,18 @@ BuildTupleHashTable(PlanState *parent, { TupleHashTable hashtable; Size entrysize = sizeof(TupleHashEntryData) + additionalsize; - Size hash_mem_limit; MemoryContext oldcontext; bool allow_jit; uint32 hash_iv = 0; Assert(nbuckets > 0); - /* Limit initial table size request to not more than hash_mem */ - hash_mem_limit = get_hash_memory_limit() / entrysize; + /* + * Limit initial table size request to not more than hash_mem + * + * XXX - we should also limit the *maximum* table size to hash_mem. + */ + hash_mem_limit = hash_mem_limit / entrysize; if (nbuckets > hash_mem_limit) nbuckets = hash_mem_limit; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 0493b7d5365..78fd887a84d 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1050,6 +1050,12 @@ InitPlan(QueryDesc *queryDesc, int eflags) /* signal that this EState is not used for EPQ */ estate->es_epq_active = NULL; + /* + * Assign working memory to SubPlan and Plan nodes, before initializing + * their states. + */ + ExecAssignWorkMem(plannedstmt); + /* * Initialize private state information for each SubPlan. We must do this * before running ExecInitNode on the main query tree, since diff --git a/src/backend/executor/execSRF.c b/src/backend/executor/execSRF.c index a03fe780a02..4b1e7e0ad1e 100644 --- a/src/backend/executor/execSRF.c +++ b/src/backend/executor/execSRF.c @@ -102,6 +102,7 @@ ExecMakeTableFunctionResult(SetExprState *setexpr, ExprContext *econtext, MemoryContext argContext, TupleDesc expectedDesc, + int workMem, bool randomAccess) { Tuplestorestate *tupstore = NULL; @@ -261,7 +262,7 @@ ExecMakeTableFunctionResult(SetExprState *setexpr, MemoryContext oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(randomAccess, false, work_mem); + tupstore = tuplestore_begin_heap(randomAccess, false, workMem); rsinfo.setResult = tupstore; if (!returnsTuple) { @@ -396,7 +397,7 @@ no_function_result: MemoryContext oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(randomAccess, false, work_mem); + tupstore = tuplestore_begin_heap(randomAccess, false, workMem); rsinfo.setResult = tupstore; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/executor/execWorkmem.c b/src/backend/executor/execWorkmem.c new file mode 100644 index 00000000000..5ec176d1355 --- /dev/null +++ b/src/backend/executor/execWorkmem.c @@ -0,0 +1,277 @@ +/*------------------------------------------------------------------------- + * + * execWorkmem.c + * routine to set the "workmem_limit" field(s) on Plan nodes that need + * workimg memory. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/execWorkmem.c + * + * INTERFACE ROUTINES + * ExecAssignWorkMem - assign working memory to Plan nodes + * + * NOTES + * Historically, every PlanState node, during initialization, looked at + * the "work_mem" (plus maybe "hash_mem_multiplier") GUC, to determine + * what working-memory limit was imposed on it. + * + * Now, to allow different PlanState nodes to be restricted to different + * amounts of memory, each PlanState node reads this limit off its + * corresponding Plan node's "workmem_limit" field. And we populate that + * field by calling ExecAssignWorkMem(), from InitPlan(), before we + * initialize the PlanState nodes. + * + * The "workmem_limit" field is a limit "per data structure," rather than + * "per PlanState". This is needed because some SQL operators (e.g., + * RecursiveUnion and Agg) require multiple data structures, and sometimes + * the data structures don't all share the same memory requirement. So we + * cannot always just divide a "per PlanState" limit among individual data + * structures. Instead, we maintain the limits on the data structures (and + * EXPLAIN, for example, sums them up into a single, human-readable + * number). + * + * Note that the *Path's* "workmem" estimate is per SQL operator, but when + * we convert that Path to a Plan we also break its "workmem" estimate + * down into per-data structure estimates. Some operators therefore + * require additional "limit" fields, which we add to the corresponding + * Plan. + * + * We store the "workmem_limit" field(s) on the Plan, instead of the + * PlanState, even though it conceptually belongs to execution rather than + * to planning, because we need it to be set before initializing the + * corresponding PlanState. This is a chicken-and-egg problem. We could, + * of course, make ExecInitNode() a two-phase operation, but that seems + * like overkill. Instead, we store these "limit" fields on the Plan, but + * set them when we start execution, as part of InitPlan(). + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/parallel.h" +#include "executor/executor.h" +#include "miscadmin.h" +#include "optimizer/cost.h" + + +/* decls for local routines only used within this module */ +static void assign_workmem_subplan(SubPlan *subplan); +static void assign_workmem_plan(Plan *plan); +static void assign_workmem_agg(Agg *agg); +static void assign_workmem_agg_node(Agg *agg, bool is_first, bool is_last, + bool *is_first_sort); + +/* end of local decls */ + + +/* ------------------------------------------------------------------------ + * ExecAssignWorkMem + * + * Recursively assigns working memory to any Plans or SubPlans that need + * it. + * + * Inputs: + * 'plannedstmt' is the statement to which we assign working memory + * + * ------------------------------------------------------------------------ + */ +void +ExecAssignWorkMem(PlannedStmt *plannedstmt) +{ + /* + * No need to re-assign working memory on parallel workers, since workers + * have the same work_mem and hash_mem_multiplier GUCs as the leader. + * + * We already assigned working-memory limits on the leader, and those + * limits were sent to the workers inside the serialized Plan. + */ + if (IsParallelWorker()) + return; + + /* Assign working memory to the Plans referred to by SubPlan objects. */ + foreach_ptr(Plan, plan, plannedstmt->subplans) + { + if (plan) + assign_workmem_plan(plan); + } + + /* And assign working memory to the main Plan tree. */ + assign_workmem_plan(plannedstmt->planTree); +} + +static void +assign_workmem_subplan(SubPlan *subplan) +{ + subplan->hashtab_workmem_limit = subplan->useHashTable ? + get_hash_memory_limit() / 1024 : 0; + + subplan->hashnul_workmem_limit = + subplan->useHashTable && !subplan->unknownEqFalse ? + get_hash_memory_limit() / 1024 : 0; +} + +static void +assign_workmem_plan(Plan *plan) +{ + /* Make sure there's enough stack available. */ + check_stack_depth(); + + /* Assign working memory to this node's (hashed) SubPlans. */ + foreach_node(SubPlan, subplan, plan->initPlan) + assign_workmem_subplan(subplan); + + foreach_node(SubPlan, subplan, plan->subPlan) + assign_workmem_subplan(subplan); + + /* Assign working memory to this node. */ + switch (nodeTag(plan)) + { + case T_BitmapIndexScan: + case T_CteScan: + case T_FunctionScan: + case T_IncrementalSort: + case T_Material: + case T_Sort: + case T_TableFuncScan: + case T_WindowAgg: + plan->workmem_limit = work_mem; + break; + case T_Hash: + case T_Memoize: + case T_SetOp: + plan->workmem_limit = get_hash_memory_limit() / 1024; + break; + case T_Agg: + assign_workmem_agg((Agg *) plan); + break; + case T_RecursiveUnion: + { + RecursiveUnion *runion = (RecursiveUnion *) plan; + + plan->workmem_limit = work_mem; + + if (runion->numCols > 0) + { + /* Also include memory for hash table. */ + runion->hashWorkMemLimit = get_hash_memory_limit() / 1024; + } + + break; + } + default: + Assert(plan->workmem == 0); + plan->workmem_limit = 0; + break; + } + + /* + * Assign working memory to this node's children. (Logic copied from + * ExplainNode().) + */ + if (outerPlan(plan)) + assign_workmem_plan(outerPlan(plan)); + + if (innerPlan(plan)) + assign_workmem_plan(innerPlan(plan)); + + switch (nodeTag(plan)) + { + case T_Append: + foreach_ptr(Plan, child, ((Append *) plan)->appendplans) + assign_workmem_plan(child); + break; + case T_MergeAppend: + foreach_ptr(Plan, child, ((MergeAppend *) plan)->mergeplans) + assign_workmem_plan(child); + break; + case T_BitmapAnd: + foreach_ptr(Plan, child, ((BitmapAnd *) plan)->bitmapplans) + assign_workmem_plan(child); + break; + case T_BitmapOr: + foreach_ptr(Plan, child, ((BitmapOr *) plan)->bitmapplans) + assign_workmem_plan(child); + break; + case T_SubqueryScan: + assign_workmem_plan(((SubqueryScan *) plan)->subplan); + break; + case T_CustomScan: + foreach_ptr(Plan, child, ((CustomScan *) plan)->custom_plans) + assign_workmem_plan(child); + break; + default: + break; + } +} + +static void +assign_workmem_agg(Agg *agg) +{ + bool is_first_sort = true; + + /* Assign working memory to the main Agg node. */ + assign_workmem_agg_node(agg, + true /* is_first */ , + agg->chain == NULL /* is_last */ , + &is_first_sort); + + /* Assign working memory to any other grouping sets. */ + foreach_node(Agg, aggnode, agg->chain) + { + assign_workmem_agg_node(aggnode, + false /* is_first */ , + foreach_current_index(aggnode) == + list_length(agg->chain) - 1 /* is_last */ , + &is_first_sort); + } +} + +static void +assign_workmem_agg_node(Agg *agg, bool is_first, bool is_last, + bool *is_first_sort) +{ + switch (agg->aggstrategy) + { + case AGG_HASHED: + case AGG_MIXED: + + /* + * Because nodeAgg.c will combine all AGG_HASHED nodes into a + * single phase, it's easier to store the hash working-memory + * limit on the first AGG_{HASHED,MIXED} node, and set it to zero + * for all subsequent AGG_HASHED nodes. + */ + agg->plan.workmem_limit = is_first ? + get_hash_memory_limit() / 1024 : 0; + break; + case AGG_SORTED: + + /* + * Also store the sort-output working-memory limit on the first + * AGG_SORTED node, and set it to zero for all subsequent + * AGG_SORTED nodes. + * + * We'll need working-memory to hold the "sort_out" only if this + * isn't the last Agg node (in which case there's no one to sort + * our output). + */ + agg->plan.workmem_limit = *is_first_sort && !is_last ? + work_mem : 0; + + *is_first_sort = false; + break; + default: + break; + } + + /* Also include memory needed to sort the input: */ + if (agg->numSorts > 0) + { + Assert(agg->sortWorkMem > 0); + + agg->sortWorkMemLimit = work_mem; + } +} diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build index 2cea41f8771..4e65974f5f3 100644 --- a/src/backend/executor/meson.build +++ b/src/backend/executor/meson.build @@ -18,6 +18,7 @@ backend_sources += files( 'execScan.c', 'execTuples.c', 'execUtils.c', + 'execWorkmem.c', 'functions.c', 'instrument.c', 'nodeAgg.c', diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index ceb8c8a8039..9e5bcf7ada4 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -258,6 +258,7 @@ #include "executor/execExpr.h" #include "executor/executor.h" #include "executor/nodeAgg.h" +#include "executor/nodeHash.h" #include "lib/hyperloglog.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" @@ -403,7 +404,8 @@ static void find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated); static bool find_cols_walker(Node *node, FindColsContext *context); static void build_hash_tables(AggState *aggstate); -static void build_hash_table(AggState *aggstate, int setno, long nbuckets); +static void build_hash_table(AggState *aggstate, int setno, long nbuckets, + Size hash_mem_limit); static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck); static long hash_choose_num_buckets(double hashentrysize, @@ -411,6 +413,7 @@ static long hash_choose_num_buckets(double hashentrysize, static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, + Size hash_mem_limit, int *log2_npartitions); static void initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable, @@ -431,9 +434,10 @@ static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); -static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, - int used_bits, double input_groups, - double hashentrysize); +static void hashagg_spill_init(HashAggSpill *spill, + LogicalTapeSet *tapeset, int used_bits, + double input_groups, double hashentrysize, + Size hash_mem_limit); static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *inputslot, uint32 hash); static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, @@ -521,6 +525,14 @@ initialize_phase(AggState *aggstate, int newphase) Sort *sortnode = aggstate->phases[newphase + 1].sortnode; PlanState *outerNode = outerPlanState(aggstate); TupleDesc tupDesc = ExecGetResultType(outerNode); + int workmem_limit; + + /* + * Read the sort-output workmem_limit off the first AGG_SORTED node. + * Since phase 0 is always AGG_HASHED, this will always be phase 1. + */ + workmem_limit = aggstate->phases[1].aggnode->plan.workmem_limit; + Assert(workmem_limit > 0); aggstate->sort_out = tuplesort_begin_heap(tupDesc, sortnode->numCols, @@ -528,7 +540,7 @@ initialize_phase(AggState *aggstate, int newphase) sortnode->sortOperators, sortnode->collations, sortnode->nullsFirst, - work_mem, + workmem_limit, NULL, TUPLESORT_NONE); } @@ -577,7 +589,7 @@ fetch_input_tuple(AggState *aggstate) */ static void initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, - AggStatePerGroup pergroupstate) + AggStatePerGroup pergroupstate, size_t workMem) { /* * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. @@ -591,6 +603,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, if (pertrans->sortstates[aggstate->current_set]) tuplesort_end(pertrans->sortstates[aggstate->current_set]); + Assert(workMem > 0); /* * We use a plain Datum sorter when there's a single input column; @@ -606,7 +619,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, pertrans->sortOperators[0], pertrans->sortCollations[0], pertrans->sortNullsFirst[0], - work_mem, NULL, TUPLESORT_NONE); + workMem, NULL, TUPLESORT_NONE); } else pertrans->sortstates[aggstate->current_set] = @@ -616,7 +629,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, pertrans->sortOperators, pertrans->sortCollations, pertrans->sortNullsFirst, - work_mem, NULL, TUPLESORT_NONE); + workMem, NULL, TUPLESORT_NONE); } /* @@ -687,7 +700,8 @@ initialize_aggregates(AggState *aggstate, AggStatePerTrans pertrans = &transstates[transno]; AggStatePerGroup pergroupstate = &pergroup[transno]; - initialize_aggregate(aggstate, pertrans, pergroupstate); + initialize_aggregate(aggstate, pertrans, pergroupstate, + aggstate->phase->aggnode->sortWorkMemLimit); } } } @@ -1498,7 +1512,7 @@ build_hash_tables(AggState *aggstate) } #endif - build_hash_table(aggstate, setno, nbuckets); + build_hash_table(aggstate, setno, nbuckets, memory); } aggstate->hash_ngroups_current = 0; @@ -1508,7 +1522,8 @@ build_hash_tables(AggState *aggstate) * Build a single hashtable for this grouping set. */ static void -build_hash_table(AggState *aggstate, int setno, long nbuckets) +build_hash_table(AggState *aggstate, int setno, long nbuckets, + Size hash_mem_limit) { AggStatePerHash perhash = &aggstate->perhash[setno]; MemoryContext metacxt = aggstate->hash_metacxt; @@ -1537,6 +1552,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets) perhash->aggnode->grpCollations, nbuckets, additionalsize, + hash_mem_limit, metacxt, hashcxt, tmpcxt, @@ -1805,12 +1821,11 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) */ void hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, - Size *mem_limit, uint64 *ngroups_limit, + Size hash_mem_limit, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions) { int npartitions; Size partition_mem; - Size hash_mem_limit = get_hash_memory_limit(); /* if not expected to spill, use all of hash_mem */ if (input_groups * hashentrysize <= hash_mem_limit) @@ -1830,6 +1845,7 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, npartitions = hash_choose_num_partitions(input_groups, hashentrysize, used_bits, + hash_mem_limit, NULL); if (num_partitions != NULL) *num_partitions = npartitions; @@ -1927,7 +1943,8 @@ hash_agg_enter_spill_mode(AggState *aggstate) hashagg_spill_init(spill, aggstate->hash_tapeset, 0, perhash->aggnode->numGroups, - aggstate->hashentrysize); + aggstate->hashentrysize, + (Size) aggstate->ss.ps.plan->workmem_limit * 1024); } } } @@ -2014,9 +2031,9 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory) */ static int hash_choose_num_partitions(double input_groups, double hashentrysize, - int used_bits, int *log2_npartitions) + int used_bits, Size hash_mem_limit, + int *log2_npartitions) { - Size hash_mem_limit = get_hash_memory_limit(); double partition_limit; double mem_wanted; double dpartitions; @@ -2095,7 +2112,8 @@ initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable, AggStatePerTrans pertrans = &aggstate->pertrans[transno]; AggStatePerGroup pergroupstate = &pergroup[transno]; - initialize_aggregate(aggstate, pertrans, pergroupstate); + initialize_aggregate(aggstate, pertrans, pergroupstate, + aggstate->phase->aggnode->sortWorkMemLimit); } } @@ -2156,7 +2174,8 @@ lookup_hash_entries(AggState *aggstate) if (spill->partitions == NULL) hashagg_spill_init(spill, aggstate->hash_tapeset, 0, perhash->aggnode->numGroups, - aggstate->hashentrysize); + aggstate->hashentrysize, + (Size) aggstate->ss.ps.plan->workmem_limit * 1024); hashagg_spill_tuple(aggstate, spill, slot, hash); pergroup[setno] = NULL; @@ -2630,7 +2649,9 @@ agg_refill_hash_table(AggState *aggstate) aggstate->hash_batches = list_delete_last(aggstate->hash_batches); hash_agg_set_limits(aggstate->hashentrysize, batch->input_card, - batch->used_bits, &aggstate->hash_mem_limit, + batch->used_bits, + (Size) aggstate->ss.ps.plan->workmem_limit * 1024, + &aggstate->hash_mem_limit, &aggstate->hash_ngroups_limit, NULL); /* @@ -2718,7 +2739,8 @@ agg_refill_hash_table(AggState *aggstate) */ spill_initialized = true; hashagg_spill_init(&spill, tapeset, batch->used_bits, - batch->input_card, aggstate->hashentrysize); + batch->input_card, aggstate->hashentrysize, + (Size) aggstate->ss.ps.plan->workmem_limit * 1024); } /* no memory for a new group, spill */ hashagg_spill_tuple(aggstate, &spill, spillslot, hash); @@ -2916,13 +2938,15 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate) */ static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, - double input_groups, double hashentrysize) + double input_groups, double hashentrysize, + Size hash_mem_limit) { int npartitions; int partition_bits; npartitions = hash_choose_num_partitions(input_groups, hashentrysize, - used_bits, &partition_bits); + used_bits, hash_mem_limit, + &partition_bits); #ifdef USE_INJECTION_POINTS if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition")) @@ -3649,6 +3673,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) totalGroups += aggstate->perhash[k].aggnode->numGroups; hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0, + (Size) aggstate->ss.ps.plan->workmem_limit * 1024, &aggstate->hash_mem_limit, &aggstate->hash_ngroups_limit, &aggstate->hash_planned_partitions); diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c index 0b32c3a022f..5e006baa88d 100644 --- a/src/backend/executor/nodeBitmapIndexscan.c +++ b/src/backend/executor/nodeBitmapIndexscan.c @@ -91,7 +91,8 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node) else { /* XXX should we use less than work_mem for this? */ - tbm = tbm_create(work_mem * (Size) 1024, + Assert(node->ss.ps.plan->workmem_limit > 0); + tbm = tbm_create((Size) node->ss.ps.plan->workmem_limit * 1024, ((BitmapIndexScan *) node->ss.ps.plan)->isshared ? node->ss.ps.state->es_query_dsa : NULL); } diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c index 231760ec93d..4ba32639f7d 100644 --- a/src/backend/executor/nodeBitmapOr.c +++ b/src/backend/executor/nodeBitmapOr.c @@ -143,7 +143,8 @@ MultiExecBitmapOr(BitmapOrState *node) if (result == NULL) /* first subplan */ { /* XXX should we use less than work_mem for this? */ - result = tbm_create(work_mem * (Size) 1024, + Assert(subnode->plan->workmem_limit > 0); + result = tbm_create((Size) subnode->plan->workmem_limit * 1024, ((BitmapOr *) node->ps.plan)->isshared ? node->ps.state->es_query_dsa : NULL); } diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c index e1675f66b43..2272185dce7 100644 --- a/src/backend/executor/nodeCtescan.c +++ b/src/backend/executor/nodeCtescan.c @@ -232,7 +232,8 @@ ExecInitCteScan(CteScan *node, EState *estate, int eflags) /* I am the leader */ prmdata->value = PointerGetDatum(scanstate); scanstate->leader = scanstate; - scanstate->cte_table = tuplestore_begin_heap(true, false, work_mem); + scanstate->cte_table = + tuplestore_begin_heap(true, false, node->scan.plan.workmem_limit); tuplestore_set_eflags(scanstate->cte_table, scanstate->eflags); scanstate->readptr = 0; } diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c index 644363582d9..bbb93a8dd58 100644 --- a/src/backend/executor/nodeFunctionscan.c +++ b/src/backend/executor/nodeFunctionscan.c @@ -95,6 +95,7 @@ FunctionNext(FunctionScanState *node) node->ss.ps.ps_ExprContext, node->argcontext, node->funcstates[0].tupdesc, + node->ss.ps.plan->workmem_limit, node->eflags & EXEC_FLAG_BACKWARD); /* @@ -154,6 +155,7 @@ FunctionNext(FunctionScanState *node) node->ss.ps.ps_ExprContext, node->argcontext, fs->tupdesc, + node->ss.ps.plan->workmem_limit, node->eflags & EXEC_FLAG_BACKWARD); /* diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 8d2201ab67f..aee3c9ea67c 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -37,6 +37,7 @@ #include "miscadmin.h" #include "port/pg_bitutils.h" #include "utils/dynahash.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -448,6 +449,7 @@ ExecHashTableCreate(HashState *state) Hash *node; HashJoinTable hashtable; Plan *outerNode; + size_t worker_space_allowed; size_t space_allowed; int nbuckets; int nbatch; @@ -471,11 +473,15 @@ ExecHashTableCreate(HashState *state) */ rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; + worker_space_allowed = (size_t) node->plan.workmem_limit * 1024; + Assert(worker_space_allowed > 0); + ExecChooseHashTableSize(rows, outerNode->plan_width, OidIsValid(node->skewTable), state->parallel_state != NULL, state->parallel_state != NULL ? state->parallel_state->nparticipants - 1 : 0, + worker_space_allowed, &space_allowed, &nbuckets, &nbatch, &num_skew_mcvs); @@ -599,6 +605,7 @@ ExecHashTableCreate(HashState *state) { pstate->nbatch = nbatch; pstate->space_allowed = space_allowed; + pstate->worker_space_allowed = worker_space_allowed; pstate->growth = PHJ_GROWTH_OK; /* Set up the shared state for coordinating batches. */ @@ -658,7 +665,8 @@ void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, - size_t *space_allowed, + size_t worker_space_allowed, + size_t *total_space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs) @@ -687,9 +695,9 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, inner_rel_bytes = ntuples * tupsize; /* - * Compute in-memory hashtable size limit from GUCs. + * Caller tells us our (per-worker) in-memory hashtable size limit. */ - hash_table_bytes = get_hash_memory_limit(); + hash_table_bytes = worker_space_allowed; /* * Parallel Hash tries to use the combined hash_mem of all workers to @@ -706,7 +714,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, hash_table_bytes = (size_t) newlimit; } - *space_allowed = hash_table_bytes; + *total_space_allowed = hash_table_bytes; /* * If skew optimization is possible, estimate the number of skew buckets @@ -808,7 +816,8 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, { ExecChooseHashTableSize(ntuples, tupwidth, useskew, false, parallel_workers, - space_allowed, + worker_space_allowed, + total_space_allowed, numbuckets, numbatches, num_skew_mcvs); @@ -929,7 +938,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch /= 2; nbuckets *= 2; - *space_allowed = (*space_allowed) * 2; + *total_space_allowed = (*total_space_allowed) * 2; } Assert(nbuckets > 0); @@ -1235,7 +1244,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) * to switch from one large combined memory budget to the * regular hash_mem budget. */ - pstate->space_allowed = get_hash_memory_limit(); + pstate->space_allowed = pstate->worker_space_allowed; /* * The combined hash_mem of all participants wasn't diff --git a/src/backend/executor/nodeIncrementalSort.c b/src/backend/executor/nodeIncrementalSort.c index 975b0397e7a..503d75e364b 100644 --- a/src/backend/executor/nodeIncrementalSort.c +++ b/src/backend/executor/nodeIncrementalSort.c @@ -312,7 +312,7 @@ switchToPresortedPrefixMode(PlanState *pstate) &(plannode->sort.sortOperators[nPresortedCols]), &(plannode->sort.collations[nPresortedCols]), &(plannode->sort.nullsFirst[nPresortedCols]), - work_mem, + plannode->sort.plan.workmem_limit, NULL, node->bounded ? TUPLESORT_ALLOWBOUNDED : TUPLESORT_NONE); node->prefixsort_state = prefixsort_state; @@ -613,7 +613,7 @@ ExecIncrementalSort(PlanState *pstate) plannode->sort.sortOperators, plannode->sort.collations, plannode->sort.nullsFirst, - work_mem, + plannode->sort.plan.workmem_limit, NULL, node->bounded ? TUPLESORT_ALLOWBOUNDED : diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c index 9798bb75365..10f764c1bd5 100644 --- a/src/backend/executor/nodeMaterial.c +++ b/src/backend/executor/nodeMaterial.c @@ -61,7 +61,8 @@ ExecMaterial(PlanState *pstate) */ if (tuplestorestate == NULL && node->eflags != 0) { - tuplestorestate = tuplestore_begin_heap(true, false, work_mem); + tuplestorestate = + tuplestore_begin_heap(true, false, node->ss.ps.plan->workmem_limit); tuplestore_set_eflags(tuplestorestate, node->eflags); if (node->eflags & EXEC_FLAG_MARK) { diff --git a/src/backend/executor/nodeMemoize.c b/src/backend/executor/nodeMemoize.c index 609deb12afb..a3fc37745ca 100644 --- a/src/backend/executor/nodeMemoize.c +++ b/src/backend/executor/nodeMemoize.c @@ -1036,7 +1036,7 @@ ExecInitMemoize(Memoize *node, EState *estate, int eflags) mstate->mem_used = 0; /* Limit the total memory consumed by the cache to this */ - mstate->mem_limit = get_hash_memory_limit(); + mstate->mem_limit = (Size) node->plan.workmem_limit * 1024; /* A memory context dedicated for the cache */ mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext, diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c index 40f66fd0680..96dc8d53db3 100644 --- a/src/backend/executor/nodeRecursiveunion.c +++ b/src/backend/executor/nodeRecursiveunion.c @@ -52,6 +52,7 @@ build_hash_table(RecursiveUnionState *rustate) node->dupCollations, node->numGroups, 0, + (Size) node->hashWorkMemLimit * 1024, rustate->ps.state->es_query_cxt, rustate->tableContext, rustate->tempContext, @@ -202,8 +203,15 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags) /* initialize processing state */ rustate->recursing = false; rustate->intermediate_empty = true; - rustate->working_table = tuplestore_begin_heap(false, false, work_mem); - rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem); + + /* + * NOTE: each of our working tables gets the same workmem_limit, since + * we're going to swap them repeatedly. + */ + rustate->working_table = + tuplestore_begin_heap(false, false, node->plan.workmem_limit); + rustate->intermediate_table = + tuplestore_begin_heap(false, false, node->plan.workmem_limit); /* * If hashing, we need a per-tuple memory context for comparisons, and a diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c index 5b7ff9c3748..7b71adf05dc 100644 --- a/src/backend/executor/nodeSetOp.c +++ b/src/backend/executor/nodeSetOp.c @@ -105,6 +105,7 @@ build_hash_table(SetOpState *setopstate) node->cmpCollations, node->numGroups, sizeof(SetOpStatePerGroupData), + (Size) node->plan.workmem_limit * 1024, setopstate->ps.state->es_query_cxt, setopstate->tableContext, econtext->ecxt_per_tuple_memory, diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index f603337ecd3..1da77ab1d6a 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -107,7 +107,7 @@ ExecSort(PlanState *pstate) plannode->sortOperators[0], plannode->collations[0], plannode->nullsFirst[0], - work_mem, + plannode->plan.workmem_limit, NULL, tuplesortopts); else @@ -117,7 +117,7 @@ ExecSort(PlanState *pstate) plannode->sortOperators, plannode->collations, plannode->nullsFirst, - work_mem, + plannode->plan.workmem_limit, NULL, tuplesortopts); if (node->bounded) diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c index 49767ed6a52..73214501238 100644 --- a/src/backend/executor/nodeSubplan.c +++ b/src/backend/executor/nodeSubplan.c @@ -546,6 +546,7 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext) node->tab_collations, nbuckets, 0, + (Size) subplan->hashtab_workmem_limit * 1024, node->planstate->state->es_query_cxt, node->hashtablecxt, node->hashtempcxt, @@ -575,6 +576,7 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext) node->tab_collations, nbuckets, 0, + (Size) subplan->hashnul_workmem_limit * 1024, node->planstate->state->es_query_cxt, node->hashtablecxt, node->hashtempcxt, diff --git a/src/backend/executor/nodeTableFuncscan.c b/src/backend/executor/nodeTableFuncscan.c index 83ade3f9437..8a9e534a743 100644 --- a/src/backend/executor/nodeTableFuncscan.c +++ b/src/backend/executor/nodeTableFuncscan.c @@ -276,7 +276,8 @@ tfuncFetchRows(TableFuncScanState *tstate, ExprContext *econtext) /* build tuplestore for the result */ oldcxt = MemoryContextSwitchTo(econtext->ecxt_per_query_memory); - tstate->tupstore = tuplestore_begin_heap(false, false, work_mem); + tstate->tupstore = tuplestore_begin_heap(false, false, + tstate->ss.ps.plan->workmem_limit); /* * Each call to fetch a new set of rows - of which there may be very many diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 9a1acce2b5d..76819d140ba 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -1092,7 +1092,8 @@ prepare_tuplestore(WindowAggState *winstate) Assert(winstate->buffer == NULL); /* Create new tuplestore */ - winstate->buffer = tuplestore_begin_heap(false, false, work_mem); + winstate->buffer = tuplestore_begin_heap(false, false, + node->plan.workmem_limit); /* * Set up read pointers for the tuplestore. The current pointer doesn't diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 73d78617009..04360f45760 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -2802,7 +2802,8 @@ cost_agg(Path *path, PlannerInfo *root, hashentrysize = hash_agg_entry_size(list_length(root->aggtransinfos), input_width, aggcosts->transitionSpace); - hash_agg_set_limits(hashentrysize, numGroups, 0, &mem_limit, + hash_agg_set_limits(hashentrysize, numGroups, 0, + get_hash_memory_limit(), &mem_limit, &ngroups_limit, &num_partitions); nbatches = Max((numGroups * hashentrysize) / mem_limit, @@ -4224,6 +4225,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, true, /* useskew */ parallel_hash, /* try_combined_hash_mem */ outer_path->parallel_workers, + get_hash_memory_limit(), &space_allowed, &numbuckets, &numbatches, diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index d12e3f451d2..c4147876d55 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -140,6 +140,7 @@ extern TupleHashTable BuildTupleHashTable(PlanState *parent, Oid *collations, long nbuckets, Size additionalsize, + Size hash_mem_limit, MemoryContext metacxt, MemoryContext tablecxt, MemoryContext tempcxt, @@ -499,6 +500,7 @@ extern Tuplestorestate *ExecMakeTableFunctionResult(SetExprState *setexpr, ExprContext *econtext, MemoryContext argContext, TupleDesc expectedDesc, + int workMem, bool randomAccess); extern SetExprState *ExecInitFunctionResultSet(Expr *expr, ExprContext *econtext, PlanState *parent); @@ -724,4 +726,9 @@ extern ResultRelInfo *ExecLookupResultRelByOid(ModifyTableState *node, bool missing_ok, bool update_cache); +/* + * prototypes from functions in execWorkmem.c + */ +extern void ExecAssignWorkMem(PlannedStmt *plannedstmt); + #endif /* EXECUTOR_H */ diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index ecff4842fd3..9b184c47322 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -253,7 +253,8 @@ typedef struct ParallelHashJoinState ParallelHashGrowth growth; /* control batch/bucket growth */ dsa_pointer chunk_work_queue; /* chunk work queue */ int nparticipants; - size_t space_allowed; + size_t space_allowed; /* -- might be shared with other workers */ + size_t worker_space_allowed; /* -- exclusive to this worker */ size_t total_tuples; /* total number of inner tuples */ LWLock lock; /* lock protecting the above */ diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index 34b82d0f5d1..728006b3ff5 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -329,8 +329,9 @@ extern void ExecReScanAgg(AggState *node); extern Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace); extern void hash_agg_set_limits(double hashentrysize, double input_groups, - int used_bits, Size *mem_limit, - uint64 *ngroups_limit, int *num_partitions); + int used_bits, Size hash_mem_limit, + Size *mem_limit, uint64 *ngroups_limit, + int *num_partitions); /* parallel instrumentation support */ extern void ExecAggEstimate(AggState *node, ParallelContext *pcxt); diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 3c1a09415aa..e4e9e0d1de1 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -59,7 +59,8 @@ extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, - size_t *space_allowed, + size_t worker_space_allowed, + size_t *total_space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs); diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 39471466a9a..396f7881420 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -168,6 +168,9 @@ typedef struct Plan /* total cost (assuming all tuples fetched) */ Cost total_cost; + /* (runtime) working memory limit (in KB) */ + int workmem_limit; + /* * planner's estimate of result size of this plan step */ @@ -235,7 +238,7 @@ typedef struct Plan /* ---------------- * Result node - - * If no outer plan, evaluate a variable-free targetlist. +z * If no outer plan, evaluate a variable-free targetlist. * If outer plan, return tuples from outer plan (after a level of * projection as shown by targetlist). * @@ -428,6 +431,9 @@ typedef struct RecursiveUnion /* estimated number of groups in input */ long numGroups; + + /* work_mem reserved for hash table */ + int hashWorkMemLimit; } RecursiveUnion; /* ---------------- @@ -1147,6 +1153,12 @@ typedef struct Agg Oid *grpOperators pg_node_attr(array_size(numCols)); Oid *grpCollations pg_node_attr(array_size(numCols)); + /* number of inputs that require sorting */ + int numSorts; + + /* work_mem limit to sort one input (in KB) */ + int sortWorkMemLimit; + /* estimated number of groups in input */ long numGroups; diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index d0576da3e25..b932168237c 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -1109,6 +1109,9 @@ typedef struct SubPlan /* Estimated execution costs: */ Cost startup_cost; /* one-time setup cost */ Cost per_call_cost; /* cost for each subplan evaluation */ + /* (Runtime) working-memory limits (in KB): */ + int hashtab_workmem_limit; /* limit for hashtable */ + int hashnul_workmem_limit; /* limit for hashnulls */ } SubPlan; /* -- 2.47.1