14.6. Postgres Pro Shardman Performance Tuning #

Below is a list of performance tuning tips for the distributed systems.

14.6.1. Examining Plans #

Tuning query execution is better on a subset of production data that represents actual data distribution. Let's look at some sample plans.

EXPLAIN VERBOSE
SELECT bid,avg(abalance) FROM pgbench_accounts
WHERE bid IN (10,20,30,40)
GROUP BY bid;
                                                                QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------
 Append  (cost=0.29..21.98 rows=4 width=36)
   ->  GroupAggregate  (cost=0.29..18.98 rows=1 width=36)
         Output: pgbench_accounts.bid, avg(pgbench_accounts.abalance)
         Group Key: pgbench_accounts.bid
         ->  Index Scan using pgbench_accounts_15_pkey on public.pgbench_accounts_15 pgbench_accounts  (cost=0.29..18.96 rows=1 width=8)
               Output: pgbench_accounts.bid, pgbench_accounts.abalance
               Index Cond: (pgbench_accounts.bid = ANY ('{10,20,30,40}'::integer[]))
   ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=36)
         Output: pgbench_accounts_1.bid, (avg(pgbench_accounts_1.abalance))
         Relations: Aggregate on (public.pgbench_accounts_16_fdw pgbench_accounts_1)
         Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_16 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
         Transport: Silk
   ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=36)
         Output: pgbench_accounts_2.bid, (avg(pgbench_accounts_2.abalance))
         Relations: Aggregate on (public.pgbench_accounts_17_fdw pgbench_accounts_2)
         Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_17 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
         Transport: Silk
   ->  Async Foreign Scan  (cost=1.00..1.00 rows=1 width=36)
         Output: pgbench_accounts_3.bid, (avg(pgbench_accounts_3.abalance))
         Relations: Aggregate on (public.pgbench_accounts_19_fdw pgbench_accounts_3)
         Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_19 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
         Transport: Silk
 Query Identifier: -1714706980364121548

We see here that queries scanning three partitions are going to be sent to other nodes, coordinator data is also going to be scanned using Index Scan. We do not know what plan will be used on the remote side, but we see which queries will be sent (marked with Remote SQL). Note that Transport: Silk section is present in the foreign scan description. This indicates that Silk transport will be used to transfer results. We see that Async foreign scan is going to be used, which is fine. To discover which servers are used in the query, we should look at foreign tables definitions. For example, we can find out that public.pgbench_accounts_19_fdw is located on the shardman_rg_2 server listening on 127.0.0.2:65432:

SELECT srvname,srvoptions FROM pg_foreign_server s JOIN pg_foreign_table ON ftserver = s.oid
WHERE ftrelid = 'public.pgbench_accounts_19_fdw'::regclass;
-[ RECORD 1 ]-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
srvname    | shardman_rg_2
srvoptions | {async_capable=on,batch_size=100,binary_format=on,connect_timeout=5,dbname=postgres,extended_features=on,fdw_tuple_cost=0.2,fetch_size=50000,host=127.0.0.2,port=65432,silk_port=8000,tcp_user_timeout=10000}

Now we can connect to shardman_rg_2 server and find out which plan is used for the local query which was shown by the above EXPLAIN:

EXPLAIN SELECT bid, avg(abalance)
FROM public.pgbench_accounts_19
WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1;

                                   QUERY PLAN                                    
---------------------------------------------------------------------------------
 HashAggregate  (cost=3641.00..3641.01 rows=1 width=36)
   Group Key: bid
   ->  Seq Scan on pgbench_accounts_19  (cost=0.00..3141.00 rows=100000 width=8)
         Filter: (bid = ANY ('{10,20,30,40}'::integer[]))

While looking at distributed query plans, we can see that sometimes aggregates are not pushed down:

EXPLAIN VERBOSE
SELECT avg(abalance) FROM pgbench_accounts;
                                                                                    QUERY PLAN                                                                                     
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=156209.38..156209.39 rows=1 width=32) (actual time=590.359..590.371 rows=1 loops=1)
   Output: avg(pgbench_accounts.abalance)
   ->  Append  (cost=2891.00..156209.33 rows=20 width=32) (actual time=56.815..590.341 rows=20 loops=1)
         ->  Partial Aggregate  (cost=2891.00..2891.01 rows=1 width=32) (actual time=56.812..56.813 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts.abalance)
               ->  Seq Scan on public.pgbench_accounts_0 pgbench_accounts  (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.018..38.478 rows=100000 loops=1)
                     Output: pgbench_accounts.abalance
         ->  Partial Aggregate  (cost=23991.00..23991.01 rows=1 width=32) (actual time=75.133..75.134 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts_1.abalance)
               ->  Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1  (cost=100.00..23741.00 rows=100000 width=4) (actual time=41.281..67.293 rows=100000 loops=1)
                     Output: pgbench_accounts_1.abalance
                     Remote SQL: SELECT abalance FROM public.pgbench_accounts_1
                     Transport: Silk
