5.1. Examining Plans #

Tuning query execution is better on a subset of production data that represents actual data distribution. Let's look at some sample plans.

EXPLAIN VERBOSE
SELECT bid,avg(abalance) FROM pgbench_accounts
WHERE bid IN (10,20,30,40)
GROUP BY bid;
                                                                QUERY PLAN                                                                 
-------------------------------------------------------------------------------------------------------------------------------------------
 Append  (cost=0.29..21.98 rows=4 width=36)
   ->  GroupAggregate  (cost=0.29..18.98 rows=1 width=36)
         Output: pgbench_accounts.bid, avg(pgbench_accounts.abalance)
         Group Key: pgbench_accounts.bid
         ->  Index Scan using pgbench_accounts_15_pkey on public.pgbench_accounts_15 pgbench_accounts  (cost=0.29..18.96 rows=1 width=8)
               Output: pgbench_accounts.bid, pgbench_accounts.abalance
               Index Cond: (pgbench_accounts.bid = ANY ('{10,20,30,40}'::integer[]))
   ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=36)
         Output: pgbench_accounts_1.bid, (avg(pgbench_accounts_1.abalance))
         Relations: Aggregate on (public.pgbench_accounts_16_fdw pgbench_accounts_1)
         Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_16 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
         Transport: Silk
   ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=36)
         Output: pgbench_accounts_2.bid, (avg(pgbench_accounts_2.abalance))
         Relations: Aggregate on (public.pgbench_accounts_17_fdw pgbench_accounts_2)
         Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_17 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
         Transport: Silk
   ->  Async Foreign Scan  (cost=1.00..1.00 rows=1 width=36)
         Output: pgbench_accounts_3.bid, (avg(pgbench_accounts_3.abalance))
         Relations: Aggregate on (public.pgbench_accounts_19_fdw pgbench_accounts_3)
         Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_19 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
         Transport: Silk
 Query Identifier: -1714706980364121548

We see here that queries scanning three partitions are going to be sent to other nodes, coordinator data is also going to be scanned using Index Scan. We do not know what plan will be used on the remote side, but we see which queries will be sent (marked with Remote SQL). Note that Transport: Silk section is present in the foreign scan description. This indicates that Silk transport will be used to transfer results. We see that Async foreign scan is going to be used, which is fine. To discover which servers are used in the query, we should look at foreign tables definitions. For example, we can find out that public.pgbench_accounts_19_fdw is located on the shardman_rg_2 server listening on 127.0.0.2:65432:

SELECT srvname,srvoptions FROM pg_foreign_server s JOIN pg_foreign_table ON ftserver = s.oid
WHERE ftrelid = 'public.pgbench_accounts_19_fdw'::regclass;
-[ RECORD 1 ]-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
srvname    | shardman_rg_2
srvoptions | {async_capable=on,batch_size=100,binary_format=on,connect_timeout=5,dbname=postgres,extended_features=on,fdw_tuple_cost=0.2,fetch_size=50000,host=127.0.0.2,port=65432,silk_port=8000,tcp_user_timeout=10000}

Now we can connect to shardman_rg_2 server and find out which plan is used for the local query which was shown by the above EXPLAIN:

EXPLAIN SELECT bid, avg(abalance)
FROM public.pgbench_accounts_19
WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1;

                                   QUERY PLAN                                    
---------------------------------------------------------------------------------
 HashAggregate  (cost=3641.00..3641.01 rows=1 width=36)
   Group Key: bid
   ->  Seq Scan on pgbench_accounts_19  (cost=0.00..3141.00 rows=100000 width=8)
         Filter: (bid = ANY ('{10,20,30,40}'::integer[]))

While looking at distributed query plans, we can see that sometimes aggregates are not pushed down:

