F.43. pg_shardman

pg_shardman is a built-in experimental extension for Postgres Pro Enterprise that enables sharding — splitting large tables into separate partitions, or shards, distributed between different servers. This extension aims for scalability and fault tolerance with ACID transactions support, targeting mainly OLTP workloads. pg_shardman offers the following features:

  • Splitting tables into shards using hash partitioning provided by pg_pathman and moving the shards across cluster nodes to balance read/write load.

  • Running read/write queries on any node, regardless of the actual data location. Queries will be automatically redirected to the node holding the required data via postgres_fdw.

  • Configurable redundancy level — the number of replicas to create for each shard. pg_shardman leverages logical replication to keep replicas up-to-date. Synchronous and asynchronous replication types are supported.

  • Atomic modification of data stored on multiple nodes in a single transaction using 2PC commit.

  • Support for cluster-wide Repeatable Read transaction isolation level.

  • Manual failover with replica promotion.

F.43.1. Limitations

The pg_shardman extension currently has the following limitations:

  • The shardlord cannot be a worker node. See Section F.43.2 for details on different node types.

  • You cannot change the number of shards once the table is sharded.

  • pg_shardman provides only limited DDL support, as explained in Section F.43.4.2. DCL support is virtually absent, with only one user being supported.

  • All limitations of pg_pathman. For example, global secondary indexes and foreign keys to sharded tables are not supported.

  • All logical replication restrictions. For example, TRUNCATE statements on sharded tables are not replicated.

  • pg_shardman currently does not configure replication identities. It is strongly recommended to use primary key as the sharding key to avoid issues with UPDATE and DELETE operations. Besides, primary key is required to synchronize multiple replicas after a cluster failure.

  • Two-phase commit does not involve replicas, so a permanent node failure may result in data inconsistencies.

  • pg_shardman has not been tested on Windows systems.

F.43.2. Architecture

A sharded database cluster consists of several nodes with Postgres Pro Enterprise instances installed:

  • The shardlord — the node that manages the cluster and stores all the cluster metadata. Configured by the database administrator, this node accepts sharding commands from the user and ensures that the whole cluster changes its state as expected. The shardlord does not hold any actual cluster data.

    When the cluster is set up, the shardlord saves all metadata describing the cluster configuration in its local tables. These tables store the information about all the cluster nodes, as well as which primary shards and replicas they keep. For details on the available metadata, see Section F.43.5.3.

  • Worker nodes — these nodes store the actual sharded tables and their replicas. Each worker node can accept read and write queries, redirecting them to the appropriate neighbor node as required. For general recommendations on choosing the number of nodes and shards, see Section F.43.3.3.1.

F.43.2.1. Transactions

For transactions that affect only a single worker node, atomicity, durability, and the required isolation levels are ensured by Postgres Pro as usual. However, to handle distributed transactions, pg_shardman relies on two-phase commit (2PC) protocol for atomicity and Clock-SI algorithm for transaction isolation.

To turn on 2PC support, use the postgres_fdw.use_twophase variable. With 2PC enabled, each transaction is prepared on all nodes before the commit. A successful PREPARE on the node means that this node is ready to commit the transaction, but it will only be committed if all the other nodes can commit this transaction as well. Otherwise, the transaction will be aborted. This approach allows to ensure transaction atomicity across multiple nodes.

This parameter can be changed at any time. Since the transaction behavior is determined by the postgres_fdw.use_twophase setting at commit time, some distributed transactions may use 2PC while others may not. Regardless of this setting, 2PC is never used for write transactions only affect a single node.

A well-known shortcoming of 2PC is that it is a blocking protocol: if the transaction coordinator has failed, some transactions might hang in the PREPARE state. pg_shardman provides shardman.recover_xacts() function to resolve them.

To ensure cluster-wide transaction isolation, use the following variables:

  • track_global_snapshots variable enables a distributed transaction manager based on the Clock-SI algorithm, which provides cluster-wide transaction isolation at the Repeatable Read level. Changing this parameter requires a server restart.

  • postgres_fdw.use_global_snapshots variable defines whether to use global snapshots for the current transaction. If you set this variable to on, you must also enable the distributed transaction manager using the track_global_snapshots variable. You can change the postgres_fdw.use_global_snapshots setting at any time; its value is consulted during transaction commit.

Important

PREPARE statements are not transferred via logical replication, which can cause data inconsistencies if a node fails permanently. A part of a distributed transaction might get lost and cause a non-atomic result if the coordinator has prepared the transaction everywhere, started committing it, but one of the nodes failed before committing the transaction on a replica.

In some cases, distributed deadlocks may also occur. For details on how to detect and resolve them, see Section F.43.4.5.3.

