From 35654b8c4bbf19a877089353af277bfe9a9c8d5c Mon Sep 17 00:00:00 2001 From: James Hunter Date: Wed, 5 Mar 2025 01:21:20 +0000 Subject: [PATCH 4/4] Add "workmem_hook" to allow extensions to override per-node work_mem --- contrib/Makefile | 3 +- contrib/meson.build | 1 + contrib/workmem/Makefile | 20 + contrib/workmem/expected/workmem.out | 676 +++++++++++++++++++++++++++ contrib/workmem/meson.build | 28 ++ contrib/workmem/sql/workmem.sql | 304 ++++++++++++ contrib/workmem/workmem.c | 409 ++++++++++++++++ src/backend/executor/execWorkmem.c | 40 +- src/include/executor/executor.h | 4 + 9 files changed, 1474 insertions(+), 11 deletions(-) create mode 100644 contrib/workmem/Makefile create mode 100644 contrib/workmem/expected/workmem.out create mode 100644 contrib/workmem/meson.build create mode 100644 contrib/workmem/sql/workmem.sql create mode 100644 contrib/workmem/workmem.c diff --git a/contrib/Makefile b/contrib/Makefile index 952855d9b61..b4880ab7067 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -50,7 +50,8 @@ SUBDIRS = \ tsm_system_rows \ tsm_system_time \ unaccent \ - vacuumlo + vacuumlo \ + workmem ifeq ($(with_ssl),openssl) SUBDIRS += pgcrypto sslinfo diff --git a/contrib/meson.build b/contrib/meson.build index 1ba73ebd67a..fa596ef426f 100644 --- a/contrib/meson.build +++ b/contrib/meson.build @@ -69,4 +69,5 @@ subdir('tsm_system_time') subdir('unaccent') subdir('uuid-ossp') subdir('vacuumlo') +subdir('workmem') subdir('xml2') diff --git a/contrib/workmem/Makefile b/contrib/workmem/Makefile new file mode 100644 index 00000000000..f920cdb9964 --- /dev/null +++ b/contrib/workmem/Makefile @@ -0,0 +1,20 @@ +# contrib/workmem/Makefile + +MODULE_big = workmem +OBJS = \ + $(WIN32RES) \ + workmem.o +PGFILEDESC = "workmem - extension that adjusts PostgreSQL work_mem per node" + +REGRESS = workmem + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/workmem +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/workmem/expected/workmem.out b/contrib/workmem/expected/workmem.out new file mode 100644 index 00000000000..f69883b0005 --- /dev/null +++ b/contrib/workmem/expected/workmem.out @@ -0,0 +1,676 @@ +load 'workmem'; +-- Note: Function derived from file explain.sql. We can't use that other +-- function, since we're run in parallel with explain.sql. +create or replace function workmem_filter(text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in execute $1 + loop + -- Mask out work_mem estimate, since it might be brittle + ln := regexp_replace(ln, '\mwork_mem=\d+\M', 'work_mem=N', 'g'); + ln := regexp_replace(ln, '\mMemory Estimate: \d+\M', 'Memory Estimate: N', 'g'); + return next ln; + end loop; +end; +$$; +--==== +-- Test suite 1: default workmem.query_work_mem (= 100 MB) +--==== +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB) (limit=25600 kB) + -> Sort (work_mem=N kB) (limit=25600 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> HashAggregate (work_mem=N kB) (limit=51200 kB) + Hash Key: "*VALUES*".column1, "*VALUES*".column2 + Hash Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 102400 kB +(10 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 +(4 rows) + +-- Grouping Sets (Sort) +set enable_hashagg = off; +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB) (limit=20480 kB) + -> Sort (work_mem=N kB) (limit=20480 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> GroupAggregate (work_mem=N kB) (limit=40960 kB) + Group Key: "*VALUES*".column1, "*VALUES*".column2 + Group Key: "*VALUES*".column1 + Sort Key: "*VALUES*".column2 + Group Key: "*VALUES*".column2 + Sort Key: "*VALUES*".column3 + Group Key: "*VALUES*".column3 + Sort Key: "*VALUES*".column4 + Group Key: "*VALUES*".column4 + -> Sort (work_mem=N kB) (limit=20480 kB) + Sort Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 102400 kB +(18 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | 1 | 9 + | 2 | 10 +(10 rows) + +reset enable_hashagg; +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + workmem_filter +----------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series a (work_mem=N kB) (limit=102400 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 102400 kB +(4 rows) + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + count +------- + 2000 +(1 row) + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + workmem_filter +--------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series (work_mem=N kB) (limit=102399 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 102399 kB +(4 rows) + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + count +------- + 12 +(1 row) + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + workmem_filter +---------------------------------------------------------------------------------------- + Limit + -> WindowAgg (work_mem=N kB) (limit=34134 kB) + -> Sort (work_mem=N kB) (limit=34133 kB) + Sort Key: ((a.n < 3)) + -> Function Scan on generate_series a (work_mem=N kB) (limit=34133 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 102400 kB +(7 rows) + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + sum +--------- + 2000997 + 2000997 + 2000997 + 2000997 + 2000997 +(5 rows) + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + workmem_filter +------------------------------------------- + Result (work_mem=N kB) (limit=102400 kB) + SubPlan 1 + -> Append + -> Result + -> Result + Total Working Memory Estimate: N kB + Total Working Memory Limit: 102400 kB +(7 rows) + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + ?column? +---------- + f +(1 row) + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + workmem_filter +---------------------------------------------------------------- + Result (work_mem=N kB) (limit=68267 kB) + SubPlan 3 + -> Result (work_mem=N kB) (limit=34133 kB) + One-Time Filter: (ANY (1 = (hashed SubPlan 2).col1)) + InitPlan 1 + -> Result + SubPlan 2 + -> Result + Total Working Memory Estimate: N kB + Total Working Memory Limit: 102400 kB +(10 rows) + +select 1 = any (select (select 1) where 1 = any (select 1)); + ?column? +---------- + t +(1 row) + +--==== +-- Test suite 2: set workmem.query_work_mem to 4 MB +--==== +set workmem.query_work_mem = 4096; +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB) (limit=1024 kB) + -> Sort (work_mem=N kB) (limit=1024 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> HashAggregate (work_mem=N kB) (limit=2048 kB) + Hash Key: "*VALUES*".column1, "*VALUES*".column2 + Hash Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 4096 kB +(10 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 +(4 rows) + +-- Grouping Sets (Sort) +set enable_hashagg = off; +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB) (limit=820 kB) + -> Sort (work_mem=N kB) (limit=819 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> GroupAggregate (work_mem=N kB) (limit=1638 kB) + Group Key: "*VALUES*".column1, "*VALUES*".column2 + Group Key: "*VALUES*".column1 + Sort Key: "*VALUES*".column2 + Group Key: "*VALUES*".column2 + Sort Key: "*VALUES*".column3 + Group Key: "*VALUES*".column3 + Sort Key: "*VALUES*".column4 + Group Key: "*VALUES*".column4 + -> Sort (work_mem=N kB) (limit=819 kB) + Sort Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 4096 kB +(18 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | 1 | 9 + | 2 | 10 +(10 rows) + +reset enable_hashagg; +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + workmem_filter +--------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series a (work_mem=N kB) (limit=4096 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 4096 kB +(4 rows) + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + count +------- + 2000 +(1 row) + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + workmem_filter +------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series (work_mem=N kB) (limit=4095 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 4095 kB +(4 rows) + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + count +------- + 12 +(1 row) + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + workmem_filter +--------------------------------------------------------------------------------------- + Limit + -> WindowAgg (work_mem=N kB) (limit=1366 kB) + -> Sort (work_mem=N kB) (limit=1365 kB) + Sort Key: ((a.n < 3)) + -> Function Scan on generate_series a (work_mem=N kB) (limit=1365 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 4096 kB +(7 rows) + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + sum +--------- + 2000997 + 2000997 + 2000997 + 2000997 + 2000997 +(5 rows) + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + workmem_filter +----------------------------------------- + Result (work_mem=N kB) (limit=4096 kB) + SubPlan 1 + -> Append + -> Result + -> Result + Total Working Memory Estimate: N kB + Total Working Memory Limit: 4096 kB +(7 rows) + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + ?column? +---------- + f +(1 row) + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + workmem_filter +---------------------------------------------------------------- + Result (work_mem=N kB) (limit=2731 kB) + SubPlan 3 + -> Result (work_mem=N kB) (limit=1365 kB) + One-Time Filter: (ANY (1 = (hashed SubPlan 2).col1)) + InitPlan 1 + -> Result + SubPlan 2 + -> Result + Total Working Memory Estimate: N kB + Total Working Memory Limit: 4096 kB +(10 rows) + +select 1 = any (select (select 1) where 1 = any (select 1)); + ?column? +---------- + t +(1 row) + +reset workmem.query_work_mem; +--==== +-- Test suite 3: set workmem.query_work_mem to 80 KB +--==== +set workmem.query_work_mem = 80; +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB) (limit=20 kB) + -> Sort (work_mem=N kB) (limit=20 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> HashAggregate (work_mem=N kB) (limit=40 kB) + Hash Key: "*VALUES*".column1, "*VALUES*".column2 + Hash Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 80 kB +(10 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 +(4 rows) + +-- Grouping Sets (Sort) +set enable_hashagg = off; +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +---------------------------------------------------------------------- + WindowAgg (work_mem=N kB) (limit=16 kB) + -> Sort (work_mem=N kB) (limit=16 kB) + Sort Key: "*VALUES*".column1, "*VALUES*".column2 NULLS FIRST + -> GroupAggregate (work_mem=N kB) (limit=32 kB) + Group Key: "*VALUES*".column1, "*VALUES*".column2 + Group Key: "*VALUES*".column1 + Sort Key: "*VALUES*".column2 + Group Key: "*VALUES*".column2 + Sort Key: "*VALUES*".column3 + Group Key: "*VALUES*".column3 + Sort Key: "*VALUES*".column4 + Group Key: "*VALUES*".column4 + -> Sort (work_mem=N kB) (limit=16 kB) + Sort Key: "*VALUES*".column1 + -> Values Scan on "*VALUES*" + Filter: (column1 = column2) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 80 kB +(18 rows) + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + a | b | row_number +---+---+------------ + 1 | | 1 + 1 | 1 | 2 + 2 | | 3 + 2 | 2 | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | 1 | 9 + | 2 | 10 +(10 rows) + +reset enable_hashagg; +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + workmem_filter +------------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series a (work_mem=N kB) (limit=80 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 80 kB +(4 rows) + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + count +------- + 2000 +(1 row) + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +----------------------------------------------------------------------- + Aggregate + -> Function Scan on generate_series (work_mem=N kB) (limit=78 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 78 kB +(4 rows) + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + count +------- + 12 +(1 row) + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +------------------------------------------------------------------------------------- + Limit + -> WindowAgg (work_mem=N kB) (limit=27 kB) + -> Sort (work_mem=N kB) (limit=27 kB) + Sort Key: ((a.n < 3)) + -> Function Scan on generate_series a (work_mem=N kB) (limit=26 kB) + Total Working Memory Estimate: N kB + Total Working Memory Limit: 80 kB +(7 rows) + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +WARNING: not enough working memory for query: increase workmem.query_work_mem + sum +--------- + 2000997 + 2000997 + 2000997 + 2000997 + 2000997 +(5 rows) + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +--------------------------------------- + Result (work_mem=N kB) (limit=80 kB) + SubPlan 1 + -> Append + -> Result + -> Result + Total Working Memory Estimate: N kB + Total Working Memory Limit: 80 kB +(7 rows) + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); +WARNING: not enough working memory for query: increase workmem.query_work_mem + ?column? +---------- + f +(1 row) + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); +WARNING: not enough working memory for query: increase workmem.query_work_mem + workmem_filter +---------------------------------------------------------------- + Result (work_mem=N kB) (limit=54 kB) + SubPlan 3 + -> Result (work_mem=N kB) (limit=26 kB) + One-Time Filter: (ANY (1 = (hashed SubPlan 2).col1)) + InitPlan 1 + -> Result + SubPlan 2 + -> Result + Total Working Memory Estimate: N kB + Total Working Memory Limit: 80 kB +(10 rows) + +select 1 = any (select (select 1) where 1 = any (select 1)); +WARNING: not enough working memory for query: increase workmem.query_work_mem + ?column? +---------- + t +(1 row) + +reset workmem.query_work_mem; diff --git a/contrib/workmem/meson.build b/contrib/workmem/meson.build new file mode 100644 index 00000000000..fce8030ba45 --- /dev/null +++ b/contrib/workmem/meson.build @@ -0,0 +1,28 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +workmem_sources = files( + 'workmem.c', +) + +if host_system == 'windows' + workmem_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'workmem', + '--FILEDESC', 'workmem - extension that adjusts PostgreSQL work_mem per node',]) +endif + +workmem = shared_module('workmem', + workmem_sources, + kwargs: contrib_mod_args, +) +contrib_targets += workmem + +tests += { + 'name': 'workmem', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'workmem', + ], + }, +} diff --git a/contrib/workmem/sql/workmem.sql b/contrib/workmem/sql/workmem.sql new file mode 100644 index 00000000000..4e1ec056b80 --- /dev/null +++ b/contrib/workmem/sql/workmem.sql @@ -0,0 +1,304 @@ +load 'workmem'; + +-- Note: Function derived from file explain.sql. We can't use that other +-- function, since we're run in parallel with explain.sql. +create or replace function workmem_filter(text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in execute $1 + loop + -- Mask out work_mem estimate, since it might be brittle + ln := regexp_replace(ln, '\mwork_mem=\d+\M', 'work_mem=N', 'g'); + ln := regexp_replace(ln, '\mMemory Estimate: \d+\M', 'Memory Estimate: N', 'g'); + return next ln; + end loop; +end; +$$; + +--==== +-- Test suite 1: default workmem.query_work_mem (= 100 MB) +--==== + +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- + +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + +-- Grouping Sets (Sort) +set enable_hashagg = off; + +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + +reset enable_hashagg; + +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + +select 1 = any (select (select 1) where 1 = any (select 1)); + +--==== +-- Test suite 2: set workmem.query_work_mem to 4 MB +--==== +set workmem.query_work_mem = 4096; + +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- + +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + +-- Grouping Sets (Sort) +set enable_hashagg = off; + +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + +reset enable_hashagg; + +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + +select 1 = any (select (select 1) where 1 = any (select 1)); + +reset workmem.query_work_mem; + +--==== +-- Test suite 3: set workmem.query_work_mem to 80 KB +--==== +set workmem.query_work_mem = 80; + +---- +-- Some tests from src/test/regress/sql/workmem.sql that don't require +-- test_setup.sql, etc., to be run first. +---- + +-- Grouping Sets (Hash) +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1), (2, 2)) as t (a, b) where a = b +group by grouping sets((a, b), (a)); + +-- Grouping Sets (Sort) +set enable_hashagg = off; + +select workmem_filter(' +explain (costs off, work_mem on) +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); +'); + +select a, b, row_number() over (order by a, b nulls first) +from (values (1, 1, 1, 1), (2, 2, 2, 2)) as t (a, b, c, d) where a = b +group by grouping sets((a, b), (a), (b), (c), (d)); + +reset enable_hashagg; + +-- Function Scan +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; +'); + +select count(*) from ( +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +) t; + +-- Three Function Scans +select workmem_filter(' +explain (work_mem on, costs off) +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); +'); + +select count(*) +from rows from(generate_series(1, 5), + generate_series(2, 10), + generate_series(4, 15)); + +-- WindowAgg +select workmem_filter(' +explain (costs off, work_mem on) +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; +'); + +select sum(n) over(partition by m) +from (SELECT n < 3 as m, n from generate_series(1,2000) a(n)) +limit 5; + +-- InitPlan with hash table ("IN SELECT") +select workmem_filter(' +explain (costs off, work_mem on) +select ''foo''::text in (select ''bar''::name union all select ''bar''::name); +'); + +select 'foo'::text in (select 'bar'::name union all select 'bar'::name); + +-- SubPlan with hash table +select workmem_filter(' +explain (costs off, work_mem on) +select 1 = any (select (select 1) where 1 = any (select 1)); +'); + +select 1 = any (select (select 1) where 1 = any (select 1)); + +reset workmem.query_work_mem; diff --git a/contrib/workmem/workmem.c b/contrib/workmem/workmem.c new file mode 100644 index 00000000000..d78f60c7d8d --- /dev/null +++ b/contrib/workmem/workmem.c @@ -0,0 +1,409 @@ +/*------------------------------------------------------------------------- + * + * workmem.c + * + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/workmem/workmem.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/parallel.h" +#include "common/int.h" +#include "executor/executor.h" +#include "miscadmin.h" +#include "utils/guc.h" + +PG_MODULE_MAGIC; + +/* Local variables */ + +/* + * A Target represents a collection of data structures, belonging to an + * execution node, that all share the same memory limit. + * + * For example, in parallel query, every parallel worker (plus the leader) + * gets a copy of the execution node, and therefore a copy of all of that + * node's work_mem limits. In this case, we'll track a single Target, but its + * count will include (1 + num_workers), because this Target gets "applied" + * to (1 + num_workers) execution nodes. + */ +typedef struct Target +{ + /* # of data structures to which target applies: */ + int count; + /* workmem estimate for each of these data structures: */ + int workmem; + /* (original) workmem limit for each of these data structures: */ + int limit; + /* workmem estimate, but capped at (original) workmem limit: */ + int priority; + /* ratio of (priority / limit); measure's Target's "greediness": */ + double ratio; + /* link to target's actual limit, so we can set it: */ + int *target_limit; +} Target; + +typedef struct WorkMemStats +{ + /* total # of data structures that get working memory: */ + uint64 count; + /* total working memory estimated for this query: */ + uint64 workmem; + /* total working memory (currently) reserved for this query: */ + uint64 limit; + /* total "capped" working memory estimate: */ + uint64 priority; + /* list of Targets, used to update actual workmem limits: */ + List *targets; +} WorkMemStats; + +/* GUC variables */ +static int workmem_query_work_mem = 100 * 1024; /* kB */ + +/* internal functions */ +static void workmem_fn(PlannedStmt *plannedstmt); + +static int clamp_priority(int workmem, int limit); +static Target * make_target(int workmem, int *target_limit, int count); +static void add_target(WorkMemStats * workmem_stats, Target * target); + +/* Sort comparators: sort by ratio, ascending or descending. */ +static int target_compare_asc(const ListCell *a, const ListCell *b); +static int target_compare_desc(const ListCell *a, const ListCell *b); + +/* + * Module load callback + */ +void +_PG_init(void) +{ + /* Define custom GUC variable. */ + DefineCustomIntVariable("workmem.query_work_mem", + "Amount of working-memory (in kB) to provide each " + "query.", + NULL, + &workmem_query_work_mem, + 100 * 1024, /* default to 100 MB */ + 64, + INT_MAX, + PGC_USERSET, + GUC_UNIT_KB, + NULL, + NULL, + NULL); + + MarkGUCPrefixReserved("workmem"); + + /* Install hooks. */ + ExecAssignWorkMem_hook = workmem_fn; +} + +static void +workmem_analyze(PlannedStmt *plannedstmt, WorkMemStats * workmem_stats) +{ + int idx; + + for (idx = 0; idx < list_length(plannedstmt->workMemCategories); ++idx) + { + WorkMemCategory category; + int count; + int estimate; + ListCell *limit_cell; + int limit; + Target *target; + + category = + (WorkMemCategory) list_nth_int(plannedstmt->workMemCategories, idx); + count = list_nth_int(plannedstmt->workMemCounts, idx); + estimate = list_nth_int(plannedstmt->workMemEstimates, idx); + + limit = category == WORKMEM_HASH ? + get_hash_memory_limit() / 1024 : work_mem; + limit_cell = list_nth_cell(plannedstmt->workMemLimits, idx); + lfirst_int(limit_cell) = limit; + + target = make_target(estimate, &lfirst_int(limit_cell), count); + add_target(workmem_stats, target); + } +} + +static void +workmem_set(PlannedStmt *plannedstmt, WorkMemStats * workmem_stats) +{ + int remaining = workmem_query_work_mem; + + if (workmem_stats->limit <= remaining) + { + /* + * "High memory" case: we have more than enough query_work_mem; now + * hand out the excess. + */ + + /* This is memory that exceeds workmem limits. */ + remaining -= workmem_stats->limit; + + /* + * Sort targets from highest ratio to lowest. When we assign memory to + * a Target, we'll truncate fractional KB; so by going through the + * list from highest to lowest ratio, we ensure that the lowest ratios + * get the leftover fractional KBs. + */ + list_sort(workmem_stats->targets, target_compare_desc); + + foreach_ptr(Target, target, workmem_stats->targets) + { + double fraction; + int extra_workmem; + + /* How much extra work mem should we assign to this target? */ + fraction = (double) target->workmem / workmem_stats->workmem; + + /* NOTE: This is extra workmem *per data structure*. */ + extra_workmem = (int) (fraction * remaining); + + *target->target_limit += extra_workmem; + + /* OK, we've handled this target. */ + workmem_stats->workmem -= (target->workmem * target->count); + remaining -= (extra_workmem * target->count); + } + } + else if (workmem_stats->priority <= remaining) + { + /* + * "Medium memory" case: we don't have enough query_work_mem to give + * every target its full allotment, but we do have enough to give it + * as much as (we estimate) it needs. + * + * So, just take some memory away from nodes that (we estimate) won't + * need it. + */ + + /* This is memory that exceeds workmem estimates. */ + remaining -= workmem_stats->priority; + + /* + * Sort targets from highest ratio to lowest. We'll skip any Target + * with ratio > 1.0, because (we estimate) they already need their + * full allotment. Also, once a target reaches its workmem limit, + * we'll stop giving it more workmem, leaving the surplus memory to be + * assigned to targets with smaller ratios. + */ + list_sort(workmem_stats->targets, target_compare_desc); + + foreach_ptr(Target, target, workmem_stats->targets) + { + double fraction; + int extra_workmem; + + /* How much extra work mem should we assign to this target? */ + fraction = (double) target->workmem / workmem_stats->workmem; + + /* + * Don't give the target more than its (original) limit. + * + * NOTE: This is extra workmem *per data structure*. + */ + extra_workmem = Min((int) (fraction * remaining), + target->limit - target->priority); + + *target->target_limit = target->priority + extra_workmem; + + /* OK, we've handled this target. */ + workmem_stats->workmem -= (target->workmem * target->count); + remaining -= (extra_workmem * target->count); + } + } + else + { + uint64 limit = workmem_stats->limit; + + /* + * "Low memory" case: we are severely memory constrained, and need to + * take "priority" memory away from targets that (we estimate) + * actually need it. We'll do this by (effectively) reducing the + * global "work_mem" limit, uniformly, for all targets, until we're + * under the query_work_mem limit. + */ + elog(WARNING, + "not enough working memory for query: increase " + "workmem.query_work_mem"); + + /* + * Sort targets from lowest ratio to highest. For any target whose + * ratio is < the target_ratio, we'll just assign it its priority (= + * workmem) as limit, and return the excess workmem to our "limit", + * for use by subsequent, greedier, targets. + */ + list_sort(workmem_stats->targets, target_compare_asc); + + foreach_ptr(Target, target, workmem_stats->targets) + { + double target_ratio; + int target_limit; + + /* + * If we restrict our targets to this ratio, we'll stay within the + * query_work_mem limit. + */ + target_ratio = (double) remaining / limit; + + /* + * Don't give this target more than its priority request (but we + * might give it less). + */ + target_limit = Min(target->priority, + target_ratio * target->limit); + *target->target_limit = target_limit; + + /* "Remaining" decreases by memory we actually assigned. */ + remaining -= (target_limit * target->count); + + /* + * "Limit" decreases by target's original memory limit. + * + * If target_limit <= target->priority, so we restricted this + * target to less memory than (we estimate) it needs, then the + * target_ratio will stay the same, since, letting A = remaining, + * B = limit, and R = ratio, we'll have: + * + * R=A/B <=> A=R*B <=> A-R*X = R*B - R*X <=> A-R*X = R * (B-X) <=> + * R = (A-R*X) / (B-X) + * + * -- which is what we wanted to prove. + * + * And if target_limit > target->priority, so we didn't need to + * restrict this target beyond its priority estimate, then the + * target_ratio will increase. This means more memory for the + * remaining, greedier, targets. + */ + limit -= (target->limit * target->count); + + target_ratio = (double) remaining / limit; + } + } +} + +/* + * workmem_fn: updates the query plan's work_mem based on query_work_mem + */ +static void +workmem_fn(PlannedStmt *plannedstmt) +{ + WorkMemStats workmem_stats; + MemoryContext context, + oldcontext; + + /* + * We already assigned working-memory limits on the leader, and those + * limits were sent to the workers inside the serialized Plan. + * + * We could re-assign working-memory limits on the parallel worker, to + * only those Plan nodes that got sent to the worker, but for now we don't + * bother. + */ + if (IsParallelWorker()) + return; + + if (workmem_query_work_mem == -1) + return; /* disabled */ + + memset(&workmem_stats, 0, sizeof(workmem_stats)); + + /* + * Set up our own memory context, so we can drop the metadata we generate, + * all at once. + */ + context = AllocSetContextCreate(CurrentMemoryContext, + "workmem_fn context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + + /* Figure out how much total working memory this query wants/needs. */ + workmem_analyze(plannedstmt, &workmem_stats); + + /* Now restrict the query to workmem.query_work_mem. */ + workmem_set(plannedstmt, &workmem_stats); + + MemoryContextSwitchTo(oldcontext); + + /* Drop all our metadata. */ + MemoryContextDelete(context); +} + +static int +clamp_priority(int workmem, int limit) +{ + return Min(workmem, limit); +} + +static Target * +make_target(int workmem, int *target_limit, int count) +{ + Target *result = palloc_object(Target); + + result->count = count; + result->workmem = workmem; + result->limit = *target_limit; + result->priority = clamp_priority(result->workmem, result->limit); + result->ratio = (double) result->priority / result->limit; + result->target_limit = target_limit; + + return result; +} + +static void +add_target(WorkMemStats * workmem_stats, Target * target) +{ + workmem_stats->count += target->count; + workmem_stats->workmem += target->count * target->workmem; + workmem_stats->limit += target->count * target->limit; + workmem_stats->priority += target->count * target->priority; + workmem_stats->targets = lappend(workmem_stats->targets, target); +} + +/* This "ascending" comparator sorts least-greedy Targets first. */ +static int +target_compare_asc(const ListCell *a, const ListCell *b) +{ + double a_val = ((Target *) a->ptr_value)->ratio; + double b_val = ((Target *) b->ptr_value)->ratio; + + /* + * Sort in ascending order: smallest ratio first, then (if ratios equal) + * smallest workmem. + */ + if (a_val == b_val) + { + return pg_cmp_s32(((Target *) a->ptr_value)->workmem, + ((Target *) b->ptr_value)->workmem); + } + else + return a_val > b_val ? 1 : -1; +} + +/* This "descending" comparator sorts most-greedy Targets first. */ +static int +target_compare_desc(const ListCell *a, const ListCell *b) +{ + double a_val = ((Target *) a->ptr_value)->ratio; + double b_val = ((Target *) b->ptr_value)->ratio; + + /* + * Sort in descending order: largest ratio first, then (if ratios equal) + * largest workmem. + */ + if (a_val == b_val) + { + return pg_cmp_s32(((Target *) b->ptr_value)->workmem, + ((Target *) a->ptr_value)->workmem); + } + else + return b_val > a_val ? 1 : -1; +} diff --git a/src/backend/executor/execWorkmem.c b/src/backend/executor/execWorkmem.c index d8a19a58ebe..37420666065 100644 --- a/src/backend/executor/execWorkmem.c +++ b/src/backend/executor/execWorkmem.c @@ -52,6 +52,10 @@ #include "nodes/plannodes.h" +/* Hook for plugins to get control in ExecAssignWorkMem */ +ExecAssignWorkMem_hook_type ExecAssignWorkMem_hook = NULL; + + /* ------------------------------------------------------------------------ * ExecAssignWorkMem * @@ -64,20 +68,36 @@ */ void ExecAssignWorkMem(PlannedStmt *plannedstmt) +{ + if (ExecAssignWorkMem_hook) + (*ExecAssignWorkMem_hook) (plannedstmt); + else + { + /* + * No need to re-assign working memory on parallel workers, since + * workers have the same work_mem and hash_mem_multiplier GUCs as the + * leader. + * + * We already assigned working-memory limits on the leader, and those + * limits were sent to the workers inside the serialized Plan. + * + * We bail out here, in case the hook wants to re-assign memory on + * parallel workers, and maybe wants to call + * standard_ExecAssignWorkMem() first, as well. + */ + if (IsParallelWorker()) + return; + + standard_ExecAssignWorkMem(plannedstmt); + } +} + +void +standard_ExecAssignWorkMem(PlannedStmt *plannedstmt) { ListCell *lc_category; ListCell *lc_limit; - /* - * No need to re-assign working memory on parallel workers, since workers - * have the same work_mem and hash_mem_multiplier GUCs as the leader. - * - * We already assigned working-memory limits on the leader, and those - * limits were sent to the workers inside the serialized Plan. - */ - if (IsParallelWorker()) - return; - forboth(lc_category, plannedstmt->workMemCategories, lc_limit, plannedstmt->workMemLimits) { diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index c4147876d55..c12625d2061 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -96,6 +96,9 @@ typedef bool (*ExecutorCheckPerms_hook_type) (List *rangeTable, bool ereport_on_violation); extern PGDLLIMPORT ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook; +/* Hook for plugins to get control in ExecAssignWorkMem() */ +typedef void (*ExecAssignWorkMem_hook_type) (PlannedStmt *plannedstmt); +extern PGDLLIMPORT ExecAssignWorkMem_hook_type ExecAssignWorkMem_hook; /* * prototypes from functions in execAmi.c @@ -730,5 +733,6 @@ extern ResultRelInfo *ExecLookupResultRelByOid(ModifyTableState *node, * prototypes from functions in execWorkmem.c */ extern void ExecAssignWorkMem(PlannedStmt *plannedstmt); +extern void standard_ExecAssignWorkMem(PlannedStmt *plannedstmt); #endif /* EXECUTOR_H */ -- 2.47.1