7.2. Query Processing #
Shardman uses the standard PostgreSQL query execution pipeline. Other nodes in the cluster are accessed via the modified postgres_fdw extension.
Shardman query planner takes the query abstract syntax tree (AST) and creates a query plan, which is used by the executor. While evaluating query execution methods, the planner operates with so-called paths, which specify how relations should be accessed. While processing a query join tree, the planner looks at different combinations of how relations can be joined. Each time it examines a join of two relations, one of which can be a join relation itself. After choosing the order and strategies for joining relations the planner considers the group by, order by and limit operations. When the cheapest path is selected, it is transformed to a query plan. A plan consists of a tree of nodes, each of which has methods to get one next result row (or NULL if there are no more results).
7.2.1. Push-down Technique #
7.2.1.1. Joins #
The efficiency of query execution in a distributed DBMS is determined by how many operations can be executed on nodes that hold the actual data. For Shardman, a lot of effort is devoted to pushing down join operations. When the planner finds a relation that is accessible via a foreign data wrapper (FDW), it creates ForeignPath
to access it. Later, when it examines a join of two relations and both of them are available via ForeignPath
from the same foreign server, it can consider pushing down this join to the server and generating a so-called ForeignJoinPath
. The planner can fail to do it if the join type is not supported, if filters attached to the relation should be applied locally, or if the relation scan result contains fields that cannot be evaluated on the remote server. An example of a currently unsupported join type is anti-join. Local filters attached to the relation should be applied locally when remote execution can lead to a different result or if the postgres_fdw module cannot create SQL expressions to apply some of the filters. An example of fields that cannot be evaluated on a remote server are attributes of semi-join inner relation that are not accessible via an outer relation. If the foreign_join_fast_path configuration parameter is set to on (which is the default value), the Shardman planner stops searching for other join strategies of two relations once it finds a foreign join possible for them. When the postgres_fdw.enforce_foreign_join configuration parameter is set to on (which is also the default), the cost of a foreign join is estimated so as to be always less than the cost of a local join.
When several sharded tables are joined on a sharding key, a partitionwise join can be possible. This means that instead of joining original tables, we can join their matching partitions. Partitionwise join currently applies only when the join conditions include all the partition keys, which must be of the same data type and have exactly matching sets of child partitions. Partitionwise join is crucial to the efficient query execution as it allows pushing down joins of table partitions. Evidently, to push down a join of several partitions, these partitions should reside on the same node. This is usually the case when sharded tables are created with the same num_parts
parameter. However, for a rebalance process to move the corresponding partitions to the same nodes, sharded tables should be marked as colocated when created (see Section 7.1.1). Partitionwise join is enabled with the enable_partitionwise_join
configuration parameter, which is turned on by default in Shardman.
When a sharded table is joined to a plain global table, asymmetric partitionwise join is possible. This means that instead of joining original tables, we can join each partition of the sharded table with the global table. This makes it possible to push down a join of sharded table partitions - with a global table to the foreign server.
7.2.1.2. Aggregations #
After planning joins, the planner considers paths for post-join operations, such as aggregations, limiting, sorting and grouping. Not all such operations reach FDW pushdown logic. For example, currently partitioning efficiently prevents the LIMIT
clause from being pushed down. There are two efficient strategies for executing aggregates on remote nodes. The first one is a partitionwise aggregation — when a GROUP BY
clause includes a partitioning key, the aggregate can be pushed down together with the GROUP BY
clause (this behavior is controlled by the enable_partitionwise_aggregate
configuration parameter, which is turned on by default in Shardman). Alternatively, the planner can decide to execute partial aggregation on each partition of a sharded table and then combine the results. In Shardman, such a partial aggregate can be pushed down if the partial aggregate efficiently matches the main aggregate. For example, partial sum()
aggregate can always be pushed down, but avg()
cannot. Also the planner refuses pushing down partial aggregates if they contain additional clauses, such as ORDER BY
or DISTINCT
, or if the statement has the HAVING
clause.
7.2.1.3. Subqueries #
Generally, subqueries cannot be pushed down to other cluster nodes. However, Shardman uses two approaches to alleviate this limitation.
The first is subquery unnesting. In PostgreSQL, non-correlated subqueries can be transformed into semi-joins. In the following example, ANY
subquery on non-partitioned tables is transformed to Hash Semi Join
:
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_branches WHERE bid = ANY (SELECT bid FROM pgbench_tellers); QUERY PLAN ----------------------------------------------------------- Hash Semi Join Hash Cond: (pgbench_branches.bid = pgbench_tellers.bid) -> Seq Scan on pgbench_branches -> Hash -> Seq Scan on pgbench_tellers
When optimize_correlated_subqueries is on (which is the default), Shardman planner also tries to convert correlated subqueries (i.e., subqueries that reference upper-level relations) into semi-joins. This optimization works for IN
and =
operators. The transformation has some restrictions. For example, it is not considered if a subquery contains aggregates or references upper-level relations from outside of a WHERE
clause. This optimization allows transforming more complex subqueries into semi-joins, like in the following example:
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_branches WHERE bid = ANY (SELECT bid FROM pgbench_tellers WHERE tbalance = bbalance); QUERY PLAN ------------------------------------------------------------------------------------------------------------------------ Hash Semi Join Hash Cond: ((pgbench_branches.bid = pgbench_tellers.bid) AND (pgbench_branches.bbalance = pgbench_tellers.tbalance)) -> Seq Scan on pgbench_branches -> Hash -> Seq Scan on pgbench_tellers (5 rows)
After applying subquery unnesting, semi-join can be pushed down for execution to a remote node.
The second approach is to push down the entire subquery. This is possible when the optimizer has already figured out that the subquery references only partitions from the same foreign server as the upper-level query and corresponding foreign scans do not have local conditions. The optimization is controlled by postgres_fdw.subplan_pushdown (which is off by default). When a decision to push down a subquery is made by postgres_fdw, it has to deparse this subquery. A subquery that contains plan nodes for which deparsing is not implemented will not be pushed down. An example of a subquery pushdown looks as follows:
EXPLAIN (VERBOSE ON, COSTS OFF) SELECT * FROM pgbench_accounts a WHERE a.bid=90 AND abalance = (SELECT min(tbalance) FROM pgbench_tellers t WHERE t.bid=90 and a.bid=t.bid); QUERY PLAN -------------------------------------------------------------------------------------- Foreign Scan on public.pgbench_accounts_5_fdw a Output: a.aid, a.bid, a.abalance, a.filler Remote SQL: SELECT aid, bid, abalance, filler FROM public.pgbench_accounts_5 r2 WHERE ((r2.bid = 90)) AND ((r2.abalance = ((SELECT min(sp0_2.tbalance) FROM public.pgbench_tellers_5 sp0_2 WHERE ((sp0_2.bid = 90)) AND ((r2.bid = 90)))))) Transport: Silk SubPlan 1 -> Finalize Aggregate Output: min(t.tbalance) -> Foreign Scan Output: (PARTIAL min(t.tbalance)) Relations: Aggregate on (public.pgbench_tellers_5_fdw t) Remote SQL: SELECT min(tbalance) FROM public.pgbench_tellers_5 WHERE ((bid = 90)) AND (($1::integer = 90)) Transport: Silk
Note that in the plan above there are no references to SubPlan 1
.
7.2.2. Asynchronous Execution #
When a sharded table is queried, the Shardman planner creates Append
plans to scan all partitions of the table and combine the result. When some of partitions are foreign tables, the planner can decide to use an asynchronous execution. This means that when an Append
node for the first time after initialization is asked for the tuples, it asks asynchronous child nodes to start fetching the result. For postgres_fdw async ForeignScan
nodes, it means that a remote cursor is declared and a fetch request is sent to the remote server. If Silk transport is used, this means that the query is sent for execution to the remote server as an MT_SPI
message.
After sending a request to the remote servers, Append
returns to fetching data from synchronous child nodes — local scan nodes or synchronous ForeignScan
nodes. Data from such nodes is fetched in a blocking manner. When Append
ends getting data from synchronous nodes, it looks if async nodes have some data. If they do not, it waits for async nodes to produce results.
Shardman can execute several types of plans asynchronously. These are asynchronous ForeignScans
, projections and trivial subquery scans (select * from subquery
) over asynchronous plans.
The asynchronous execution is turned on by default on the level of a foreign server. This is controlled by async_capable
postgres_fdw option. For now, only Append
plans support asynchronous execution. MergeAppend
does not support asynchronous execution.
While examining query plans, pay attention to the presence of non-asynchronous ForeignScan
nodes in the plan. Asynchronous execution can significantly increase query execution time.
Examples:
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_accounts; QUERY PLAN ------------------------------------------------------------------------- Append -> Seq Scan on pgbench_accounts_0 pgbench_accounts_1 -> Async Foreign Scan on pgbench_accounts_1_fdw pgbench_accounts_2 -> Async Foreign Scan on pgbench_accounts_2_fdw pgbench_accounts_3 -> Seq Scan on pgbench_accounts_3 pgbench_accounts_4 -> Async Foreign Scan on pgbench_accounts_4_fdw pgbench_accounts_5 -> Async Foreign Scan on pgbench_accounts_5_fdw pgbench_accounts_6 -> Seq Scan on pgbench_accounts_6 pgbench_accounts_7 -> Async Foreign Scan on pgbench_accounts_7_fdw pgbench_accounts_8 -> Async Foreign Scan on pgbench_accounts_8_fdw pgbench_accounts_9 -> Seq Scan on pgbench_accounts_9 pgbench_accounts_10 -> Async Foreign Scan on pgbench_accounts_10_fdw pgbench_accounts_11 -> Async Foreign Scan on pgbench_accounts_11_fdw pgbench_accounts_12 -> Seq Scan on pgbench_accounts_12 pgbench_accounts_13 -> Async Foreign Scan on pgbench_accounts_13_fdw pgbench_accounts_14 -> Async Foreign Scan on pgbench_accounts_14_fdw pgbench_accounts_15 -> Seq Scan on pgbench_accounts_15 pgbench_accounts_16 -> Async Foreign Scan on pgbench_accounts_16_fdw pgbench_accounts_17 -> Async Foreign Scan on pgbench_accounts_17_fdw pgbench_accounts_18 -> Seq Scan on pgbench_accounts_18 pgbench_accounts_19 -> Async Foreign Scan on pgbench_accounts_19_fdw pgbench_accounts_20
Here we see a typical asynchronous plan. There are asynchronous foreign scans and local sequential scans, which are executed synchronously.
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_accounts ORDER BY aid; QUERY PLAN ------------------------------------------------------------------- Merge Append Sort Key: pgbench_accounts.aid -> Sort Sort Key: pgbench_accounts_1.aid -> Seq Scan on pgbench_accounts_0 pgbench_accounts_1 -> Foreign Scan on pgbench_accounts_1_fdw pgbench_accounts_2 -> Foreign Scan on pgbench_accounts_2_fdw pgbench_accounts_3 -> Sort Sort Key: pgbench_accounts_4.aid -> Seq Scan on pgbench_accounts_3 pgbench_accounts_4 -> Foreign Scan on pgbench_accounts_4_fdw pgbench_accounts_5 -> Foreign Scan on pgbench_accounts_5_fdw pgbench_accounts_6 -> Sort Sort Key: pgbench_accounts_7.aid -> Seq Scan on pgbench_accounts_6 pgbench_accounts_7 -> Foreign Scan on pgbench_accounts_7_fdw pgbench_accounts_8 -> Foreign Scan on pgbench_accounts_8_fdw pgbench_accounts_9 -> Sort Sort Key: pgbench_accounts_10.aid -> Seq Scan on pgbench_accounts_9 pgbench_accounts_10 -> Foreign Scan on pgbench_accounts_10_fdw pgbench_accounts_11 -> Foreign Scan on pgbench_accounts_11_fdw pgbench_accounts_12 -> Sort Sort Key: pgbench_accounts_13.aid -> Seq Scan on pgbench_accounts_12 pgbench_accounts_13 -> Foreign Scan on pgbench_accounts_13_fdw pgbench_accounts_14 -> Foreign Scan on pgbench_accounts_14_fdw pgbench_accounts_15 -> Sort Sort Key: pgbench_accounts_16.aid -> Seq Scan on pgbench_accounts_15 pgbench_accounts_16 -> Foreign Scan on pgbench_accounts_16_fdw pgbench_accounts_17 -> Foreign Scan on pgbench_accounts_17_fdw pgbench_accounts_18 -> Sort Sort Key: pgbench_accounts_19.aid -> Seq Scan on pgbench_accounts_18 pgbench_accounts_19 -> Foreign Scan on pgbench_accounts_19_fdw pgbench_accounts_20
Here merge append
is used, and so the execution cannot be asynchronous.
7.2.3. Fetch-all Fallback #
There are a lot of cases when operations on data cannot be executed remotely (for example, when some non-immutable function is used in filters, when several sharded tables are joined by an attribute that is not a sharding key, when pushdown of a particular join type is not supported) or when the planner considers local execution to be cheaper. In such cases different operations (selection, joins or aggregations) are not pushed down, but executed locally. This can lead to inefficient query execution due to large inter-cluster traffic and high processing cost on a coordinator. When this happens, you should check if an optimizer has fresh statistics, consider rewriting the query to benefit from different forms of pushdown or at least check that the suggested query plan is reasonable enough. To make DBMS analyze data for the whole cluster, you can use shardman.global_analyze function.