.....

Here avg() is calculated on the coordinator side. This can lead to a significant growth of data transfer between nodes. The actual data transfer can be monitored with the NETWORK parameter of EXPLAIN ANALYZE (look at the Network received field of the topmost plan node):

EXPLAIN (ANALYZE, VERBOSE, NETWORK)
SELECT avg(abalance) FROM pgbench_accounts
                                                                                     QUERY PLAN                                                                                     
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=156209.38..156209.39 rows=1 width=32) (actual time=589.014..589.027 rows=1 loops=1)
   Output: avg(pgbench_accounts.abalance)
   Network: FDW bytes sent=3218 received=14402396
   ->  Append  (cost=2891.00..156209.33 rows=20 width=32) (actual time=52.111..588.999 rows=20 loops=1)
         Network: FDW bytes sent=3218 received=14402396
         ->  Partial Aggregate  (cost=2891.00..2891.01 rows=1 width=32) (actual time=52.109..52.109 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts.abalance)
               ->  Seq Scan on public.pgbench_accounts_0 pgbench_accounts  (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.020..34.472 rows=100000 loops=1)
                     Output: pgbench_accounts.abalance
         ->  Partial Aggregate  (cost=23991.00..23991.01 rows=1 width=32) (actual time=78.616..78.617 rows=1 loops=1)
               Output: PARTIAL avg(pgbench_accounts_1.abalance)
               Network: FDW bytes sent=247 received=2400360
               ->  Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1  (cost=100.00..23741.00 rows=100000 width=4) (actual time=42.359..69.984 rows=100000 loops=1)
                     Output: pgbench_accounts_1.abalance
                     Remote SQL: SELECT abalance FROM public.pgbench_accounts_1
                     Transport: Silk
                     Network: FDW bytes sent=247 received=2400360
.....

In such cases, we sometimes can rewrite the query:

EXPLAIN  (ANALYZE, NETWORK, VERBOSE)
SELECT sum(abalance)::float/count(abalance) FROM pgbench_accounts where abalance is not null;

                                                                             QUERY PLAN                                                                             
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=12577.20..12577.22 rows=1 width=8) (actual time=151.632..151.639 rows=1 loops=1)
   Output: ((sum(pgbench_accounts.abalance))::double precision / (count(pgbench_accounts.abalance))::double precision)
   Network: FDW bytes sent=3907 received=872
   ->  Append  (cost=3141.00..12577.10 rows=20 width=16) (actual time=55.589..151.621 rows=20 loops=1)
         Network: FDW bytes sent=3907 received=872
         ->  Partial Aggregate  (cost=3141.00..3141.01 rows=1 width=16) (actual time=55.423..55.424 rows=1 loops=1)
               Output: PARTIAL sum(pgbench_accounts.abalance), PARTIAL count(pgbench_accounts.abalance)
               ->  Seq Scan on public.pgbench_accounts_0 pgbench_accounts  (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.023..37.212 rows=100000 loops=1)
                     Output: pgbench_accounts.abalance
                     Filter: (pgbench_accounts.abalance IS NOT NULL)
         ->  Async Foreign Scan  (cost=1.00..1.00 rows=1 width=16) (actual time=0.055..0.089 rows=1 loops=1)
               Output: (PARTIAL sum(pgbench_accounts_1.abalance)), (PARTIAL count(pgbench_accounts_1.abalance))
               Relations: Aggregate on (public.pgbench_accounts_1_fdw pgbench_accounts_1)
               Remote SQL: SELECT sum(abalance), count(abalance) FROM public.pgbench_accounts_1 WHERE ((abalance IS NOT NULL))
               Transport: Silk
               Network: FDW bytes sent=300 received=800
....

Rewriting the query here, we could decrease incoming network traffic generated by the query from 13 MB to 872 bytes.