F.43.2.2. Replication

To ensure fault tolerance, pg_shardman allows creating shard replicas to store data with redundancy. For each sharded table, you can define the redundancy level — the number of replicas to store for each shard. Replicas are automatically distributed between the cluster nodes. If the node holding the primary shard fails, you can promote its most advanced replica to become primary.

To avoid performance degradation when using a large number replicas, all nodes are arranged into replication groups — subsets of nodes that can create replicas on each other. Primary shards and their replicas can only be located on different nodes that belong to the same replication group, as explained in Section F.43.3.3.2.

pg_shardman uses logical replication to synchronize data between the primary shards and their replicas. You can use synchronous or asynchronous replication types. The trade-off is well-known: synchronous replication is slower, but committed transactions are unlikely to be dropped. Asynchronous replication is faster, but allows replicas to lag behind the primary for arbitrary time, which might lead to a loss of some recently committed transactions, or WAL bloating if a replica fails.

Replication type is defined by the shardman.sync_replication variable. If set to off (default), this variable enables asynchronous replication. Otherwise, synchronous replication is used: although transactions on the primary will be committed locally right after the COMMIT request, the client will not get a transaction confirmation until it is committed on all replicas.

By default, cluster redundancy level is 0, so replicas are not created. You can change the redundancy level as follows:

F.43.3. Installation and Setup

The pg_shardman extension is included into Postgres Pro Enterprise. Once you have Postgres Pro Enterprise installed on all nodes to be added to the sharded cluster, you need to configure the nodes and create pg_shardman extension, together with all the required dependencies.

F.43.3.1. Setting up a Sharded Cluster

To configure the nodes, you must modify the postgresql.conf file on all nodes as described below.

  1. Choose the node that will be the shardlord and configure the following shardlord-specific settings:

    • Make this node the shardlord:

      shardman.shardlord = on
      

    • Specify the replication type to use for creating shard replicas:

      shardman.sync_replication = on
      

      By default, asynchronous replication is used.

      Important

      The shardman.sync_replication configuration variable should not be changed once the cluster is set up. If you would like to change the selected replication type, you have to remove all replicas and create them from scratch using the shardman.set_redundancy(rel_name regclass, redundancy int) function.

  2. On all nodes, including the shardlord, modify the following settings:

    • Make sure the shared_preload_libraries variable includes postgres_fdw, pg_pathman, and pg_shardman values. The pg_pathman library must always be the last in the list:

      shared_preload_libraries = 'postgres_fdw, pg_shardman, pg_pathman'
      

    • Specify the options required to connect to the shardlord on behalf of a superuser. You can use the options that libpq accepts in connection strings, as described in Section 35.1.2.

      shardman.shardlord_connstring = connstring
      

      Worker nodes use this connection string to redirect commands to the shardlord. The shardlord relies on this setting when sharding tables.

  3. On worker nodes, configure the following settings:

    • Set the shardman.shardlord variable to off:

      shardman.shardlord = off
      

    • Set the wal_level to logical to enable logical replication:

      wal_level = logical
      

      Other logical replication settings depend on the number of nodes in the replication group to which this node belongs. For a group of N nodes, the minimum recommended values are:

      max_replication_slots = 2N + 1
      max_wal_senders = N + 1
      max_logical_replication_workers = N + 1
      max_worker_processes = max_logical_replication_workers + 1
      

    • Enable two-phase commit (2PC) transaction support and turn on the distributed transaction manager to ensure cluster-wide transaction isolation. Make sure to increase the max_prepared_transactions value to allow prepared transactions:

      postgres_fdw.use_twophase = on
      track_global_snapshots = on
      postgres_fdw.use_global_snapshots = on
      max_prepared_transactions = 1000
      

    • Make sure the synchronous_commit variable specifies the replication type of your choice. For synchronous replication, it must be set to on. You can change this setting at any time, so different transactions may use different replication modes.

      synchronous_commit = on
      

When all settings are configured, restart the nodes to preload the newly added shared libraries, and execute the following statement on each node:

CREATE EXTENSION pg_shardman CASCADE;

Postgres Pro Enterprise installs pg_shardman, together with pg_pathman and postgres_fdw extensions it depends on. The pg_shardman extension is installed into the shardman schema, which is currently fixed and cannot be changed. You now need to connect all worker nodes to the shardlord as explained in Section F.43.3.2.

F.43.3.2. Adding and Removing Nodes

Before you can start sharding tables, all worker nodes must be explicitly united into a single cluster with the shardlord node. For general recommendations on the optimal number nodes, see Section F.43.3.3.1.

To add a worker node to the cluster, run the following function on the shardlord:

shardman.add_node(super_conn_string text, conn_string text DEFAULT NULL, repl_group text DEFAULT 'default')