EXPLAIN VERBOSE
SELECT avg(abalance) FROM pgbench_accounts;
                                                                                    QUERY PLAN                                                                                     
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=156209.38..156209.39 rows=1 width=32) (actual time=590.359..590.371 rows=1 loops=1)
   Output: avg(pgbench_accounts.abalance)
   ->  Append  (cost=2891.00..156209.33 rows=20 width=32) (actual time=56.815..590.341 rows=20 loops=1)
         ->  Partial Aggregate  (cost=2891.00..2891.01 rows=1 width=32) (actual time=56.812..56.813 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts.abalance)
               ->  Seq Scan on public.pgbench_accounts_0 pgbench_accounts  (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.018..38.478 rows=100000 loops=1)
                     Output: pgbench_accounts.abalance
         ->  Partial Aggregate  (cost=23991.00..23991.01 rows=1 width=32) (actual time=75.133..75.134 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts_1.abalance)
               ->  Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1  (cost=100.00..23741.00 rows=100000 width=4) (actual time=41.281..67.293 rows=100000 loops=1)
                     Output: pgbench_accounts_1.abalance
                     Remote SQL: SELECT abalance FROM public.pgbench_accounts_1
                     Transport: Silk
.....

Here avg() is calculated on the coordinator side. This can lead to a significant growth of data transfer between nodes. The actual data transfer can be monitored with the NETWORK parameter of EXPLAIN ANALYZE (look at the Network received field of the topmost plan node):

EXPLAIN (ANALYZE, VERBOSE, NETWORK)
SELECT avg(abalance) FROM pgbench_accounts
                                                                                     QUERY PLAN                                                                                     
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=156209.38..156209.39 rows=1 width=32) (actual time=589.014..589.027 rows=1 loops=1)
   Output: avg(pgbench_accounts.abalance)
   Network: FDW bytes sent=3218 received=14402396
   ->  Append  (cost=2891.00..156209.33 rows=20 width=32) (actual time=52.111..588.999 rows=20 loops=1)
         Network: FDW bytes sent=3218 received=14402396
         ->  Partial Aggregate  (cost=2891.00..2891.01 rows=1 width=32) (actual time=52.109..52.109 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts.abalance)
               ->  Seq Scan on public.pgbench_accounts_0 pgbench_accounts  (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.020..34.472 rows=100000 loops=1)
                     Output: pgbench_accounts.abalance
         ->  Partial Aggregate  (cost=23991.00..23991.01 rows=1 width=32) (actual time=78.616..78.617 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts_1.abalance)
               Network: FDW bytes sent=247 received=2400360
               ->  Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1  (cost=100.00..23741.00 rows=100000 width=4) (actual time=42.359..69.984 rows=100000 loops=1)
                     Output: pgbench_accounts_1.abalance
                     Remote SQL: SELECT abalance FROM public.pgbench_accounts_1
                     Transport: Silk
                     Network: FDW bytes sent=247 received=2400360
.....

In such cases, we sometimes can rewrite the query:

EXPLAIN  (ANALYZE, NETWORK, VERBOSE)
SELECT sum(abalance)::float/count(abalance) FROM pgbench_accounts where abalance is not null;

                                                                             QUERY PLAN                                                                             
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=12577.20..12577.22 rows=1 width=8) (actual time=151.632..151.639 rows=1 loops=1)
   Output: ((sum(pgbench_accounts.abalance))::double precision / (count(pgbench_accounts.abalance))::double precision)
   Network: FDW bytes sent=3907 received=872
   ->  Append  (cost=3141.00..12577.10 rows=20 width=16) (actual time=55.589..151.621 rows=20 loops=1)
         Network: FDW bytes sent=3907 received=872
         ->  Partial Aggregate  (cost=3141.00..3141.01 rows=1 width=16) (actual time=55.423..55.424 rows=1 loops=1)
               Output: PARTIAL sum(pgbench_accounts.abalance), PARTIAL count(pgbench_accounts.abalance)
               ->  Seq Scan on public.pgbench_accounts_0 pgbench_accounts  (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.023..37.212 rows=100000 loops=1)
                     Output: pgbench_accounts.abalance
                     Filter: (pgbench_accounts.abalance IS NOT NULL)
         ->  Async Foreign Scan  (cost=1.00..1.00 rows=1 width=16) (actual time=0.055..0.089 rows=1 loops=1)
               Output: (PARTIAL sum(pgbench_accounts_1.abalance)), (PARTIAL count(pgbench_accounts_1.abalance))
               Relations: Aggregate on (public.pgbench_accounts_1_fdw pgbench_accounts_1)
               Remote SQL: SELECT sum(abalance), count(abalance) FROM public.pgbench_accounts_1 WHERE ((abalance IS NOT NULL))
               Transport: Silk
               Network: FDW bytes sent=300 received=800
