5.1. Examining Plans #
Tuning query execution is better on a subset of production data that represents actual data distribution. Let's look at some sample plans.
EXPLAIN VERBOSE SELECT bid,avg(abalance) FROM pgbench_accounts WHERE bid IN (10,20,30,40) GROUP BY bid; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------- Append (cost=0.29..21.98 rows=4 width=36) -> GroupAggregate (cost=0.29..18.98 rows=1 width=36) Output: pgbench_accounts.bid, avg(pgbench_accounts.abalance) Group Key: pgbench_accounts.bid -> Index Scan using pgbench_accounts_15_pkey on public.pgbench_accounts_15 pgbench_accounts (cost=0.29..18.96 rows=1 width=8) Output: pgbench_accounts.bid, pgbench_accounts.abalance Index Cond: (pgbench_accounts.bid = ANY ('{10,20,30,40}'::integer[])) -> Async Foreign Scan (cost=0.99..0.99 rows=1 width=36) Output: pgbench_accounts_1.bid, (avg(pgbench_accounts_1.abalance)) Relations: Aggregate on (public.pgbench_accounts_16_fdw pgbench_accounts_1) Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_16 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1 Transport: Silk -> Async Foreign Scan (cost=0.99..0.99 rows=1 width=36) Output: pgbench_accounts_2.bid, (avg(pgbench_accounts_2.abalance)) Relations: Aggregate on (public.pgbench_accounts_17_fdw pgbench_accounts_2) Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_17 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1 Transport: Silk -> Async Foreign Scan (cost=1.00..1.00 rows=1 width=36) Output: pgbench_accounts_3.bid, (avg(pgbench_accounts_3.abalance)) Relations: Aggregate on (public.pgbench_accounts_19_fdw pgbench_accounts_3) Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_19 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1 Transport: Silk Query Identifier: -1714706980364121548
We see here that queries scanning three partitions are going to be sent to other nodes, coordinator data is also going to be scanned using Index Scan
. We do not know what plan will be used on the remote side, but we see which queries will be sent (marked with Remote SQL
). Note that Transport: Silk
section is present in the foreign scan description. This indicates that Silk transport will be used to transfer results. We see that Async
foreign scan is going to be used, which is fine. To discover which servers are used in the query, we should look at foreign tables definitions. For example, we can find out that public.pgbench_accounts_19_fdw
is located on the shardman_rg_2
server listening on 127.0.0.2:65432
:
SELECT srvname,srvoptions FROM pg_foreign_server s JOIN pg_foreign_table ON ftserver = s.oid WHERE ftrelid = 'public.pgbench_accounts_19_fdw'::regclass; -[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- srvname | shardman_rg_2 srvoptions | {async_capable=on,batch_size=100,binary_format=on,connect_timeout=5,dbname=postgres,extended_features=on,fdw_tuple_cost=0.2,fetch_size=50000,host=127.0.0.2,port=65432,silk_port=8000,tcp_user_timeout=10000}
Now we can connect to shardman_rg_2
server and find out which plan is used for the local query which was shown by the above EXPLAIN
:
EXPLAIN SELECT bid, avg(abalance) FROM public.pgbench_accounts_19 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1; QUERY PLAN --------------------------------------------------------------------------------- HashAggregate (cost=3641.00..3641.01 rows=1 width=36) Group Key: bid -> Seq Scan on pgbench_accounts_19 (cost=0.00..3141.00 rows=100000 width=8) Filter: (bid = ANY ('{10,20,30,40}'::integer[]))
While looking at distributed query plans, we can see that sometimes aggregates are not pushed down:
EXPLAIN VERBOSE SELECT avg(abalance) FROM pgbench_accounts; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ Finalize Aggregate (cost=156209.38..156209.39 rows=1 width=32) (actual time=590.359..590.371 rows=1 loops=1) Output: avg(pgbench_accounts.abalance) -> Append (cost=2891.00..156209.33 rows=20 width=32) (actual time=56.815..590.341 rows=20 loops=1) -> Partial Aggregate (cost=2891.00..2891.01 rows=1 width=32) (actual time=56.812..56.813 rows=1 loops=1) Output: PARTIAL avg(pgbench_accounts.abalance) -> Seq Scan on public.pgbench_accounts_0 pgbench_accounts (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.018..38.478 rows=100000 loops=1) Output: pgbench_accounts.abalance -> Partial Aggregate (cost=23991.00..23991.01 rows=1 width=32) (actual time=75.133..75.134 rows=1 loops=1) Output: PARTIAL avg(pgbench_accounts_1.abalance) -> Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1 (cost=100.00..23741.00 rows=100000 width=4) (actual time=41.281..67.293 rows=100000 loops=1) Output: pgbench_accounts_1.abalance Remote SQL: SELECT abalance FROM public.pgbench_accounts_1 Transport: Silk .....
Here avg()
is calculated on the coordinator side. This can lead to a significant growth of data transfer between nodes. The actual data transfer can be monitored with the NETWORK
parameter of EXPLAIN ANALYZE
(look at the Network received
field of the topmost plan node):
EXPLAIN (ANALYZE, VERBOSE, NETWORK) SELECT avg(abalance) FROM pgbench_accounts QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ Finalize Aggregate (cost=156209.38..156209.39 rows=1 width=32) (actual time=589.014..589.027 rows=1 loops=1) Output: avg(pgbench_accounts.abalance) Network: FDW bytes sent=3218 received=14402396 -> Append (cost=2891.00..156209.33 rows=20 width=32) (actual time=52.111..588.999 rows=20 loops=1) Network: FDW bytes sent=3218 received=14402396 -> Partial Aggregate (cost=2891.00..2891.01 rows=1 width=32) (actual time=52.109..52.109 rows=1 loops=1) Output: PARTIAL avg(pgbench_accounts.abalance) -> Seq Scan on public.pgbench_accounts_0 pgbench_accounts (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.020..34.472 rows=100000 loops=1) Output: pgbench_accounts.abalance -> Partial Aggregate (cost=23991.00..23991.01 rows=1 width=32) (actual time=78.616..78.617 rows=1 loops=1) Output: PARTIAL avg(pgbench_accounts_1.abalance) Network: FDW bytes sent=247 received=2400360 -> Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1 (cost=100.00..23741.00 rows=100000 width=4) (actual time=42.359..69.984 rows=100000 loops=1) Output: pgbench_accounts_1.abalance Remote SQL: SELECT abalance FROM public.pgbench_accounts_1 Transport: Silk Network: FDW bytes sent=247 received=2400360 .....
In such cases, we sometimes can rewrite the query:
EXPLAIN (ANALYZE, NETWORK, VERBOSE) SELECT sum(abalance)::float/count(abalance) FROM pgbench_accounts where abalance is not null; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=12577.20..12577.22 rows=1 width=8) (actual time=151.632..151.639 rows=1 loops=1) Output: ((sum(pgbench_accounts.abalance))::double precision / (count(pgbench_accounts.abalance))::double precision) Network: FDW bytes sent=3907 received=872 -> Append (cost=3141.00..12577.10 rows=20 width=16) (actual time=55.589..151.621 rows=20 loops=1) Network: FDW bytes sent=3907 received=872 -> Partial Aggregate (cost=3141.00..3141.01 rows=1 width=16) (actual time=55.423..55.424 rows=1 loops=1) Output: PARTIAL sum(pgbench_accounts.abalance), PARTIAL count(pgbench_accounts.abalance) -> Seq Scan on public.pgbench_accounts_0 pgbench_accounts (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.023..37.212 rows=100000 loops=1) Output: pgbench_accounts.abalance Filter: (pgbench_accounts.abalance IS NOT NULL) -> Async Foreign Scan (cost=1.00..1.00 rows=1 width=16) (actual time=0.055..0.089 rows=1 loops=1) Output: (PARTIAL sum(pgbench_accounts_1.abalance)), (PARTIAL count(pgbench_accounts_1.abalance)) Relations: Aggregate on (public.pgbench_accounts_1_fdw pgbench_accounts_1) Remote SQL: SELECT sum(abalance), count(abalance) FROM public.pgbench_accounts_1 WHERE ((abalance IS NOT NULL)) Transport: Silk Network: FDW bytes sent=300 received=800 ....
Rewriting the query here, we could decrease incoming network traffic generated by the query from 13 MB to 872 bytes.
Now let's look at two nearly identical joins.
EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b JOIN pgbench_history h ON b.bid = h.bid WHERE mtime > '2023-03-14 10:00:00'::timestamptz AND b.bbalance > 0; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------ Finalize Aggregate (cost=8125.68..8125.69 rows=1 width=8) (actual time=27.464..27.543 rows=1 loops=1) -> Append (cost=3.85..8125.63 rows=20 width=8) (actual time=0.036..27.475 rows=20 loops=1) -> Partial Aggregate (cost=3.85..3.86 rows=1 width=8) (actual time=0.033..0.036 rows=1 loops=1) -> Nested Loop (cost=0.00..3.69 rows=67 width=0) (actual time=0.025..0.027 rows=0 loops=1) Join Filter: (b.bid = h.bid) -> Seq Scan on pgbench_branches_0 b (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.024 rows=0 loops=1) Filter: (bbalance > 0) Rows Removed by Filter: 1 -> Seq Scan on pgbench_history_0 h (cost=0.00..1.84 rows=67 width=4) (never executed) Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone) -> Partial Aggregate (cost=222.65..222.66 rows=1 width=8) (actual time=3.969..3.973 rows=1 loops=1) -> Nested Loop (cost=200.00..222.43 rows=86 width=0) (actual time=3.736..3.920 rows=86 loops=1) Join Filter: (b_1.bid = h_1.bid) -> Foreign Scan on pgbench_branches_1_fdw b_1 (cost=100.00..101.22 rows=1 width=4) (actual time=1.929..1.932 rows=1 loops=1) -> Foreign Scan on pgbench_history_1_fdw h_1 (cost=100.00..120.14 rows=86 width=4) (actual time=1.795..1.916 rows=86 loops=1) Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone) -> Partial Aggregate (cost=864.54..864.55 rows=1 width=8) (actual time=1.780..1.786 rows=1 loops=1) -> Hash Join (cost=200.01..864.53 rows=5 width=0) (actual time=1.769..1.773 rows=0 loops=1) Hash Cond: (h_2.bid = b_2.bid) -> Foreign Scan on pgbench_history_2_fdw h_2 (cost=100.00..760.81 rows=975 width=4) (never executed) Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone) -> Hash (cost=100.00..100.00 rows=1 width=4) (actual time=1.740..1.742 rows=0 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 8kB -> Foreign Scan on pgbench_branches_2_fdw b_2 (cost=100.00..100.00 rows=1 width=4) (actual time=1.738..1.738 rows=0 loops=1) .... Planning Time: 6.066 ms Execution Time: 33.851 ms
An interesting thing to note is that joining of pgbench_branches
and pgbench_history
partitions happens locally. It is a fetch-all plan — you can discover this by joins being located above foreign scans. It is not always evident why join pushdown does not happen. But if we look at the pgbench_history
definition, we can see that mtime
has the timestamp without time zone
type.
\d pgbench_history Partitioned table "public.pgbench_history" Column | Type | Collation | Nullable | Default --------+-----------------------------+-----------+----------+--------- tid | integer | | | bid | integer | | | aid | integer | | | delta | integer | | | mtime | timestamp without time zone | | | filler | character(22) | | | Partition key: HASH (bid) Number of partitions: 20 (Use \d+ to list them.)
And in the above query, the string describing time is converted to timestamp with timezone
. This requires comparison of mtime
column (of timestamp
type) and timestamptz
value. The comparison is implicitly performed using the stable function timestamp_gt_timestamptz
. A filter containing a non-immutable function cannot be pushed down to the foreign server, so join is executed locally. If we rewrite the query, converting the string to a timestamp, we can see not only that joins are pushed down, but also that remote queries can be executed asynchronously because foreign scans in a plan tree are located immediately below Append
:
EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b JOIN pgbench_history h ON b.bid = h.bid WHERE mtime > '2023-03-14 10:00:00'::timestamp AND b.bbalance > 0; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------ Finalize Aggregate (cost=84.30..84.31 rows=1 width=8) (actual time=22.962..22.990 rows=1 loops=1) -> Append (cost=3.85..84.25 rows=20 width=8) (actual time=0.196..22.927 rows=20 loops=1) -> Partial Aggregate (cost=3.85..3.86 rows=1 width=8) (actual time=0.032..0.034 rows=1 loops=1) -> Nested Loop (cost=0.00..3.69 rows=67 width=0) (actual time=0.024..0.026 rows=0 loops=1) Join Filter: (b.bid = h.bid) -> Seq Scan on pgbench_branches_0 b (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.023 rows=0 loops=1) Filter: (bbalance > 0) Rows Removed by Filter: 1 -> Seq Scan on pgbench_history_0 h (cost=0.00..1.84 rows=67 width=4) (never executed) Filter: (mtime > '2023-03-14 10:00:00'::timestamp without time zone) -> Async Foreign Scan (cost=0.99..0.99 rows=1 width=8) (actual time=10.870..10.871 rows=1 loops=1) Relations: Aggregate on ((pgbench_branches_1_fdw b_1) INNER JOIN (pgbench_history_1_fdw h_1)) -> Async Foreign Scan (cost=0.99..0.99 rows=1 width=8) (actual time=0.016..0.017 rows=1 loops=1) Relations: Aggregate on ((pgbench_branches_2_fdw b_2) INNER JOIN (pgbench_history_2_fdw h_2)) ... Planning Time: 7.729 ms Execution Time: 14.603 ms
Note that foreign scans here include a list of joined relations. The expected cost of a foreign join is below 1.0. This is due to an optimistic technique of foreign join cost estimation, turned on by the postgres_fdw.enforce_foreign_join
setting. Compare the total execution time (planning time + execution time) of the original and modified query — we could decrease it from about 40 to 22 ms.
Overall, while examining query plans, pay attention to what queries are actually pushed down. Some of the common reasons why joins cannot be pushed down is the absence of equi-joins on the sharding key and filters that contain non-immutable functions (possibly implicitly). If data is fetched from multiple replication groups, check that execution is mostly asynchronous.
5.1.1. EXPLAIN
Parameters #
This section lists Shardman-specific EXPLAIN
parameters.
-
NETWORK
(boolean
) # Include the actual data transfer between nodes in the
EXPLAIN ANALYZE
output. If this parameter is not specified,off
is assumed. If the parameter is specified without a value,on
is assumed.
-
REMOTE
(boolean
) # Include plans for queries executed on foreign servers. If this parameter or its value is not specified,
on
is assumed.