You can specify two types of connection strings. The super_conn_string is mandatory, as superuser privileges are required to configure logical replication between the nodes. The optional conn_string parameter is used for configuring FDWs; this allows to access the data without superuser rights. If you omit this parameter, pg_shardman uses super_conn_string for all purposes. The repl_group parameter defines the replication group to add the node to. If you omit this, the node is added to an automatically created new group. For details on replication groups, see Section F.43.3.3.2.

Each cluster node is assigned a unique ID and appears in Section F.43.5.3.1. Node IDs start with 1 and are incremented by one for each added node.

If the node previously belonged to another cluster managed by a different shardlord, its state will be reset. Once all nodes are added to the cluster, you can start sharding tables, as explained in Section F.43.4.1.

Tip

To learn the node ID, run the following function on this node:

SELECT shardman.get_my_id();

To remove the node from the cluster, run the following function on the shardlord:

shardman.rm_node(rm_node_id int, force bool DEFAULT false)

where node_id is the ID of the node to be removed. By default, the node is not excluded from the cluster if it holds at least one primary shard. To overwrite this behavior, set the force parameter to true. In this case, the node is excluded from the cluster, and the most advanced replicas of the removed shards are promoted on other nodes to become primary.

The shardman.rm_node() function does not delete tables with data and foreign tables on the removed node. To delete a table with all the data, including shard replicas, use shardman.rm_table(rel_name regclass) . Caution: this operation cannot be undone and does not require any confirmation.

F.43.3.3. Choosing Sharding and Replication Strategies

F.43.3.3.1. Defining the Number of Shards and Nodes

Once the table is sharded, you cannot change the number of shards, so you should properly choose the number of shards from the very beginning. Obviously, the number of shards should not be smaller than the number of nodes to effectively use the available hardware resources. Otherwise, pg_shardman cannot scatter the data across all the cluster nodes. Having one shard per node usually provides the best performance, especially if synchronous replication is used. However, in this case, you cannot rebalance the existing data if you add new nodes, or address data skew if some data is accessed much more often than other.

The number of worker nodes in a sharded cluster depends on the number of servers you have, the volume of data you are going to store, and the typical database workload. Increasing the number of nodes allows to store more data and provide better performance for some queries that are not affecting all nodes. You can increase the number of cluster nodes any time.

As a rule of thumb, the number of shards should be about ten times larger then the number of nodes. In this case, you can increase the number of nodes up to ten times and redistribute partitions between nodes to provide more or less uniform load of all cluster nodes, if required.

F.43.3.3.2. Setting up Replication Groups

A replication group is a subset of cluster nodes that can create replicas on each other. Every node in the cluster belongs to a replication group, and replication groups do not intersect. Nodes cannot replicate partitions to other replication groups. Splitting cluster nodes into replication groups offer the following benefits:

  • Minimize logical replication impact on performance. Logical replication in PostgreSQL-based products is relatively slow if there are many WAL senders on the node, since each WAL sender decodes the whole WAL. If you are using synchronous replication, multiple WAL senders can cause serious performance issues. Increasing the number of synchronous standbys can result in almost linear performance degradation. Replication groups limit the number of nodes where replicas can be located. For example, in a replication group of three nodes, each node only requires two WAL senders to set up logical replication with the other two nodes, regardless of the number of shards and replicas they hold.

  • Speed up replication by effective use of hardware resources. Modern non-blocking switches provide high-speed throughput between any pair of nodes connected to this switch, while inter-switch links can still be a bottleneck if you need to send data between nodes connected to different switches.

  • Increase data reliability by including the nodes hosted in different locations into a single replication group. In this case, an incident in one data center will not cause data loss.

pg_shardman configures logical replication channels between the nodes within replication groups. Even if there are no replicas, logical replication channels can add serious overhead because WAL senders still have to decode the whole WAL and send the decoded empty transactions to subscribers. By default, each node is added to a unique replication group with name equal to node's unique system identifier, which means that replications groups consist of a single node and no logical replication channels are configured: you cannot create replicas, and no overhead is added.

In general, each replication group should contain the number of nodes equal to redundancy level + 1, or a bit more to be able to increase redundancy later. In practice, it means 1 (no redundancy) — 3 (redundancy 2) nodes in a replication group. Since performance degrades as the size of a replication group grows, it makes sense to set up replication groups of equal size. Moving data between replication groups is not allowed. If you decide to add more nodes to an already sharded cluster, you can only add them to the already existing replication groups. Otherwise, resharding is required.

F.43.4. Sharded Cluster Administration

F.43.4.1. Sharding Tables