....

Rewriting the query here, we could decrease incoming network traffic generated by the query from 13 MB to 872 bytes.

Now let's look at two nearly identical joins.

EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b
JOIN pgbench_history h ON b.bid = h.bid
WHERE mtime > '2023-03-14 10:00:00'::timestamptz AND b.bbalance > 0;

                                                                         QUERY PLAN                                                                         
------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=8125.68..8125.69 rows=1 width=8) (actual time=27.464..27.543 rows=1 loops=1)
   ->  Append  (cost=3.85..8125.63 rows=20 width=8) (actual time=0.036..27.475 rows=20 loops=1)
         ->  Partial Aggregate  (cost=3.85..3.86 rows=1 width=8) (actual time=0.033..0.036 rows=1 loops=1)
               ->  Nested Loop  (cost=0.00..3.69 rows=67 width=0) (actual time=0.025..0.027 rows=0 loops=1)
                     Join Filter: (b.bid = h.bid)
                     ->  Seq Scan on pgbench_branches_0 b  (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.024 rows=0 loops=1)
                           Filter: (bbalance > 0)
                           Rows Removed by Filter: 1
                     ->  Seq Scan on pgbench_history_0 h  (cost=0.00..1.84 rows=67 width=4) (never executed)
                           Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
         ->  Partial Aggregate  (cost=222.65..222.66 rows=1 width=8) (actual time=3.969..3.973 rows=1 loops=1)
               ->  Nested Loop  (cost=200.00..222.43 rows=86 width=0) (actual time=3.736..3.920 rows=86 loops=1)
                     Join Filter: (b_1.bid = h_1.bid)
                     ->  Foreign Scan on pgbench_branches_1_fdw b_1  (cost=100.00..101.22 rows=1 width=4) (actual time=1.929..1.932 rows=1 loops=1)
                     ->  Foreign Scan on pgbench_history_1_fdw h_1  (cost=100.00..120.14 rows=86 width=4) (actual time=1.795..1.916 rows=86 loops=1)
                           Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
         ->  Partial Aggregate  (cost=864.54..864.55 rows=1 width=8) (actual time=1.780..1.786 rows=1 loops=1)
               ->  Hash Join  (cost=200.01..864.53 rows=5 width=0) (actual time=1.769..1.773 rows=0 loops=1)
                     Hash Cond: (h_2.bid = b_2.bid)
                     ->  Foreign Scan on pgbench_history_2_fdw h_2  (cost=100.00..760.81 rows=975 width=4) (never executed)
                           Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
                     ->  Hash  (cost=100.00..100.00 rows=1 width=4) (actual time=1.740..1.742 rows=0 loops=1)
                           Buckets: 1024  Batches: 1  Memory Usage: 8kB
                           ->  Foreign Scan on pgbench_branches_2_fdw b_2  (cost=100.00..100.00 rows=1 width=4) (actual time=1.738..1.738 rows=0 loops=1)
....
 Planning Time: 6.066 ms
 Execution Time: 33.851 ms

