From b0033119eae5dd382095cf8cb6a63be2439636f1 Mon Sep 17 00:00:00 2001 From: Alexander Pyhalov Date: Wed, 6 May 2026 09:56:56 +0300 Subject: [PATCH] Push join with function scan to remote server The patch allows pushing joins with function RTEs to PostgreSQL data sources in general and postgres_fdw specifically. Co-authored-by: Gleb Kashkin --- contrib/postgres_fdw/deparse.c | 192 ++++-- .../postgres_fdw/expected/postgres_fdw.out | 467 ++++++++++++++ contrib/postgres_fdw/postgres_fdw.c | 580 ++++++++++++++++-- contrib/postgres_fdw/postgres_fdw.h | 6 + contrib/postgres_fdw/sql/postgres_fdw.sql | 195 ++++++ src/backend/optimizer/path/joinpath.c | 11 + src/include/foreign/fdwapi.h | 1 + 7 files changed, 1348 insertions(+), 104 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 2dcc6c8af1b..4b31f2a1c9f 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -154,6 +154,7 @@ static void deparseConst(Const *node, deparse_expr_cxt *context, int showtype); static void deparseParam(Param *node, deparse_expr_cxt *context); static void deparseSubscriptingRef(SubscriptingRef *node, deparse_expr_cxt *context); static void deparseFuncExpr(FuncExpr *node, deparse_expr_cxt *context); +static void deparseFuncColnames(StringInfo buf, int varno, RangeTblEntry *rte, bool qualify_col); static void deparseOpExpr(OpExpr *node, deparse_expr_cxt *context); static bool isPlainForeignVar(Expr *node, deparse_expr_cxt *context); static void deparseOperatorName(StringInfo buf, Form_pg_operator opform); @@ -1794,6 +1795,7 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, List **additional_conds, List **params_list) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + bool alias_required = use_alias; if (IS_JOIN_REL(foreignrel)) { @@ -2010,23 +2012,100 @@ deparseFromExprForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, { RangeTblEntry *rte = planner_rt_fetch(foreignrel->relid, root); - /* - * Core code already has some lock on each rel being planned, so we - * can use NoLock here. - */ - Relation rel = table_open(rte->relid, NoLock); + Assert(rte->rtekind == RTE_RELATION || rte->rtekind == RTE_FUNCTION); + if (rte->rtekind == RTE_RELATION) + { + /* + * Core code already has some lock on each rel being planned, so + * we can use NoLock here. + */ + Relation rel = table_open(rte->relid, NoLock); - deparseRelation(buf, rel); + deparseRelation(buf, rel); + + table_close(rel, NoLock); + } + else if (rte->rtekind == RTE_FUNCTION) + { + RangeTblFunction *rtfunc; + deparse_expr_cxt context; + ListCell *lc; + bool first = true; + int n; + + n = list_length(rte->functions); + Assert(n >= 1); + + if (n > 1) + appendStringInfoString(buf, "ROWS FROM ("); + + foreach(lc, rte->functions) + { + if (!first) + appendStringInfoString(buf, ", "); + else + first = false; + + rtfunc = (RangeTblFunction *) lfirst(lc); + + context.root = root; + context.foreignrel = foreignrel; + context.scanrel = foreignrel; + context.buf = buf; + context.params_list = params_list; + + deparseExpr((Expr *) rtfunc->funcexpr, &context); + } + + if (n > 1) + appendStringInfoString(buf, ")"); + /* Function alias determines column names, and so it's required */ + alias_required = true; + } /* * Add a unique alias to avoid any conflict in relation names due to * pulled up subqueries in the query being built for a pushed down * join. */ - if (use_alias) + if (alias_required) + { appendStringInfo(buf, " %s%d", REL_ALIAS_PREFIX, foreignrel->relid); + if (rte->rtekind == RTE_FUNCTION) + { + appendStringInfo(buf, " ("); + deparseFuncColnames(buf, 0, rte, false); + appendStringInfo(buf, ") "); + } + } + } +} - table_close(rel, NoLock); +/* + * Deparse function columns alias list + */ +static void +deparseFuncColnames(StringInfo buf, int varno, RangeTblEntry *rte, bool qualify_col) +{ + bool first = true; + ListCell *lc; + + Assert(rte); + Assert(rte->rtekind == RTE_FUNCTION); + Assert(rte->eref); + + foreach(lc, rte->eref->colnames) + { + char *colname = strVal(lfirst(lc)); + + if (colname[0] == '\0') + continue; + if (!first) + appendStringInfoString(buf, ","); + if (qualify_col) + ADD_REL_QUALIFIER(buf, varno); + appendStringInfoString(buf, quote_identifier(colname)); + first = false; } } @@ -2750,23 +2829,6 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte, /* Required only to be passed down to deparseTargetList(). */ List *retrieved_attrs; - /* - * The lock on the relation will be held by upper callers, so it's - * fine to open it with no lock here. - */ - rel = table_open(rte->relid, NoLock); - - /* - * The local name of the foreign table can not be recognized by the - * foreign server and the table it references on foreign server might - * have different column ordering or different columns than those - * declared locally. Hence we have to deparse whole-row reference as - * ROW(columns referenced locally). Construct this by deparsing a - * "whole row" attribute. - */ - attrs_used = bms_add_member(NULL, - 0 - FirstLowInvalidHeapAttributeNumber); - /* * In case the whole-row reference is under an outer join then it has * to go NULL whenever the rest of the row goes NULL. Deparsing a join @@ -2781,16 +2843,43 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte, } appendStringInfoString(buf, "ROW("); - deparseTargetList(buf, rte, varno, rel, false, attrs_used, qualify_col, - &retrieved_attrs); + if (rte->rtekind == RTE_RELATION) + { + /* + * The local name of the foreign table can not be recognized by + * the foreign server and the table it references on foreign + * server might have different column ordering or different + * columns than those declared locally. Hence we have to deparse + * whole-row reference as ROW(columns referenced locally). + * Construct this by deparsing a "whole row" attribute. + */ + attrs_used = bms_add_member(NULL, + 0 - FirstLowInvalidHeapAttributeNumber); + + /* + * The lock on the relation will be held by upper callers, so it's + * fine to open it with no lock here. + */ + rel = table_open(rte->relid, NoLock); + deparseTargetList(buf, rte, varno, rel, false, attrs_used, qualify_col, + &retrieved_attrs); + table_close(rel, NoLock); + bms_free(attrs_used); + } + else if (rte->rtekind == RTE_FUNCTION) + { + /* + * Function call is translated as-is, function returns the same + * columns in the same order as on local server + */ + deparseFuncColnames(buf, varno, rte, qualify_col); + } appendStringInfoChar(buf, ')'); /* Complete the CASE WHEN statement started above. */ if (qualify_col) appendStringInfoString(buf, " END"); - table_close(rel, NoLock); - bms_free(attrs_used); } else { @@ -2805,29 +2894,40 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte, * If it's a column of a foreign table, and it has the column_name FDW * option, use that value. */ - options = GetForeignColumnOptions(rte->relid, varattno); - foreach(lc, options) + if (rte->rtekind == RTE_RELATION) { - DefElem *def = (DefElem *) lfirst(lc); - - if (strcmp(def->defname, "column_name") == 0) + options = GetForeignColumnOptions(rte->relid, varattno); + foreach(lc, options) { - colname = defGetString(def); - break; + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "column_name") == 0) + { + colname = defGetString(def); + break; + } } - } - /* - * If it's a column of a regular table or it doesn't have column_name - * FDW option, use attribute name. - */ - if (colname == NULL) - colname = get_attname(rte->relid, varattno, false); + /* + * If it's a column of a regular table or it doesn't have + * column_name FDW option, use attribute name. + */ + if (colname == NULL) + colname = get_attname(rte->relid, varattno, false); - if (qualify_col) - ADD_REL_QUALIFIER(buf, varno); + if (qualify_col) + ADD_REL_QUALIFIER(buf, varno); - appendStringInfoString(buf, quote_identifier(colname)); + appendStringInfoString(buf, quote_identifier(colname)); + } + else if (rte->rtekind == RTE_FUNCTION) + { + colname = get_rte_attribute_name(rte, varattno); + + if (qualify_col) + ADD_REL_QUALIFIER(buf, varno); + appendStringInfoString(buf, quote_identifier(colname)); + } } } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index aaffcf31271..643aa22e64b 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -13053,3 +13053,470 @@ RESET client_min_messages; DROP FUNCTION wait_for_backend_termination(int); DROP FOREIGN TABLE remote_backend_pid; DROP VIEW my_backend_pid; +-- =================================================================== +-- test function scan pushdown +-- =================================================================== +CREATE TABLE base_tbl (a int, b int); +CREATE FOREIGN TABLE remote_tbl (a int, b int) + SERVER loopback OPTIONS (table_name 'base_tbl'); +ALTER FOREIGN TABLE remote_tbl OPTIONS (use_remote_estimate 'true'); +CREATE TABLE base_tbl1 (c int, d text); +CREATE FOREIGN TABLE remote_tbl1 (c int, d text) + SERVER loopback OPTIONS (table_name 'base_tbl1'); +ALTER FOREIGN TABLE remote_tbl1 OPTIONS (use_remote_estimate 'true'); +INSERT INTO remote_tbl SELECT g, g*2 from generate_series(1,1000) g; +INSERT INTO remote_tbl1 SELECT g, 'text'|| g from generate_series(1,500) g; +ANALYZE base_tbl; +ANALYZE base_tbl1; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r, unnest(array[2,3,4]) n WHERE r.a = n; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: r.a, r.b, n.n + Relations: (public.remote_tbl r) INNER JOIN (pg_catalog.unnest() n) + Remote SQL: SELECT r1.a, r1.b, r2.n FROM (public.base_tbl r1 INNER JOIN unnest('{2,3,4}'::integer[]) r2 (n) ON (((r1.a = r2.n)))) +(4 rows) + +SELECT * FROM remote_tbl r, unnest(array[2,3,4]) n WHERE r.a = n +ORDER BY r.a; + a | b | n +---+---+--- + 2 | 4 | 2 + 3 | 6 | 3 + 4 | 8 | 4 +(3 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM unnest(array[2,3,4]) n, remote_tbl r WHERE r.a = n; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: n.n, r.a, r.b + Relations: (public.remote_tbl r) INNER JOIN (pg_catalog.unnest() n) + Remote SQL: SELECT r1.n, r2.a, r2.b FROM (public.base_tbl r2 INNER JOIN unnest('{2,3,4}'::integer[]) r1 (n) ON (((r2.a = r1.n)))) +(4 rows) + +SELECT * FROM unnest(array[2,3,4]) n, remote_tbl r WHERE r.a = n +ORDER BY r.a; + n | a | b +---+---+--- + 2 | 2 | 4 + 3 | 3 | 6 + 4 | 4 | 8 +(3 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Foreign Scan + Output: r.a, r.b, r1.c, r1.d, n.n + Relations: ((public.remote_tbl r) INNER JOIN (public.remote_tbl1 r1)) INNER JOIN (pg_catalog.unnest() n) + Remote SQL: SELECT r1.a, r1.b, r2.c, r2.d, r3.n FROM ((public.base_tbl r1 INNER JOIN public.base_tbl1 r2 ON (((r1.a = r2.c)))) INNER JOIN unnest('{3,4}'::integer[]) r3 (n) ON (((r1.a = r3.n)))) +(4 rows) + +SELECT * FROM remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a +ORDER BY r.a; + a | b | c | d | n +---+---+---+-------+--- + 3 | 6 | 3 | text3 | 3 + 4 | 8 | 4 | text4 | 4 +(2 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT r.*,n from remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a and n > 3; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: r.a, r.b, n.n + Relations: ((public.remote_tbl r) INNER JOIN (public.remote_tbl1 r1)) INNER JOIN (pg_catalog.unnest() n) + Remote SQL: SELECT r1.a, r1.b, r3.n FROM ((public.base_tbl r1 INNER JOIN public.base_tbl1 r2 ON (((r1.a = r2.c)))) INNER JOIN unnest('{3,4}'::integer[]) r3 (n) ON (((r1.a = r3.n)) AND ((r3.n > 3)))) +(4 rows) + +SELECT * from remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a and n > 3; + a | b | c | d | n +---+---+---+-------+--- + 4 | 8 | 4 | text4 | 4 +(1 row) + +CREATE OR REPLACE FUNCTION get_constant_texts() +RETURNS TABLE (text_value text) AS $$ +BEGIN + RETURN QUERY VALUES + ('text1'), + ('text4'); +END; +$$ LANGUAGE plpgsql IMMUTABLE; +ALTER EXTENSION postgres_fdw ADD FUNCTION get_constant_texts(); +ALTER SERVER loopback OPTIONS (extensions 'postgres_fdw'); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT r.*, t.n from remote_tbl1 r, ROWS FROM (unnest(array[3,4]), get_constant_texts()) t (n, txt) +WHERE r.c = t.n AND r.d = t.txt; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: r.c, r.d, t.n + Relations: (public.remote_tbl1 r) INNER JOIN (ROWS FROM(pg_catalog.unnest(), public.get_constant_texts()) t) + Remote SQL: SELECT r1.c, r1.d, r2.n FROM (public.base_tbl1 r1 INNER JOIN ROWS FROM (unnest('{3,4}'::integer[]), public.get_constant_texts()) r2 (n,txt) ON (((r1.c = r2.n)) AND ((r2.txt = r1.d)))) +(4 rows) + +SELECT r.*, t.txt from remote_tbl1 r, ROWS FROM (unnest(array[3,4]), get_constant_texts()) t (n, txt) +WHERE r.c = t.n AND r.d = t.txt; + c | d | txt +---+-------+------- + 4 | text4 | text4 +(1 row) + +-- Complex types +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r JOIN UNNEST(array[box '((2,3),(-2,-3))']) as t(bx) ON a = area(bx); + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: r.a, r.b, t.bx + Relations: (public.remote_tbl r) INNER JOIN (pg_catalog.unnest() t) + Remote SQL: SELECT r1.a, r1.b, r2.bx FROM (public.base_tbl r1 INNER JOIN unnest('{(2,3),(-2,-3)}'::box[]) r2 (bx) ON (((r1.a = area(r2.bx))))) +(4 rows) + +SELECT * FROM remote_tbl r JOIN UNNEST(array[box '((2,3),(-2,-3))']) as t(bx) ON a = area(bx) +ORDER BY r.a; + a | b | bx +----+----+--------------- + 24 | 48 | (2,3),(-2,-3) +(1 row) + +-- DML +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------- + Update on public.remote_tbl r + Output: r.a, r.b + -> Foreign Update + Remote SQL: UPDATE public.base_tbl r1 SET b = 5 FROM unnest('{(2,3),(-2,-3)}'::box[]) r2 (bx) WHERE ((r1.a = area(r2.bx))) RETURNING r1.a, r1.b +(4 rows) + +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + a | b +----+--- + 24 | 5 +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Update on public.remote_tbl r + Output: r.a, r.b + Remote SQL: UPDATE public.base_tbl SET b = $2 WHERE ctid = $1 RETURNING a, b + -> Foreign Scan + Output: CASE WHEN (random() >= '0'::double precision) THEN 5 ELSE 0 END, r.ctid, r.*, t.* + Relations: (public.remote_tbl r) INNER JOIN (pg_catalog.unnest() t) + Remote SQL: SELECT r1.ctid, CASE WHEN (r1.*)::text IS NOT NULL THEN ROW(r1.a, r1.b) END, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.bx) END FROM (public.base_tbl r1 INNER JOIN unnest('{(2,3),(-2,-3)}'::box[]) r2 (bx) ON (((r1.a = area(r2.bx))))) FOR UPDATE OF r1 + -> Hash Join + Output: r.ctid, r.*, t.* + Hash Cond: ((r.a)::double precision = area(t.bx)) + -> Foreign Scan on public.remote_tbl r + Output: r.ctid, r.*, r.a + Remote SQL: SELECT a, b, ctid FROM public.base_tbl FOR UPDATE + -> Hash + Output: t.*, t.bx + -> Function Scan on pg_catalog.unnest t + Output: t.*, t.bx + Function Call: unnest('{(2,3),(-2,-3)}'::box[]) +(18 rows) + +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + a | b +----+--- + 24 | 5 +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Update on public.remote_tbl r + Output: r.a, r.b + -> Foreign Update + Remote SQL: UPDATE public.base_tbl r1 SET b = 5 FROM ROWS FROM (unnest('{10,20}'::integer[]), unnest('{(2,3),(-2,-4);(1,2),(-2,-3)}'::box[])) r2 (l,bx) WHERE ((r1.a >= r2.l)) AND ((r1.a <= area(r2.bx))) RETURNING r1.a, r1.b +(4 rows) + +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + a | b +----+--- + 10 | 5 + 11 | 5 + 12 | 5 + 13 | 5 + 14 | 5 + 15 | 5 + 16 | 5 + 17 | 5 + 18 | 5 + 19 | 5 + 20 | 5 + 21 | 5 + 22 | 5 + 23 | 5 + 25 | 5 + 26 | 5 + 27 | 5 + 28 | 5 + 24 | 5 +(19 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Update on public.remote_tbl r + Output: r.a, r.b + Remote SQL: UPDATE public.base_tbl SET b = $2 WHERE ctid = $1 RETURNING a, b + -> Foreign Scan + Output: CASE WHEN (random() >= '0'::double precision) THEN 5 ELSE 0 END, r.ctid, r.*, t.* + Relations: (public.remote_tbl r) INNER JOIN (ROWS FROM(pg_catalog.unnest(), pg_catalog.unnest()) t) + Remote SQL: SELECT r1.ctid, CASE WHEN (r1.*)::text IS NOT NULL THEN ROW(r1.a, r1.b) END, CASE WHEN (r2.*)::text IS NOT NULL THEN ROW(r2.l,r2.bx) END FROM (public.base_tbl r1 INNER JOIN ROWS FROM (unnest('{10,20}'::integer[]), unnest('{(2,3),(-2,-4);(1,2),(-2,-3)}'::box[])) r2 (l,bx) ON (((r1.a >= r2.l)) AND ((r1.a <= area(r2.bx))))) FOR UPDATE OF r1 + -> Nested Loop + Output: r.ctid, r.*, t.* + Join Filter: ((r.a >= t.l) AND ((r.a)::double precision <= area(t.bx))) + -> Foreign Scan on public.remote_tbl r + Output: r.ctid, r.*, r.a + Remote SQL: SELECT a, b, ctid FROM public.base_tbl FOR UPDATE + -> Function Scan on t + Output: t.*, t.l, t.bx + Function Call: unnest('{10,20}'::integer[]), unnest('{(2,3),(-2,-4);(1,2),(-2,-3)}'::box[]) +(16 rows) + +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + a | b +----+--- + 10 | 5 + 11 | 5 + 12 | 5 + 13 | 5 + 14 | 5 + 15 | 5 + 16 | 5 + 17 | 5 + 18 | 5 + 19 | 5 + 20 | 5 + 21 | 5 + 22 | 5 + 23 | 5 + 25 | 5 + 26 | 5 + 27 | 5 + 28 | 5 + 24 | 5 +(19 rows) + +-- Test that local functions are not pushed down +CREATE OR REPLACE FUNCTION f(INTEGER) +RETURNS SETOF INTEGER +LANGUAGE sql AS 'select generate_series(1,$1);' IMMUTABLE; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r, f(10) n +WHERE r.a = n; + QUERY PLAN +------------------------------------------------------ + Hash Join + Output: r.a, r.b, (generate_series(1, 10)) + Hash Cond: (r.a = (generate_series(1, 10))) + -> Foreign Scan on public.remote_tbl r + Output: r.a, r.b + Remote SQL: SELECT a, b FROM public.base_tbl + -> Hash + Output: (generate_series(1, 10)) + -> ProjectSet + Output: generate_series(1, 10) + -> Result +(11 rows) + +SELECT * FROM remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a AND n > 3; + a | b | c | d | n +---+---+---+-------+--- + 4 | 8 | 4 | text4 | 4 +(1 row) + +-- Test that a function that returns a record is not pushed down +CREATE OR REPLACE FUNCTION f_ret_record() RETURNS record AS $$ SELECT (1,2)::record $$ language SQL IMMUTABLE; +ALTER EXTENSION postgres_fdw ADD function f_ret_record(); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT s FROM remote_tbl rt, f_ret_record() AS s(a int, b int) +WHERE s.a = rt.a; + QUERY PLAN +----------------------------------------------------------------------------- + Nested Loop + Output: s.* + -> Function Scan on public.f_ret_record s + Output: s.*, s.a + Function Call: f_ret_record() + -> Foreign Scan on public.remote_tbl rt + Output: rt.a, rt.b + Remote SQL: SELECT a FROM public.base_tbl WHERE ((a = $1::integer)) +(8 rows) + +SELECT s FROM remote_tbl rt, f_ret_record() AS s(a int, b int) +WHERE s.a = rt.a; + s +------- + (1,2) +(1 row) + +DROP FUNCTION f(INTEGER); +ALTER EXTENSION postgres_fdw DROP FUNCTION f_ret_record(); +DROP FUNCTION f_ret_record(); +-- Test that a function that returns composite type is not pushed down +CREATE TYPE c1 AS (i int, j int); +CREATE OR REPLACE FUNCTION f_ret_c1(int) RETURNS SETOF c1 AS $$ begin return next '(3,3)'::c1 ; end $$ language plpgsql immutable; +ALTER EXTENSION postgres_fdw ADD FUNCTION f_ret_c1(int); +ALTER EXTENSION postgres_fdw ADD TYPE c1; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT f FROM remote_tbl r, ROWS FROM (f_ret_c1(2), f_ret_c1(2)) AS f(i, j, k,l) WHERE f.i=r.a; + QUERY PLAN +----------------------------------------------------------------------------- + Merge Join + Output: f.* + Merge Cond: (r.a = f.i) + -> Foreign Scan on public.remote_tbl r + Output: r.a, r.b + Remote SQL: SELECT a FROM public.base_tbl ORDER BY a ASC NULLS LAST + -> Sort + Output: f.*, f.i + Sort Key: f.i + -> Function Scan on f + Output: f.*, f.i + Function Call: f_ret_c1(2), f_ret_c1(2) +(12 rows) + +SELECT f FROM remote_tbl r, ROWS FROM (f_ret_c1(2), f_ret_c1(2)) AS f(i, j, k,l) WHERE f.i=r.a; + f +----------- + (3,3,3,3) +(1 row) + +ALTER EXTENSION postgres_fdw DROP FUNCTION f_ret_c1(int); +ALTER EXTENSION postgres_fdw DROP TYPE c1; +DROP FUNCTION f_ret_c1(int); +DROP TYPE c1; +DELETE FROM base_tbl; +-- Test that function with parameters in arguments is not pushed down +INSERT INTO remote_tbl SELECT g, g*2 from generate_series(1,10) g; +ANALYZE base_tbl; +ANALYZE remote_tbl; +CREATE OR REPLACE FUNCTION f(int) RETURNS SETOF int +LANGUAGE plpgsql ROWS 10 AS 'BEGIN RETURN QUERY SELECT generate_series(1,$1) ; END' IMMUTABLE; +ALTER EXTENSION postgres_fdw ADD function f(INTEGER); +-- no foreign join +set enable_material = off; +-- Make local function scan not so attractive +ALTER SERVER loopback OPTIONS (ADD fdw_tuple_cost '1000'); +EXPLAIN (VERBOSE, COSTS OFF) +WITH s AS MATERIALIZED (SELECT r1.* FROM remote_tbl r1 +JOIN LATERAL +(SELECT r2.a FROM remote_tbl r2, f(r1.a) WHERE f=r2.a LIMIT 1) s +ON true) +SELECT * FROM s ORDER BY 1; + QUERY PLAN +----------------------------------------------------------------------- + Sort + Output: s.a, s.b + Sort Key: s.a + CTE s + -> Nested Loop + Output: r1.a, r1.b + -> Foreign Scan on public.remote_tbl r1 + Output: r1.a, r1.b + Remote SQL: SELECT a, b FROM public.base_tbl + -> Limit + Output: NULL::integer + -> Nested Loop + Output: NULL::integer + Join Filter: (r2.a = f.f) + -> Foreign Scan on public.remote_tbl r2 + Output: r2.a, r2.b + Remote SQL: SELECT a FROM public.base_tbl + -> Function Scan on public.f + Output: f.f + Function Call: f(r1.a) + -> CTE Scan on s + Output: s.a, s.b +(22 rows) + +WITH s AS MATERIALIZED (SELECT r1.* FROM remote_tbl r1 +JOIN LATERAL +(SELECT r2.a FROM remote_tbl r2, f(r1.a) WHERE f=r2.a LIMIT 1) s +ON true) +SELECT * FROM s ORDER BY 1; + a | b +----+---- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 + 6 | 12 + 7 | 14 + 8 | 16 + 9 | 18 + 10 | 20 +(10 rows) + +reset enable_material; +ALTER SERVER loopback OPTIONS (DROP fdw_tuple_cost); +ALTER EXTENSION postgres_fdw DROP function f(INTEGER); +ALTER EXTENSION postgres_fdw DROP FUNCTION get_constant_texts(); +DROP FUNCTION get_constant_texts(); +DROP TABLE base_tbl, base_tbl1; +DROP FOREIGN TABLE remote_tbl, remote_tbl1; +-- Test that function WITH ORDINALITY is not pushed down +CREATE TABLE base_tbl (a int, b int); +CREATE FOREIGN TABLE remote_tbl (a int, b int) SERVER loopback OPTIONS (table_name 'base_tbl'); +INSERT INTO remote_tbl VALUES (1, 2), (2, 3), (3, 4), (5, 6); +ANALYZE remote_tbl; +SET enable_mergejoin TO false; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl, unnest(ARRAY[1, 2]) WITH ORDINALITY +WHERE a=unnest; + QUERY PLAN +------------------------------------------------------------------------ + Hash Join + Output: remote_tbl.a, remote_tbl.b, unnest.unnest, unnest.ordinality + Hash Cond: (remote_tbl.a = unnest.unnest) + -> Foreign Scan on public.remote_tbl + Output: remote_tbl.a, remote_tbl.b + Remote SQL: SELECT a, b FROM public.base_tbl + -> Hash + Output: unnest.unnest, unnest.ordinality + -> Function Scan on pg_catalog.unnest + Output: unnest.unnest, unnest.ordinality + Function Call: unnest('{1,2}'::integer[]) +(11 rows) + +SELECT * FROM remote_tbl, unnest(ARRAY[1, 2]) WITH ORDINALITY +WHERE a=unnest; + a | b | unnest | ordinality +---+---+--------+------------ + 1 | 2 | 1 | 1 + 2 | 3 | 2 | 2 +(2 rows) + +DROP TABLE base_tbl; +DROP FOREIGN TABLE remote_tbl; +RESET enable_mergejoin; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index c42cb690c7b..ca08536f45e 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -40,6 +40,7 @@ #include "optimizer/prep.h" #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" +#include "optimizer/clauses.h" #include "parser/parsetree.h" #include "postgres_fdw.h" #include "statistics/statistics.h" @@ -52,6 +53,7 @@ #include "utils/rel.h" #include "utils/sampling.h" #include "utils/selfuncs.h" +#include "utils/typcache.h" PG_MODULE_MAGIC_EXT( .name = "postgres_fdw", @@ -88,6 +90,17 @@ enum FdwScanPrivateIndex * of join, added when the scan is join */ FdwScanPrivateRelations, + + /* + * List of function oid, return type and collation for each function per + * rte + */ + FdwScanPrivateFunctions, + + /* + * Minimum RT index, used in foreign scan + */ + FdwScanPrivateMinRTIndex }; /* @@ -124,6 +137,8 @@ enum FdwModifyPrivateIndex * 2) Boolean flag showing if the remote query has a RETURNING clause * 3) Integer list of attribute numbers retrieved by RETURNING, if any * 4) Boolean flag showing if we set the command es_processed + * 5) Data about used RTE_FUNCTIONS - their oid, return type and collation + * 6) Minimum RT index, used in foreign scan */ enum FdwDirectModifyPrivateIndex { @@ -135,6 +150,14 @@ enum FdwDirectModifyPrivateIndex FdwDirectModifyPrivateRetrievedAttrs, /* set-processed flag (as a Boolean node) */ FdwDirectModifyPrivateSetProcessed, + + /* + * List of function oid, return type and collation for each function per + * rte + */ + FdwDirectModifyPrivateFunctions, + /* Minimum RT index, used in foreign scan */ + FdwDirectModifyPrivateMinRTIndex }; /* @@ -593,6 +616,14 @@ static void postgresGetForeignJoinPaths(PlannerInfo *root, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra); + +static void postgresTryShippableJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra); + static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot); static void postgresGetForeignUpperPaths(PlannerInfo *root, @@ -760,6 +791,15 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static bool is_nonrel_relinfo_ok(PlannerInfo *root, RelOptInfo *foreignrel); +static void init_fpinfo(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid, + PgFdwRelationInfo *existing_fpinfo); +static bool contain_param_walker(Node *node, void *arg); +static List *get_functions_data(PlannerInfo *root, RelOptInfo *rel); +static int get_min_base_rti(PlannerInfo *root, RelOptInfo *rel); +static Relids get_base_relids(PlannerInfo *root, RelOptInfo *rel); /* * Foreign-data wrapper handler function: return a struct with pointers @@ -816,6 +856,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + routine->TryShippableJoinPaths = postgresTryShippableJoinPaths; /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; @@ -840,10 +881,32 @@ static void postgresGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) +{ + init_fpinfo(root, baserel, foreigntableid, NULL); +} + +/* + * init_fpinfo + * + * Either initialize fpinfo based on foreign table or generate one, based on + * existing fpinfo. + * Also estimate # of rows and width of the result of the scan. + * + * We should consider the effect of all baserestrictinfo clauses here, but + * not any join clauses. + */ +static void +init_fpinfo(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid, + PgFdwRelationInfo *existing_fpinfo) { PgFdwRelationInfo *fpinfo; ListCell *lc; + Assert(existing_fpinfo || foreigntableid != InvalidOid); + Assert(existing_fpinfo == NULL || foreigntableid == InvalidOid); + /* * We use PgFdwRelationInfo to pass various information to subsequent * functions. @@ -854,40 +917,64 @@ postgresGetForeignRelSize(PlannerInfo *root, /* Base foreign tables need to be pushed down always. */ fpinfo->pushdown_safe = true; - /* Look up foreign-table catalog info. */ - fpinfo->table = GetForeignTable(foreigntableid); - fpinfo->server = GetForeignServer(fpinfo->table->serverid); - - /* - * Extract user-settable option values. Note that per-table settings of - * use_remote_estimate, fetch_size and async_capable override per-server - * settings of them, respectively. - */ - fpinfo->use_remote_estimate = false; - fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; - fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; - fpinfo->shippable_extensions = NIL; - fpinfo->fetch_size = 100; - fpinfo->async_capable = false; - - apply_server_options(fpinfo); - apply_table_options(fpinfo); + if (existing_fpinfo) + { + /* We don't have any table, related to query */ + fpinfo->table = NULL; + fpinfo->server = existing_fpinfo->server; + } + else + { + /* Look up foreign-table catalog info. */ + fpinfo->table = GetForeignTable(foreigntableid); + fpinfo->server = GetForeignServer(fpinfo->table->serverid); + } - /* - * If the table or the server is configured to use remote estimates, - * identify which user to do remote access as during planning. This - * should match what ExecCheckPermissions() does. If we fail due to lack - * of permissions, the query would have failed at runtime anyway. - */ - if (fpinfo->use_remote_estimate) + if (existing_fpinfo) { - Oid userid; + merge_fdw_options(fpinfo, existing_fpinfo, NULL); + fpinfo->user = existing_fpinfo->user; - userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId(); - fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); + /* + * Don't try to execute anything on remote server for + * non-relation-based query + */ + fpinfo->use_remote_estimate = false; } else - fpinfo->user = NULL; + { + /* + * Extract user-settable option values. Note that per-table settings + * of use_remote_estimate, fetch_size and async_capable override + * per-server settings of them, respectively. + */ + fpinfo->use_remote_estimate = false; + fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; + fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; + fpinfo->shippable_extensions = NIL; + fpinfo->fetch_size = 100; + fpinfo->async_capable = false; + fpinfo->is_generated = false; + + apply_server_options(fpinfo); + apply_table_options(fpinfo); + + /* + * If the table or the server is configured to use remote estimates, + * identify which user to do remote access as during planning. This + * should match what ExecCheckPermissions() does. If we fail due to + * lack of permissions, the query would have failed at runtime anyway. + */ + if (fpinfo->use_remote_estimate) + { + Oid userid; + + userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId(); + fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); + } + else + fpinfo->user = NULL; + } /* * Identify which baserestrictinfo clauses can be sent to the remote @@ -1002,6 +1089,9 @@ postgresGetForeignRelSize(PlannerInfo *root, fpinfo->hidden_subquery_rels = NULL; /* Set the relation index. */ fpinfo->relation_index = baserel->relid; + if (existing_fpinfo) + /* Mark fpinfo generated */ + fpinfo->is_generated = true; } /* @@ -1223,6 +1313,112 @@ get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) return useful_pathkeys_list; } +/* + * get_functions_data + * Collect info about function return types for executor + */ +static List * +get_functions_data(PlannerInfo *root, RelOptInfo *rel) +{ + List *rtfuncdata = NIL; + int i; + Relids fscan_relids; + + fscan_relids = get_base_relids(root, rel); + + /* + * Preserve function oid and return type for executor. Each element of + * rtfuncdata is a List, corresponding to RangeTblEntry functions. Element + * of the list is a list of three-elements lists. The first one is + * function oid (for explain), the second and the third - are actual + * result types and collation oids. + */ + for (i = 0; i < root->simple_rel_array_size; i++) + { + RangeTblEntry *rte = root->simple_rte_array[i]; + + /* + * The place in rtfuncdata should match rtindex, so we add empty "0" + * member. Also avoid examining functions, which are not present in + * foreign scan. + */ + if (rte != NULL && i > 0 && bms_is_member(i, fscan_relids) && rte->rtekind == RTE_FUNCTION) + { + ListCell *lc; + List *funcdata = NIL; + + + foreach(lc, rte->functions) + { + RangeTblFunction *rtfunc; + Oid funccollation; + TupleDesc tupdesc; + Oid funcrettype; + Oid funcid = InvalidOid; + + rtfunc = (RangeTblFunction *) lfirst(lc); + + get_expr_result_type(rtfunc->funcexpr, &funcrettype, &tupdesc); + + /* Should never happen */ + if (!OidIsValid(funcrettype) || funcrettype == RECORDOID) + elog(ERROR, "could not determine return type for function"); + + funccollation = exprCollation(rtfunc->funcexpr); + + if (IsA(rtfunc->funcexpr, FuncExpr)) + funcid = ((FuncExpr *) rtfunc->funcexpr)->funcid; + + funcdata = lappend(funcdata, list_make3( + makeInteger(funcid), + makeInteger(funcrettype), + makeInteger(funccollation))); + } + + rtfuncdata = lappend(rtfuncdata, funcdata); + } + else + rtfuncdata = lappend(rtfuncdata, NULL); + } + + return rtfuncdata; +} + +/* + * get_base_relids + * Get base relids, used in foreign scan + */ +static Relids +get_base_relids(PlannerInfo *root, RelOptInfo *rel) +{ + Relids relids; + + if (rel->reloptkind == RELOPT_UPPER_REL) + relids = root->all_query_rels; + else + relids = rel->relids; + + relids = bms_difference(relids, root->outer_join_rels); + + return relids; +} + +/* + * get_min_base_rti + * Get minimum base RTI, used in foreign scan + * + * Actually, this copies logic from create_foreignscan_plan(). + * However, we need this data prior to create_foreignscan_plan() + * does its job. + */ +static int +get_min_base_rti(PlannerInfo *root, RelOptInfo *rel) +{ + Relids relids = get_base_relids(root, rel); + + return bms_next_member(relids, -1); +} + /* * postgresGetForeignPaths * Create possible scan paths for a scan on the foreign table @@ -1637,6 +1833,12 @@ postgresGetForeignPlan(PlannerInfo *root, if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name)); + else + fdw_private = lappend(fdw_private, NULL); + + fdw_private = lappend(fdw_private, get_functions_data(root, foreignrel)); + /* We can use this value to determine rtoffset. */ + fdw_private = lappend(fdw_private, makeInteger(get_min_base_rti(root, foreignrel))); /* * Create the ForeignScan node for the given relation. @@ -1659,7 +1861,7 @@ postgresGetForeignPlan(PlannerInfo *root, * Construct a tuple descriptor for the scan tuples handled by a foreign join. */ static TupleDesc -get_tupdesc_for_join_scan_tuples(ForeignScanState *node) +get_tupdesc_for_join_scan_tuples(ForeignScanState *node, List *rtfuncdata, int rtoffset) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; @@ -1695,13 +1897,65 @@ get_tupdesc_for_join_scan_tuples(ForeignScanState *node) if (!IsA(var, Var) || var->varattno != 0) continue; rte = list_nth(estate->es_range_table, var->varno - 1); - if (rte->rtekind != RTE_RELATION) - continue; - reltype = get_rel_type_id(rte->relid); - if (!OidIsValid(reltype)) - continue; - att->atttypid = reltype; - /* shouldn't need to change anything else */ + if (rte->rtekind == RTE_RELATION) + { + reltype = get_rel_type_id(rte->relid); + if (!OidIsValid(reltype)) + continue; + att->atttypid = reltype; + /* shouldn't need to change anything else */ + } + else if (rte->rtekind == RTE_FUNCTION) + { + /* Whole row var references function RTE. Determine its rowtype. */ + TupleDesc rte_tupdesc; + int num_funcs, + attnum; + ListCell *lc, + *lcname; + List *funcdata; + + funcdata = list_nth(rtfuncdata, var->varno - rtoffset); + + num_funcs = list_length(funcdata); + Assert(num_funcs >= 0); + + /* + * funcrettype != RECORD, so we have only one return attribute per + * function + */ + Assert(list_length(rte->eref->colnames) == num_funcs); + rte_tupdesc = CreateTemplateTupleDesc(num_funcs); + + attnum = 1; + forboth(lc, funcdata, lcname, rte->eref->colnames) + { + List *fdata; + char *colname; + Oid funcrettype; + Oid funcretcollation; + + fdata = lfirst_node(List, lc); + + colname = strVal(lfirst(lcname)); + + funcrettype = lsecond_node(Integer, fdata)->ival; + funcretcollation = lthird_node(Integer, fdata)->ival; + + /* Should never happen */ + if (!OidIsValid(funcrettype) || funcrettype == RECORDOID) + elog(ERROR, "could not determine return type for function"); + + TupleDescInitEntry(rte_tupdesc, (AttrNumber) attnum, colname, + funcrettype, -1, 0); + TupleDescInitEntryCollation(rte_tupdesc, (AttrNumber) attnum, + funcretcollation); + attnum++; + } + + assign_record_type_typmod(rte_tupdesc); + att->atttypmod = rte_tupdesc->tdtypmod; + } } return tupdesc; } @@ -1716,7 +1970,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; - RangeTblEntry *rte; + RangeTblEntry *rte = NULL; Oid userid; ForeignTable *table; UserMapping *user; @@ -1737,14 +1991,28 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) /* * Identify which user to do the remote access as. This should match what - * ExecCheckPermissions() does. + * ExecCheckPermissions() does. In case of a join or aggregate, scan RTEs + * until RTE_RELATION is found. We would get the same result from any. */ userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId(); if (fsplan->scan.scanrelid > 0) + { rtindex = fsplan->scan.scanrelid; + rte = exec_rt_fetch(rtindex, estate); + } else - rtindex = bms_next_member(fsplan->fs_base_relids, -1); - rte = exec_rt_fetch(rtindex, estate); + { + rtindex = -1; + while ((rtindex = bms_next_member(fsplan->fs_base_relids, rtindex)) >= 0) + { + rte = exec_rt_fetch(rtindex, estate); + if (rte && rte->rtekind == RTE_RELATION) + break; + } + /* Should never happen */ + if (rte == NULL || rte->rtekind != RTE_RELATION) + elog(ERROR, "can't find relation rte for foreign scan"); + } /* Get info about foreign table. */ table = GetForeignTable(rte->relid); @@ -1787,8 +2055,16 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) } else { + List *rtfuncdata = list_nth(fsplan->fdw_private, FdwScanPrivateFunctions); + Integer *minrti = list_nth(fsplan->fdw_private, FdwScanPrivateMinRTIndex); + int rtoffset = bms_next_member(fsplan->fs_base_relids, -1) - minrti->ival; + + /* Index sanity checks */ + Assert(minrti->ival > 0); + Assert(rtoffset >= 0); + fsstate->rel = NULL; - fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node); + fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node, rtfuncdata, rtoffset); } fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); @@ -2828,6 +3104,13 @@ postgresPlanDirectModify(PlannerInfo *root, makeBoolean((retrieved_attrs != NIL)), retrieved_attrs, makeBoolean(plan->canSetTag)); + fscan->fdw_private = lappend(fscan->fdw_private, get_functions_data(root, foreignrel)); + + /* + * We can use this value to determine rtoffset. It was already calculated + * while building foreign scan plan + */ + fscan->fdw_private = lappend(fscan->fdw_private, makeInteger(bms_next_member(fscan->fs_base_relids, -1))); /* * Update the foreign-join-related fields. @@ -2941,7 +3224,17 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) TupleDesc tupdesc; if (fsplan->scan.scanrelid == 0) - tupdesc = get_tupdesc_for_join_scan_tuples(node); + { + List *rtfuncdata = list_nth(fsplan->fdw_private, FdwDirectModifyPrivateFunctions); + Integer *minrti = list_nth(fsplan->fdw_private, FdwDirectModifyPrivateMinRTIndex); + int rtoffset = bms_next_member(fsplan->fs_base_relids, -1) - minrti->ival; + + /* Index sanity checks */ + Assert(minrti->ival > 0); + Assert(rtoffset >= 0); + + tupdesc = get_tupdesc_for_join_scan_tuples(node, rtfuncdata, rtoffset); + } else tupdesc = RelationGetDescr(dmstate->rel); @@ -3054,7 +3347,7 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) * We do that here, not when the plan is created, because we can't know * what aliases ruleutils.c will assign at plan creation time. */ - if (list_length(fdw_private) > FdwScanPrivateRelations) + if (list_nth(fdw_private, FdwScanPrivateRelations)) { StringInfoData relations; char *rawrelations; @@ -3098,31 +3391,89 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) { int rti = strtol(ptr, &ptr, 10); RangeTblEntry *rte; - char *relname; + char *relname = NULL; char *refname; rti += rtoffset; Assert(bms_is_member(rti, plan->fs_base_relids)); rte = rt_fetch(rti, es->rtable); - Assert(rte->rtekind == RTE_RELATION); + /* This logic should agree with explain.c's ExplainTargetRel */ - relname = get_rel_name(rte->relid); - if (es->verbose) + if (rte->rtekind == RTE_RELATION) { - char *namespace; - - namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid)); - appendStringInfo(&relations, "%s.%s", - quote_identifier(namespace), - quote_identifier(relname)); + /* Note: relname may be uninitialized. */ + relname = get_rel_name(rte->relid); + if (es->verbose) + { + char *namespace; + + namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid)); + appendStringInfo(&relations, "%s.%s", + quote_identifier(namespace), + quote_identifier(relname)); + } + else + appendStringInfoString(&relations, + quote_identifier(relname)); } - else - appendStringInfoString(&relations, - quote_identifier(relname)); + else if (rte->rtekind == RTE_FUNCTION) + { + ListCell *lc; + int n; + bool first = true; + List *rtfuncdata = list_nth(fdw_private, FdwScanPrivateFunctions); + List *funcdata; +#ifdef USE_ASSERT_CHECKING + Integer *fdw_private_min_rti = list_nth(fdw_private, FdwScanPrivateMinRTIndex); + + /* Check that fdw_private_min_rti calculation is correct. */ + Assert(fdw_private_min_rti->ival == minrti); +#endif + + funcdata = list_nth(rtfuncdata, rti - rtoffset); + + n = list_length(funcdata); + + if (n > 1) + appendStringInfo(&relations, "ROWS FROM("); + foreach(lc, funcdata) + { + List *funcinfo; + Oid funcid; + + funcinfo = (List *) lfirst(lc); + + if (!first) + appendStringInfoString(&relations, ", "); + else + first = false; + + funcid = linitial_node(Integer, funcinfo)->ival; + + if (OidIsValid(funcid)) + { + relname = get_func_name(funcid); + if (es->verbose) + { + char *namespace; + + namespace = get_namespace_name(get_func_namespace(funcid)); + appendStringInfo(&relations, "%s.%s()", + quote_identifier(namespace), + quote_identifier(relname)); + } + else + appendStringInfo(&relations, "%s()", quote_identifier(relname)); + } + } + if (n > 1) + appendStringInfo(&relations, ")"); + } + refname = (char *) list_nth(es->rtable_names, rti - 1); if (refname == NULL) refname = rte->eref->aliasname; - if (strcmp(refname, relname) != 0) + if (relname == NULL || strcmp(refname, relname) != 0) appendStringInfo(&relations, " %s", quote_identifier(refname)); } @@ -6580,6 +6931,69 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) return commands; } +/* + * Determine if foreignrel, not backed by foreign + * table, is fine to push down. + */ +static bool +is_nonrel_relinfo_ok(PlannerInfo *root, RelOptInfo *foreignrel) +{ + RangeTblEntry *rte; + RangeTblFunction *rtfunc; + TupleDesc tupdesc; + Oid funcrettype; + + rte = planner_rt_fetch(foreignrel->relid, root); + + if (!rte) + return false; + + Assert(foreignrel->fdw_private); + + if (rte->rtekind == RTE_FUNCTION) + { + ListCell *lc; + TypeFuncClass functypclass; + + /* + * WITH ORDINALITY pushdown is not implemented yet. + */ + if (rte->funcordinality) + return false; + + Assert(list_length(rte->functions) >= 1); + foreach(lc, rte->functions) + { + rtfunc = (RangeTblFunction *) lfirst(lc); + + functypclass = get_expr_result_type(rtfunc->funcexpr, &funcrettype, &tupdesc); + + /* + * Remote server requires a well defined return type for a + * function pushdown. Also get_tupdesc_for_join_scan_tuples() + * doesn't expect RTE_FUNCTION to return several attributes. + */ + if (functypclass != TYPEFUNC_SCALAR) + return false; + + if (!OidIsValid(funcrettype) || funcrettype == RECORDOID || funcrettype == VOIDOID) + return false; + + if (contain_var_clause(rtfunc->funcexpr) || + contain_mutable_functions(rtfunc->funcexpr) || + contain_subplans(rtfunc->funcexpr) || + contain_param_walker(rtfunc->funcexpr, NULL)) + return false; + if (!is_foreign_expr(root, foreignrel, (Expr *) rtfunc->funcexpr)) + return false; + } + + return true; + } + + return false; +} + /* * Check if reltarget is safe enough to push down semi-join. Reltarget is not * safe, if it contains references to inner rel relids, which do not belong to @@ -7328,6 +7742,43 @@ postgresGetForeignJoinPaths(PlannerInfo *root, /* XXX Consider parameterized paths for the join relation */ } +/* + * postgresTryShippableJoinPaths + * + * Try to add foreign join of foreign relation with shippable RTE. + */ +static void +postgresTryShippableJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra) +{ + PgFdwRelationInfo *fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private; + PgFdwRelationInfo *fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private; + + if (fpinfo_o == NULL) + /* Outer path is not foreign relation or foreign JOIN. */ + return; + + if (joinrel->fdwroutine != NULL || innerrel->reloptkind != RELOPT_BASEREL) + return; + + if (fpinfo_i == NULL || fpinfo_i->is_generated) + init_fpinfo(root, innerrel, InvalidOid, fpinfo_o); + + if (!is_nonrel_relinfo_ok(root, innerrel)) + return; + + joinrel->serverid = outerrel->serverid; + joinrel->userid = outerrel->userid; + joinrel->useridiscurrent = outerrel->useridiscurrent; + joinrel->fdwroutine = outerrel->fdwroutine; + + postgresGetForeignJoinPaths(root, joinrel, outerrel, innerrel, jointype, extra); +} + /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in @@ -8815,3 +9266,16 @@ get_batch_size_option(Relation rel) return batch_size; } + +/* + * Checks if expression contains parameters + */ +static bool +contain_param_walker(Node *node, void *arg) +{ + if (node == NULL) + return false; + if (IsA(node, Param)) + return true; + return expression_tree_walker(node, contain_param_walker, NULL); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index a2bb1ff352c..b4d3e2bfe10 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -52,6 +52,12 @@ typedef struct PgFdwRelationInfo /* True means that the query_pathkeys is safe to push down */ bool qp_is_pushdown_safe; + /* + * True means that PgFdwRelationInfo is not extracted from catalogs, but + * generated + */ + bool is_generated; + /* Cost and selectivity of local_conds. */ QualCost local_conds_cost; Selectivity local_conds_sel; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 267d3c1a7e7..5140f2027ca 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -4671,3 +4671,198 @@ RESET client_min_messages; DROP FUNCTION wait_for_backend_termination(int); DROP FOREIGN TABLE remote_backend_pid; DROP VIEW my_backend_pid; + +-- =================================================================== +-- test function scan pushdown +-- =================================================================== +CREATE TABLE base_tbl (a int, b int); +CREATE FOREIGN TABLE remote_tbl (a int, b int) + SERVER loopback OPTIONS (table_name 'base_tbl'); +ALTER FOREIGN TABLE remote_tbl OPTIONS (use_remote_estimate 'true'); +CREATE TABLE base_tbl1 (c int, d text); +CREATE FOREIGN TABLE remote_tbl1 (c int, d text) + SERVER loopback OPTIONS (table_name 'base_tbl1'); +ALTER FOREIGN TABLE remote_tbl1 OPTIONS (use_remote_estimate 'true'); + +INSERT INTO remote_tbl SELECT g, g*2 from generate_series(1,1000) g; +INSERT INTO remote_tbl1 SELECT g, 'text'|| g from generate_series(1,500) g; +ANALYZE base_tbl; +ANALYZE base_tbl1; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r, unnest(array[2,3,4]) n WHERE r.a = n; + +SELECT * FROM remote_tbl r, unnest(array[2,3,4]) n WHERE r.a = n +ORDER BY r.a; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM unnest(array[2,3,4]) n, remote_tbl r WHERE r.a = n; + +SELECT * FROM unnest(array[2,3,4]) n, remote_tbl r WHERE r.a = n +ORDER BY r.a; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a; + +SELECT * FROM remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a +ORDER BY r.a; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT r.*,n from remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a and n > 3; + +SELECT * from remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a and n > 3; + +CREATE OR REPLACE FUNCTION get_constant_texts() +RETURNS TABLE (text_value text) AS $$ +BEGIN + RETURN QUERY VALUES + ('text1'), + ('text4'); +END; +$$ LANGUAGE plpgsql IMMUTABLE; + +ALTER EXTENSION postgres_fdw ADD FUNCTION get_constant_texts(); +ALTER SERVER loopback OPTIONS (extensions 'postgres_fdw'); + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT r.*, t.n from remote_tbl1 r, ROWS FROM (unnest(array[3,4]), get_constant_texts()) t (n, txt) +WHERE r.c = t.n AND r.d = t.txt; + +SELECT r.*, t.txt from remote_tbl1 r, ROWS FROM (unnest(array[3,4]), get_constant_texts()) t (n, txt) +WHERE r.c = t.n AND r.d = t.txt; + +-- Complex types +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r JOIN UNNEST(array[box '((2,3),(-2,-3))']) as t(bx) ON a = area(bx); + +SELECT * FROM remote_tbl r JOIN UNNEST(array[box '((2,3),(-2,-3))']) as t(bx) ON a = area(bx) +ORDER BY r.a; + +-- DML +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[box '((2,3),(-2,-3))']) AS t (bx) WHERE r.a = area(t.bx) +RETURNING a,b; + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + +UPDATE remote_tbl r SET b=5 FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + +EXPLAIN (VERBOSE, COSTS OFF) +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + +UPDATE remote_tbl r SET b=CASE WHEN random()>=0 THEN 5 ELSE 0 END FROM UNNEST(array[10,20], array[box '((2,3),(-2,-4))', box '((1,2),(-2,-3))']) AS t (l, bx) WHERE r.a between l and area(t.bx) +RETURNING a,b; + +-- Test that local functions are not pushed down +CREATE OR REPLACE FUNCTION f(INTEGER) +RETURNS SETOF INTEGER +LANGUAGE sql AS 'select generate_series(1,$1);' IMMUTABLE; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl r, f(10) n +WHERE r.a = n; + +SELECT * FROM remote_tbl r, remote_tbl1 r1, unnest(array[3,4]) n +WHERE r.a = n AND r1.c = r.a AND n > 3; + +-- Test that a function that returns a record is not pushed down +CREATE OR REPLACE FUNCTION f_ret_record() RETURNS record AS $$ SELECT (1,2)::record $$ language SQL IMMUTABLE; +ALTER EXTENSION postgres_fdw ADD function f_ret_record(); + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT s FROM remote_tbl rt, f_ret_record() AS s(a int, b int) +WHERE s.a = rt.a; + +SELECT s FROM remote_tbl rt, f_ret_record() AS s(a int, b int) +WHERE s.a = rt.a; + +DROP FUNCTION f(INTEGER); +ALTER EXTENSION postgres_fdw DROP FUNCTION f_ret_record(); +DROP FUNCTION f_ret_record(); + +-- Test that a function that returns composite type is not pushed down +CREATE TYPE c1 AS (i int, j int); +CREATE OR REPLACE FUNCTION f_ret_c1(int) RETURNS SETOF c1 AS $$ begin return next '(3,3)'::c1 ; end $$ language plpgsql immutable; +ALTER EXTENSION postgres_fdw ADD FUNCTION f_ret_c1(int); +ALTER EXTENSION postgres_fdw ADD TYPE c1; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT f FROM remote_tbl r, ROWS FROM (f_ret_c1(2), f_ret_c1(2)) AS f(i, j, k,l) WHERE f.i=r.a; +SELECT f FROM remote_tbl r, ROWS FROM (f_ret_c1(2), f_ret_c1(2)) AS f(i, j, k,l) WHERE f.i=r.a; + +ALTER EXTENSION postgres_fdw DROP FUNCTION f_ret_c1(int); +ALTER EXTENSION postgres_fdw DROP TYPE c1; +DROP FUNCTION f_ret_c1(int); +DROP TYPE c1; + +DELETE FROM base_tbl; + +-- Test that function with parameters in arguments is not pushed down +INSERT INTO remote_tbl SELECT g, g*2 from generate_series(1,10) g; +ANALYZE base_tbl; +ANALYZE remote_tbl; + +CREATE OR REPLACE FUNCTION f(int) RETURNS SETOF int +LANGUAGE plpgsql ROWS 10 AS 'BEGIN RETURN QUERY SELECT generate_series(1,$1) ; END' IMMUTABLE; +ALTER EXTENSION postgres_fdw ADD function f(INTEGER); + +-- no foreign join +set enable_material = off; +-- Make local function scan not so attractive +ALTER SERVER loopback OPTIONS (ADD fdw_tuple_cost '1000'); +EXPLAIN (VERBOSE, COSTS OFF) +WITH s AS MATERIALIZED (SELECT r1.* FROM remote_tbl r1 +JOIN LATERAL +(SELECT r2.a FROM remote_tbl r2, f(r1.a) WHERE f=r2.a LIMIT 1) s +ON true) +SELECT * FROM s ORDER BY 1; +WITH s AS MATERIALIZED (SELECT r1.* FROM remote_tbl r1 +JOIN LATERAL +(SELECT r2.a FROM remote_tbl r2, f(r1.a) WHERE f=r2.a LIMIT 1) s +ON true) +SELECT * FROM s ORDER BY 1; +reset enable_material; +ALTER SERVER loopback OPTIONS (DROP fdw_tuple_cost); + +ALTER EXTENSION postgres_fdw DROP function f(INTEGER); + +ALTER EXTENSION postgres_fdw DROP FUNCTION get_constant_texts(); +DROP FUNCTION get_constant_texts(); +DROP TABLE base_tbl, base_tbl1; +DROP FOREIGN TABLE remote_tbl, remote_tbl1; + +-- Test that function WITH ORDINALITY is not pushed down +CREATE TABLE base_tbl (a int, b int); +CREATE FOREIGN TABLE remote_tbl (a int, b int) SERVER loopback OPTIONS (table_name 'base_tbl'); +INSERT INTO remote_tbl VALUES (1, 2), (2, 3), (3, 4), (5, 6); +ANALYZE remote_tbl; +SET enable_mergejoin TO false; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM remote_tbl, unnest(ARRAY[1, 2]) WITH ORDINALITY +WHERE a=unnest; + +SELECT * FROM remote_tbl, unnest(ARRAY[1, 2]) WITH ORDINALITY +WHERE a=unnest; + +DROP TABLE base_tbl; +DROP FOREIGN TABLE remote_tbl; +RESET enable_mergejoin; diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index 713283a73aa..5e5282dfda8 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -365,6 +365,17 @@ add_paths_to_joinrel(PlannerInfo *root, outerrel, innerrel, save_jointype, &extra); + /* + * If push down of join is not possible we can try to join foreign + * relation with shippable RTE. In this case we have a chance to push down + * this join yet. + */ + else if ((extra.pgs_mask & PGS_FOREIGNJOIN) != 0 && outerrel->fdwroutine && + outerrel->fdwroutine->TryShippableJoinPaths) + outerrel->fdwroutine->TryShippableJoinPaths(root, joinrel, + outerrel, innerrel, + save_jointype, &extra); + /* * 6. Finally, give extensions a chance to manipulate the path list. They * could add new paths (such as CustomPaths) by calling add_path(), or diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index abf59a0d8ad..81b74eaadd8 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -225,6 +225,7 @@ typedef struct FdwRoutine /* Functions for remote-join planning */ GetForeignJoinPaths_function GetForeignJoinPaths; + GetForeignJoinPaths_function TryShippableJoinPaths; /* Functions for remote upper-relation (post scan/join) planning */ GetForeignUpperPaths_function GetForeignUpperPaths; -- 2.43.0