pg_shardman only allows sharding empty tables created on the shardlord, so a typical workflow for sharding a table is as follows:

  1. Create an empty table on the shardlord.

    To create a table on the shardlord, use the regular CREATE TABLE syntax. For example:

    CREATE TABLE films (id int PRIMARY KEY, title varchar(40));
    

  2. Split the created table into shards.

    Run the following command on the shardlord:

    SELECT shardman.create_hash_partitions(
        relation regclass,
        expression text, part_count int,
        redundancy int DEFAULT 0);
    

    You should properly choose the number of shards for your table because it cannot be changed later. For details, see Section F.43.3.3.1.

    pg_shardman scatters the shards among the cluster nodes using the round-robin algorithm. Replicas are created on randomly chosen nodes within the same replication group. Each node can contain only one replica for each shard.

    For example, to split the films table into 30 shards by the id column, with one replica spawn for each shard, run:

    SELECT create_hash_partitions('films', 'id', 30, redundancy = 1);
    

    On successful execution, all shards will be evenly distributed between the available worker nodes, and logical replication channels will be set up between the nodes holding primary shards and their replicas.

  3. Import the data into the sharded table.

    Once the shards are distributed across the cluster nodes, you can fill them with data. Since sharding is performed using Postgres Pro built-in hash functions, it is impossible to predict at the client application level at which node a particular record will be stored.

    The most efficient way of uploading data is to use the COPY FROM form of the COPY command. pg_shardman only supports text and CSV formats, binary format is not supported. You can import data in parallel from several nodes.

    You can also populate the shards using regular INSERT commands, which are redirected by pg_pathman to the proper node. However, running multiple INSERT commands may be less efficient than using COPY FROM.

    Tip

    If redundancy level is non-zero, avoid large transactions. When decoded by WAL sender, large transactions are spilled to disk, which can significantly reduce speed.

F.43.4.2. Working with Cluster Data

Once a table is sharded and filled with data, you can execute DML statements on this table from any pg_shardman worker node. DML statements can access more than one shard, remote or local. It is implemented using the standard Postgres Pro inheritance mechanism: all partitions are derived from the parent table. If the required partition is located on another node, it will be accessed using postgres_fdw.

Note

Inheritance and FDW have some limitations that do not allow building efficient execution plans for some queries. Although Postgres Pro can push aggregates to FDW, merging partial aggregate values from different nodes is not supported. Besides, it cannot execute queries on different nodes in parallel: foreign data wrappers do not support parallel scan because of using cursors. Thus, pg_shardman is oriented mainly on OLTP workload. Execution of OLAP queries might be inefficient.

DDL support is very limited. Function shardman.alter_table(relation regclass, alter_clause text) alters the parent table on all nodes and updates its definition in cluster metadata; it can be used to add, remove, or rename columns, as well as create NOT NULL constraints. However, changing the column used as the sharding key is not supported. Foreign keys pointing to sharded tables are not supported either. Although you can create a UNIQUE constraint, it will only be enforced on per-partition basis.

If you create an index on the parent table before sharding, it will be included into the table definition and created on all shards and replicas automatically. There is no easy way to define an index on the already sharded table, since the index must be created separately for each partition, as described in pg_pathman wiki. To execute the same statement on all nodes, you can use the shardman.forall(sql text, use_2pc bool DEFAULT false, including_shardlord bool DEFAULT false) function.

To delete a table with all the data, including shard replicas, use shardman.rm_table(rel_name regclass) . Caution: this operation cannot be undone and does not require any confirmation.

F.43.4.3. Setting up Local and Shared Tables

Apart from sharded tables, an application may need to have a local table on one of the nodes and/or shared tables that store the same data on all cluster nodes.

Local tables store data that is unique for a particular node. For example, it can be some data in temporary tables. Local tables do not need to be handled by pg_shardman. You can create and use them locally at each node, just like in regular Postgres Pro clusters.

Shared tables store the same data on all cluster nodes. Such tables can be used for dictionaries that provide rarely updated data required for all queries. For each shared table, pg_shardman chooses the master node where this table will be stored and broadcasts the data to all other nodes using logical replication. If run on a different node, all updates of shared tables are automatically redirected to the corresponding master node.

To create a shared table, run the following function:

shardman.create_shared_table( rel regclass, master_node_id int DEFAULT 1)

The master_node_id parameter is a unique identifier assigned to the node when it is added to the cluster.

Tip

To learn the node ID, run the following function on this node:

SELECT shardman.get_my_id();

F.43.4.4. Balancing the Load

When new nodes are added to the cluster, they are initially empty. To start using these nodes, you need to rebalance the data. Data rebalancing is performed in the background. To minimize the impact on the normal work of the cluster, shards and replicas are moved sequentially, one at time.

