From 13df98b6939852b8dd18be7adc5702c6ba38e1fb Mon Sep 17 00:00:00 2001 From: James Hunter Date: Fri, 21 Feb 2025 00:41:31 +0000 Subject: [PATCH 4/4] Add "workmem_hook" to allow extensions to override per-node work_mem --- contrib/Makefile | 3 +- contrib/workmem/Makefile | 20 + contrib/workmem/expected/workmem.out | 676 +++++++++++++++++++++++++++ contrib/workmem/meson.build | 28 ++ contrib/workmem/sql/workmem.sql | 304 ++++++++++++ contrib/workmem/workmem.c | 654 ++++++++++++++++++++++++++ src/backend/executor/execWorkmem.c | 37 +- src/include/executor/executor.h | 4 + 8 files changed, 1716 insertions(+), 10 deletions(-) create mode 100644 contrib/workmem/Makefile create mode 100644 contrib/workmem/expected/workmem.out create mode 100644 contrib/workmem/meson.build create mode 100644 contrib/workmem/sql/workmem.sql create mode 100644 contrib/workmem/workmem.c diff --git a/contrib/Makefile b/contrib/Makefile index 952855d9b61..b4880ab7067 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -50,7 +50,8 @@ SUBDIRS = \ tsm_system_rows \ tsm_system_time \ unaccent \ - vacuumlo + vacuumlo \ + workmem ifeq ($(with_ssl),openssl) SUBDIRS += pgcrypto sslinfo diff --git a/contrib/workmem/Makefile b/contrib/workmem/Makefile new file mode 100644 index 00000000000..f920cdb9964 --- /dev/null +++ b/contrib/workmem/Makefile @@ -0,0 +1,20 @@ +# contrib/workmem/Makefile + +MODULE_big = workmem +OBJS = \ + $(WIN32RES) \ + workmem.o +PGFILEDESC = "workmem - extension that adjusts PostgreSQL work_mem per node" + +REGRESS = workmem + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/workmem +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/workmem/expected/workmem.out b/contrib/workmem/expected/workmem.out new file mode 100644 index 00000000000..a2c6d3be4d2 --- /dev/null +++ b/contrib/workmem/expected/workmem.out @@ -0,0 +1,676 @@ +load 'workmem'; +-- Note: Function derived from file explain.sql. We can't use that other +-- function, since we're run in parallel with explain.sql. +create or replace function workmem_filter(text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in execute $1 + loop + -- Mask out work_mem estimate, since it might be brittle + ln := regexp_replace(ln, '\mwork_mem=\d+\M', 'work_mem=N', 'g'); + ln := regexp_replace(ln, '\mMemory: \d+\M', 'Memory: N', 'g'); + return next ln; + end loop; +end; +$$; +--==== +-- Test suite 1: default workmem.query_work_mem (= 100 MB) +--==== +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB limit=25600 kB) + -> Sort (work_mem=N kB limit=25600 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> HashAggregate (work_mem=N kB limit=51200 kB) + Hash Key: "*VALUES*".column1, "*VALUES*".column2 + Hash Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory: N kB + Total Working Memory Limit: 102400 kB +(10 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 +(4 rows) + +-- Grouping Sets (Sort) +set enable_hashagg = off; +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB limit=20480 kB) + -> Sort (work_mem=N kB limit=20480 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> GroupAggregate (work_mem=N kB limit=40960 kB) + Group Key: "*VALUES*".column1, "*VALUES*".column2 + Group Key: "*VALUES*".column1 + Sort Key: "*VALUES*".column2 + Group Key: "*VALUES*".column2 + Sort Key: "*VALUES*".column3 + Group Key: "*VALUES*".column3 + Sort Key: "*VALUES*".column4 + Group Key: "*VALUES*".column4 + -> Sort (work_mem=N kB limit=20480 kB) + Sort Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory: N kB + Total Working Memory Limit: 102400 kB +(18 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | 1 | 9 + | 2 | 10 +(10 rows) + +reset enable_hashagg; +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + workmem_filter +--------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series a (work_mem=N kB limit=102400 kB) + Total Working Memory: N kB + Total Working Memory Limit: 102400 kB +(4 rows) + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + count +------- + 2000 +(1 row) + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + workmem_filter +------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series (work_mem=N kB limit=102399 kB) + Total Working Memory: N kB + Total Working Memory Limit: 102399 kB +(4 rows) + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + count +------- + 12 +(1 row) + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + workmem_filter +-------------------------------------------------------------------------------------- + Limit + -> WindowAgg (work_mem=N kB limit=34133 kB) + -> Sort (work_mem=N kB limit=34133 kB) + Sort Key: ((a.n < 3)) + -> Function Scan on generate_series a (work_mem=N kB limit=34134 kB) + Total Working Memory: N kB + Total Working Memory Limit: 102400 kB +(7 rows) + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + sum +--------- + 2000997 + 2000997 + 2000997 + 2000997 + 2000997 +(5 rows) + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + workmem_filter +----------------------------------------- + Result (work_mem=N kB limit=102400 kB) + SubPlan 1 + -> Append + -> Result + -> Result + Total Working Memory: N kB + Total Working Memory Limit: 102400 kB +(7 rows) + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + ?column? +---------- + f +(1 row) + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + workmem_filter +---------------------------------------------------------------- + Result (work_mem=N kB limit=68267 kB) + SubPlan 3 + -> Result (work_mem=N kB limit=34133 kB) + One-Time Filter: (ANY (1 = (hashed SubPlan 2).col1)) + InitPlan 1 + -> Result + SubPlan 2 + -> Result + Total Working Memory: N kB + Total Working Memory Limit: 102400 kB +(10 rows) + +select 1 = any (select (select 1) where 1 = any (select 1)); + ?column? +---------- + t +(1 row) + +--==== +-- Test suite 2: set workmem.query_work_mem to 4 MB +--==== +set workmem.query_work_mem = 4096; +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB limit=1024 kB) + -> Sort (work_mem=N kB limit=1024 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> HashAggregate (work_mem=N kB limit=2048 kB) + Hash Key: "*VALUES*".column1, "*VALUES*".column2 + Hash Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory: N kB + Total Working Memory Limit: 4096 kB +(10 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 +(4 rows) + +-- Grouping Sets (Sort) +set enable_hashagg = off; +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB limit=819 kB) + -> Sort (work_mem=N kB limit=819 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> GroupAggregate (work_mem=N kB limit=1638 kB) + Group Key: "*VALUES*".column1, "*VALUES*".column2 + Group Key: "*VALUES*".column1 + Sort Key: "*VALUES*".column2 + Group Key: "*VALUES*".column2 + Sort Key: "*VALUES*".column3 + Group Key: "*VALUES*".column3 + Sort Key: "*VALUES*".column4 + Group Key: "*VALUES*".column4 + -> Sort (work_mem=N kB limit=820 kB) + Sort Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory: N kB + Total Working Memory Limit: 4096 kB +(18 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | 1 | 9 + | 2 | 10 +(10 rows) + +reset enable_hashagg; +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + workmem_filter +------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series a (work_mem=N kB limit=4096 kB) + Total Working Memory: N kB + Total Working Memory Limit: 4096 kB +(4 rows) + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + count +------- + 2000 +(1 row) + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + workmem_filter +----------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series (work_mem=N kB limit=4095 kB) + Total Working Memory: N kB + Total Working Memory Limit: 4095 kB +(4 rows) + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + count +------- + 12 +(1 row) + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + workmem_filter +------------------------------------------------------------------------------------- + Limit + -> WindowAgg (work_mem=N kB limit=1365 kB) + -> Sort (work_mem=N kB limit=1365 kB) + Sort Key: ((a.n < 3)) + -> Function Scan on generate_series a (work_mem=N kB limit=1366 kB) + Total Working Memory: N kB + Total Working Memory Limit: 4096 kB +(7 rows) + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + sum +--------- + 2000997 + 2000997 + 2000997 + 2000997 + 2000997 +(5 rows) + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + workmem_filter +--------------------------------------- + Result (work_mem=N kB limit=4096 kB) + SubPlan 1 + -> Append + -> Result + -> Result + Total Working Memory: N kB + Total Working Memory Limit: 4096 kB +(7 rows) + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + ?column? +---------- + f +(1 row) + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + workmem_filter +---------------------------------------------------------------- + Result (work_mem=N kB limit=2731 kB) + SubPlan 3 + -> Result (work_mem=N kB limit=1365 kB) + One-Time Filter: (ANY (1 = (hashed SubPlan 2).col1)) + InitPlan 1 + -> Result + SubPlan 2 + -> Result + Total Working Memory: N kB + Total Working Memory Limit: 4096 kB +(10 rows) + +select 1 = any (select (select 1) where 1 = any (select 1)); + ?column? +---------- + t +(1 row) + +reset workmem.query_work_mem; +--==== +-- Test suite 3: set workmem.query_work_mem to 80 KB +--==== +set workmem.query_work_mem = 80; +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB limit=20 kB) + -> Sort (work_mem=N kB limit=20 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> HashAggregate (work_mem=N kB limit=40 kB) + Hash Key: "*VALUES*".column1, "*VALUES*".column2 + Hash Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory: N kB + Total Working Memory Limit: 80 kB +(10 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 +(4 rows) + +-- Grouping Sets (Sort) +set enable_hashagg = off; +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB limit=16 kB) + -> Sort (work_mem=N kB limit=16 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> GroupAggregate (work_mem=N kB limit=32 kB) + Group Key: "*VALUES*".column1, "*VALUES*".column2 + Group Key: "*VALUES*".column1 + Sort Key: "*VALUES*".column2 + Group Key: "*VALUES*".column2 + Sort Key: "*VALUES*".column3 + Group Key: "*VALUES*".column3 + Sort Key: "*VALUES*".column4 + Group Key: "*VALUES*".column4 + -> Sort (work_mem=N kB limit=16 kB) + Sort Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory: N kB + Total Working Memory Limit: 80 kB +(18 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | 1 | 9 + | 2 | 10 +(10 rows) + +reset enable_hashagg; +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + workmem_filter +----------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series a (work_mem=N kB limit=80 kB) + Total Working Memory: N kB + Total Working Memory Limit: 80 kB +(4 rows) + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + count +------- + 2000 +(1 row) + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +--------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series (work_mem=N kB limit=78 kB) + Total Working Memory: N kB + Total Working Memory Limit: 78 kB +(4 rows) + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + count +------- + 12 +(1 row) + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +----------------------------------------------------------------------------------- + Limit + -> WindowAgg (work_mem=N kB limit=26 kB) + -> Sort (work_mem=N kB limit=27 kB) + Sort Key: ((a.n < 3)) + -> Function Scan on generate_series a (work_mem=N kB limit=27 kB) + Total Working Memory: N kB + Total Working Memory Limit: 80 kB +(7 rows) + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +WARNING: not enough working memory for query: increase workmem.query_work_mem + sum +--------- + 2000997 + 2000997 + 2000997 + 2000997 + 2000997 +(5 rows) + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +------------------------------------- + Result (work_mem=N kB limit=80 kB) + SubPlan 1 + -> Append + -> Result + -> Result + Total Working Memory: N kB + Total Working Memory Limit: 80 kB +(7 rows) + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); +WARNING: not enough working memory for query: increase workmem.query_work_mem + ?column? +---------- + f +(1 row) + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +---------------------------------------------------------------- + Result (work_mem=N kB limit=54 kB) + SubPlan 3 + -> Result (work_mem=N kB limit=26 kB) + One-Time Filter: (ANY (1 = (hashed SubPlan 2).col1)) + InitPlan 1 + -> Result + SubPlan 2 + -> Result + Total Working Memory: N kB + Total Working Memory Limit: 80 kB +(10 rows) + +select 1 = any (select (select 1) where 1 = any (select 1)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + ?column? +---------- + t +(1 row) + +reset workmem.query_work_mem; diff --git a/contrib/workmem/meson.build b/contrib/workmem/meson.build new file mode 100644 index 00000000000..fce8030ba45 --- /dev/null +++ b/contrib/workmem/meson.build @@ -0,0 +1,28 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +workmem_sources = files( + 'workmem.c', +) + +if host_system == 'windows' + workmem_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'workmem', + '--FILEDESC', 'workmem - extension that adjusts PostgreSQL work_mem per node',]) +endif + +workmem = shared_module('workmem', + workmem_sources, + kwargs: contrib_mod_args, +) +contrib_targets += workmem + +tests += { + 'name': 'workmem', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'workmem', + ], + }, +} diff --git a/contrib/workmem/sql/workmem.sql b/contrib/workmem/sql/workmem.sql new file mode 100644 index 00000000000..e6dbc35bf10 --- /dev/null +++ b/contrib/workmem/sql/workmem.sql @@ -0,0 +1,304 @@ +load 'workmem'; + +-- Note: Function derived from file explain.sql. We can't use that other +-- function, since we're run in parallel with explain.sql. +create or replace function workmem_filter(text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in execute $1 + loop + -- Mask out work_mem estimate, since it might be brittle + ln := regexp_replace(ln, '\mwork_mem=\d+\M', 'work_mem=N', 'g'); + ln := regexp_replace(ln, '\mMemory: \d+\M', 'Memory: N', 'g'); + return next ln; + end loop; +end; +$$; + +--==== +-- Test suite 1: default workmem.query_work_mem (= 100 MB) +--==== + +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- + +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + +-- Grouping Sets (Sort) +set enable_hashagg = off; + +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + +reset enable_hashagg; + +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + +select 1 = any (select (select 1) where 1 = any (select 1)); + +--==== +-- Test suite 2: set workmem.query_work_mem to 4 MB +--==== +set workmem.query_work_mem = 4096; + +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- + +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + +-- Grouping Sets (Sort) +set enable_hashagg = off; + +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + +reset enable_hashagg; + +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + +select 1 = any (select (select 1) where 1 = any (select 1)); + +reset workmem.query_work_mem; + +--==== +-- Test suite 3: set workmem.query_work_mem to 80 KB +--==== +set workmem.query_work_mem = 80; + +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- + +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + +-- Grouping Sets (Sort) +set enable_hashagg = off; + +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + +reset enable_hashagg; + +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + +select 1 = any (select (select 1) where 1 = any (select 1)); + +reset workmem.query_work_mem; diff --git a/contrib/workmem/workmem.c b/contrib/workmem/workmem.c new file mode 100644 index 00000000000..c758e49c162 --- /dev/null +++ b/contrib/workmem/workmem.c @@ -0,0 +1,654 @@ +/*------------------------------------------------------------------------- + * + * workmem.c + * + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/workmem/workmem.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "executor/executor.h" +#include "miscadmin.h" +#include "utils/guc.h" + +PG_MODULE_MAGIC; + +/* Local variables */ + +/* + * A Target represents a collection of data structures, belonging to an + * execution node, that all share the same memory limit. + * + * For example, in parallel query, every parallel worker (plus the leader) + * gets a copy of the execution node, and therefore a copy of all of that + * node's work_mem limits. In this case, we'll track a single Target, but its + * count will include (1 + num_workers), because this Target gets "applied" + * to (1 + num_workers) execution nodes. + */ +typedef struct Target +{ + /* # of data structures to which target applies: */ + int count; + /* workmem estimate for each of these data structures: */ + int workmem; + /* (original) workmem limit for each of these data structures: */ + int limit; + /* workmem estimate, but capped at (original) workmem limit: */ + int priority; + /* ratio of (priority / limit); measure's Target's "greediness": */ + double ratio; + /* link to target's actual limit, so we can set it: */ + int *target_limit; +} Target; + +typedef struct WorkMemStats +{ + /* total # of data structures that get working memory: */ + uint64 count; + /* total working memory estimated for this query: */ + uint64 workmem; + /* total working memory (currently) reserved for this query: */ + uint64 limit; + /* total "capped" working memory estimate: */ + uint64 priority; + /* list of Targets, used to update actual workmem limits: */ + List *targets; +} WorkMemStats; + +/* GUC variables */ +static int workmem_query_work_mem = 100 * 1024; /* kB */ + +/* internal functions */ +static void workmem_fn(PlannedStmt *plannedstmt); + +static int clamp_priority(int workmem, int limit); +static Target * make_target(int workmem, int *target_limit, int count); +static void add_target(WorkMemStats * workmem_stats, Target * target); + +/* Sort comparators: sort by ratio, ascending or descending. */ +static int target_compare_asc(const ListCell *a, const ListCell *b); +static int target_compare_desc(const ListCell *a, const ListCell *b); + +/* + * Module load callback + */ +void +_PG_init(void) +{ + /* Define custom GUC variable. */ + DefineCustomIntVariable("workmem.query_work_mem", + "Amount of working-memory (in kB) to provide each " + "query.", + NULL, + &workmem_query_work_mem, + 100 * 1024, /* default to 100 MB */ + 64, + INT_MAX, + PGC_USERSET, + GUC_UNIT_KB, + NULL, + NULL, + NULL); + + MarkGUCPrefixReserved("workmem"); + + /* Install hooks. */ + ExecAssignWorkMem_hook = workmem_fn; +} + +/* Compute an Agg's working memory estimate and limit. */ +typedef struct AggWorkMem +{ + uint64 hash_workmem; + int *hash_limit; + + int num_sorts; + int max_sort_workmem; + int *sort_limit; +} AggWorkMem; + +static void +workmem_analyze_agg_node(Agg *agg, AggWorkMem * mem, + WorkMemStats * workmem_stats) +{ + if (agg->sortWorkMem > 0 || agg->sortWorkMemLimit > 0) + { + /* Record memory used for input sort buffers. */ + Target *target = make_target(agg->sortWorkMem, + &agg->sortWorkMemLimit, + agg->numSorts); + + add_target(workmem_stats, target); + } + + switch (agg->aggstrategy) + { + case AGG_HASHED: + case AGG_MIXED: + + mem->hash_workmem += agg->plan.workmem; + + /* Read hash limit from the first AGG_HASHED node. */ + if (mem->hash_limit == NULL) + mem->hash_limit = &agg->plan.workmem_limit; + + break; + case AGG_SORTED: + + ++mem->num_sorts; + + mem->max_sort_workmem = Max(mem->max_sort_workmem, agg->plan.workmem); + + /* Read sort limit from the first AGG_SORTED node. */ + if (mem->sort_limit == NULL) + mem->sort_limit = &agg->plan.workmem_limit; + + break; + default: + break; + } +} + +static void +workmem_analyze_agg(Agg *agg, int num_workers, WorkMemStats * workmem_stats) +{ + AggWorkMem mem; + + memset(&mem, 0, sizeof(mem)); + + /* Analyze main Agg node. */ + workmem_analyze_agg_node(agg, &mem, workmem_stats); + + /* Also include the chain of GROUPING SETS aggs. */ + foreach_node(Agg, aggnode, agg->chain) + workmem_analyze_agg_node(aggnode, &mem, workmem_stats); + + /* + * Working memory for hash tables, if needed. All hash tables share the + * same limit: + */ + if (mem.hash_workmem > 0 || mem.hash_limit != NULL) + { + Target *target = + make_target(mem.hash_workmem, mem.hash_limit, + 1 + num_workers); + + add_target(workmem_stats, target); + } + + /* + * Workimg memory for (output) sort buffers, if needed. We'll need at most + * 2 sort buffers: + */ + if (mem.max_sort_workmem > 0 || mem.sort_limit != NULL) + { + Target *target = + make_target(mem.max_sort_workmem, mem.sort_limit, + Min(mem.num_sorts, 2) * (1 + num_workers)); + + add_target(workmem_stats, target); + } +} + +static void +workmem_analyze_subplan(SubPlan *subplan, int num_workers, + WorkMemStats * workmem_stats) +{ + if (subplan->hashtab_workmem > 0 || subplan->hashtab_workmem_limit > 0) + { + /* working memory for SubPlan's hash table */ + Target *target = make_target(subplan->hashtab_workmem, + &subplan->hashtab_workmem_limit, + 1 + num_workers); + + add_target(workmem_stats, target); + } + + if (subplan->hashnul_workmem > 0 || subplan->hashnul_workmem_limit > 0) + { + /* working memory for SubPlan's hash-NULL table */ + Target *target = make_target(subplan->hashnul_workmem, + &subplan->hashnul_workmem_limit, + 1 + num_workers); + + add_target(workmem_stats, target); + } +} + +static void +workmem_analyze_plan(Plan *plan, int num_workers, WorkMemStats * workmem_stats) +{ + /* Make sure there's enough stack available. */ + check_stack_depth(); + + /* Analyze this node's SubPlans. */ + foreach_node(SubPlan, subplan, plan->initPlan) + workmem_analyze_subplan(subplan, num_workers, workmem_stats); + + if (IsA(plan, Gather) || IsA(plan, GatherMerge)) + { + /* + * Parallel query apparently does not run InitPlans in parallel. Well, + * currently, Gather and GatherMerge Plan nodes don't contain any + * quals, so they can't contain SubPlans at all; so maybe we should + * move this below the SubPlan-analysis loop, as well? For now, to + * maintain consistency with explain.c, we'll just leave this here. + */ + Assert(num_workers == 0); + + if (IsA(plan, Gather)) + num_workers = ((Gather *) plan)->num_workers; + else + num_workers = ((GatherMerge *) plan)->num_workers; + } + + foreach_node(SubPlan, subplan, plan->subPlan) + workmem_analyze_subplan(subplan, num_workers, workmem_stats); + + /* Analyze this node's working memory. */ + switch (nodeTag(plan)) + { + case T_BitmapIndexScan: + case T_CteScan: + case T_Material: + case T_Sort: + case T_TableFuncScan: + case T_WindowAgg: + case T_Hash: + case T_Memoize: + case T_SetOp: + if (plan->workmem > 0 || plan->workmem_limit > 0) + { + Target *target = make_target(plan->workmem, + &plan->workmem_limit, + 1 + num_workers); + + add_target(workmem_stats, target); + } + break; + case T_Agg: + workmem_analyze_agg((Agg *) plan, num_workers, workmem_stats); + break; + case T_FunctionScan: + if (plan->workmem > 0 || plan->workmem_limit > 0) + { + int nfuncs = + list_length(((FunctionScan *) plan)->functions); + Target *target = make_target(plan->workmem, + &plan->workmem_limit, + nfuncs * (1 + num_workers)); + + add_target(workmem_stats, target); + } + break; + case T_IncrementalSort: + if (plan->workmem > 0 || plan->workmem_limit > 0) + { + Target *target = make_target(plan->workmem, + &plan->workmem_limit, + 2 * (1 + num_workers)); + + add_target(workmem_stats, target); + } + break; + case T_RecursiveUnion: + { + RecursiveUnion *runion = (RecursiveUnion *) plan; + Target *target; + + /* working memory for two tuplestores */ + target = make_target(plan->workmem, &plan->workmem_limit, + 2 * (1 + num_workers)); + add_target(workmem_stats, target); + + /* working memory for a hash table, if needed */ + if (runion->hashWorkMem > 0 || runion->hashWorkMemLimit > 0) + { + target = make_target(runion->hashWorkMem, + &runion->hashWorkMem, + 1 + num_workers); + add_target(workmem_stats, target); + } + } + break; + default: + Assert(plan->workmem == 0); + Assert(plan->workmem_limit == 0); + break; + } + + /* Now analyze this Plan's children. */ + if (outerPlan(plan)) + workmem_analyze_plan(outerPlan(plan), num_workers, workmem_stats); + + if (innerPlan(plan)) + workmem_analyze_plan(innerPlan(plan), num_workers, workmem_stats); + + switch (nodeTag(plan)) + { + case T_Append: + foreach_ptr(Plan, child, ((Append *) plan)->appendplans) + workmem_analyze_plan(child, num_workers, workmem_stats); + break; + case T_MergeAppend: + foreach_ptr(Plan, child, ((MergeAppend *) plan)->mergeplans) + workmem_analyze_plan(child, num_workers, workmem_stats); + break; + case T_BitmapAnd: + foreach_ptr(Plan, child, ((BitmapAnd *) plan)->bitmapplans) + workmem_analyze_plan(child, num_workers, workmem_stats); + break; + case T_BitmapOr: + foreach_ptr(Plan, child, ((BitmapOr *) plan)->bitmapplans) + workmem_analyze_plan(child, num_workers, workmem_stats); + break; + case T_SubqueryScan: + workmem_analyze_plan(((SubqueryScan *) plan)->subplan, + num_workers, workmem_stats); + break; + case T_CustomScan: + foreach_ptr(Plan, child, ((CustomScan *) plan)->custom_plans) + workmem_analyze_plan(child, num_workers, workmem_stats); + break; + default: + break; + } +} + +static void +workmem_analyze(PlannedStmt *plannedstmt, WorkMemStats * workmem_stats) +{ + /* Analyze the Plans referred to by SubPlan objects. */ + foreach_ptr(Plan, plan, plannedstmt->subplans) + { + if (plan) + workmem_analyze_plan(plan, 0 /* num_workers */ , workmem_stats); + } + + /* Analyze the main Plan tree itself. */ + workmem_analyze_plan(plannedstmt->planTree, 0 /* num_workers */ , + workmem_stats); +} + +static void +workmem_set(PlannedStmt *plannedstmt, WorkMemStats * workmem_stats) +{ + int remaining = workmem_query_work_mem; + + if (workmem_stats->limit <= remaining) + { + /* + * "High memory" case: we have more than enough query_work_mem; now + * hand out the excess. + */ + + /* This is memory that exceeds workmem limits. */ + remaining -= workmem_stats->limit; + + /* + * Sort targets from highest ratio to lowest. When we assign memory to + * a Target, we'll truncate fractional KB; so by going through the + * list from highest to lowest ratio, we ensure that the lowest ratios + * get the leftover fractional KBs. + */ + list_sort(workmem_stats->targets, target_compare_desc); + + foreach_ptr(Target, target, workmem_stats->targets) + { + double fraction; + int extra_workmem; + + /* How much extra work mem should we assign to this target? */ + fraction = (double) target->workmem / workmem_stats->workmem; + + /* NOTE: This is extra workmem *per data structure*. */ + extra_workmem = (int) (fraction * remaining); + + *target->target_limit += extra_workmem; + + /* OK, we've handled this target. */ + workmem_stats->workmem -= (target->workmem * target->count); + remaining -= (extra_workmem * target->count); + } + } + else if (workmem_stats->priority <= remaining) + { + /* + * "Medium memory" case: we don't have enough query_work_mem to give + * every target its full allotment, but we do have enough to give it + * as much as (we estimate) it needs. + * + * So, just take some memory away from nodes that (we estimate) won't + * need it. + */ + + /* This is memory that exceeds workmem estimates. */ + remaining -= workmem_stats->priority; + + /* + * Sort targets from highest ratio to lowest. We'll skip any Target + * with ratio > 1.0, because (we estimate) they already need their + * full allotment. Also, once a target reaches its workmem limit, + * we'll stop giving it more workmem, leaving the surplus memory to be + * assigned to targets with smaller ratios. + */ + list_sort(workmem_stats->targets, target_compare_desc); + + foreach_ptr(Target, target, workmem_stats->targets) + { + double fraction; + int extra_workmem; + + /* How much extra work mem should we assign to this target? */ + fraction = (double) target->workmem / workmem_stats->workmem; + + /* + * Don't give the target more than its (original) limit. + * + * NOTE: This is extra workmem *per data structure*. + */ + extra_workmem = Min((int) (fraction * remaining), + target->limit - target->priority); + + *target->target_limit = target->priority + extra_workmem; + + /* OK, we've handled this target. */ + workmem_stats->workmem -= (target->workmem * target->count); + remaining -= (extra_workmem * target->count); + } + } + else + { + uint64 limit = workmem_stats->limit; + + /* + * "Low memory" case: we are severely memory constrained, and need to + * take "priority" memory away from targets that (we estimate) + * actually need it. We'll do this by (effectively) reducing the + * global "work_mem" limit, uniformly, for all targets, until we're + * under the query_work_mem limit. + */ + elog(WARNING, + "not enough working memory for query: increase " + "workmem.query_work_mem"); + + /* + * Sort targets from lowest ratio to highest. For any target whose + * ratio is < the target_ratio, we'll just assign it its priority (= + * workmem) as limit, and return the excess workmem to our "limit", + * for use by subsequent, greedier, targets. + */ + list_sort(workmem_stats->targets, target_compare_asc); + + foreach_ptr(Target, target, workmem_stats->targets) + { + double target_ratio; + int target_limit; + + /* + * If we restrict our targets to this ratio, we'll stay within the + * query_work_mem limit. + */ + target_ratio = (double) remaining / limit; + + /* + * Don't give this target more than its priority request (but we + * might give it less). + */ + target_limit = Min(target->priority, + target_ratio * target->limit); + *target->target_limit = target_limit; + + /* "Remaining" decreases by memory we actually assigned. */ + remaining -= (target_limit * target->count); + + /* + * "Limit" decreases by target's original memory limit. + * + * If target_limit <= target->priority, so we restricted this + * target to less memory than (we estimate) it needs, then the + * target_ratio will stay the same, since, letting A = remaining, + * B = limit, and R = ratio, we'll have: + * + * R=A/B <=> A=R*B <=> A-R*X = R*B - R*X <=> A-R*X = R * (B-X) <=> + * R = (A-R*X) / (B-X) + * + * -- which is what we wanted to prove. + * + * And if target_limit > target->priority, so we didn't need to + * restrict this target beyond its priority estimate, then the + * target_ratio will increase. This means more memory for the + * remaining, greedier, targets. + */ + limit -= (target->limit * target->count); + + target_ratio = (double) remaining / limit; + } + } +} + +/* + * workmem_fn: updates the query plan's work_mem based on query_work_mem + */ +static void +workmem_fn(PlannedStmt *plannedstmt) +{ + WorkMemStats workmem_stats; + MemoryContext context, + oldcontext; + + /* + * We already assigned working-memory limits on the leader, and those + * limits were sent to the workers inside the serialized Plan. + */ + if (IsParallelWorker()) + return; + + if (workmem_query_work_mem == -1) + return; /* disabled */ + + /* + * Start by assigning default working memory to all of this query's Plan + * nodes. + */ + standard_ExecAssignWorkMem(plannedstmt); + + memset(&workmem_stats, 0, sizeof(workmem_stats)); + + /* + * Set up our own memory context, so we can drop the metadata we generate, + * all at once. + */ + context = AllocSetContextCreate(CurrentMemoryContext, + "workmem_fn context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + + /* Figure out how much total working memory this query wants/needs. */ + workmem_analyze(plannedstmt, &workmem_stats); + + /* Now restrict the query to workmem.query_work_mem. */ + workmem_set(plannedstmt, &workmem_stats); + + MemoryContextSwitchTo(oldcontext); + + /* Drop all our metadata. */ + MemoryContextDelete(context); +} + +static int +clamp_priority(int workmem, int limit) +{ + return Min(workmem, limit); +} + +static Target * +make_target(int workmem, int *target_limit, int count) +{ + Target *result = palloc_object(Target); + + result->count = count; + result->workmem = workmem; + result->limit = *target_limit; + result->priority = clamp_priority(result->workmem, result->limit); + result->ratio = (double) result->priority / result->limit; + result->target_limit = target_limit; + + return result; +} + +static void +add_target(WorkMemStats * workmem_stats, Target * target) +{ + workmem_stats->count += target->count; + workmem_stats->workmem += target->count * target->workmem; + workmem_stats->limit += target->count * target->limit; + workmem_stats->priority += target->count * target->priority; + workmem_stats->targets = lappend(workmem_stats->targets, target); +} + +/* This "ascending" comparator sorts least-greedy Targets first. */ +static int +target_compare_asc(const ListCell *a, const ListCell *b) +{ + double a_val = ((Target *) a->ptr_value)->ratio; + double b_val = ((Target *) b->ptr_value)->ratio; + + /* + * Sort in ascending order: smallest ratio first, then (if ratios equal) + * smallest workmem. + */ + if (a_val == b_val) + { + return ((Target *) a->ptr_value)->workmem - + ((Target *) b->ptr_value)->workmem; + } + else + return a_val > b_val ? 1 : -1; +} + +/* This "descending" comparator sorts most-greedy Targets first. */ +static int +target_compare_desc(const ListCell *a, const ListCell *b) +{ + double a_val = ((Target *) a->ptr_value)->ratio; + double b_val = ((Target *) b->ptr_value)->ratio; + + /* + * Sort in descending order: largest ratio first, then (if ratios equal) + * largest workmem. + */ + if (a_val == b_val) + { + return ((Target *) b->ptr_value)->workmem - + ((Target *) a->ptr_value)->workmem; + } + else + return b_val > a_val ? 1 : -1; +} diff --git a/src/backend/executor/execWorkmem.c b/src/backend/executor/execWorkmem.c index c513b90fc77..8a3e52c8968 100644 --- a/src/backend/executor/execWorkmem.c +++ b/src/backend/executor/execWorkmem.c @@ -57,6 +57,9 @@ #include "optimizer/cost.h" +/* Hook for plugins to get control in ExecAssignWorkMem */ +ExecAssignWorkMem_hook_type ExecAssignWorkMem_hook = NULL; + /* decls for local routines only used within this module */ static void assign_workmem_subplan(SubPlan *subplan); static void assign_workmem_plan(Plan *plan); @@ -81,16 +84,32 @@ static void assign_workmem_agg_node(Agg *agg, bool is_first, bool is_last, void ExecAssignWorkMem(PlannedStmt *plannedstmt) { - /* - * No need to re-assign working memory on parallel workers, since workers - * have the same work_mem and hash_mem_multiplier GUCs as the leader. - * - * We already assigned working-memory limits on the leader, and those - * limits were sent to the workers inside the serialized Plan. - */ - if (IsParallelWorker()) - return; + if (ExecAssignWorkMem_hook) + (*ExecAssignWorkMem_hook) (plannedstmt); + else + { + /* + * No need to re-assign working memory on parallel workers, since + * workers have the same work_mem and hash_mem_multiplier GUCs as the + * leader. + * + * We already assigned working-memory limits on the leader, and those + * limits were sent to the workers inside the serialized Plan. + * + * We bail out here, in case the hook wants to re-assign memory on + * parallel workers, and maybe wants to call + * standard_ExecAssignWorkMem() first, as well. + */ + if (IsParallelWorker()) + return; + standard_ExecAssignWorkMem(plannedstmt); + } +} + +void +standard_ExecAssignWorkMem(PlannedStmt *plannedstmt) +{ /* Assign working memory to the Plans referred to by SubPlan objects. */ foreach_ptr(Plan, plan, plannedstmt->subplans) { diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index c4147876d55..c12625d2061 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -96,6 +96,9 @@ typedef bool (*ExecutorCheckPerms_hook_type) (List *rangeTable, bool ereport_on_violation); extern PGDLLIMPORT ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook; +/* Hook for plugins to get control in ExecAssignWorkMem() */ +typedef void (*ExecAssignWorkMem_hook_type) (PlannedStmt *plannedstmt); +extern PGDLLIMPORT ExecAssignWorkMem_hook_type ExecAssignWorkMem_hook; /* * prototypes from functions in execAmi.c @@ -730,5 +733,6 @@ extern ResultRelInfo *ExecLookupResultRelByOid(ModifyTableState *node, * prototypes from functions in execWorkmem.c */ extern void ExecAssignWorkMem(PlannedStmt *plannedstmt); +extern void standard_ExecAssignWorkMem(PlannedStmt *plannedstmt); #endif /* EXECUTOR_H */ -- 2.47.1