contrib/postgres_fdw/expected/postgres_fdw.out | 51 ++++++++------------- contrib/postgres_fdw/postgres_fdw.c | 56 ++++++++++++++++++++++- src/backend/executor/nodeLimit.c | 62 ++------------------------ src/backend/executor/nodeMergeAppend.c | 6 ++- src/backend/executor/nodeResult.c | 25 +++++++++++ src/backend/executor/nodeSort.c | 10 +++++ src/include/nodes/execnodes.h | 3 ++ 7 files changed, 118 insertions(+), 95 deletions(-) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 0b9e3e4..08ead31 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -1003,18 +1003,15 @@ SELECT t1.c1, t2.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t -- join three tables EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c2, t3.c3 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) JOIN ft4 t3 ON (t3.c1 = t1.c1) ORDER BY t1.c3, t1.c1 OFFSET 10 LIMIT 10; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit Output: t1.c1, t2.c2, t3.c3, t1.c3 - -> Sort + -> Foreign Scan Output: t1.c1, t2.c2, t3.c3, t1.c3 - Sort Key: t1.c3, t1.c1 - -> Foreign Scan - Output: t1.c1, t2.c2, t3.c3, t1.c3 - Relations: ((public.ft1 t1) INNER JOIN (public.ft2 t2)) INNER JOIN (public.ft4 t3) - Remote SQL: SELECT r1."C 1", r1.c3, r2.c2, r4.c3 FROM (("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1."C 1" = r2."C 1")))) INNER JOIN "S 1"."T 3" r4 ON (((r1."C 1" = r4.c1)))) -(9 rows) + Relations: ((public.ft1 t1) INNER JOIN (public.ft2 t2)) INNER JOIN (public.ft4 t3) + Remote SQL: SELECT r1."C 1", r1.c3, r2.c2, r4.c3 FROM (("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1."C 1" = r2."C 1")))) INNER JOIN "S 1"."T 3" r4 ON (((r1."C 1" = r4.c1)))) ORDER BY r1.c3 ASC NULLS LAST, r1."C 1" ASC NULLS LAST +(6 rows) SELECT t1.c1, t2.c2, t3.c3 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) JOIN ft4 t3 ON (t3.c1 = t1.c1) ORDER BY t1.c3, t1.c1 OFFSET 10 LIMIT 10; c1 | c2 | c3 @@ -1477,18 +1474,15 @@ SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) RIGHT -- full outer join + WHERE clause, only matched rows EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c1 FROM ft4 t1 FULL JOIN ft5 t2 ON (t1.c1 = t2.c1) WHERE (t1.c1 = t2.c1 OR t1.c1 IS NULL) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; - QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit Output: t1.c1, t2.c1 - -> Sort + -> Foreign Scan Output: t1.c1, t2.c1 - Sort Key: t1.c1, t2.c1 - -> Foreign Scan - Output: t1.c1, t2.c1 - Relations: (public.ft4 t1) FULL JOIN (public.ft5 t2) - Remote SQL: SELECT r1.c1, r2.c1 FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) WHERE (((r1.c1 = r2.c1) OR (r1.c1 IS NULL))) -(9 rows) + Relations: (public.ft4 t1) FULL JOIN (public.ft5 t2) + Remote SQL: SELECT r1.c1, r2.c1 FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) WHERE (((r1.c1 = r2.c1) OR (r1.c1 IS NULL))) ORDER BY r1.c1 ASC NULLS LAST, r2.c1 ASC NULLS LAST +(6 rows) SELECT t1.c1, t2.c1 FROM ft4 t1 FULL JOIN ft5 t2 ON (t1.c1 = t2.c1) WHERE (t1.c1 = t2.c1 OR t1.c1 IS NULL) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; c1 | c1 @@ -1804,24 +1798,15 @@ SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2 -- CROSS JOIN, not pushed down EXPLAIN (VERBOSE, COSTS OFF) SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10; - QUERY PLAN ---------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit Output: t1.c1, t2.c1 - -> Sort + -> Foreign Scan Output: t1.c1, t2.c1 - Sort Key: t1.c1, t2.c1 - -> Nested Loop - Output: t1.c1, t2.c1 - -> Foreign Scan on public.ft1 t1 - Output: t1.c1 - Remote SQL: SELECT "C 1" FROM "S 1"."T 1" - -> Materialize - Output: t2.c1 - -> Foreign Scan on public.ft2 t2 - Output: t2.c1 - Remote SQL: SELECT "C 1" FROM "S 1"."T 1" -(15 rows) + Relations: (public.ft1 t1) INNER JOIN (public.ft2 t2) + Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) ORDER BY r1."C 1" ASC NULLS LAST, r2."C 1" ASC NULLS LAST +(6 rows) SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10; c1 | c1 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 5d270b9..e4cdbe5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2426,6 +2426,17 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) if (es->verbose) { sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + + /* LIMIT clause is attached on execution time */ + if (node->ss.ps.ps_numTuples > 0) + { + StringInfoData buf; + + initStringInfo(&buf); + appendStringInfo(&buf, "%s LIMIT %lu", + sql, node->ss.ps.ps_numTuples); + sql = buf.data; + } ExplainPropertyText("Remote SQL", sql, es); } } @@ -2491,6 +2502,7 @@ estimate_path_cost_size(PlannerInfo *root, Cost *p_startup_cost, Cost *p_total_cost) { PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + double limit_tuples = -1.0; double rows; double retrieved_rows; int width; @@ -2499,6 +2511,18 @@ estimate_path_cost_size(PlannerInfo *root, Cost cpu_per_tuple; /* + * LIMIT clause can be pushed down only when this foreign-path does not + * need to join any other base relations, and the supplied path-keys + * strictly match with the final output order. + * Elsewhere, pushdown of LIMIT clause makes incorrect results, thus, + * estimated cost don't assume remote LIMIT execution. + */ + if (root->limit_tuples >= 0.0 && + equal(root->query_pathkeys, pathkeys) && + bms_equal(root->all_baserels, foreignrel->relids)) + limit_tuples = root->limit_tuples; + + /* * If the table or the server is configured to use remote estimates, * connect to the foreign server and execute EXPLAIN to estimate the * number of rows selected by the restriction+join clauses. Otherwise, @@ -2551,6 +2575,12 @@ estimate_path_cost_size(PlannerInfo *root, deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, remote_conds, pathkeys, &retrieved_attrs, NULL); + /* + * Attach LIMIT if this path is top-level and constant value is + * specified. + */ + if (limit_tuples >= 0.0) + appendStringInfo(&sql, " LIMIT %.0f", limit_tuples); /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false); @@ -2625,6 +2655,12 @@ estimate_path_cost_size(PlannerInfo *root, /* Estimate of number of rows in cross product */ nrows = fpinfo_i->rows * fpinfo_o->rows; + if (limit_tuples >= 0.0) + { + nrows = Min(nrows, limit_tuples); + rows = Min(rows, limit_tuples); + } + /* Clamp retrieved rows estimate to at most size of cross product */ retrieved_rows = Min(retrieved_rows, nrows); @@ -2722,6 +2758,10 @@ estimate_path_cost_size(PlannerInfo *root, /* * Number of rows expected from foreign server will be same as * that of number of groups. + * + * MEMO: root->limit_tuples is not attached when query contains + * grouping-clause or aggregate functions. So, we don't adjust + * rows even if LIMIT is supplied. */ rows = retrieved_rows = numGroups; @@ -2752,8 +2792,16 @@ estimate_path_cost_size(PlannerInfo *root, } else { + double nrows = foreignrel->tuples; + + if (limit_tuples >= 0.0) + { + nrows = Min(nrows, limit_tuples); + rows = Min(rows, limit_tuples); + } + /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ - retrieved_rows = Min(retrieved_rows, foreignrel->tuples); + retrieved_rows = Min(retrieved_rows, nrows); /* * Cost as though this were a seqscan, which is pessimistic. We @@ -2766,7 +2814,7 @@ estimate_path_cost_size(PlannerInfo *root, startup_cost += foreignrel->baserestrictcost.startup; cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; - run_cost += cpu_per_tuple * foreignrel->tuples; + run_cost += cpu_per_tuple * nrows; } /* @@ -2940,6 +2988,10 @@ create_cursor(ForeignScanState *node) appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query); + /* Append LIMIT if numTuples were passed-down */ + if (node->ss.ps.ps_numTuples > 0) + appendStringInfo(&buf, " LIMIT %ld", node->ss.ps.ps_numTuples); + /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c index aaec132..3cbc97a 100644 --- a/src/backend/executor/nodeLimit.c +++ b/src/backend/executor/nodeLimit.c @@ -26,7 +26,6 @@ #include "nodes/nodeFuncs.h" static void recompute_limits(LimitState *node); -static void pass_down_bound(LimitState *node, PlanState *child_node); /* ---------------------------------------------------------------- @@ -232,6 +231,7 @@ static void recompute_limits(LimitState *node) { ExprContext *econtext = node->ps.ps_ExprContext; + uint64 tuples_needed; Datum val; bool isNull; @@ -293,64 +293,8 @@ recompute_limits(LimitState *node) /* Set state-machine state */ node->lstate = LIMIT_RESCAN; - /* Notify child node about limit, if useful */ - pass_down_bound(node, outerPlanState(node)); -} - -/* - * If we have a COUNT, and our input is a Sort node, notify it that it can - * use bounded sort. Also, if our input is a MergeAppend, we can apply the - * same bound to any Sorts that are direct children of the MergeAppend, - * since the MergeAppend surely need read no more than that many tuples from - * any one input. We also have to be prepared to look through a Result, - * since the planner might stick one atop MergeAppend for projection purposes. - * - * This is a bit of a kluge, but we don't have any more-abstract way of - * communicating between the two nodes; and it doesn't seem worth trying - * to invent one without some more examples of special communication needs. - * - * Note: it is the responsibility of nodeSort.c to react properly to - * changes of these parameters. If we ever do redesign this, it'd be a - * good idea to integrate this signaling with the parameter-change mechanism. - */ -static void -pass_down_bound(LimitState *node, PlanState *child_node) -{ - if (IsA(child_node, SortState)) - { - SortState *sortState = (SortState *) child_node; - int64 tuples_needed = node->count + node->offset; - - /* negative test checks for overflow in sum */ - if (node->noCount || tuples_needed < 0) - { - /* make sure flag gets reset if needed upon rescan */ - sortState->bounded = false; - } - else - { - sortState->bounded = true; - sortState->bound = tuples_needed; - } - } - else if (IsA(child_node, MergeAppendState)) - { - MergeAppendState *maState = (MergeAppendState *) child_node; - int i; - - for (i = 0; i < maState->ms_nplans; i++) - pass_down_bound(node, maState->mergeplans[i]); - } - else if (IsA(child_node, ResultState)) - { - /* - * If Result supported qual checking, we'd have to punt on seeing a - * qual. Note that having a resconstantqual is not a showstopper: if - * that fails we're not getting any rows at all. - */ - if (outerPlanState(child_node)) - pass_down_bound(node, outerPlanState(child_node)); - } + tuples_needed = (node->noCount ? 0 : node->count + node->offset); + outerPlanState(node)->ps_numTuples = tuples_needed; } /* ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index 7a20bf0..6e76e31 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -174,10 +174,14 @@ ExecMergeAppend(MergeAppendState *node) /* * First time through: pull the first tuple from each subplan, and set * up the heap. + * Also, pass down the required number of tuples */ for (i = 0; i < node->ms_nplans; i++) { - node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + PlanState *pstate = node->mergeplans[i]; + + pstate->ps_numTuples = node->ps.ps_numTuples; + node->ms_slots[i] = ExecProcNode(pstate); if (!TupIsNull(node->ms_slots[i])) binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); } diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c index b5b50b2..be94b42 100644 --- a/src/backend/executor/nodeResult.c +++ b/src/backend/executor/nodeResult.c @@ -47,6 +47,7 @@ #include "executor/executor.h" #include "executor/nodeResult.h" +#include "nodes/nodeFuncs.h" #include "utils/memutils.h" @@ -73,6 +74,28 @@ ExecResult(ResultState *node) econtext = node->ps.ps_ExprContext; /* + * Pass down the number of required tuples by the upper node + * + * An extra consideration here is that if the Result is projecting a + * targetlist that contains any SRFs, we can't assume that every input + * tuple generates an output tuple, so a Sort underneath might need to + * return more than N tuples to satisfy LIMIT N. So we cannot use + * bounded sort. + * + * If Result supported qual checking, we'd have to punt on seeing a + * qual, too. Note that having a resconstantqual is not a + * showstopper: if that fails we're not getting any rows at all. + */ + if (!node->rs_started) + { + if (outerPlanState(node) && + !expression_returns_set((Node *) node->ps.plan->targetlist)) + outerPlanState(node)->ps_numTuples = node->ps.ps_numTuples; + + node->rs_started = true; + } + + /* * check constant qualifications like (2 > 1), if not already done */ if (node->rs_checkqual) @@ -191,6 +214,7 @@ ExecInitResult(Result *node, EState *estate, int eflags) resstate->ps.plan = (Plan *) node; resstate->ps.state = estate; + resstate->rs_started = false; resstate->rs_done = false; resstate->rs_checkqual = (node->resconstantqual == NULL) ? false : true; @@ -265,6 +289,7 @@ ExecEndResult(ResultState *node) void ExecReScanResult(ResultState *node) { + node->rs_started = false; node->rs_done = false; node->rs_checkqual = (node->resconstantqual == NULL) ? false : true; diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 591a31a..b6b72f1 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -66,6 +66,16 @@ ExecSort(SortState *node) SO1_printf("ExecSort: %s\n", "sorting subplan"); + /* + * Check bounds according to the required number of tuples + */ + if (node->ss.ps.ps_numTuples == 0) + node->bounded = false; + else + { + node->bounded = true; + node->bound = node->ss.ps.ps_numTuples; + } /* * Want to scan subplan in the forward direction while creating the diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6332ea0..2e3c37a 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1077,6 +1077,8 @@ typedef struct PlanState TupleTableSlot *ps_ResultTupleSlot; /* slot for my result tuples */ ExprContext *ps_ExprContext; /* node's expression-evaluation context */ ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */ + uint64 ps_numTuples; /* number of tuples required by the upper + * node if any. 0 means all the tuples */ } PlanState; /* ---------------- @@ -1125,6 +1127,7 @@ typedef struct ResultState { PlanState ps; /* its first field is NodeTag */ ExprState *resconstantqual; + bool rs_started; /* are we already called? */ bool rs_done; /* are we done? */ bool rs_checkqual; /* do we need to check the qual? */ } ResultState;