pg_shardman offers the following functions for rebalancing the specified primary shards and replicas, respectively:

shardman.rebalance(table_pattern text DEFAULT '%')

shardman.rebalance_replicas(table_pattern text DEFAULT '%')

All shards/replicas with the names conforming to the given pattern are evenly distributed between all nodes within each replication group. The naming pattern for primary shards and replicas is the same:

${sharded_table_name}_${shard_number}

For example, if you have sharded the films table, the following command will rebalance all its primary shards:

SELECT shardman.rebalance('films%')

Similarly, to rebalance all replicas, run:

SELECT shardman.rebalance_replicas('films%')

If you omit this argument, shards or replicas will be rebalanced for all sharded tables in the cluster.

To achieve a more fine-grained control over data distribution, you can move the specified primary shard or its replica to the exact node within the same replication group using the following functions:

shardman.mv_partition( part_name text, dst_node_id int)

shardman.mv_replica( part_name text, src_node_id int, dst_node_id int)

To move a primary shard, it is enough to specify the shard name and destination node, as pg_shardman knows the location of each primary shard. For replicas, you must also specify the source node as different nodes can hold replicas with the same name.

Note

pg_shardman can only move the data within the same replication group. For details on replication groups, see Section F.43.3.3.2.

F.43.4.5. Failure Detection and Recovery

pg_shardman does not support automatic failure detection and recovery. If a node fails, it should be manually excluded from the cluster as described in the sections below.

To check for and resolve distributed deadlocks, you can use the shardman.monitor(). For details, see Section F.43.4.5.3.

F.43.4.5.1. Worker Failover

When a worker node is down, the primary shards stored on this node are not available until the node back online again, or it is excluded from the cluster. Moreover, if a failed node holds replicas and synchronous replication is used, queries touching replicated partitions block. If the failed node is restored, its data becomes reachable again, and the node automatically receives updates for the blocked replicas. However, you should run the recover_xacts() or monitor() functions to resolve possibly hung distributed transactions.

If the shardlord has failed while executing a command, or you would like to verify that everything works as expected, run the shardman.recover() function. It checks the status of the worker nodes against the current metadata on the shardlord and tries to resolve the detected issues, if any. For example, reconfigure logical replication channels or repair FDW.

If you do not plan to restore the failed node, it should be manually excluded from the cluster, as follows:

  1. Make sure the failed node is really turned off, and never make it online without erasing data on it, or make sure no one tries to access the node. Otherwise, stale reads and inconsistent writes on it are possible.

  2. Run the following command to exclude the node from the cluster:

    SELECT shardman.rm_node(${failed_node_id}, force => true);
    

    If redundancy level is greater than zero, pg_shardman tries to replace primary shards stored on the excluded node with their replicas. The most advanced replica is chosen and state of other replicas is synchronized.

    Tip

    You can check the replication lag in the shardman.replication_lag view, which can be especially critical for asynchronous replication.

  3. To ensure that there are no hung 2PC transactions, run:

    SELECT shardman.recover_xacts();
    

Note that recent transactions, or parts of distributed transactions still might be lost as explained in Section F.43.2.1.

F.43.4.5.2. Shardlord Failover

The shardlord node only stores cluster metadata, as described in Section F.43.5.3.2. This metadata is only accessed if you change the number of cluster nodes or rebalance the data. Thus, even if the shardlord fails, you can still run read and write queries for the existing shards.

By default, the tables holding cluster metadata are not replicated. However, you can set up physical or logical replication of these tables to a node with Postgres Pro Enterprise instance that is not included into your sharded cluster. If the current shardlord fails, you can promote this node to become the new shardlord, as follows:

  1. Configure the new shardlord following the instructions in Section F.43.3.1. Make sure to use the same replication settings as the original shardlord.

  2. Modify the shardman.shardlord_connstring setting on all worker nodes.

Important

You must ensure that only one shardlord is used at the same time.

F.43.4.5.3. Detecting Distributed Deadlocks and Failed Nodes

To continuously check the cluster for failed nodes and distributed deadlocks, you can run the following function:

shardman.monitor(check_timeout_sec int DEFAULT 5, rm_node_timeout_sec int DEFAULT 60)

This function starts an infinite loop that polls all clusters nodes, collecting local lock graphs from all nodes. The poll interval is specified by the check_timeout_sec parameter. The default value is 5 seconds. Local lock graphs are combined into a global lock graph, which is analyzed for loops. A loop in the lock graph indicates a possible distributed deadlock.

Since local graphs collected from different nodes may not form a consistent global snapshot, false positives are possible: edges in deadlock loop correspond to different moments in time. To prevent false deadlock detection, pg_shardman compares the results of each two consecutive iterations before reporting the deadlock. If the loops in the compared iterations are the same, the deadlock is confirmed.