Now let's look at two nearly identical joins.

EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b
JOIN pgbench_history h ON b.bid = h.bid
WHERE mtime > '2023-03-14 10:00:00'::timestamptz AND b.bbalance > 0;

                                                                         QUERY PLAN                                                                         
------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=8125.68..8125.69 rows=1 width=8) (actual time=27.464..27.543 rows=1 loops=1)
   ->  Append  (cost=3.85..8125.63 rows=20 width=8) (actual time=0.036..27.475 rows=20 loops=1)
         ->  Partial Aggregate  (cost=3.85..3.86 rows=1 width=8) (actual time=0.033..0.036 rows=1 loops=1)
               ->  Nested Loop  (cost=0.00..3.69 rows=67 width=0) (actual time=0.025..0.027 rows=0 loops=1)
                     Join Filter: (b.bid = h.bid)
                     ->  Seq Scan on pgbench_branches_0 b  (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.024 rows=0 loops=1)
                           Filter: (bbalance > 0)
                           Rows Removed by Filter: 1
                     ->  Seq Scan on pgbench_history_0 h  (cost=0.00..1.84 rows=67 width=4) (never executed)
                           Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
         ->  Partial Aggregate  (cost=222.65..222.66 rows=1 width=8) (actual time=3.969..3.973 rows=1 loops=1)
               ->  Nested Loop  (cost=200.00..222.43 rows=86 width=0) (actual time=3.736..3.920 rows=86 loops=1)
                     Join Filter: (b_1.bid = h_1.bid)
                     ->  Foreign Scan on pgbench_branches_1_fdw b_1  (cost=100.00..101.22 rows=1 width=4) (actual time=1.929..1.932 rows=1 loops=1)
                     ->  Foreign Scan on pgbench_history_1_fdw h_1  (cost=100.00..120.14 rows=86 width=4) (actual time=1.795..1.916 rows=86 loops=1)
                           Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
         ->  Partial Aggregate  (cost=864.54..864.55 rows=1 width=8) (actual time=1.780..1.786 rows=1 loops=1)
               ->  Hash Join  (cost=200.01..864.53 rows=5 width=0) (actual time=1.769..1.773 rows=0 loops=1)
                     Hash Cond: (h_2.bid = b_2.bid)
                     ->  Foreign Scan on pgbench_history_2_fdw h_2  (cost=100.00..760.81 rows=975 width=4) (never executed)
                           Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
                     ->  Hash  (cost=100.00..100.00 rows=1 width=4) (actual time=1.740..1.742 rows=0 loops=1)
                           Buckets: 1024  Batches: 1  Memory Usage: 8kB
                           ->  Foreign Scan on pgbench_branches_2_fdw b_2  (cost=100.00..100.00 rows=1 width=4) (actual time=1.738..1.738 rows=0 loops=1)
....
 Planning Time: 6.066 ms
 Execution Time: 33.851 ms

An interesting thing to note is that joining of pgbench_branches and pgbench_history partitions happens locally. It is a fetch-all plan — you can discover this by joins being located above foreign scans. It is not always evident why join pushdown does not happen. But if we look at the pgbench_history definition, we can see that mtime has the timestamp without time zone type.

\d pgbench_history
              Partitioned table "public.pgbench_history"
 Column |            Type             | Collation | Nullable | Default 
--------+-----------------------------+-----------+----------+---------
 tid    | integer                     |           |          | 
 bid    | integer                     |           |          | 
 aid    | integer                     |           |          | 
 delta  | integer                     |           |          | 
 mtime  | timestamp without time zone |           |          | 
 filler | character(22)               |           |          | 
Partition key: HASH (bid)
Number of partitions: 20 (Use \d+ to list them.)

And in the above query, the string describing time is converted to timestamp with timezone. This requires comparison of mtime column (of timestamp type) and timestamptz value. The comparison is implicitly performed using the stable function timestamp_gt_timestamptz. A filter containing a non-immutable function cannot be pushed down to the foreign server, so join is executed locally. If we rewrite the query, converting the string to a timestamp, we can see not only that joins are pushed down, but also that remote queries can be executed asynchronously because foreign scans in a plan tree are located immediately below Append:

EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b
JOIN pgbench_history h ON b.bid = h.bid
WHERE mtime > '2023-03-14 10:00:00'::timestamp AND b.bbalance > 0;
                                                                   QUERY PLAN                                                                   