An interesting thing to note is that joining of pgbench_branches and pgbench_history partitions happens locally. It is a fetch-all plan — you can discover this by joins being located above foreign scans. It is not always evident why join pushdown does not happen. But if we look at the pgbench_history definition, we can see that mtime has the timestamp without time zone type.

\d pgbench_history
              Partitioned table "public.pgbench_history"
 Column |            Type             | Collation | Nullable | Default 
--------+-----------------------------+-----------+----------+---------
 tid    | integer                     |           |          | 
 bid    | integer                     |           |          | 
 aid    | integer                     |           |          | 
 delta  | integer                     |           |          | 
 mtime  | timestamp without time zone |           |          | 
 filler | character(22)               |           |          | 
Partition key: HASH (bid)
Number of partitions: 20 (Use \d+ to list them.)

And in the above query, the string describing time is converted to timestamp with timezone. This requires comparison of mtime column (of timestamp type) and timestamptz value. The comparison is implicitly performed using the stable function timestamp_gt_timestamptz. A filter containing a non-immutable function cannot be pushed down to the foreign server, so join is executed locally. If we rewrite the query, converting the string to a timestamp, we can see not only that joins are pushed down, but also that remote queries can be executed asynchronously because foreign scans in a plan tree are located immediately below Append:

EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b
JOIN pgbench_history h ON b.bid = h.bid
WHERE mtime > '2023-03-14 10:00:00'::timestamp AND b.bbalance > 0;
                                                                   QUERY PLAN                                                                   
------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=84.30..84.31 rows=1 width=8) (actual time=22.962..22.990 rows=1 loops=1)
   ->  Append  (cost=3.85..84.25 rows=20 width=8) (actual time=0.196..22.927 rows=20 loops=1)
         ->  Partial Aggregate  (cost=3.85..3.86 rows=1 width=8) (actual time=0.032..0.034 rows=1 loops=1)
               ->  Nested Loop  (cost=0.00..3.69 rows=67 width=0) (actual time=0.024..0.026 rows=0 loops=1)
                     Join Filter: (b.bid = h.bid)
                     ->  Seq Scan on pgbench_branches_0 b  (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.023 rows=0 loops=1)
                           Filter: (bbalance > 0)
                           Rows Removed by Filter: 1
                     ->  Seq Scan on pgbench_history_0 h  (cost=0.00..1.84 rows=67 width=4) (never executed)
                           Filter: (mtime > '2023-03-14 10:00:00'::timestamp without time zone)
         ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=8) (actual time=10.870..10.871 rows=1 loops=1)
               Relations: Aggregate on ((pgbench_branches_1_fdw b_1) INNER JOIN (pgbench_history_1_fdw h_1))
         ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=8) (actual time=0.016..0.017 rows=1 loops=1)
               Relations: Aggregate on ((pgbench_branches_2_fdw b_2) INNER JOIN (pgbench_history_2_fdw h_2))
...
 Planning Time: 7.729 ms
 Execution Time: 14.603 ms

Note that foreign scans here include a list of joined relations. The expected cost of a foreign join is below 1.0. This is due to an optimistic technique of foreign join cost estimation, turned on by the postgres_fdw.enforce_foreign_join setting. Compare the total execution time (planning time + execution time) of the original and modified query — we could decrease it from about 40 to 22 ms.

Overall, while examining query plans, pay attention to what queries are actually pushed down. Some of the common reasons why joins cannot be pushed down is the absence of equi-joins on the sharding key and filters that contain non-immutable functions (possibly implicitly). If data is fetched from multiple replication groups, check that execution is mostly asynchronous.

5.1.1. EXPLAIN Parameters #

This section lists Shardman-specific EXPLAIN parameters.

NETWORK (boolean) #

Include the actual data transfer between nodes in the EXPLAIN ANALYZE output. If this parameter is not specified, off is assumed. If the parameter is specified without a value, on is assumed.

REMOTE (boolean) #

Include plans for queries executed on foreign servers. If this parameter or its value is not specified, on is assumed.

pdf