pg_shardman tries to resolve reported deadlocks by canceling one or more backends involved in the deadlock loop. It invokes the pg_cancel_backend function that tries to cancel the current query, without terminating the backend. The affected backend is randomly chosen within the deadlock loop.

If a node is unreachable, pg_shardman prints the corresponding error message and retries to access this node until the rm_node_timeout_sec timeout expires. If the node cannot be reached within the specified timeframe, pg_shardman excludes this node from the cluster, as follows:

  1. The shardman.rm_node() function removes the node from the cluster. If redundancy level is non-zero, primary shards from the disabled node are replaced with replicas.

  2. pg_shardman tries to restore distributed transactions started on the failed node by running the shardman.recover_xacts function. If the node that initiated the transaction is still in the cluster, pg_shardman checks the transaction outcome on this node. Otherwise, pg_shardman checks the status of this transaction on all other nodes. If there is at least one commit and no aborts, the transaction is committed. If there is at least one abort and no commits, the transaction is aborted. All nodes in the cluster must be online for this function to resolve the transaction.

F.43.5. Reference

F.43.5.1. GUC Variables

shardman.shardlord (boolean)

Defines whether Postgres Pro Enterprise instance is shardlord. Changing this variable requires a server restart.

Default: false

This parameter can only be set in the postgresql.conf file or on the server command line.

shardman.sync_replication (boolean)

When this parameter is set to on, pg_shardman adds replicas to the list of synchronous_standby_names, enabling synchronous replication.

Default: off

This parameter can only be set in the postgresql.conf file or on the server command line.

shardman.shardlord_connstring (text)

Connection string for the shardlord. You can use all the options that libpq accepts in connection strings, as described in Section 35.1.2. You must ensure that the node is accessed on behalf of a superuser. This variable must be set on the shardlord itself. You can also optionally set it on worker nodes if you would like to execute cluster management functions on worker nodes. In this case, these commands will be automatically redirected to the shardlord.

This parameter can only be set in the postgresql.conf file or on the server command line.

F.43.5.2. Functions

To manage sharded cluster, pg_shardman provides regular Postgres Pro functions in the shardman schema. All the functions except for shardman.get_my_id() must be executed on the shardlord. pg_shardman functions return immediately, without waiting for the operation to complete, except for shardman.ensure_redundancy() , which is specifically tailored to wait for data synchronization.

F.43.5.2.1. Administrative Functions
shardman.get_redundancy_of_partition(pname text)

Returns redundancy level for the specified shard.

Arguments:

  • pname — the name of the shard for which to count replicas.

shardman.get_min_redundancy(rel_name regclass)

Returns the minimum redundancy level for the specified table.

Arguments:

  • rel_name — the name of the sharded table.

shardman.get_node_partitions_count(node int)

Returns the number of shards at the specified node.

Arguments:

  • node — node ID on which to count shards.

shardman.get_node_replicas_count(node int)

Returns the number of replicas at the specified node.

Arguments:

  • node — node ID on which to count replicas.

shardman.get_my_id()

Returns the ID of the current node. This function can only be run on worker nodes.

F.43.5.2.2. Membership Functions
shardman.add_node(super_conn_string text, conn_string text DEFAULT NULL, repl_group text DEFAULT 'default')

Adds a node with the specified connection string to the cluster and returns the node ID. If the node previously contained the pg_shardman state from an old cluster managed by a different shardlord, this state will be lost. The newly added node does not hold any cluster data. To move any shard or replica to this node, you have to use rebalance functions. However, the node instantly becomes aware of the sharded tables in the cluster and can accept and redirect queries.

Arguments:

  • super_conn_string — connection string that provides superuser access to the node. This connection string is used to configure logical replication between cluster nodes. You can use all the options that libpq accepts in connection strings, as described in Section 35.1.2.

  • conn_string — connection string to the node to be used for DDL operations and setting up postgres_fdw, without superuser privileges. If you are going to set up pgbouncer between your cluster nodes, you should use this connection string to specify the pgbouncer address, while reserving the super_conn_string for the actual node addresses. Otherwise, logical replication between the nodes will be broken, as pgbouncer does not support replication. You can use all the options that libpq accepts in connection strings, as described in Section 35.1.2.

    If conn_string is NULL, the super_conn_string connection string is used for all purposes.

  • repl_group — replication group to which the new node will be added. If this parameter is omitted, the node is added to an automatically created new group. Once the node is added to the cluster, you cannot move it to a different replication group. You have to remove this node and add it again.

shardman.rm_node(rm_node_id int, force bool DEFAULT false)