------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=84.30..84.31 rows=1 width=8) (actual time=22.962..22.990 rows=1 loops=1)
   ->  Append  (cost=3.85..84.25 rows=20 width=8) (actual time=0.196..22.927 rows=20 loops=1)
         ->  Partial Aggregate  (cost=3.85..3.86 rows=1 width=8) (actual time=0.032..0.034 rows=1 loops=1)
               ->  Nested Loop  (cost=0.00..3.69 rows=67 width=0) (actual time=0.024..0.026 rows=0 loops=1)
                     Join Filter: (b.bid = h.bid)
                     ->  Seq Scan on pgbench_branches_0 b  (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.023 rows=0 loops=1)
                           Filter: (bbalance > 0)
                           Rows Removed by Filter: 1
                     ->  Seq Scan on pgbench_history_0 h  (cost=0.00..1.84 rows=67 width=4) (never executed)
                           Filter: (mtime > '2023-03-14 10:00:00'::timestamp without time zone)
         ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=8) (actual time=10.870..10.871 rows=1 loops=1)
               Relations: Aggregate on ((pgbench_branches_1_fdw b_1) INNER JOIN (pgbench_history_1_fdw h_1))
         ->  Async Foreign Scan  (cost=0.99..0.99 rows=1 width=8) (actual time=0.016..0.017 rows=1 loops=1)
               Relations: Aggregate on ((pgbench_branches_2_fdw b_2) INNER JOIN (pgbench_history_2_fdw h_2))
...
 Planning Time: 7.729 ms
 Execution Time: 14.603 ms

Note that foreign scans here include a list of joined relations. The expected cost of a foreign join is below 1.0. This is due to an optimistic technique of foreign join cost estimation, turned on by the postgres_fdw.enforce_foreign_join setting. Compare the total execution time (planning time + execution time) of the original and modified query — we could decrease it from about 40 to 22 ms.

Overall, while examining query plans, pay attention to what queries are actually pushed down. Some of the common reasons why joins cannot be pushed down is the absence of equi-joins on the sharding key and filters that contain non-immutable functions (possibly implicitly). If data is fetched from multiple replication groups, check that execution is mostly asynchronous.

14.6.1.1. EXPLAIN Parameters #

This section lists EXPLAIN parameters for a distributed system.

NETWORK (boolean) #

Include the actual data transfer between nodes in the EXPLAIN ANALYZE output. If this parameter is not specified, off is assumed. If the parameter is specified without a value, on is assumed.

REMOTE (boolean) #

Include plans for queries executed on foreign servers. If this parameter or its value is not specified, on is assumed.

14.6.2. DML Optimizations #

While evaluating performance of DML statements, it is important to understand how they are processed in Postgres Pro Shardman.

First of all, the execution of INSERT significantly differs from the execution of UPDATE and DELETE statements. The behavior of INSERT for sharded tables is controlled by the batch_size foreign server option, which can be set in FDWOptions section of Postgres Pro Shardman configuration file. If batch_size is greater than 0, an INSERT in the same statement of several values that fall into the same foreign partition leads to the values being grouped together in batches of the specified size. Remote INSERT statements are prepared with the necessary number of parameters and then are executed with the given values. If the number of values does not match the number of prepared arguments, the modified statement with the necessary number of parameters is prepared again. A batch insert optimization can fail if a transaction inserts records one by one or records routed to different foreign tables are intermixed in one INSERT statement. A batch is formed for a single foreign modify operation. It is sent to the remote server when the batch is filled or when the modify operation is over. The modify operation is over when we start routing tuples to another sharded table partition. So, for bulk load, inserting multiple values in a single INSERT command or using COPY is recommended (as COPY is optimized in a similar way). Large batch_size values allow issuing less INSERT statements on remote side and so significantly reduce communication costs. However, during construction of parameters for prepared INSERT statements, all inserted values should be copied to libpq-allocated memory. This can lead to unrestricted memory usage on the query coordinator side when several large text or bytea objects are loaded.

UPDATE and DELETE statements can be executed in a direct or indirect mode. A direct mode is used when a statement can be directly sent to a foreign server. In this mode, to modify a table on a remote server, a new statement is created based on the original ModifyTable plan node. Using a direct update is not always possible. In particular, it is impossible when some conditions should be evaluated locally. In this case, a much less efficient indirect modification is used. An indirect modification includes several statements. The first one is SELECT FOR UPDATE to lock remote rows. The second one is an actual UPDATE or DELETE, which is prepared once and then executed with different parameters for each row of the SELECT FOR UPDATE statement result after local filters are applied to the result. Evidently, direct modifications are much more efficient.