Removes the specified node from the sharded cluster. If the force parameter is false (default), the node holding primary shards is not removed. If force is true, the node is removed from the cluster, regardless of its contents. If the removed node holds any primary shards for which redundancy level is non-zero, pg_shardman promotes the most advanced replicas on other nodes to become primary.

This function does not delete tables with data and foreign tables on the removed node. If the removed node is alive, pg_shardman executes shardman.wipe_state(force bool DEFAULT true) .

Arguments:

  • rm_node_id — ID of the node to be removed.

  • force — defines how to handle the node if it holds primary shards:

    • true — remove the node and promote random replicas of the removed shards on other nodes to become primary shards.

    • false — forbid deleting the node if it contains a primary shard.

shardman.create_shared_table( rel regclass, master_node_id int DEFAULT 1)

Create a table that will be present on all worker nodes. This function should be executed on the shardlord. The empty table should be present on the shardlord, but not on the worker nodes.

F.43.5.2.3. Shards and Replicas
shardman.create_hash_partitions( rel_name regclass, expr text, part_count int, redundancy int DEFAULT 0)

Shards the rel_name table using hash partitioning by the expr key, creating the part_count number of shards and distributing the shards evenly between all cluster nodes. You can only shard an empty table created on the shardlord with the CREATE TABLE command. For details, see Section F.43.4.1.

Arguments:

  • rel_name — an empty table to shard.

  • expr — partitioning key to shard the table by. Column(s) included into expr must be marked NOT NULL. It is strongly recommended to use the primary key as the partitioning key to avoid issues with UPDATE and DELETE operations.

  • part_count — the number of shards to create.

  • redundancy — the number of replicas to create for each shard. By default, pg_shardman does not create any replicas.

shardman.rm_table(rel_name regclass)

Drops all shards of the specified table, removing the corresponding data on all worker nodes. The original empty table on the shardlord remains unchanged.

Arguments:

  • rel_name — the table to drop.

shardman.set_redundancy(rel_name regclass, redundancy int)

Creates replicas for shards of the specified table until each shard has the number of replicas specified by the redundancy argument. Replica holders are chosen randomly within the replication group to which the corresponding primary shards belong. If the current redundancy level is greater than the one specified, this function does nothing.

This function only starts replication and does not wait for the data to be copied to all replicas. To wait for the full table synchronization, run shardman.ensure_redundancy() .

Arguments:

  • rel_name — the table to replicate.

  • redundancy — the number of replicas to create for each shard.

shardman.ensure_redundancy()

Waits for the initial data synchronization to complete for all replication subscriptions. This function can be called after the set_redundancy() function call to ensure that all the data from the primary shards is copied to replicas.

F.43.5.2.4. Rebalance Functions

pg_shardman provides several functions to redistribute shards and replicas between the cluster nodes.

shardman.rebalance(table_pattern text DEFAULT '%')

Rebalances shards between the nodes of the cluster. This function tries to evenly redistribute shards of tables with names matching the LIKE 'table_pattern' expression between all nodes of the corresponding replication groups, so it should be called after nodes addition/removal. This function cannot move shards between replication groups. Shards are moved sequentially to minimize influence on system performance. Since pg_shardman uses logical replication, you can continue running write queries while rebalancing the cluster.

Arguments:

  • table_pattern — the naming pattern for tables to rebalance.

shardman.rebalance_replicas(table_pattern text DEFAULT '%')

Rebalances shard replicas between cluster nodes. This function tries to evenly redistribute the partitions of tables with names matching the LIKE 'table_pattern' expression between all nodes of the corresponding replication groups, so it should be called after nodes addition/removal. It cannot move replicas between replication groups. Replicas are moved sequentially to minimize influence on system performance. Since pg_shardman uses logical replication, you can continue running write queries while rebalancing the cluster.

Arguments:

  • table_pattern — the table to rebalance.

shardman.mv_partition( part_name text, dst_node_id int)

Moves the primary shard part_name to the specified node within the same replication group. To rearrange multiple shards, you can use the shardman.rebalance() function instead.

shardman.mv_replica( part_name text, src_node_id int, dst_node_id int)

Moves the replica part_name to the specified node within the same replication group. This command fails if the destination node already contains a replica of this shard. To rearrange multiple replicas, you can use the shardman.rebalance_replicas() function instead.

shardman.forall(sql text, use_2pc bool DEFAULT false, including_shardlord bool DEFAULT false)

Execute an SQL statement on all nodes.

Arguments:

  • sql — the statement to execute.

  • use_2pc — defines whether to use two-phase commit. The default value is inherited from the postgres_fdw.use_twophase setting.

  • including_shardlord — defines whether to run the SQL statement on the shardlord. By default, the statement is executed on worker nodes only.

shardman.alter_table(relation regclass, alter_clause text)

Alter sharded or shared tables.

Arguments:

  • relation — the table to alter.

  • alter_clause — the alter command to apply.

Example:

   SELECT shardman.alter_table('films', 'ADD COLUMN author text');

shardman.recover()

Checks consistency of the cluster state against the current metadata and performs recovery, if required. A recovery process can include such operations as reconfiguring logical replication channels or repairing FDW.

shardman.monitor(check_timeout_sec int DEFAULT 5, rm_node_timeout_sec int DEFAULT 60)

Monitors the cluster state to detect distributed deadlocks and node failures. If a distributed deadlock is detected, pg_shardman tries to resolve the deadlock by canceling one or more queries on the affected backend. For details, see Section F.43.4.5.3. This function is redirected to the shardlord if launched on a worker node.

Arguments:

  • check_timeout_sec — poll interval, in seconds. pg_shardman polls each node with this interval to check for locks. The default value is 5 seconds.

  • rm_node_timeout_sec — time interval, in seconds, within which pg_shardman tries to reach the node. If the node does not respond within this time interval, the shardman.rm_node function is invoked to exclude this node from the cluster. If rm_node_timeout_sec is NULL, pg_shardman does not remove the node.

shardman.recover_xacts()

Restores the interrupted transactions after an abnormal cluster restart. This function must be manually invoked on the shardlord by the database administrator. Since 2PC used by pg_shardman is a blocking protocol, the result of this operation depends on the cluster state:

  • If the node that initiated the transaction (coordinator) is still included into the cluster, pg_shardman checks the transaction outcome on this node and enforces it cluster-wide.

  • If the coordinator is already excluded from the cluster, pg_shardman checks the status of this transaction on all other nodes. If there is at least one commit and no aborts, the transaction is committed. If there is at least one abort and no commits, the transaction is aborted. Otherwise, you may have to manually resolve the transaction.

  • If the coordinator is down but is still included into the cluster, the transaction has to be resolved manually.

Since pg_shardman does not control WAL recycling, shardman.recover_xacts uses clog to check the transaction status. Though unlikely, shardman.recover_xacts may fail to get the transaction status and resolve the transaction and it has to be resolved manually.

shardman.wipe_state(force bool DEFAULT true)

Removes all publications, subscriptions, replication slots, foreign servers, and user mappings created on the worker node by pg_shardman. Postgres Pro forbids dropping replication slots with active connections. If force is true, pg_shardman tries to kill WAL senders before dropping the slots, without affecting the data stored on this node. Once this transaction commits, the synchronous_standby_names variable is set to an empty string. It is a non-transactional action, so there is a very small chance it won't be completed.

You may want to run this function before DROP EXTENSION pg_shardman.

F.43.5.3. pg_shardman Tables

pg_shardman provides several tables to store the sharded cluster metadata and monitor the state of the nodes.

F.43.5.3.1. shardman.nodes Table

Displays all nodes of the sharded cluster, together with the corresponding connection strings and replication groups they belong to.

Table F.31. shardman.nodes Table

Column Name Column Type Description
idserial Node ID. Starting from 1 for the first node, node ID values get incremented by one for each newly added node.
system_idbigint System ID.
super_connection_stringtext Connection string for this node, with superuser privileges. This connection string is used to set up logical replication between nodes.
connection_stringtext Connection string for this node, without superuser privileges. This connection string is used to perform DDL operations or set up postgres_fdw.
replication_grouptext A group of nodes within which shard replicas are allocated.

F.43.5.3.2. shardman.tables Table

Displays the list of all sharded tables.

Table F.32. shardman.tables Table

Column Name Column Type Description
relationtext The name of a sharded or shared table.
sharding_keytext Partitioning expression by which the table is sharded.
master_nodeint ID of the node that stores the shared table.
partitions_countint The number of partitions in this table.
create_sqltext SQL command used to re-create the table on other nodes.
create_rules_sqltext SQL command to create rules for the shared table.

F.43.5.3.3. shardman.partitions Table

Displays primary shards of all the sharded tables.

Table F.33. shardman.partitions Table

Column Name Column Type Description
part_nametext Primary shard name.
node_idint ID of the node on which the primary shard is stored.
relationtext The parent table for this shard.

F.43.5.3.4. shardman.replicas Table

Displays all shard replicas.

Table F.34. shardman.replicas Table

Column Name Column Type Description
part_nametext The name of the shard replica.
node_idint ID of the node on which the replica is located.
relationtext The parent table for the corresponding primary shard.

F.43.5.3.5. shardman.replication_lag View

This view provides information on replication lag, which can be critical for asynchronous replication.

F.43.6. Authors

Postgres Professional, Moscow, Russia