You can easily identify whether a DML statement is going to be executed in a direct or indirect mode looking at the query plan. A typical example of an indirect modification is:

EXPLAIN VERBOSE DELETE FROM pgbench_history
WHERE bid = 20 AND mtime > '2023-03-14 10:00:00'::timestamptz;
                                                  QUERY PLAN                                                  
--------------------------------------------------------------------------------------------------------------
 Delete on public.pgbench_history  (cost=100.00..142.66 rows=0 width=0)
   Foreign Delete on public.pgbench_history_17_fdw pgbench_history_1
     Remote SQL: DELETE FROM public.pgbench_history_17 WHERE ctid = $1
   ->  Foreign Scan on public.pgbench_history_17_fdw pgbench_history_1  (cost=100.00..142.66 rows=4 width=10)
         Output: pgbench_history_1.tableoid, pgbench_history_1.ctid
         Filter: (pgbench_history_1.mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
         Remote SQL: SELECT mtime, ctid FROM public.pgbench_history_17 WHERE ((bid = 20)) FOR UPDATE

If we had chosen another type for the string constant, this would become a direct update.

EXPLAIN VERBOSE DELETE FROM pgbench_history
WHERE bid = 20 AND mtime > '2023-03-14 10:00:00'::timestamp;
explain verbose delete from pgbench_history where bid = 20 and mtime > '2023-03-14 10:00:00'::timestamp;
                                                                   QUERY PLAN                                                                    
-------------------------------------------------------------------------------------------------------------------------------------------------
 Delete on public.pgbench_history  (cost=100.00..146.97 rows=0 width=0)
   Foreign Delete on public.pgbench_history_17_fdw pgbench_history_1
   ->  Foreign Delete on public.pgbench_history_17_fdw pgbench_history_1  (cost=100.00..146.97 rows=4 width=10)
         Remote SQL: DELETE FROM public.pgbench_history_17 WHERE ((mtime > '2023-03-14 10:00:00'::timestamp without time zone)) AND ((bid = 20))

We see that in a direct update mode, only one statement is executed on the remote server.

14.6.2.1. DML Optimizations of Global Tables #

The shardman.gt_batch_size configuration parameter, which you can tune, defines the size of an intermediate buffer used before sending data to a remote server.

INSERT uses the binary protocol and creates batches of the shardman.gt_batch_size size. Large values of the buffer size enable sending fewer network requests on the remote side and thus substantially reduce the connection costs. On the other hand, large values of this parameter can increase memory consumption on the query coordinator side. Therefore, when specifying the buffer size, it is important to achieve a compromise between the connection costs and the allocated memory size.

For UPDATE, a query for each column and each row is created on the coordinator and sent to remote nodes.

For DELETE, a query for a batch of data of the shardman.gt_batch_size size is created on the coordinator and sent to remote nodes.

14.6.3. Time Synchronization #

The algorithm that provides data consistency on all the cluster nodes uses the system clock installed on the hosts. Therefore, the transaction commit latency depends on clock drift on different hosts, as the coordinator always waits for the most lagging host to catch up. This makes it crucial that the time on all the connected nodes of a Postgres Pro Shardman cluster are synchronized, as lack of synchronization may have a negative impact on Postgres Pro Shardman performance by increasing the query latency.

First, to ensure time synchronization on all cluster nodes, install chrony daemon when deploying a new cluster.

  sudo apt update
  sudo apt install -y chrony
  sudo systemctl enable --now chrony

Check that chrony is working properly.

chronyc tracking

Expected output:

      Reference ID    : C0248F82 (Time100.Stupi.SE)
      Stratum         : 2
      Ref time (UTC)  : Tue Apr 18 11:50:44 2023
      System time     : 0.000019457 seconds slow of NTP time
      Last offset     : -0.000005579 seconds
      RMS offset      : 0.000089375 seconds
      Frequency       : 30.777 ppm fast
      Residual freq   : -0.000 ppm
      Skew            : 0.003 ppm
      Root delay      : 0.018349268 seconds
      Root dispersion : 0.000334640 seconds
      Update interval : 1039.1 seconds
      Leap status     : Normal

Note that managing the clock drift should be performed using the OS tools. Postgres Pro Shardman diagnostic tools cannot be considered as the only and defining measurement utility.

To see if any major drift already exists, use the shardman.pg_stat_csn view that shows statistics on delays that take place during import of CSN snapshots. Its values are calculated when any related action is performed, or if any of the shardman.trim_csnxid_map() or shardman.pg_oldest_csn_snapshot() functions are called. These functions are called from the csn trimmer routine worker, therefore disabling this worker will result in these statistics not being collected.

The csn_max_shift field of the shardman.pg_stat_csn view shows the maximum registered snapshot CSN shift that caused a delay. This value defines the clock drift between the nodes in the cluster. A consecutive increase of this value means at least one's cluster system clock is out of sync. If this value exceeds 1000 (microseconds), it is recommended to check the time synchronization settings.

The same can be discovered if the csn_total_import_delay value increases while csn_max_shift remains unchanged. However, one-time increase may be due to single failures, non-related to the time issues.

Also, if the difference between CSNXidMap_head_csn and shardman.oldest_csn exceeds the csn_snapshot_defer_time parameter value and stays the same for a long time, it means that the CSNSnapshotXidMap map is full. It can result in a global transaction failure.

There are two main reasons for this issue.

  • There is a transaction that runs for more than csn_snapshot_defer_time seconds and holds the entire cluster, holding the VACUUM process. In this case, xid field of the shardman.oldest_csn view is used to determine the transaction ID of this transaction, and the rgid field is used to determine the cluster node where this transaction is located.

  • The CSNSnapshotXidMap map lacks capacity. During the normal operation the system might have transactions that exceed the csn_snapshot_defer_time value. To fix it, increase the csn_snapshot_defer_time time so that these transactions stay below this value.

If the shardman.silk_tracepoints configuration parameter is enabled, executing the EXPLAIN command for the distributed queries outputs the rows with information about how much time was spent on the query execution and what result it ended with, depending on the system components. These rows show metric values for the time spent on each component. The net (qry), net (1st tup), net (last tup) metrics calculate the difference between timestamps on different servers. This difference includes both time spent on a message transfer and the clock drift (positive or negative) between these servers. Therefore, these metrics can also help to determine whether there is any clock drift.

14.6.4. Distributed Query Diagnostics #

Postgres Pro Shardman enhances the EXPLAIN command so that it can provide additional information about a query if it is distributed. The work with the distributed tables is based on the plan nodes with the ForeignScan type. A query to each remote partition is determined by a single plan node of this type, with Postgres Pro Shardman submitting additional information to the EXPLAIN blocks with the node description.

When executing a distributed query, the part of the plan (a subtree) that relates to a specific remote partition is serialized into an SQL statement. This process is known as deparsing. Then, this statement is sent to a remote server. The result of this query is the output of a ForeignScan node. It is used to gather the final results of the distributed query execution.

When the VERBOSE option of the EXPLAIN command is set to on, the Remote SQL field of the ForeignScan node block shows the statement sent to the remote server. Also, the Server field indicates the name of the server as it was specified during the cluster configuration and as it is displayed in pg_foreign_server, along with the transport method used to send this statement. The transport field can take two values: silk for the the enhanced interconnect Postgres Pro Shardman mechanism, or libpq for sending via the standard PostgreSQL protocol.

14.6.4.1. Displaying Plans from the Remote Server #

To see the execution plan that will be used on the remote server under the EXPLAIN block of the ForeignScan node, use the postgres_fdw.foreign_explain configuration parameter. The possible values are: none to exclude the EXPLAIN output from the remote servers, full to include the EXPLAIN output from the remote servers, collapsed to include the EXPLAIN output only for the first ForeignScan node under its Append/MergeAppend.

In production, it is recommended to disable this parameter (set it to none) or set it to collapsed, because obtaining any EXPLAIN information results in an additional implicit request to the server. Moreover, this request is executed in a synchronous mode, meaning the overall EXPLAIN output is built only once all the servers are sequentially queried. It can be a costly operationin case of a table with a large number of partitions.

Note that in case of the internal request for obtaining the EXPLAIN blocks for a remote plan, certain parameters are forcibly disabled, regardless of the parameters specified by a user when requesting EXPLAIN from the coordinator: ANALYZE OFF, TIMING OFF, SUMMARY OFF, SETTINGS OFF, NETWORK OFF. In this case, the EXPLAIN block of a remote plan will lack the corresponding metrics. Other EXPLAIN parameters (FORMAT, VERBOSE, COSTS, BUFFERS, WAL) are inherited from the coordinator.

The internal EXPLAIN request to a remote server is made with the GENERIC_PLAN option. Therefore, when analyzing the plans from the remote side, note that the EXPLAIN blocks display the generic plan.

14.6.4.2. Network Metrics and Latency #

Setting the NETWORK option of the EXPLAIN command to on shows the network operation metrics for the plan nodes, including individual ForeignScan nodes and general nodes Append or MergeAppend.

For each plan node, the FDW bytes, sent, and received parameters are displayed for the outgoing and incoming traffic when the node is executed (regardless of the transport type). Note that these metrics are only output when the ANALYZE option of the EXPLAIN command is set to on.

When the track_fdw_wait_timing configuration parameter is enabled, the wait_time metric is also output. This metric summarizes all stages of the plan node execution, starting from the time the request is sent to the remote server, including the time spent on the execution itself and all the time until the complete set of results for that plan node is received.

Note that the ForeignScan node can operate in both synchronous and asynchronous modes. For the asynchronous execution, the node's execution function sends a request to the remote server and completes its execution without waiting for the result. The result is considered and processed later, upon receipt. In this scenario, the wait_time metric may not accurately reflect the actual execution time.

14.6.4.3. Query Tracing for Silk Transport #

For the Silk transport, there is an option to output the extended debug information about tracing of a query passing from the coordinator to the remote server and back, including the results from the remote server. This information is only available if the ANALYZE option of the EXPLAIN command is set to on, and the shardman.silk_tracepoints configuration parameter is enabled.

When these parameters are enabled, each message transferred through the Silk transport (sending the SQL query, delivering it to the recipient, executing the query, and returning the execution result) is accompanied by an array of the timestamps measured at certain points in the pipeline. Once the query is executed, this information is displayed in the EXPLAIN block as rows starting with the word Trace. Each metric represents the difference between the timestamps at different points, in milliseconds:

Table 14.1. Query Tracing for Silk Transport Metrics

IntervalDescription
bk shm->mp1 (qry)The time taken to transfer an SQL query from the coordinator to its multiplexer via the shared memory.
mp1 shm->net (qry)The time between receiving a query within the multiplexer from the shared memory and transferring it over the network.
net (qry)The time spent by an SQL query to transfer over the network between the multiplexers.
mp2 recv->shm (qry)The time between receiving an SQL query from the network and placing it in the queue in the shared memory on a remote multiplexer.
wk exec (1st tup)The time spent to execute a query in Silkworm until the first row of the result is received.
wk exec (all tups)The time spent to execute a query on Silkworm until the complete result is received.
wk->shm (1st tup)The time taken to place the first row of the result into the Silkworm queue.
wk->shm (last tup)The time taken to place the last row of the result into the Silkworm queue.
mp2 shm->net (1st tup)The time between reading the first row of the result from the queue by the remote multiplexer and transferring it over the network.
net (1st tup)The time spent to transfer the first row of the result over the network between the multiplexers.
mp1 recv->shm (1st tup)The time between receiving the first row of the result from the network and placing it in the queue by the local multiplexer.
mp1 shm->bk (1st tup)The time spent to retrieve the first row of the result from the queue by the coordinator.
mp2 shm->net (last tup)The time between reading of the last row of the result from the queue by the remote multiplexer and transferring it over the network.
net (last tup)The time spent to transfer the last row of the result over the network between the multiplexers.
mp1 recv->shm (last tup)The time between receiving the last row of the result from the network and placing it in the queue by the local multiplexer.
mp1 shm->bk (last tup)The time taken by the coordinator to retrieve the last row o the result from the queue.
END-TO-ENDThe total time from sending the query to receiving the last row of the result. This approximately corresponds to the wait_time.

For the metrics net (qry), net (1st tup), and net (last tup), the interval value is calculated as the difference between timestamps on different servers. Therefore, negative values may appear in these lines. This difference includes both time spent on a message transfer and the clock drift (positive or negative) between these servers. Thus, even with a slight drift, the values will be negative if its absolute value exceeds the duration of network transfer. Although it is not a bug, you should pay close attention to whether the cluster clocks are synchronized. For more information, see Section 14.6.3.