H.5. citus — distributed database and columnar storage functionality #

citus is an extension that is made compatible with Postgres Pro and provides such major functionalities as columnar data storage and distributed OLAP database, which can be used either together or separately.

citus offers the following benefits:

  • Columnar storage with data compression.

  • The ability to scale your Postgres Pro installation to a distributed database cluster.

  • Row-based or schema-based sharding.

  • Parallelized DML operations across cluster nodes.

  • Reference tables, which can be accessed locally on each node.

  • The ability to execute DML queries on any node, which allows utilizing the full capacity of your cluster for distributed queries.

H.5.1. Limitations #

citus is incompatible with some Postgres Pro Enterprise features, take note of these limitations while arranging your work with the extension:

H.5.2. Installation #

H.5.2.1. Installing citus on a Single Node #

To enable citus on a single node, complete the following steps:

  1. Add citus to the shared_preload_libraries variable in the postgresql.conf file:

    shared_preload_libraries = 'citus'
    

    If you want to use citus together with other extensions, citus should be the first on the list of shared_preload_libraries.

  2. Reload the database server for the changes to take effect. To verify that the citus library was installed correctly, you can run the following command:

    SHOW shared_preload_libraries;
    
  3. Create the citus extension using the following query:

    CREATE EXTENSION citus;
    

The CREATE EXTENSION command in the procedure above also installs the citus_columnar extension. If you want to enable only citus_columnar, complete the same steps but specify citus_columnar instead.

H.5.2.2. Installing citus on Multiple Nodes #

To enable citus on multiple nodes, complete the following steps on all nodes:

  1. Add citus to the shared_preload_libraries variable in the postgresql.conf file:

    shared_preload_libraries = 'citus'
    

    If you want to use citus together with other extensions, citus should be the first on the list of shared_preload_libraries.

  2. Set up access permissions to the database server. By default, the database server listens only to clients on localhost. Set the listen_addresses configuration parameter to * to specify all available IP interfaces.

  3. Configure client authentication by editing the pg_hba.conf file.

  4. Reload the database server for the changes to take effect. To verify that the citus library was installed correctly, you can run the following command:

    SHOW shared_preload_libraries;
    
  5. Create the citus extension using the following query:

    CREATE EXTENSION citus;
    

When the above steps have been taken on all nodes, perform the actions below on the coordinator node for worker nodes to be able to connect to it:

  1. Register the hostname that worker nodes use to connect to the coordinator node:

    SELECT citus_set_coordinator_host('coordinator_name', coordinator_port);
    

  2. Add each worker node:

    SELECT * from citus_add_node('worker_name', worker_port);
    

  3. Verify that worker nodes are set successfully:

    SELECT * FROM citus_get_active_worker_nodes();
    

H.5.3. When to Use citus #

H.5.3.1. Multi-Tenant SaaS Database #

Most B2B applications already have the notion of a tenant, customer, or account built into their data model. In this model, the database serves many tenants, each of whose data is separate from other tenants.

citus provides full SQL functionality for this workload and enables scaling out your relational database to more than 100,000 tenants. citus also adds new features for multi-tenancy. For example, citus supports tenant isolation to provide performance guarantees for large tenants, and has the concept of reference tables to reduce data duplication across tenants.

These capabilities allow you to scale out data of your tenants across many computers and add more CPU, memory, and disk resources. Further, sharing the same database schema across multiple tenants makes efficient use of hardware resources and simplifies database management.

citus offers the following advantages for multi-tenant applications:

  • Fast queries for all tenants.

  • Sharding logic in the database rather than the application.

  • Hold more data in single-node Postgres Pro.

  • Scale out maintaining the SQL functionality.

  • Maintain performance under high concurrency.

  • Fast metrics analysis across customer base.

  • Scale to handle new customer sign-ups.

  • Isolate resource usage of large and small customers.

H.5.3.2. Real-Time Analytics #

citus supports real-time queries over large datasets. Commonly these queries occur in rapidly growing event systems or systems with time series data. Example use cases include:

  • Analytic dashboards with sub-second response times.

  • Exploratory queries on unfolding events.

  • Large dataset archival and reporting.

  • Analyzing sessions with funnel, segmentation, and cohort queries.

citus parallelizes query execution and scales linearly with the number of worker databases in a cluster. Some advantages of citus for real-time applications are as follows:

  • Maintain sub-second responses as the dataset grows.

  • Analyze new events and new data in real time.

  • Parallelize SQL queries.

  • Scale out maintaining the SQL functionality.

  • Maintain performance under high concurrency.

  • Fast responses to dashboard queries.

  • Use one database rather than many on several nodes.

  • Rich Postgres Pro data types and extensions.

H.5.3.3. Microservices #

citus supports schema-based sharding, which allows distributing regular database schemas across many computers. This sharding methodology aligns well with typical microservices architecture, where storage is fully owned by the service hence cannot share the same schema definition with other tenants.

Schema-based sharding is an easier model to adopt, create a new schema, and set the search_path in your service.

Advantages of using citus for microservices:

  • Allows distributing horizontally scalable state across services.

  • Transfer strategic business data from microservices into common distributed tables for analytics.

  • Efficiently use hardware by balancing services on multiple computers.

  • Isolate noisy services to their own nodes.

  • Easy to understand sharding model.

  • Quick adoption.

H.5.3.4. Considerations for Use #

citus extends Postgres Pro with distributed functionality, but it is not a drop-in replacement that scales out all workloads. A performant citus cluster involves thinking about the data model, tooling, and choice of SQL features used.

A good way to think about tools and SQL features is the following: if your workload aligns with use cases described here and you happen to run into an unsupported tool or query, then there is usually a good workaround.

H.5.3.5. When citus is Inappropriate #

Some workloads do not need a powerful distributed database, while others require a large flow of information between worker nodes. In the first case citus is unnecessary and in the second not generally performant. Below are a few examples when you do not need to use citus:

  • You do not expect your workload to ever grow beyond a single Postgres Pro Enterprise node.

  • Offline analytics, without the need for real-time data transfer nor real-time queries.

  • Analytics apps that do not need to support a large number of concurrent users.

  • Queries that return data-heavy ETL results rather than summaries.

H.5.4. Quick Tutorials #

H.5.4.1. Multi-Tenant Applications #

In this tutorial a sample ad analytics dataset is used to demonstrate how you can use citus to power your multi-tenant application.

Note

This tutorial assumes that you already have citus installed and running. If not, consult Section H.5.2.1 to set up the extension locally.

H.5.4.1.1. Data Model and Sample Data #

This section shows how to create a database for an ad analytics app, which can be used by companies to view, change, analyze, and manage their ads and campaigns (see an example app). Such an application has good characteristics of a typical multi-tenant system. Data from different tenants is stored in a central database, and each tenant has an isolated view of their own data.

Three Postgres Pro tables to represent this data will be used. To get started, download sample data for these tables:

curl https://examples.citusdata.com/tutorial/companies.csv > companies.csv
curl https://examples.citusdata.com/tutorial/campaigns.csv > campaigns.csv
curl https://examples.citusdata.com/tutorial/ads.csv > ads.csv
H.5.4.1.2. Creating Tables #
  1. First connect to the citus coordinator using psql.

    If you are using citus installed as described in Section H.5.2.1, the coordinator node will be running on port 9700.

    psql -p 9700
    
  2. Create tables by using the standard Postgres Pro CREATE TABLE command:

    CREATE TABLE companies (
        id bigint NOT NULL,
        name text NOT NULL,
        image_url text,
        created_at timestamp without time zone NOT NULL,
        updated_at timestamp without time zone NOT NULL
    );
    
    CREATE TABLE campaigns (
        id bigint NOT NULL,
        company_id bigint NOT NULL,
        name text NOT NULL,
        cost_model text NOT NULL,
        state text NOT NULL,
        monthly_budget bigint,
        blacklisted_site_urls text[],
        created_at timestamp without time zone NOT NULL,
        updated_at timestamp without time zone NOT NULL
    );
    
    CREATE TABLE ads (
        id bigint NOT NULL,
        company_id bigint NOT NULL,
        campaign_id bigint NOT NULL,
        name text NOT NULL,
        image_url text,
        target_url text,
        impressions_count bigint DEFAULT 0,
        clicks_count bigint DEFAULT 0,
        created_at timestamp without time zone NOT NULL,
        updated_at timestamp without time zone NOT NULL
    );
    
  3. Create primary key indexes on each of the tables just like you would do in Postgres Pro:

    ALTER TABLE companies ADD PRIMARY KEY (id);
    ALTER TABLE campaigns ADD PRIMARY KEY (id, company_id);
    ALTER TABLE ads ADD PRIMARY KEY (id, company_id);
    
H.5.4.1.3. Distributing Tables and Loading Data #

Now you can instruct citus to distribute tables created above across the different nodes in the cluster. To do so, run the create_distributed_table function and specify the table you want to shard and the column you want to shard on. In the example below, all the tables are sharded on the company_id column.

SELECT create_distributed_table('companies', 'id');
SELECT create_distributed_table('campaigns', 'company_id');
SELECT create_distributed_table('ads', 'company_id');

Sharding all tables on the company_id column allows citus to co-locate the tables together and allows for features like primary keys, foreign keys, and complex joins across your cluster.

Then you can go ahead and load the downloaded data into the tables using the standard psql \copy command. Make sure that you specify the correct file path if you downloaded the file to a different location.

\copy companies from 'companies.csv' with csv
\copy campaigns from 'campaigns.csv' with csv
\copy ads from 'ads.csv' with csv
H.5.4.1.4. Running Queries #

After the data is loaded into the tables, you can run some queries. citus supports standard INSERT, UPDATE, and DELETE commands for inserting and modifying rows in a distributed table, which is the typical way of interaction for a user-facing application.

For example, you can insert a new company by running:

INSERT INTO companies VALUES (5000, 'New Company', 'https://randomurl/image.png', now(), now());

If you want to double the budget for all campaigns of the company, run the UPDATE command:

UPDATE campaigns
SET monthly_budget = monthly_budget*2
WHERE company_id = 5;

Another example of such an operation is to run transactions, which span multiple tables. For example, you can delete a campaign and all its associated ads atomically by running:

BEGIN;
DELETE FROM campaigns WHERE id = 46 AND company_id = 5;
DELETE FROM ads WHERE campaign_id = 46 AND company_id = 5;
COMMIT;

Each statement in a transaction causes round-trips between the coordinator and workers in the multi-node citus. For multi-tenant workloads, it is more efficient to run transactions in distributed functions. The efficiency gains become more apparent for larger transactions, but you can use the small transaction above as an example.

  1. First create a function that does the deletions:

    CREATE OR REPLACE FUNCTION
      delete_campaign(company_id int, campaign_id int)
    RETURNS void LANGUAGE plpgsql AS $fn$
    BEGIN
      DELETE FROM campaigns
       WHERE id = $2 AND campaigns.company_id = $1;
      DELETE FROM ads
       WHERE ads.campaign_id = $2 AND ads.company_id = $1;
    END;
    $fn$;
    
  2. Next use create_distributed_function to instruct citus to call the function directly on workers rather than on the coordinator (except on a single-node citus installation, which runs everything on the coordinator). It calls the function on whatever worker holds the shards for the ads and campaigns tables corresponding to the company_id value.

    SELECT create_distributed_function(
      'delete_campaign(int, int)', 'company_id',
      colocate_with := 'campaigns'
    );
    
    -- You can run the function as usual
    SELECT delete_campaign(5, 46);
    
  3. Besides transactional operations, you can also run analytics queries using standard SQL. One interesting query for a company to run is to see details about its campaigns with maximum budget.

    SELECT name, cost_model, state, monthly_budget
    FROM campaigns
    WHERE company_id = 5
    ORDER BY monthly_budget DESC
    LIMIT 10;
    
  4. You can also run a join query across multiple tables to see information about running campaigns, which receive the most clicks and impressions.

    SELECT campaigns.id, campaigns.name, campaigns.monthly_budget,
           sum(impressions_count) AS total_impressions, sum(clicks_count) AS total_clicks
    FROM ads, campaigns
    WHERE ads.company_id = campaigns.company_id
    AND ads.campaign_id = campaigns.id
    AND campaigns.company_id = 5
    AND campaigns.state = 'running'
    GROUP BY campaigns.id, campaigns.name, campaigns.monthly_budget
    ORDER BY total_impressions, total_clicks;
    

The tutorial above shows how to use citus to power a simple multi-tenant application. As a next step, you can look at the Multi-Tenant Apps section to see how you can model your own data for multi-tenancy.

H.5.4.2. Real-Time Analytics #

This tutorial demonstrates how to use citus to ingest events data and run analytical queries on that data in human real-time. A sample GitHub events dataset is used to this end in the example.

Note

This tutorial assumes that you already have citus installed and running. If not, consult Section H.5.2.1 to set up the extension locally.

H.5.4.2.1. Data Model and Sample Data #

This section shows how to create a database for a real-time analytics application. This application will insert large volumes of events data and enable analytical queries on that data with sub-second latencies. In this example, the GitHub events dataset is used. This dataset includes all public events on GitHub, such as commits, forks, new issues, and comments on these issues.

Two Postgres Pro tables are used to represent this data. To get started, download sample data for these tables:

curl https://examples.citusdata.com/tutorial/users.csv > users.csv
curl https://examples.citusdata.com/tutorial/events.csv > events.csv
H.5.4.2.2. Creating Tables #

To start first connect to the citus coordinator using psql.

If you are using citus installed as described in Section H.5.2.1, the coordinator node will be running on port 9700.

psql -p 9700

Then you can create the tables by using the standard Postgres Pro CREATE TABLE command:

CREATE TABLE github_events
(
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    user_id bigint,
    org jsonb,
    created_at timestamp
);

CREATE TABLE github_users
(
    user_id bigint,
    url text,
    login text,
    avatar_url text,
    gravatar_id text,
    display_login text
);

Next you can create indexes on events data just like you do in Postgres Pro. This example also shows how to create a GIN index to make querying on JSONB fields faster.

CREATE INDEX event_type_index ON github_events (event_type);
CREATE INDEX payload_index ON github_events USING GIN (payload jsonb_path_ops);
H.5.4.2.3. Distributing Tables and Loading Data #

Now you can instruct citus to distribute the tables created above across the nodes in the cluster. To do so, you can call the create_distributed_table function and specify the table you want to shard and the column you want to shard on. In the example below, all the tables are sharded on the user_id column.

SELECT create_distributed_table('github_users', 'user_id');
SELECT create_distributed_table('github_events', 'user_id');

Sharding all tables on the user_id column allows citus to co-locate the tables together and allows for efficient joins and distributed roll-ups.

Then you can go ahead and load the downloaded data into the tables using the standard psql \copy command. Make sure that you specify the correct file path if you downloaded the file to a different location.

\copy github_users from 'users.csv' with csv
\copy github_events from 'events.csv' with csv
H.5.4.2.4. Running Queries #

After the data is loaded into the tables, you can run some queries. First check how many users are contained in the distributed database.

SELECT count(*) FROM github_users;

Now analyze GitHub push events in the data. First compute the number of commits per minute by using the number of distinct commits in each push event.

SELECT date_trunc('minute', created_at) AS minute,
       sum((payload->>'distinct_size')::int) AS num_commits
FROM github_events
WHERE event_type = 'PushEvent'
GROUP BY minute
ORDER BY minute;

Also, there is a users table. You can also join the users with events and find the top ten users who created the most repositories.

SELECT login, count(*)
FROM github_events ge
JOIN github_users gu
ON ge.user_id = gu.user_id
WHERE event_type = 'CreateEvent' AND payload @> '{"ref_type": "repository"}'
GROUP BY login
ORDER BY count(*) DESC LIMIT 10;

citus also supports standard INSERT, UPDATE, and DELETE commands for inserting and modifying data. For example, you can update the user display login by running the following command:

UPDATE github_users SET display_login = 'no1youknow' WHERE user_id = 24305673;

As a next step, you can look at the Real-Time Apps section to see how you can model your own data and power real-time analytical applications.

H.5.4.3. Microservices #

This tutorial shows how to use citus as the storage backend for multiple microservices and demonstrates a sample setup and basic operation of such a cluster.

Note

This tutorial assumes that you already have citus installed and running. If not, consult Section H.5.2.1 to set up the extension locally.

H.5.4.3.1. Distributed Schemas #

Distributed schemas are relocatable within a citus cluster. The system can rebalance them as a whole unit across the available nodes, which allows for efficient sharing of resources without manual allocation.

By design, microservices own their storage layer, we do not make any assumptions on the type of tables and data that they will create and store. We, however, provide a schema for every service and assume that they use a distinct role to connect to the database. When a user connects, their role name is put at the beginning of the search_path, so if the role matches the schema name, you do not need any application changes to set the correct search_path.

Three services are used in the example:

  • user service

  • time service

  • ping service

To start first connect to the citus coordinator using psql.

If you are using citus installed as described in Section H.5.2.1, the coordinator node will be running on port 9700.

psql -p 9700

You can now create the database roles for every service:

CREATE USER user_service;
CREATE USER time_service;
CREATE USER ping_service;

There are two ways to distribute a schema in citus:

  • Manually by calling the citus_schema_distribute('schema_name') function:

    CREATE SCHEMA AUTHORIZATION user_service;
    CREATE SCHEMA AUTHORIZATION time_service;
    CREATE SCHEMA AUTHORIZATION ping_service;
    
    SELECT citus_schema_distribute('user_service');
    SELECT citus_schema_distribute('time_service');
    SELECT citus_schema_distribute('ping_service');
    

    This method also allows you to convert existing regular schemas into distributed schemas.

    Note

    You can only distribute schemas that do not contain distributed and reference tables.

  • Alternative approach is to enable citus.enable_schema_based_sharding configuration parameter:

    SET citus.enable_schema_based_sharding TO ON;
    
    CREATE SCHEMA AUTHORIZATION user_service;
    CREATE SCHEMA AUTHORIZATION time_service;
    CREATE SCHEMA AUTHORIZATION ping_service;
    

    The parameter can be changed for the current session or permanently in the postgresql.conf file. With the parameter set to ON all created schemas are distributed by default.

You can list the currently distributed schemas:

SELECT * FROM citus_schemas;
schema_name  | colocation_id | schema_size | schema_owner
-------------+---------------+-------------+--------------
user_service |             5 | 0 bytes     | user_service
time_service |             6 | 0 bytes     | time_service
ping_service |             7 | 0 bytes     | ping_service
(3 rows)
H.5.4.3.2. Creating Tables #

You now need to connect to the citus coordinator for every microservice. You can use the \c command to swap the user within an existing psql instance.

\c citus user_service
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL
);
\c citus time_service
CREATE TABLE query_details (
    id SERIAL PRIMARY KEY,
    ip_address INET NOT NULL,
    query_time TIMESTAMP NOT NULL
);
\c citus ping_service
CREATE TABLE ping_results (
    id SERIAL PRIMARY KEY,
    host VARCHAR(255) NOT NULL,
    result TEXT NOT NULL
);
H.5.4.3.3. Configure Services #

For the purpose of this tutorial a very simple set of services is used. You can obtain them by cloning this public repository:

git clone https://github.com/citusdata/citus-example-microservices.git

The repository contains the ping, time, and user services. All of them have the app.py file, which we run.

$ tree
.
├── LICENSE
├── README.md
├── ping
│   ├── app.py
│   ├── ping.sql
│   └── requirements.txt
├── time
│   ├── app.py
│   ├── requirements.txt
│   └── time.sql
└── user
    ├── app.py
    ├── requirements.txt
    └── user.sql

Before you run the services, however, edit the user/app.py, ping/app.py, and time/app.py files providing the connection configuration for your citus cluster:

# Database configuration
db_config = {
    'host': 'localhost',
    'database': 'citus',
    'user': 'ping_service',
    'port': 9700
}

After making the changes save all modified files and move on to the next step of running the services.

H.5.4.3.4. Running the Services #
  • Change into every app directory and run them in their own python environment.

    cd user
    pipenv install
    pipenv shell
    python app.py
    

    Repeat the above for the time and ping service, after which you can use the API.

  • Create some users:

    curl -X POST -H "Content-Type: application/json" -d '[
      {"name": "John Doe", "email": "john@example.com"},
      {"name": "Jane Smith", "email": "jane@example.com"},
      {"name": "Mike Johnson", "email": "mike@example.com"},
      {"name": "Emily Davis", "email": "emily@example.com"},
      {"name": "David Wilson", "email": "david@example.com"},
      {"name": "Sarah Thompson", "email": "sarah@example.com"},
      {"name": "Alex Miller", "email": "alex@example.com"},
      {"name": "Olivia Anderson", "email": "olivia@example.com"},
      {"name": "Daniel Martin", "email": "daniel@example.com"},
      {"name": "Sophia White", "email": "sophia@example.com"}
    ]' http://localhost:5000/users
    
  • List the created users:

    curl http://localhost:5000/users
    
  • Get current time:

    curl http://localhost:5001/current_time
    
  • Run the ping against example.com:

    curl -X POST -H "Content-Type: application/json" -d '{"host": "example.com"}' http://localhost:5002/ping
    
H.5.4.3.5. Exploring the Database #

Now that we called some API functions, data has been stored and we can check if the citus_schemas view reflects what we expect:

SELECT * FROM citus_schemas;
schema_name  | colocation_id | schema_size | schema_owner
--------------+---------------+-------------+--------------
user_service |             1 | 112 kB      | user_service
time_service |             2 | 32 kB       | time_service
ping_service |             3 | 32 kB       | ping_service
(3 rows)

At the time of schemas creation you do not instruct citus on which computer to create them. It is done automatically. Execute the following query to see where each schema resides:

SELECT nodename,nodeport, table_name, pg_size_pretty(sum(shard_size))
  FROM citus_shards
GROUP BY nodename,nodeport, table_name;
nodename  | nodeport |         table_name         | pg_size_pretty
-----------+----------+----------------------------+----------------
localhost |     9701 | time_service.query_details | 32 kB
localhost |     9702 | user_service.users         | 112 kB
localhost |     9702 | ping_service.ping_results  | 32 kB

We can see that the time service landed on node localhost:9701, while the user and ping services share space on the second worker localhost:9702. This is only an example, and the data sizes here can be ignored, but let us assume that we are annoyed by the uneven storage space utilization between the nodes. It makes more sense to have the two smaller time and ping services reside on one computer, while the large user service resides alone.

We can do this by instructing citus to rebalance the cluster by disk size:

SELECT citus_rebalance_start();
NOTICE:  Scheduled 1 moves as job 1
DETAIL:  Rebalance scheduled as background job
HINT:  To monitor progress, run: SELECT * FROM citus_rebalance_status();
 citus_rebalance_start
-----------------------
                     1
(1 row)

When done, check how the new layout looks:

SELECT nodename,nodeport, table_name, pg_size_pretty(sum(shard_size))
  FROM citus_shards
GROUP BY nodename,nodeport, table_name;
nodename  | nodeport |         table_name         | pg_size_pretty
-----------+----------+----------------------------+----------------
localhost |     9701 | time_service.query_details | 32 kB
localhost |     9701 | ping_service.ping_results  | 32 kB
localhost |     9702 | user_service.users         | 112 kB
(3 rows)

We expect that the schemas have been moved and the cluster has become more balanced. This operation is transparent for the applications. Therefore, there is no need for a restart, and they will continue serving queries.

H.5.5. Architecture Concepts #

H.5.5.1. Nodes #

citus is a Postgres Pro extension that allows commodity database servers (called nodes) to coordinate with one another in a shared-nothing architecture. The nodes form a cluster that allows Postgres Pro to hold more data and use more CPU cores than would be possible on a single computer. This architecture also allows the database to scale by simply adding more nodes to the cluster.

Every cluster has one special node called the coordinator (the others are known as workers). Applications send their queries to the coordinator node, which relays it to the relevant workers and accumulates the results.

For each query, the coordinator either routes it to a single worker node, or parallelizes it across several depending on whether the required data lives on a single node or multiple. The coordinator knows how to do this by consulting its metadata tables. These tables specific to citus track the DNS names and health of worker nodes, and the distribution of data across nodes. For more information, see the citus Tables and Views section.

H.5.5.2. Sharding Models #

Sharding is a technique used in database systems and distributed computing to horizontally partition data across multiple servers or nodes. It involves breaking up a large database or dataset into smaller, more manageable parts called shards. Each shard contains a subset of the data, and together they form the complete dataset.

citus offers two types of data sharding: row-based and schema-based. Each option comes with its own sharding tradeoffs allowing you to choose the approach that best aligns with requirements of your application.

H.5.5.2.1. Row-Based Sharding #

The traditional way in which citus shards tables is the single database, shared schema model also known as row-based sharding, tenants co-exist as rows within the same table. The tenant is determined by defining the distribution column, which allows splitting up a table horizontally.

This is the most hardware efficient way of sharding. Tenants are densely packed and distributed among the nodes in the cluster. This approach, however, requires making sure that all tables in the schema have the distribution column and that all queries in the application filter by it. Row-based sharding shines in IoT workloads and for achieving the best margin out of hardware use.

Benefits:

  • Best performance

  • Best tenant density per node

Drawbacks:

  • Requires schema modifications

  • Requires application query modifications

  • All tenants must share the same schema

H.5.5.2.2. Schema-Based Sharding #

Schema-based sharding is the shared database, separate schema model, the schema becomes the logical shard within the database. Multi-tenant apps can a use a schema per tenant to easily shard along the tenant dimension. Query changes are not required and the application usually only needs a small modification to set the proper search_path when switching tenants. Schema-based sharding is an ideal solution for microservices, and for ISVs deploying applications that cannot undergo the changes required to onboard row-based sharding.

Benefits:

  • Tenants can have heterogeneous schemas

  • No schema modifications required

  • No application query modifications required

  • Schema-based sharding SQL compatibility is better compared to the row-based sharding

Drawbacks:

  • Fewer tenants per node compared to row-based sharding

H.5.5.2.3. Sharding Tradeoffs #
Schema-Based Sharding Row-Based Sharding
Multi-tenancy model Separate schema per tenant Shared tables with tenant ID columns
citus version 12.0+ All versions
Additional steps compared to Postgres Pro None, only a config change Use the create_distributed_table function on each table to distribute and co-locate tables by tenant_id
Number of tenants 1-10k 1-1M+
Data modelling requirement No foreign keys across distributed schemas Need to include the tenant_id column (a distribution column, also known as a sharding key) in each table, and in primary keys, foreign keys
SQL requirement for single node queries Use a single distributed schema per query Joins and WHERE clauses should include tenant_id column
Parallel cross-tenant queries No Yes
Custom table definitions per tenant Yes No
Access control Schema permissions Schema permissions
Data sharing across tenants Yes, using reference tables (in a separate schema) Yes, using reference tables
Tenant to shard isolation Every tenant has its own shard group by definition Can give specific tenant IDs their own shard group via the isolate_tenant_to_new_shard function.

H.5.5.3. Distributed Data #

H.5.5.3.1. Table Types #

There are several types of tables in a citus cluster, each used for different purposes.

  • Type 1: Distributed Tables.

    The first type, and most common, is distributed tables. These appear to be normal tables to SQL statements, but are horizontally partitioned across worker nodes. See the figure below to learn more.

    Figure H.1. Parallel SELECT Diagram


    Here the rows of table are stored in tables table_1001, table_1002, etc. on the workers. The component worker tables are called shards.

    citus runs not only SQL but DDL statements throughout a cluster, so changing the schema of a distributed table cascades to update all the table shards across workers.

    To learn how to create a distributed table, see the Creating and Modifying Distributed Objects (DDL) section.

    Distribution Column.  citus uses algorithmic sharding to assign rows to shards. This means the assignment is made deterministically — in our case based on the value of a particular table column called the distribution column. The cluster administrator must designate this column when distributing a table. Making the right choice is important for performance and functionality, as described in the general topic of Section H.5.6.2.

  • Type 2: Reference Tables.

    A reference table is a type of distributed table whose entire contents are concentrated into a single shard, which is replicated on every worker. Thus queries on any worker can access the reference information locally, without the network overhead of requesting rows from another node. Reference tables have no distribution column because there is no need to distinguish separate shards per row.

    Reference tables are typically small and are used to store data that is relevant to queries running on any worker node. For example, enumerated values like order statuses or product categories.

    When interacting with a reference table, we automatically perform two-phase commits on transactions. This means that citus makes sure your data is always in a consistent state, regardless of whether you are writing, modifying or deleting it.

    The Reference Tables section talks more about these tables and how to create them.

  • Type 3: Local Tables.

    When you use citus, the coordinator node you connect to and interact with is a regular Postgres Pro database with the citus extension installed. Thus you can create ordinary tables and choose not to shard them. This is useful for small administrative tables that do not participate in join queries. An example would be users table for application login and authentication.

    Creating standard Postgres Pro tables is easy because it is the default. It is what you get when you run CREATE TABLE. In almost every citus deployment we see standard Postgres Pro tables co-existing with distributed and reference tables. Indeed, citus itself uses local tables to hold cluster metadata, as mentioned earlier.

  • Type 4: Local Managed Tables.

    When the citus.enable_local_reference_table_foreign_keys configuration parameter is enabled, citus may automatically add local tables to metadata if a foreign key reference exists between a local table and a reference table. Additionally this tables can be manually created by calling the citus_add_local_table_to_metadata function on regular local tables. Tables present in metadata are considered managed tables and can be queried from any node, citus will know to route to the coordinator to obtain data from the local managed table. Such tables are displayed as local in the citus_tables view.

  • Type 5: Schema Tables.

    When using schema-based sharding, distributed schemas are automatically associated with individual co-location groups such that the tables created in those schemas are automatically converted to co-located distributed tables without a shard key. Such tables are considered schema tables and are displayed as schema in the citus_tables view.

H.5.5.3.2. Shards #

The previous section described a shard as containing a subset of the rows of a distributed table in a smaller table within a worker node. This section gets more into the technical details.

The pg_dist_shard metadata table on the coordinator contains a row for each shard of each distributed table in the system. The row matches a shardid with a range of integers in a hash space (shardminvalue, shardmaxvalue):

SELECT * FROM pg_dist_shard;
 logicalrelid  | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------+---------+--------------+---------------+---------------
 github_events |  102026 | t            | 268435456     | 402653183
 github_events |  102027 | t            | 402653184     | 536870911
 github_events |  102028 | t            | 536870912     | 671088639
 github_events |  102029 | t            | 671088640     | 805306367
 (4 rows)

If the coordinator node wants to determine which shard holds a row of github_events, it hashes the value of the distribution column in the row, and checks which shard's range contains the hashed value. (The ranges are defined so that the image of the hash function is their disjoint union.)

H.5.5.3.2.1. Shard Placements #

Suppose that shard 102027 is associated with the row in question. This means the row should be read or written to a table called github_events_102027 in one of the workers. Which worker? That is determined entirely by the metadata tables, and the mapping of shard to worker is known as the shard placement.

Joining some metadata tables gives us the answer. These are the types of lookups that the coordinator does to route queries. It rewrites queries into fragments that refer to the specific tables like github_events_102027, and runs those fragments on the appropriate workers.

SELECT
    shardid,
    node.nodename,
    node.nodeport
FROM pg_dist_placement placement
JOIN pg_dist_node node
  ON placement.groupid = node.groupid
 AND node.noderole = 'primary'::noderole
WHERE shardid = 102027;
┌─────────┬───────────┬──────────┐
│ shardid │ nodename  │ nodeport │
├─────────┼───────────┼──────────┤
│  102027 │ localhost │     5433 │
└─────────┴───────────┴──────────┘

In our example of github_events there were four shards. The number of shards is configurable per table at the time of its distribution across the cluster. The best choice of shard count depends on your use case.

Finally note that citus allows shards to be replicated for protection against data loss using Postgres Pro streaming replication to back up the entire database of each node to a follower database. This is transparent and does not require the involvement of citus metadata tables.

H.5.5.3.3. Co-Location #

Since shards can be placed on nodes as desired, it makes sense to place shards containing related rows of related tables together on the same nodes. That way join queries between them can avoid sending as much information over the network, and can be performed inside a single citus node.

One example is a database with stores, products, and purchases. If all three tables contain — and are distributed by — the store_id column, then all queries restricted to a single store can run efficiently on a single worker node. This is true even when the queries involve any combination of these tables.

For the full explanation and examples of this concept, see the Table Co-Location section.

H.5.5.3.4. Parallelism #

Spreading queries across multiple computers allows more queries to run at once, and allows processing speed to scale by adding new computers to the cluster. Additionally splitting a single query into fragments as described in the previous section boosts the processing power devoted to it. The latter situation achieves the greatest parallelism, meaning utilization of CPU cores.

Queries reading or affecting shards spread evenly across many nodes are able to run at real-time speed. Note that the results of the query still need to pass back through the coordinator node, so the speedup is most apparent when the final results are compact, such as aggregate functions like counting and descriptive statistics.

The Query Processing section explains more about how queries are broken into fragments and how their execution is managed.

H.5.5.4. Query Execution #

When executing multi-shard queries, citus must balance the gains from parallelism with the overhead from database connections (network latency and worker node resource usage). To configure citus query execution for best results with your database workload, it helps to understand how citus manages and conserves database connections between the coordinator node and worker nodes.

citus transforms each incoming multi-shard query session into per-shard queries called tasks. It queues the tasks, and runs them once it is able to obtain connections to the relevant worker nodes. For queries on distributed tables foo and bar, see the connection management diagram below.

Figure H.2. Executor Overview


The coordinator node has a connection pool for each session. Each query (such as SELECT * FROM foo in the diagram) is limited to opening at most simultaneous connections for its tasks per worker set in the citus.max_adaptive_executor_pool_size configuration parameter. It is configurable at the session level, for priority management.

It can be faster to execute short tasks sequentially over the same connection rather than establishing new connections for them in parallel. Long running tasks, on the other hand, benefit from more immediate parallelism.

To balance the needs of short and long tasks, citus uses the citus.executor_slow_start_interval configuration parameter. It specifies a delay between connection attempts for the tasks in a multi-shard query. When a query first queues tasks, the tasks can acquire just one connection. At the end of each interval where there are pending connections, citus increases the number of simultaneous connections it will open. The slow start behavior can be disabled entirely by setting the GUC to 0.

When a task finishes using a connection, the session pool will hold the connection open for later. Caching the connection avoids the overhead of connection reestablishment between coordinator and worker. However, each pool will hold no more than the number of idle connections open at once set by the citus.max_cached_conns_per_worker configuration parameter, to limit idle connection resource usage in the worker.

Finally, the citus.max_shared_pool_size configuration parameter acts as a fail-safe. It limits the total connections per worker between all tasks.

H.5.6. Develop #

H.5.6.1. Determining Application Type #

Running efficient queries on a citus cluster requires that data be properly distributed across computers. This varies by the type of application and its query patterns.

There are broadly two kinds of applications that work very well on citus. The first step in data modeling is to identify which of them more closely resembles your application.

H.5.6.1.1. At a Glance #
Multi-Tenant Applications Real-Time Applications
Sometimes dozens or hundreds of tables in schema Small number of tables
Queries relating to one tenant (company/store) at a time Relatively simple analytics queries with aggregations
OLTP workloads for serving web clients High ingest volume of mostly immutable data
OLAP workloads that serve per-tenant analytical queries Often centering around a big table of events
H.5.6.1.2. Examples and Characteristics #
H.5.6.1.2.1. Multi-Tenant Applications #

These are typically SaaS applications that serve other companies, accounts, or organizations. Most SaaS applications are inherently relational. They have a natural dimension on which to distribute data across nodes: just shard by tenant_id.

citus enables you to scale out your database to millions of tenants without having to re-architect your application. You can keep the relational semantics you need, like joins, foreign key constraints, transactions, ACID, and consistency.

  • Examples: Websites, which host store-fronts for other businesses, such as a digital marketing solution, or a sales automation tool.

  • Characteristics: Queries relating to a single tenant rather than joining information across tenants. This includes OLTP workloads for serving web clients, and OLAP workloads that serve per-tenant analytical queries. Having dozens or hundreds of tables in your database schema is also an indicator for the multi-tenant data model.

Scaling a multi-tenant app with citus also requires minimal changes to application code. We have support for popular frameworks like Ruby on Rails and Django.

H.5.6.1.2.2. Real-Time Analytics #

Applications needing massive parallelism, coordinating hundreds of cores for fast results to numerical, statistical, or counting queries. By sharding and parallelizing SQL queries across multiple nodes, citus makes it possible to perform real-time queries across billions of records in under a second.

  • Examples: Customer-facing analytics dashboards requiring sub-second response times.

  • Characteristics: Few tables, often centering around a big table of device-, site- or user-events and requiring high ingest volume of mostly immutable data. Relatively simple (but computationally intensive) analytics queries involving several aggregations and GROUP BY operations.

If your situation resembles either cases above, then the next step is to decide how to shard your data in the citus cluster. As explained in Section H.5.5, citus assigns table rows to shards according to the hashed value of the table distribution column. The database administrator's choice of distribution columns needs to match the access patterns of typical queries to ensure performance.

H.5.6.2. Choosing Distribution Column #

citus uses the distribution column in distributed tables to assign table rows to shards. Choosing the distribution column for each table is one of the most important modeling decisions because it determines how data is spread across nodes.

If the distribution columns are chosen correctly, then related data will group together on the same physical nodes, making queries fast and adding support for all SQL features. If the columns are chosen incorrectly, the system will run needlessly slowly, and will not be able to support all SQL features across nodes.

This section gives distribution column tips for the two most common citus scenarios. It concludes by going in-depth on co-location, the desirable grouping of data on nodes.

H.5.6.2.1. Multi-Tenant Apps #

The multi-tenant architecture uses a form of hierarchical database modeling to distribute queries across nodes in the distributed cluster. The top of the data hierarchy is known as the tenant_id, and needs to be stored in a column on each table. citus inspects queries to see which tenant_id they involve and routes the query to a single worker node for processing, specifically the node that holds the data shard associated with the tenant_id. Running a query with all relevant data placed on the same node is called co-location.

The following diagram illustrates co-location in the multi-tenant data model. It contains two tables, Accounts and Campaigns, each distributed by account_id. The shaded boxes represent shards, each of whose color represents which worker node contains it. Green shards are stored together on one worker node, and blue on another. Notice how a join query between Accounts and Campaigns would have all the necessary data together on one node when restricting both tables to the same account_id.

Figure H.3. Multi-Tenant Co-Location


To apply this design in your own schema the first step is identifying what constitutes a tenant in your application. Common instances include company, account, organization, or customer. The column name will be something like company_id or customer_id. Examine each of your queries and ask yourself: would it work if it had additional WHERE clauses to restrict all tables involved to rows with the same tenant_id? Queries in the multi-tenant model are usually scoped to a tenant, for instance, queries on sales or inventory would be scoped within a certain store.

Best practices are as follows:

  • Partition distributed tables by the common tenant_id column. For instance, in a SaaS application where tenants are companies, the tenant_id will likely be company_id.

  • Convert small cross-tenant tables to reference tables. When multiple tenants share a small table of information, distribute it as a reference table.

  • Restrict filter all application queries by tenant_id. Each query should request information for one tenant at a time.

H.5.6.2.2. Real-Time Apps #

While the multi-tenant architecture introduces a hierarchical structure and uses data co-location to route queries per tenant, real-time architectures depend on specific distribution properties of their data to achieve highly parallel processing.

We use entity ID as a term for distribution columns in the real-time model, as opposed to tenant IDs in the multi-tenant model. Typical entities are users, hosts, or devices.

Real-time queries typically ask for numeric aggregates grouped by date or category. citus sends these queries to each shard for partial results and assembles the final answer on the coordinator node. Queries run fastest when as many nodes contribute as possible, and when no single node must do a disproportionate amount of work.

Best practices are as follows:

  • Choose a column with high cardinality as the distribution column. For comparison, a status field on an order table with values new, paid, and shipped is a poor choice of distribution column because it assumes only those few values. The number of distinct values limits the number of shards that can hold the data, and the number of nodes that can process it. Among columns with high cardinality, it is good additionally to choose those that are frequently used in group-by clauses or as join keys.

  • Choose a column with even distribution. If you distribute a table on a column skewed to certain common values, then data in the table will tend to accumulate in certain shards. The nodes holding those shards will end up doing more work than other nodes.

  • Distribute fact and dimension tables on their common columns. Your fact table can have only one distribution key. Tables that join on another key will not be co-located with the fact table. Choose one dimension to co-locate based on how frequently it is joined and the size of the joining rows.

  • Change some dimension tables into reference tables. If a dimension table cannot be co-located with the fact table, you can improve query performance by distributing copies of the dimension table to all of the nodes in the form of a reference table.

H.5.6.2.3. Timeseries Data #

In a time-series workload, applications query recent information while archiving old information.

The most common mistake in modeling timeseries information in citus is using the timestamp itself as a distribution column. A hash distribution based on time will distribute times seemingly at random into different shards rather than keeping ranges of time together in shards. However, queries involving time generally reference ranges of time (for example, the most recent data), so such a hash distribution would lead to network overhead.

Best practices are as follows:

  • Do not choose a timestamp as the distribution column. Choose a different distribution column. In a multi-tenant app, use the tenant_id or in a real-time app use the entity_id.

  • Use Postgres Pro table partitioning for time instead. Use table partitioning to break a big table of time-ordered data into multiple inherited tables with each containing different time ranges. Distributing a Postgres Pro partitioned table in citus creates shards for the inherited tables.

H.5.6.2.4. Table Co-Location #

Relational databases are the first choice of data store for many applications due to their enormous flexibility and reliability. Historically the one criticism of relational databases is that they can run on only a single computer, which creates inherent limitations when data storage needs outpace server improvements. The solution to rapidly scaling databases is to distribute them, but this creates a performance problem of its own: relational operations such as joins then need to cross the network boundary. Co-location is the practice of dividing data tactically, where one keeps related information on the same computers to enable efficient relational operations, but takes advantage of the horizontal scalability for the whole dataset.

The principle of data co-location is that all tables in the database have a common distribution column and are sharded across computers in the same way, such that rows with the same distribution column value are always on the same computer, even across different tables. As long as the distribution column provides a meaningful grouping of data, relational operations can be performed within the groups.

H.5.6.2.4.1. Data Co-Location in citus for Hash-Distributed Tables #

The citus extension for Postgres Pro is unique in being able to form a distributed database of databases. Every node in a citus cluster is a fully functional Postgres Pro database and the extension adds the experience of a single homogenous database on top. While it does not provide the full functionality of Postgres Pro in a distributed way, in many cases it can take full advantage of features offered by Postgres Pro on a single computer through co-location, including full SQL support, transactions, and foreign keys.

In citus a row is stored in a shard if the hash of the value in the distribution column falls within the shard hash range. To ensure co-location, shards with the same hash range are always placed on the same node even after rebalance operations, such that equal distribution column values are always on the same node across tables. See the figure below to learn more.

Figure H.4. Co-Location Shards


A distribution column that we have found to work well in practice is tenant_id in multi-tenant applications. For example, SaaS applications typically have many tenants, but every query they make is specific to a particular tenant. While one option is providing a database or schema for every tenant, it is often costly and impractical as there can be many operations that span across users (data loading, migrations, aggregations, analytics, schema changes, backups, etc). That becomes harder to manage as the number of tenants grows.

H.5.6.2.4.2. Practical Example of Co-Location #

Consider the following tables, which might be part of a multi-tenant web analytics SaaS:

CREATE TABLE event (
  tenant_id int,
  event_id bigint,
  page_id int,
  payload jsonb,
  primary key (tenant_id, event_id)
);

CREATE TABLE page (
  tenant_id int,
  page_id int,
  path text,
  primary key (tenant_id, page_id)
);

Now we want to answer queries that may be issued by a customer-facing dashboard, such as: Return the number of visits in the past week for all pages starting with /blog in tenant six.

H.5.6.2.4.3. Using Regular Postgres Pro Tables #

If our data was in a single Postgres Pro node, we could easily express our query using the rich set of relational operations offered by SQL:

SELECT page_id, count(event_id)
FROM
  page
LEFT JOIN  (
  SELECT * FROM event
  WHERE (payload->>'time')::timestamptz >= now() - interval '1 week'
) recent
USING (tenant_id, page_id)
WHERE tenant_id = 6 AND path LIKE '/blog%'
GROUP BY page_id;

As long as the working set for this query fits in memory, this is an appropriate solution for many applications since it offers maximum flexibility. However, even if you do not need to scale yet, it can be useful to consider the implications of scaling out on your data model.

H.5.6.2.4.4. Distributing Tables by ID #

As the number of tenants and the data stored for each tenant grows, query times will typically go up as the working set no longer fits in memory or CPU becomes a bottleneck. In this case, we can shard the data across many nodes using citus. The first and the most important choice we need to make when sharding is the distribution column. Let's start with a naive choice of using event_id for the event table and page_id for the page table:

-- Naively use event_id and page_id as distribution columns

SELECT create_distributed_table('event', 'event_id');
SELECT create_distributed_table('page', 'page_id');

Given that the data is dispersed across different workers, we cannot simply perform a join as we would on a single Postgres Pro node. Instead, we will need to issue two queries:

Across all shards of the page table (Q1):

SELECT page_id FROM page WHERE path LIKE '/blog%' AND tenant_id = 6;

Across all shards of the event table (Q2):

SELECT page_id, count(*) AS count
FROM event
WHERE page_id IN (/*…page IDs from first query…*/)
  AND tenant_id = 6
  AND (payload->>'time')::date >= now() - interval '1 week'
GROUP BY page_id ORDER BY count DESC LIMIT 10;

Afterwards, the results from the two steps need to be combined by the application.

The data required to answer the query is scattered across the shards on the different nodes and each of those shards will need to be queried. See the figure below to learn more.

Figure H.5. Co-Location With Inefficient Queries


In this case the data distribution creates substantial drawbacks:

  • Overhead from querying each shard, running multiple queries.

  • Overhead of Q1 returning many rows to the client.

  • Q2 becomes very large.

  • The need to write queries in multiple steps, combine results, requires changes in the application.

A potential upside of the relevant data being dispersed is that the queries can be parallelised, which citus will do. However, this is only beneficial if the amount of work that the query does is substantially greater than the overhead of querying many shards. It is generally better to avoid doing such heavy lifting directly from the application, for example, by pre-aggregating the data.

H.5.6.2.4.5. Distributing Tables by Tenant #

Looking at our query again, we can see that all the rows that the query needs have one dimension in common: tenant_id. The dashboard will only ever query for a tenant's own data. That means that if data for the same tenant is always co-located on a single Postgres Pro node, our original query could be answered in a single step by that node by performing a join on tenant_id and page_id.

In citus, rows with the same distribution column value are guaranteed to be on the same node. Each shard in a distributed table effectively has a set of co-located shards from other distributed tables that contain the same distribution column values (data for the same tenant). Starting over, we can create our tables with tenant_id as the distribution column.

-- Co-locate tables by using a common distribution column
SELECT create_distributed_table('event', 'tenant_id');
SELECT create_distributed_table('page', 'tenant_id', colocate_with => 'event');

In this case, citus can answer the same query that you would run on a single Postgres Pro node without modification (Q1):

SELECT page_id, count(event_id)
FROM
  page
LEFT JOIN  (
  SELECT * FROM event
  WHERE (payload->>'time')::timestamptz >= now() - interval '1 week'
) recent
USING (tenant_id, page_id)
WHERE tenant_id = 6 AND path LIKE '/blog%'
GROUP BY page_id;

Because of the tenant_id filter and join on tenant_id, citus knows that the entire query can be answered using the set of co-located shards that contain the data for that particular tenant, and the Postgres Pro node can answer the query in a single step, which enables full SQL support. See the figure below to learn more.

Figure H.6. Co-Location With Better Queries


In some cases, queries and table schemas will require minor modifications to ensure that the tenant_id is always included in unique constraints and join conditions. However, this is usually a straightforward change, and the extensive rewrite that would be required without having co-location is avoided.

While the example above queries just one node because there is a specific tenant_id = 6 filter, co-location also allows us to efficiently perform distributed joins on tenant_id across all nodes, be it with SQL limitations.

H.5.6.2.4.6. Co-Location Means Better Feature Support #

The full list of citus features that are unlocked by co-location are:

  • Full SQL support for queries on a single set of co-located shards.

  • Multi-statement transaction support for modifications on a single set of co-located shards.

  • Aggregation through INSERT...SELECT.

  • Foreign keys.

  • Distributed outer joins.

  • Pushdown CTEs.

Data co-location is a powerful technique for providing both horizontal scale and support to relational data models. The cost of migrating or building applications using a distributed database that enables relational operations through co-location is often substantially lower than moving to a restrictive data model (e.g. NoSQL) and, unlike a single-node database, it can scale out with the size of your business. For more information about migrating an existing database, see Section H.5.6.3.

H.5.6.2.4.7. Query Performance #

citus parallelizes incoming queries by breaking it into multiple fragment queries (tasks), which run on the worker shards in parallel. This allows citus to utilize the processing power of all the nodes in the cluster and also of individual cores on each node for each query. Due to this parallelization, you can get performance, which is cumulative of the computing power of all of the cores in the cluster leading to a dramatic decrease in query times versus Postgres Pro on a single server.

citus employs a two stage optimizer when planning SQL queries. The first phase involves converting the SQL queries into their commutative and associative form so that they can be pushed down and run on the workers in parallel. As discussed in previous sections, choosing the right distribution column and distribution method allows the distributed query planner to apply several optimizations to the queries. This can have a significant impact on query performance due to reduced network I/O.

The distributed executor of the citus extension then takes these individual query fragments and sends them to worker Postgres Pro instances. There are several aspects of both the distributed planner and the executor, which can be tuned in order to improve performance. When these individual query fragments are sent to the workers, the second phase of query optimization kicks in. The workers are simply running extended Postgres Pro servers and they apply Postgres Pro standard planning and execution logic to run these fragment SQL queries. Therefore, any optimization that helps Postgres Pro also helps citus. Postgres Pro by default comes with conservative resource settings; and therefore optimizing these configuration settings can improve query times significantly.

H.5.6.3. Migrating an Existing App #

Migrating an existing application to citus sometimes requires adjusting the schema and queries for optimal performance. citus extends Postgres Pro with distributed functionality, but row-based sharding is not a drop-in replacement that scales out all workloads. A performant citus cluster involves thinking about the data model, tooling, and choice of SQL features used.

There is another mode of operation in citus called schema-based sharding, and while row-based sharding results in best performance and hardware efficiency, see schema-based sharding if you are in a need for a more drop-in approach.

  1. The first steps are to optimize the existing database schema so that it can work efficiently across multiple computers.

  2. Next, update application code and queries to deal with the schema changes.

  3. After testing the changes in a development environment, the last step is to migrate production data to a citus cluster and switch over the production app. We have techniques to minimize downtime for this step.

H.5.6.3.1. Identify Distribution Strategy #
H.5.6.3.1.1. Pick Distribution Key #

The first step in migrating to citus is identifying suitable distribution keys and planning table distribution accordingly. In multi-tenant applications this will typically be an internal identifier for tenants. We typically refer to it as the tenant_id. The use cases may vary, so we advise being thorough on this step.

For guidance, consult these sections:

Review your environment to be sure that the ideal distribution key is chosen. To do so, examine schema layouts, larger tables, long-running and/or problematic queries, standard use cases, and more.

H.5.6.3.1.2. Identify Types of Tables #

Once a distribution key is identified, review the schema to identify how each table will be handled and whether any modifications to table layouts will be required.

Tables will generally fall into one of the following categories:

  • Ready for distribution. These tables already contain the distribution key, and are ready for distribution.

  • Needs backfill. These tables can be logically distributed by the chosen key but do not contain a column directly referencing it. The tables will be modified later to add the column.

  • Reference table. These tables are typically small, do not contain the distribution key, are commonly joined by distributed tables, and/or are shared across tenants. A copy of each of these tables will be maintained on all nodes. Common examples include country code lookups, product categories, and the like.

  • Local table. These are typically not joined to other tables, and do not contain the distribution key. They are maintained exclusively on the coordinator node. Common examples include admin user lookups and other utility tables.

Consider an example multi-tenant application similar to Etsy or Shopify where each tenant is a store. A simplified schema is presented in the diagram below. (Underlined items are primary keys, italicized items are foreign keys.)

Figure H.7. Simplified Schema Example


In this example stores are a natural tenant. The tenant_id is in this case the store_id. After distributing tables in the cluster, we want rows relating to the same store to reside together on the same nodes.

H.5.6.3.2. Prepare Source Tables for Migration #

Once the scope of needed database changes is identified, the next major step is to modify the data structure for the application's existing database. First, tables requiring backfill are modified to add a column for the distribution key.

H.5.6.3.2.1. Add Distribution Keys #

In our storefront example the stores and products tables have a store_id and are ready for distribution. Being normalized, the line_items table lacks store_id. If we want to distribute by store_id, the table needs this column.

-- Denormalize line_items by including store_id

ALTER TABLE line_items ADD COLUMN store_id uuid;

Be sure to check that the distribution column has the same type in all tables, e.g. do not mix int and bigint. The column types must match to ensure proper data co-location.

H.5.6.3.2.2. Backfill Newly Created Columns #

Once the schema is updated, backfill missing values for the tenant_id column in tables where the column was added. In our example line_items requires values for store_id.

We backfill the table by obtaining the missing values from a join query with orders:

UPDATE line_items
   SET store_id = orders.store_id
  FROM line_items
 INNER JOIN orders
 WHERE line_items.order_id = orders.order_id;

Doing the whole table at once may cause too much load on the database and disrupt other queries. The backfill can be done more slowly instead. One way to do that is to make a function that backfills small batches at a time, then call the function repeatedly with pg_cron.

-- The function to backfill up to one
-- thousand rows from line_items

CREATE FUNCTION backfill_batch()
RETURNS void LANGUAGE sql AS $$
  WITH batch AS (
    SELECT line_items_id, order_id
      FROM line_items
     WHERE store_id IS NULL
     LIMIT 1000
       FOR UPDATE
      SKIP LOCKED
  )
  UPDATE line_items AS li
     SET store_id = orders.store_id
    FROM batch, orders
   WHERE batch.line_item_id = li.line_item_id
     AND batch.order_id = orders.order_id;
$$;

-- Run the function every quarter hour
SELECT cron.schedule('*/15 * * * *', 'SELECT backfill_batch()');

-- Note the return value of cron.schedule

Once the backfill is caught up, the cron job can be disabled:

-- Assuming 42 is the job id returned
-- from cron.schedule

SELECT cron.unschedule(42);
H.5.6.3.3. Prepare Application for citus #
H.5.6.3.3.1. Set Up Development citus Cluster #

When modifying the application to work with citus, you will need a database to test against. Follow the instructions in Section H.5.2.1 to set up the extension.

Next dump a copy of the schema from your application's original database and restore the schema in the new development database.

# get schema from source db

pg_dump \
   --format=plain \
   --no-owner \
   --schema-only \
   --file=schema.sql \
   --schema=target_schema \
   postgres://user:pass@host:5432/db

# load schema into test db

psql postgres://user:pass@testhost:5432/db -f schema.sql

The schema should include a distribution key (tenant_id) in all tables you wish to distribute. Before running pg_dump for the schema, be sure to prepare source tables for migration.

Include Distribution Column in Keys #

citus cannot enforce uniqueness constraints unless a unique index or primary key contains the distribution column. Thus we must modify primary and foreign keys in our example to include store_id.

Some of the libraries listed in the next section are able to help migrate the database schema to include the distribution column in keys. However, here is an example of the underlying SQL commands to turn the simple keys composite in the development database:

BEGIN;

-- Drop simple primary keys (cascades to foreign keys)

ALTER TABLE products   DROP CONSTRAINT products_pkey CASCADE;
ALTER TABLE orders     DROP CONSTRAINT orders_pkey CASCADE;
ALTER TABLE line_items DROP CONSTRAINT line_items_pkey CASCADE;

-- Recreate primary keys to include would-be distribution column

ALTER TABLE products   ADD PRIMARY KEY (store_id, product_id);
ALTER TABLE orders     ADD PRIMARY KEY (store_id, order_id);
ALTER TABLE line_items ADD PRIMARY KEY (store_id, line_item_id);

-- Recreate foreign keys to include would-be distribution column

ALTER TABLE line_items ADD CONSTRAINT line_items_store_fkey
  FOREIGN KEY (store_id) REFERENCES stores (store_id);
ALTER TABLE line_items ADD CONSTRAINT line_items_product_fkey
  FOREIGN KEY (store_id, product_id) REFERENCES products (store_id, product_id);
ALTER TABLE line_items ADD CONSTRAINT line_items_order_fkey
  FOREIGN KEY (store_id, order_id) REFERENCES orders (store_id, order_id);

COMMIT;

Thus completed, our schema from the previous section will look like this (Underlined items are primary keys, italicized items are foreign keys.):

Figure H.8. Simplified Schema Example


Be sure to modify data flows to add keys to incoming data.

H.5.6.3.3.2. Add Distribution Key to Queries #

Once the distribution key is present on all appropriate tables, the application needs to include it in queries. Take the following steps using a copy of the application running in a development environment, and testing against a citus back-end. After the application is working with the extension we will see how to migrate production data from the source database into a real citus cluster.

  • Application code and any other ingestion processes that write to the tables should be updated to include the new columns.

  • Running the application test suite against the modified schema on citus is a good way to determine which areas of the code need to be modified.

  • It is a good idea to enable database logging. The logs can help uncover stray cross-shard queries in a multi-tenant app that should be converted to per-tenant queries.

Cross-shard queries are supported, but in a multi-tenant application most queries should be targeted to a single node. For simple SELECT, UPDATE, and DELETE queries this means that the WHERE clause should filter by tenant_id. citus can then run these queries efficiently on a single node.

There are helper libraries for a number of popular application frameworks that make it easy to include tenant_id in queries:

It is possible to use the libraries for database writes first (including data ingestion) and later for read queries. The activerecord-multi-tenant gem, for instance, has a write-only mode that modifies only the write queries.

Other (SQL Principles) #

If you are using a different ORM than those above or executing multi-tenant queries more directly in SQL, follow these general principles. We will use our earlier example of the e-commerce application.

Suppose we want to get the details for an order. Distributed queries that filter on the tenant_id run most efficiently in multi-tenant apps, so the change below makes the query faster (while both queries return the same results):

-- Before
SELECT *
  FROM orders
 WHERE order_id = 123;

-- After
SELECT *
  FROM orders
 WHERE order_id = 123
   AND store_id = 42; -- <== added

The tenant_id column is not just beneficial but critical for INSERT statements. Inserts must include a value for the tenant_id column or else citus will be unable to route the data to the correct shard and will raise an error.

Finally, when joining tables make sure to filter by tenant_id too. For instance, here is how to inspect how many awesome wool pants a given store has sold:

-- One way is to include store_id in the join and also
-- filter by it in one of the queries

SELECT sum(l.quantity)
  FROM line_items l
 INNER JOIN products p
    ON l.product_id = p.product_id
   AND l.store_id = p.store_id
 WHERE p.name='Awesome Wool Pants'
   AND l.store_id='8c69aa0d-3f13-4440-86ca-443566c1fc75'

-- Equivalently you omit store_id from the join condition
-- but filter both tables by it. This may be useful if
-- building the query in an ORM

SELECT sum(l.quantity)
  FROM line_items l
 INNER JOIN products p ON l.product_id = p.product_id
 WHERE p.name='Awesome Wool Pants'
   AND l.store_id='8c69aa0d-3f13-4440-86ca-443566c1fc75'
   AND p.store_id='8c69aa0d-3f13-4440-86ca-443566c1fc75'
H.5.6.3.3.3. Enable Secure Connections #

Clients should connect to citus with SSL to protect information and prevent man-in-the-middle attacks.

Check for Cross-Node Traffic #

With large and complex application code-bases, certain queries generated by the application can often be overlooked and thus will not have the tenant_id filter on them. citus parallel executor will still execute these queries successfully, and so, during testing, these queries remain hidden since the application still works fine. However, if a query does not contain the tenant_id filter, citus executor will hit every shard in parallel, but only one will return any data. This consumes resources needlessly and may exhibit itself as a problem only when one moves to a higher-throughput production environment.

To prevent encountering such issues only after launching in production, one can set a config value to log queries, which hit more than one shard. In a properly configured and migrated multi-tenant application, each query should only hit one shard at a time.

During testing, one can configure the following:

-- Adjust for your own database's name of course

ALTER DATABASE citus SET citus.multi_task_query_log_level = 'error';

citus will then error out if it encounters queries that are going to hit more than one shard. Erroring out during testing allows the application developer to find and migrate such queries.

During a production launch, one can configure the same setting to log, instead of error out:

ALTER DATABASE citus SET citus.multi_task_query_log_level = 'log';

Visit the citus.multi_task_query_log_level description to learn more about the supported values.

H.5.6.3.4. Migrate Production Data #

At this time, having updated the database schema and application queries to work with citus, you are ready for the final step. It is time to migrate data to the citus cluster and cut over the application to its new database. The data migration procedure is presented in the Database Migration section.

H.5.6.3.4.1. Database Migration #

For smaller environments that can tolerate a little downtime, use a simple pg_dump/pg_restore process. Here are the steps:

  1. Save the database structure from your development database:

    pg_dump \
       --format=plain \
       --no-owner \
       --schema-only \
       --file=schema.sql \
       --schema=target_schema \
       postgres://user:pass@host:5432/db
    
  2. Connect to the citus cluster using psql and create a schema:

    \i schema.sql
    
  3. Call the create_distributed_table and create_reference_table functions. If you get an error about foreign keys, it is generally due to the order of operations. Drop foreign keys before distributing tables and then re-add them.

  4. Put the application into maintenance mode and disable any other writes to the old database.

  5. Save the data from the original production database to disk with pg_dump:

    pg_dump \
       --format=custom \
       --no-owner \
       --data-only \
       --file=data.dump \
       --schema=target_schema \
       postgres://user:pass@host:5432/db
    
  6. Import into citus using pg_restore:

    # remember to use connection details for citus,
    # not the source database
    pg_restore  \
       --host=host \
       --dbname=dbname \
       --username=username \
       data.dump
    
    # it will prompt you for the connection password
    
  7. Test application.

H.5.6.4. SQL Reference #

H.5.6.4.1. Creating and Modifying Distributed Objects (DDL) #
H.5.6.4.1.1. Creating and Distributing Schemas #

citus supports schema-based sharding, which allows a schema to be distributed. Distributed schemas are automatically associated with individual co-location groups such that the tables created in those schemas will be automatically converted to co-located distributed tables without a shard key.

There are two ways in which a schema can be distributed in citus:

  • Manually by calling the citus_schema_distribute function:

    SELECT citus_schema_distribute('user_service');
    

    This method also allows you to convert existing regular schemas into distributed schemas.

    Note

    You can only distribute schemas that do not contain distributed and reference tables.

  • Alternative approach is to enable the citus.enable_schema_based_sharding configuration parameter:

    SET citus.enable_schema_based_sharding TO ON;
    
    CREATE SCHEMA AUTHORIZATION user_service;
    

    The parameter can be changed for the current session or permanently in postgresql.conf. With the parameter set to ON, all created schemas are be distributed by default.

    The process of distributing the schema will automatically assign and move it to an existing node in the cluster. The background shard rebalancer takes these schemas and all tables within them when rebalancing the cluster, performing the optimal moves, and migrating the schemas between the nodes in the cluster.

To convert a schema back into a regular Postgres Pro schema, use the citus_schema_undistribute function:

SELECT citus_schema_undistribute('user_service');

The tables and data in the user_service schema will be moved from the current node back to the coordinator node in the cluster.

H.5.6.4.1.2. Creating and Distributing Tables #

To create a distributed table, you need to first define the table schema. To do so, you can define a table using the CREATE TABLE command in the same way as you would do with a regular Postgres Pro table.

CREATE TABLE github_events
(
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    actor jsonb,
    org jsonb,
    created_at timestamp
);

Next, you can use the create_distributed_table function to specify the table distribution column and create the worker shards.

SELECT create_distributed_table('github_events', 'repo_id');

This function informs citus that the github_events table should be distributed on the repo_id column (by hashing the column value). The function also creates shards on the worker nodes using the citus.shard_count configuration parameter.

This example would create a total of citus.shard_count number of shards where each shard owns a portion of a hash token space. Once the shards are created, this function saves all distributed metadata on the coordinator.

Each created shard is assigned a unique shard_id. Each shard is represented on the worker node as a regular Postgres Pro table with the tablename_shardid name where tablename is the name of the distributed table and shardid is the unique ID assigned to that shard. You can connect to the worker Postgres Pro instances to view or run commands on individual shards.

You are now ready to insert data into the distributed table and run queries on it. You can also learn more about the function used in this section in the citus Utility Functions section.

Reference Tables #

The above method distributes tables into multiple horizontal shards, but another possibility is distributing tables into a single shard and replicating the shard to every worker node. Tables distributed this way are called reference tables. They are used to store data that needs to be frequently accessed by multiple nodes in a cluster.

Common candidates for reference tables include:

  • Smaller tables that need to join with larger distributed tables.

  • Tables in multi-tenant apps that lack a tenant_id column or which are not associated with a tenant. (In some cases, to reduce migration effort, users might even choose to make reference tables out of tables associated with a tenant but which currently lack a tenant ID.)

  • Tables that need unique constraints across multiple columns and are small enough.

For instance, suppose a multi-tenant eCommerce site needs to calculate sales tax for transactions in any of its stores. Tax information is not specific to any tenant. It makes sense to consolidate it in a shared table. A US-centric reference table might look like this:

-- A reference table

CREATE TABLE states (
  code char(2) PRIMARY KEY,
  full_name text NOT NULL,
  general_sales_tax numeric(4,3)
);

-- Distribute it to all workers

SELECT create_reference_table('states');

Now queries such as one calculating tax for a shopping cart can join on the states table with no network overhead and can add a foreign key to the state code for better validation.

In addition to distributing a table as a single replicated shard, the create_reference_table function marks it as a reference table in the citus metadata tables. citus automatically performs two-phase commits for modifications to tables marked this way, which provides strong consistency guarantees.

If you have an existing distributed table, you can change it to a reference table by running:

SELECT undistribute_table('table_name');
SELECT create_reference_table('table_name');
Distributing Coordinator Data #

If an existing Postgres Pro database is converted into the coordinator node for a citus cluster, the data in its tables can be distributed efficiently and with minimal interruption to an application.

The create_distributed_table function described earlier works on both empty and non-empty tables and for the latter it automatically distributes table rows throughout the cluster. You will know if it does this by the presence of the following message: NOTICE: Copying data from local table.... For example:

CREATE TABLE series AS SELECT i FROM generate_series(1,1000000) i;
SELECT create_distributed_table('series', 'i');
NOTICE:  Copying data from local table...
NOTICE:  copying the data has completed
DETAIL:  The local data in the table is no longer visible, but is still on disk.
HINT:  To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.series$$)
 create_distributed_table
 --------------------------

 (1 row)

Writes on the table are blocked while the data is migrated, and pending writes are handled as distributed queries once the function commits. (If the function fails, then the queries become local again.) Reads can continue as normal and will become distributed queries once the function commits.

When distributing tables A and B, where A has a foreign key to B, distribute the key destination table B first. Doing it in the wrong order will cause an error:

ERROR:  cannot create foreign key constraint
DETAIL:  Referenced table must be a distributed table or a reference table.

If it is not possible to distribute in the correct order, then drop the foreign keys, distribute the tables, and recreate the foreign keys.

After the tables are distributed, use the truncate_local_data_after_distributing_table function to remove local data. Leftover local data in distributed tables is inaccessible to citus queries and can cause irrelevant constraint violations on the coordinator.

H.5.6.4.1.3. Co-Locating Tables #

Co-location is the practice of dividing data tactically, keeping related information on the same computers to enable efficient relational operations, while taking advantage of the horizontal scalability for the whole dataset. For more information and examples, see the Table Co-Location section.

Tables are co-located in groups. To manually control a table's co-location group assignment use the optional colocate_with parameter of the create_distributed_table function. If you do not care about a table's co-location, then omit this parameter. It defaults to the value 'default', which groups the table with any other default co-location table having the same distribution column type and shard count. If you want to break or update this implicit co-location, you can use the update_distributed_table_colocation function.

-- These tables are implicitly co-located by using the same
-- distribution column type and shard count with the default
-- co-location group

SELECT create_distributed_table('A', 'some_int_col');
SELECT create_distributed_table('B', 'other_int_col');

When a new table is not related to others in its would-be implicit co-location group, specify colocated_with => 'none'.

-- Not co-located with other tables

SELECT create_distributed_table('A', 'foo', colocate_with => 'none');

Splitting unrelated tables into their own co-location groups will improve shard rebalancing performance, because shards in the same group have to be moved together.

When tables are indeed related (for instance when they will be joined), it can make sense to explicitly co-locate them. The gains of appropriate co-location are more important than any rebalancing overhead.

To explicitly co-locate multiple tables, distribute one and then put the others into its co-location group. For example:

-- Distribute stores
SELECT create_distributed_table('stores', 'store_id');

-- Add to the same group as stores
SELECT create_distributed_table('orders', 'store_id', colocate_with => 'stores');
SELECT create_distributed_table('products', 'store_id', colocate_with => 'stores');

Information about co-location groups is stored in the pg_dist_colocation table, while the pg_dist_partition table reveals which tables are assigned to which groups.

H.5.6.4.1.4. Dropping Tables #

You can use the standard Postgres Pro DROP TABLE command to remove your distributed tables. As with regular tables, DROP TABLE removes any indexes, rules, triggers, and constraints that exist for the target table. In addition, it also drops the shards on the worker nodes and cleans up their metadata.

DROP TABLE github_events;
H.5.6.4.1.5. Modifying Tables #

citus automatically propagates many kinds of DDL statements, which means that modifying a distributed table on the coordinator node will update shards on the workers too. Other DDL statements require manual propagation, and certain others are prohibited such as those which would modify a distribution column. Attempting to run DDL that is ineligible for automatic propagation will raise an error and leave tables on the coordinator node unchanged.

Here is a reference of the categories of DDL statements, which propagate. Note that automatic propagation can be enabled or disabled with the citus.enable_ddl_propagation configuration parameter.

Adding/Modifying Columns #

citus propagates most ALTER TABLE commands automatically. Adding columns or changing their default values work as they would in a single-machine Postgres Pro database:

-- Adding a column

ALTER TABLE products ADD COLUMN description text;

-- Changing default value

ALTER TABLE products ALTER COLUMN price SET DEFAULT 7.77;

Significant changes to an existing column like renaming it or changing its data type are fine too. However, the data type of the distribution column cannot be altered. This column determines how table data distributes through the citus cluster, and modifying its data type would require moving the data.

Attempting to do so causes an error:

-- Assuming store_id is the distribution column
-- for products and that it has type integer

ALTER TABLE products
ALTER COLUMN store_id TYPE text;

/*
ERROR:  cannot execute ALTER TABLE command involving partition column
*/

As a workaround, you can consider changing the distribution column using the alter_distributed_table function, updating it, and changing it back.

Adding/Removing Constraints #

Using citus allows you to continue to enjoy the safety of a relational database, including database constraints. Due to the nature of distributed systems, citus will not cross-reference uniqueness constraints or referential integrity between worker nodes.

To set up a foreign key between co-located distributed tables, always include the distribution column in the key. This may involve making the key compound.

Foreign keys may be created in these situations:

  • between two local (non-distributed) tables,

  • between two reference tables,

  • between reference tables and local tables (by default enabled via the citus.enable_local_reference_table_foreign_keys configuration parameter),

  • between two co-located distributed tables when the key includes the distribution column, or

  • as a distributed table referencing a reference table.

Foreign keys from reference tables to distributed tables are not supported.

citus supports all referential actions on foreign keys from local to reference tables but does not support ON DELETE/ON UPDATE CASCADE in the reverse direction (reference to local).

Note

Primary keys and uniqueness constraints must include the distribution column. Adding them to a non-distribution column will generate an error.

This example shows how to create primary and foreign keys on distributed tables:

--
-- Adding a primary key
-- --------------------

-- We will distribute these tables on the account_id. The ads and clicks
-- tables must use compound keys that include account_id

ALTER TABLE accounts ADD PRIMARY KEY (id);
ALTER TABLE ads ADD PRIMARY KEY (account_id, id);
ALTER TABLE clicks ADD PRIMARY KEY (account_id, id);

-- Next distribute the tables

SELECT create_distributed_table('accounts', 'id');
SELECT create_distributed_table('ads',      'account_id');
SELECT create_distributed_table('clicks',   'account_id');

--
-- Adding foreign keys
-- -------------------

-- Note that this can happen before or after distribution, as long as
-- there exists a uniqueness constraint on the target column(s), which
-- can only be enforced before distribution

ALTER TABLE ads ADD CONSTRAINT ads_account_fk
  FOREIGN KEY (account_id) REFERENCES accounts (id);
ALTER TABLE clicks ADD CONSTRAINT clicks_ad_fk
  FOREIGN KEY (account_id, ad_id) REFERENCES ads (account_id, id);

Similarly, include the distribution column in uniqueness constraints:

-- Suppose we want every ad to use a unique image. Notice we can
-- enforce it only per account when we distribute by account_id

ALTER TABLE ads ADD CONSTRAINT ads_unique_image
  UNIQUE (account_id, image_url);

Not-null constraints can be applied to any column (distribution or not) because they require no lookups between workers.

ALTER TABLE ads ALTER COLUMN image_url SET NOT NULL;
Using NOT VALID Constraints #

In some situations it can be useful to enforce constraints for new rows, while allowing existing non-conforming rows to remain unchanged. citus supports this feature for the CHECK constraints and foreign keys using the Postgres Pro NOT VALID constraint designation.

For example, consider an application that stores user profiles in a reference table.

-- We are using the "text" column type here, but a real application
-- might use "citext", which is available in the
-- Postgres Pro contrib module

CREATE TABLE users ( email text PRIMARY KEY );
SELECT create_reference_table('users');

In the course of time imagine that a few non-addresses get into the table.

INSERT INTO users VALUES
   ('foo@example.com'), ('hacker12@aol.com'), ('lol');

We would like to validate the addresses, but Postgres Pro does not ordinarily allow us to add the CHECK constraint that fails for existing rows. However, it does allow a constraint marked NOT VALID:

ALTER TABLE users
ADD CONSTRAINT syntactic_email
CHECK (email ~
   '^[a-zA-Z0-9.!#$%&''*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
) NOT VALID;

This succeeds, and new rows are protected.

INSERT INTO users VALUES ('fake');

/*
ERROR:  new row for relation "users_102010" violates
        check constraint "syntactic_email_102010"
DETAIL:  Failing row contains (fake).
*/

Later, during non-peak hours, a database administrator can attempt to fix the bad rows and re-validate the constraint.

-- Later, attempt to validate all rows
ALTER TABLE users
VALIDATE CONSTRAINT syntactic_email;

The Postgres Pro documentation has more information about NOT VALID and VALIDATE CONSTRAINT in ALTER TABLE.

Adding/Removing Indices #

citus supports adding and removing indices:

-- Adding an index

CREATE INDEX clicked_at_idx ON clicks USING BRIN (clicked_at);

-- Removing an index

DROP INDEX clicked_at_idx;

Adding an index takes a write lock, which can be undesirable in a multi-tenant system-of-record. To minimize application downtime, create the index concurrently instead. This method requires more total work than a standard index build and takes significantly longer to complete. However, since it allows normal operations to continue while the index is built, this method is useful for adding new indexes in a production environment.

-- Adding an index without locking table writes

CREATE INDEX CONCURRENTLY clicked_at_idx ON clicks USING BRIN (clicked_at);
H.5.6.4.1.6. Types and Functions #

Creating custom SQL types and user-defined functions propogates to worker nodes. However, creating such database objects in a transaction with distributed operations involves tradeoffs.

citus parallelizes operations such as create_distributed_table across shards using multiple connections per worker. Whereas, when creating a database object, citus propagates it to worker nodes using a single connection per worker. Combining the two operations in a single transaction may cause issues, because the parallel connections will not be able to see the object that was created over a single connection but not yet committed.

Consider a transaction block that creates a type, a table, loads data, and distributes the table:

BEGIN;

-- Type creation over a single connection:
CREATE TYPE coordinates AS (x int, y int);
CREATE TABLE positions (object_id text primary key, position coordinates);

-- Data loading thus goes over a single connection:
SELECT create_distributed_table('positions', 'object_id');
\COPY positions FROM 'positions.csv'

COMMIT;

citus default behaviour prioritizes schema consistency between coordinator and worker nodes. This behavior has a downside: if object propagation happens after a parallel command in the same transaction, then the transaction can no longer be completed, as highlighted by the ERROR in the code block below:

BEGIN;
CREATE TABLE items (key text, value text);
-- Parallel data loading:
SELECT create_distributed_table('items', 'key');
\COPY items FROM 'items.csv'
CREATE TYPE coordinates AS (x int, y int);

ERROR:  cannot run type command because there was a parallel operation on a distributed table in the transaction

If you run into this issue, there is a simple workaround: use the citus.multi_shard_modify_mode parameter set to sequential to disable per-node parallelism. Data load in the same transaction might be slower.

H.5.6.4.1.7. Manual Modification #

Most DDL commands are auto-propagated. For any others, you can propagate the changes manually. See the Manual Query Propagation section.

H.5.6.4.2. Ingesting, Modifying Data (DML) #
H.5.6.4.2.1. Inserting Data #

To insert data into distributed tables, you can use the standard Postgres Pro INSERT command. As an example, we pick two rows randomly from the GitHub Archive dataset.

/*
CREATE TABLE github_events
(
  event_id bigint,
  event_type text,
  event_public boolean,
  repo_id bigint,
  payload jsonb,
  repo jsonb,
  actor jsonb,
  org jsonb,
  created_at timestamp
);
*/

INSERT INTO github_events VALUES (2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13');

INSERT INTO github_events VALUES (2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24');

When inserting rows into distributed tables, the distribution column of the row being inserted must be specified. Based on the distribution column, citus determines the right shard to which the insert should be routed to. Then, the query is forwarded to the right shard, and the remote INSERT command is executed on all the replicas of that shard.

Sometimes it is convenient to put multiple INSERT statements together into a single INSERT of multiple rows. It can also be more efficient than making repeated database queries. For instance, the example from the previous section can be loaded all at once like this:

INSERT INTO github_events VALUES
  (
    2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13'
  ), (
    2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24'
  );
Distributed Rollups #

citus also supports INSERT … SELECT statements, which insert rows based on the results of the SELECT query. This is a convenient way to fill tables and also allows UPSERTS with the ON CONFLICT clause, the easiest way to do distributed rollups.

In citus there are three ways that inserting from the SELECT statement can happen:

  • The first is if the source tables and the destination table are co-located and the SELECT/INSERT statements both include the distribution column. In this case, citus can push the INSERT … SELECT statement down for parallel execution on all nodes.

  • The second way of executing the INSERT … SELECT statement is by repartitioning the results of the result set into chunks, and sending those chunks among workers to matching destination table shards. Each worker node can insert the values into local destination shards.

    The repartitioning optimization can happen when the SELECT query does not require a merge step on the coordinator. It does nor work with the following SQL features, which require a merge step:

    • ORDER BY

    • LIMIT

    • OFFSET

    • GROUP BY when distribution column is not part of the group key

    • Window functions when partitioning by a non-distribution column in the source table(s)

    • Joins between non-colocated tables (i.e. repartition joins)

  • When the source and destination tables are not co-located and the repartition optimization cannot be applied, then citus uses the third way of executing INSERT … SELECT. It selects the results from worker nodes and pulls the data up to the coordinator node. The coordinator redirects rows back down to the appropriate shard. Because all the data must pass through a single node, this method is not as efficient.

When in doubt about which method citus is using, use the EXPLAIN command. When the target table has a very large shard count, it may be wise to disable repartitioning, see the citus.enable_repartitioned_insert_select configuration parameter.

The \copy Command (Bulk Load) #

To bulk load data from a file, you can directly use the \copy command.

First download our example github_events dataset by running:

wget http://examples.citusdata.com/github_archive/github_events-2015-01-01-{0..5}.csv.gz
gzip -d github_events-2015-01-01-*.gz

Then, you can copy the data using psql. Note that this data requires the database to have UTF-8 encoding:

\COPY github_events FROM 'github_events-2015-01-01-0.csv' WITH (format CSV)

Note

There is no notion of snapshot isolation across shards, which means that a multi-shard SELECT that runs concurrently with the \copy command might see it committed on some shards, but not on others. If the user is storing events data, he may occasionally observe small gaps in recent data. It is up to applications to deal with this if it is a problem (e.g. exclude the most recent data from queries or use some lock).

If \copy fails to open a connection for a shard placement, then it behaves in the same way as INSERT, namely to mark the placement(s) as inactive unless there are no more active placements. If any other failure occurs after connecting, the transaction is rolled back and thus no metadata changes are made.

H.5.6.4.3. Caching Aggregations with Rollups #

Applications like event data pipelines and real-time dashboards require sub-second queries on large volumes of data. One way to make these queries fast is by calculating and saving aggregates ahead of time. This is called rolling up the data and it avoids the cost of processing raw data at run-time. As an extra benefit, rolling up timeseries data into hourly or daily statistics can also save space. Old data may be deleted when its full details are no longer needed and aggregates suffice.

For example, here is a distributed table for tracking page views by URL:

CREATE TABLE page_views (
  site_id int,
  url text,
  host_ip inet,
  view_time timestamp default now(),

  PRIMARY KEY (site_id, url)
);

SELECT create_distributed_table('page_views', 'site_id');

Once the table is populated with data, we can run an aggregate query to count page views per URL per day, restricting to a given site and year.

-- How many views per url per day on site 5?
SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE site_id = 5 AND
    view_time >= date '2016-01-01' AND view_time < date '2017-01-01'
  GROUP BY view_time::date, site_id, url;

The setup described above works but has two drawbacks. First, when you repeatedly execute the aggregate query, it must go over each related row and recompute the results for the entire data set. If you are using this query to render a dashboard, it is faster to save the aggregated results in a daily page views table and query that table. Second, storage costs will grow proportionally with data volumes and the length of queryable history. In practice, you may want to keep raw events for a short time period and look at historical graphs over a longer time window.

To receive those benefits, we can create the daily_page_views table to store the daily statistics.

CREATE TABLE daily_page_views (
  site_id int,
  day date,
  url text,
  view_count bigint,
  PRIMARY KEY (site_id, day, url)
);

SELECT create_distributed_table('daily_page_views', 'site_id');

In this example, we distributed both page_views and daily_page_views on the site_id column. This ensures that data corresponding to a particular site will be co-located on the same node. Keeping the rows of the two tables together on each node minimizes network traffic between nodes and enables highly parallel execution.

Once we create this new distributed table, we can then run INSERT INTO ... SELECT to roll up raw page views into the aggregated table. In the following, we aggregate page views each day. citus users often wait for a certain time period after the end of day to run a query like this, to accommodate late arriving data.

-- Roll up yesterday's data
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01' AND view_time < date '2017-01-02'
  GROUP BY view_time::date, site_id, url;

-- Now the results are available right out of the table
SELECT day, site_id, url, view_count
  FROM daily_page_views
  WHERE site_id = 5 AND
    day >= date '2016-01-01' AND day < date '2017-01-01';

The rollup query above aggregates data from the previous day and inserts it into the daily_page_views table. Running the query once each day means that no rollup tables rows need to be updated, because the new day's data does not affect previous rows.

The situation changes when dealing with late arriving data, or running the rollup query more than once per day. If any new rows match days already in the rollup table, the matching counts should increase. Postgres Pro can handle this situation with ON CONFLICT, which is its technique for doing UPSERTS. Here is an example.

-- Roll up from a given date onward,
-- updating daily page views when necessary
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01'
  GROUP BY view_time::date, site_id, url
  ON CONFLICT (day, url, site_id) DO UPDATE SET
    view_count = daily_page_views.view_count + EXCLUDED.view_count;
H.5.6.4.3.1. Updates and Deletion #

You can update or delete rows from your distributed tables using the standard Postgres Pro UPDATE and DELETE commands.

DELETE FROM github_events
WHERE repo_id IN (24509048, 24509049);

UPDATE github_events
SET event_public = TRUE
WHERE (org->>'id')::int = 5430905;

When the UPDATE/DELETE operations affect multiple shards as in the above example, citus defaults to using a one-phase commit protocol. For greater safety you can enable two-phase commits by setting the citus.multi_shard_commit_protocol configuration parameter:

SET citus.multi_shard_commit_protocol = '2pc';

If an UPDATE or DELETE operation affects only a single shard, then it runs within a single worker node. In this case enabling 2PC is unnecessary. This often happens when updates or deletes filter by a table's distribution column:

-- Since github_events is distributed by repo_id,
-- this will execute in a single worker node

DELETE FROM github_events
WHERE repo_id = 206084;

Furthermore, when dealing with a single shard, citus supports SELECT … FOR UPDATE. This is a technique sometimes used by object-relational mappers (ORMs) to safely:

  • Load rows

  • Make a calculation in application code

  • Update the rows based on calculation

Selecting the rows for update puts a write lock on them to prevent other processes from causing the lost update anomaly.

BEGIN;

  -- Select events for a repo, but
  -- lock them for writing
  SELECT *
  FROM github_events
  WHERE repo_id = 206084
  FOR UPDATE;

  -- Calculate a desired value event_public using
  -- application logic that uses those rows

  -- Now make the update
  UPDATE github_events
  SET event_public = :our_new_value
  WHERE repo_id = 206084;

COMMIT;

This feature is supported for hash distributed and reference tables only.

H.5.6.4.3.2. Maximizing Write Performance #

Both INSERT and UPDATE/DELETE statements can be scaled up to around 50,000 queries per second on large machines. However, to achieve this rate, you will need to use many parallel, long-lived connections and consider how to deal with locking.

H.5.6.4.4. Querying Distributed Tables (SQL) #

As discussed in the previous sections, citus extends the latest Postgres Pro for distributed execution. This means that you can use standard Postgres Pro SELECT queries on the citus coordinator. The extension will then parallelize the SELECT queries involving complex selections, groupings and orderings, and JOINs to speed up the query performance. At a high level, citus partitions the SELECT query into smaller query fragments, assigns these query fragments to workers, oversees their execution, merges their results (and orders them if needed), and returns the final result to the user.

In the following sections, we discuss the different types of queries you can run using citus.

H.5.6.4.4.1. Aggregate Functions #

citus supports and parallelizes most aggregate functions supported by Postgres Pro, including custom user-defined aggregates. Aggregates execute using one of three methods, in this order of preference:

  • When the aggregate is grouped by a distribution column of a table, citus can push down execution of the entire query to each worker. All aggregates are supported in this situation and execute in parallel on the worker nodes. (Any custom aggregates being used must be installed on the workers.)

  • When the aggregate is not grouped by a distribution column, citus can still optimize on a case-by-case basis. citus has internal rules for certain aggregates like sum(), avg(), and count(distinct) that allow it to rewrite queries for partial aggregation on workers. For instance, to calculate an average, citus obtains a sum and a count from each worker, and then the coordinator node computes the final average.

    Full list of the special-case aggregates:

    avg, min, max, sum, count, array_agg, jsonb_agg, jsonb_object_agg, json_agg, json_object_agg, bit_and, bit_or, bool_and, bool_or, every, hll_add_agg, hll_union_agg, topn_add_agg, topn_union_agg, any_value, tdigest(double precision, int), tdigest_percentile(double precision, int, double precision), tdigest_percentile(double precision, int, double precision[]), tdigest_percentile(tdigest, double precision), tdigest_percentile(tdigest, double precision[]), tdigest_percentile_of(double precision, int, double precision), tdigest_percentile_of(double precision, int, double precision[]), tdigest_percentile_of(tdigest, double precision), tdigest_percentile_of(tdigest, double precision[])

  • Last resort: pull all rows from the workers and perform the aggregation on the coordinator node. When the aggregate is not grouped on a distribution column, and is not one of the predefined special cases, then citus falls back to this approach. It causes network overhead and can exhaust the coordinator resources if the data set to be aggregated is too large. (It is possible to disable this fallback, see below.)

Beware that small changes in a query can change execution modes causing potentially surprising inefficiency. For example, sum(x) grouped by a non-distribution column could use distributed execution, while sum(distinct x) has to pull up the entire set of input records to the coordinator.

All it takes is one column to hurt the execution of a whole query. In the example below, if sum(distinct value2) has to be grouped on the coordinator, then so will sum(value1) even if the latter was fine on its own.

SELECT sum(value1), sum(distinct value2) FROM distributed_table;

To avoid accidentally pulling data to the coordinator, you can set the citus.coordinator_aggregation_strategy parameter:

SET citus.coordinator_aggregation_strategy TO 'disabled';

Note that disabling the coordinator aggregation strategy will prevent type three aggregate queries from working at all.

The count(distinct) Aggregates #

citus supports count(distinct) aggregates in several ways. If the count(distinct) aggregate is on the distribution column, citus can directly push down the query to the workers. If not, citus runs SELECT distinct statements on each worker and returns the list to the coordinator where it obtains the final count.

Note that transferring this data becomes slower when workers have a greater number of distinct items. This is especially true for queries containing multiple count(distinct) aggregates, e.g.:

-- Multiple distinct counts in one query tend to be slow
SELECT count(distinct a), count(distinct b), count(distinct c)
FROM table_abc;

For these kind of queries, the resulting SELECT distinct statements on the workers essentially produce a cross-product of rows to be transferred to the coordinator.

For increased performance you can choose to make an approximate count instead. Follow the steps below:

  1. Download and install the hll extension on all Postgres Pro instances (the coordinator and all the workers).

    You can visit the hll GitHub repository for specifics on obtaining the extension.

  2. Create the hll extension on all the Postgres Pro instances by simply running the below command from the coordinator:

    CREATE EXTENSION hll;
    

  3. Enable count(distinct) approximations by setting the citus.count_distinct_error_rate configuration parameter. Lower values for this configuration setting are expected to give more accurate results but take more time for computation. We recommend setting this to 0.005.

    SET citus.count_distinct_error_rate TO 0.005;
    

    After this step, count(distinct) aggregates automatically switch to using hll with no changes necessary to your queries. You should be able to run approximate count(distinct) queries on any column of the table.

HyperLogLog Column.  Certain users already store their data as hll columns. In such cases, they can dynamically roll up those data by calling the hll_union_agg(hll_column) function.

Estimating Top N Items #

Calculating the first n elements in a set by applying count, sort, and limit is simple. However, as data sizes increase, this method becomes slow and resource intensive. It is more efficient to use an approximation.

The open source topn extension for Postgres Pro enables fast approximate results to top-n queries. The extension materializes the top values into a json data type. The topn extension can incrementally update these top values or merge them on-demand across different time intervals.

Before seeing a realistic example of topn, let's see how some of its primitive operations work. First topn_add updates a JSON object with counts of how many times a key has been seen:

-- Starting from nothing, record that we saw an "a"
SELECT topn_add('{}', 'a');
-- => {"a": 1}

-- Record the sighting of another "a"
SELECT topn_add(topn_add('{}', 'a'), 'a');
-- => {"a": 2}

The extension also provides aggregations to scan multiple values:

-- For normal_rand
CREATE EXTENSION tablefunc;

-- Count values from a normal distribution
SELECT topn_add_agg(floor(abs(i))::text)
  FROM normal_rand(1000, 5, 0.7) i;
-- => {"2": 1, "3": 74, "4": 420, "5": 425, "6": 77, "7": 3}

If the number of distinct values crosses a threshold, the aggregation drops information for those seen least frequently. This keeps space usage under control. The threshold can be controlled by the topn.number_of_counters configuration parameter. Its default value is 1000.

Now onto a more realistic example of how topn works in practice. Let's ingest Amazon product reviews from the year 2000 and use topn to query it quickly. First download the dataset:

curl -L https://examples.citusdata.com/customer_reviews_2000.csv.gz | \
  gunzip > reviews.csv

Next, ingest it into a distributed table:

CREATE TABLE customer_reviews
(
    customer_id TEXT,
    review_date DATE,
    review_rating INTEGER,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10),
    product_title TEXT,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids CHAR(10)[]
);

SELECT create_distributed_table('customer_reviews', 'product_id');

\COPY customer_reviews FROM 'reviews.csv' WITH CSV

Next we will add the extension, create a destination table to store the JSON data generated by topn, and apply the topn_add_agg function we saw previously.

-- Run below command from coordinator, it will be propagated to the worker nodes as well
CREATE EXTENSION topn;

-- A table to materialize the daily aggregate
CREATE TABLE reviews_by_day
(
  review_date date unique,
  agg_data jsonb
);

SELECT create_reference_table('reviews_by_day');

-- Materialize how many reviews each product got per day per customer
INSERT INTO reviews_by_day
  SELECT review_date, topn_add_agg(product_id)
  FROM customer_reviews
  GROUP BY review_date;

Now, rather than writing a complex window function on customer_reviews, we can simply apply topn to reviews_by_day. For instance, the following query finds the most frequently reviewed product for each of the first five days:

SELECT review_date, (topn(agg_data, 1)).*
FROM reviews_by_day
ORDER BY review_date
LIMIT 5;
┌─────────────┬────────────┬───────────┐
│ review_date │    item    │ frequency │
├─────────────┼────────────┼───────────┤
│ 2000-01-01  │ 0939173344 │        12 │
│ 2000-01-02  │ B000050XY8 │        11 │
│ 2000-01-03  │ 0375404368 │        12 │
│ 2000-01-04  │ 0375408738 │        14 │
│ 2000-01-05  │ B00000J7J4 │        17 │
└─────────────┴────────────┴───────────┘

The JSON fields created by topn can be merged with topn_union and topn_union_agg. We can use the latter to merge the data for the entire first month and list the five most reviewed products during that period.

SELECT (topn(topn_union_agg(agg_data), 5)).*
FROM reviews_by_day
WHERE review_date >= '2000-01-01' AND review_date < '2000-02-01'
ORDER BY 2 DESC;
┌────────────┬───────────┐
│    item    │ frequency │
├────────────┼───────────┤
│ 0375404368 │       217 │
│ 0345417623 │       217 │
│ 0375404376 │       217 │
│ 0375408738 │       217 │
│ 043936213X │       204 │
└────────────┴───────────┘

For more details and examples, see the topn readme file.

Percentile Calculations #

Finding an exact percentile over a large number of rows can be prohibitively expensive, because all rows must be transferred to the coordinator for final sorting and processing. Finding an approximation, on the other hand, can be done in parallel on worker nodes using a so-called sketch algorithm. The coordinator node then combines compressed summaries into the final result rather than reading through the full rows.

A popular sketch algorithm for percentiles uses a compressed data structure called t-digest, and is available for Postgres Pro in the tdigest extension. citus has integrated support for this extension.

Here is how to use tdigest in citus:

  1. Download and install the tdigest extension on all Postgres Pro nodes (the coordinator and all the workers). The tdigest extension GitHub repository has installation instructions.

  2. Create the tdigest extension within the database. Run the following command on the coordinator:

    CREATE EXTENSION tdigest;
    

    The coordinator will propagate the command to the workers as well.

When any of the aggregates defined in the extension are used in queries, citus will rewrite the queries to push down partial tdigest computation to the workers where applicable.

tdigest accuracy can be controlled with the compression argument passed into aggregates. The trade-off is accuracy vs the amount of data shared between workers and the coordinator. For a full explanation of how to use the aggregates in tdigest, have a look at the documentation of the extension.

H.5.6.4.4.2. Limit Pushdown #

citus also pushes down the limit clauses to the shards on the workers wherever possible to minimize the amount of data transferred across network.

However, in some cases, SELECT queries with LIMIT clauses may need to fetch all rows from each shard to generate exact results. For example, if the query requires ordering by the aggregate column, it would need results of that column from all shards to determine the final aggregate value. This reduces performance of the LIMIT clause due to high volume of network data transfer. In such cases, and where an approximation would produce meaningful results, citus provides an option for network efficient approximate LIMIT clauses.

LIMIT approximations are disabled by default and can be enabled by setting the citus.limit_clause_row_fetch_count configuration parameter. On the basis of this configuration value, citus will limit the number of rows returned by each task for aggregation on the coordinator. Due to this limit, the final results may be approximate. Increasing this limit will increase the accuracy of the final results, while still providing an upper bound on the number of rows pulled from the workers.

SET citus.limit_clause_row_fetch_count TO 10000;
H.5.6.4.4.3. Views on Distributed Tables #

citus supports all views on distributed tables. To learn more about syntax and features of views, see CREATE VIEW.

Note that some views cause a less efficient query plan than others.

citus supports materialized views as well and stores them as local tables on the coordinator node.

H.5.6.4.4.4. Joins #

citus supports equi-joins between any number of tables irrespective of their size and distribution method. The query planner chooses the optimal join method and join order based on how tables are distributed. It evaluates several possible join orders and creates a join plan which requires minimum data to be transferred across network.

Co-Located Joins #

When two tables are co-located then they can be joined efficiently on their common distribution columns. A co-located join is the most efficient way to join two large distributed tables.

Internally, the citus coordinator knows which shards of the co-located tables might match with shards of the other table by looking at the distribution column metadata. This allows citus to prune away shard pairs, which cannot produce matching join keys. The joins between remaining shard pairs are executed in parallel on the workers and then the results are returned to the coordinator.

Note

Be sure that the tables are distributed into the same number of shards and that the distribution columns of each table have exactly matching types. Attempting to join on columns of slightly different types such as int and bigint can cause problems.

Reference Table Joins #

Reference tables can be used as dimension tables to join efficiently with large fact tables. Because reference tables are replicated in full across all worker nodes, a reference join can be decomposed into local joins on each worker and performed in parallel. A reference join is like a more flexible version of a co-located join because reference tables are not distributed on any particular column and are free to join on any of their columns.

Reference tables can also join with tables local to the coordinator node.

Repartition Joins #

In some cases, you may need to join two tables on columns other than the distribution column. For such cases, citus also allows joining on non-distribution key columns by dynamically repartitioning the tables for the query.

In such cases the table(s) to be partitioned are determined by the query optimizer on the basis of the distribution columns, join keys and sizes of the tables. With repartitioned tables, it can be ensured that only relevant shard pairs are joined with each other reducing the amount of data transferred across network drastically.

In general, co-located joins are more efficient than repartition joins as repartition joins require shuffling of data. So, you should try to distribute your tables by the common join keys whenever possible.

H.5.6.4.5. Query Processing #

A citus cluster consists of a coordinator instance and multiple worker instances. The data is sharded on the workers while the coordinator stores metadata about these shards. All queries issued to the cluster are executed via the coordinator. The coordinator partitions the query into smaller query fragments where each query fragment can be run independently on a shard. The coordinator then assigns the query fragments to workers, oversees their execution, merges their results, and returns the final result to the user. The query processing architecture can be described in brief by the diagram below.

Figure H.9. Query Processing Architecture


citus query processing pipeline involves the two components:

  • Distributed query planner and executor

  • Postgres Pro planner and executor

We discuss them in greater detail in the subsequent sections.

H.5.6.4.5.1. Distributed Query Planner #

citus distributed query planner takes in a SQL query and plans it for distributed execution.

For SELECT queries, the planner first creates a plan tree of the input query and transforms it into its commutative and associative form so it can be parallelized. It also applies several optimizations to ensure that the queries are executed in a scalable manner, and that network I/O is minimized.

Next, the planner breaks the query into two parts: the coordinator query, which runs on the coordinator, and the worker query fragments, which run on individual shards on the workers. The planner then assigns these query fragments to the workers such that all their resources are used efficiently. After this step, the distributed query plan is passed on to the distributed executor for execution.

The planning process for key-value lookups on the distribution column or modification queries is slightly different as they hit exactly one shard. Once the planner receives an incoming query, it needs to decide the correct shard to which the query should be routed. To do this, it extracts the distribution column in the incoming row and looks up the metadata to determine the right shard for the query. Then, the planner rewrites the SQL of that command to reference the shard table instead of the original table. This re-written plan is then passed to the distributed executor.

H.5.6.4.5.2. Distributed Query Executor #

citus distributed executor runs distributed query plans and handles failures. The executor is well suited for getting fast responses to queries involving filters, aggregations, and co-located joins, as well as running single-tenant queries with full SQL coverage. It opens one connection per shard to the workers as needed and sends all fragment queries to them. It then fetches the results from each fragment query, merges them, and gives the final results back to the user.

Subquery/CTE Push-Pull Execution #

If necessary citus can gather results from subqueries and CTEs into the coordinator node and then push them back across workers for use by an outer query. This allows citus to support a greater variety of SQL constructs.

For example, having subqueries in the WHERE clause sometimes cannot execute inline at the same time as the main query, but must be done separately. Suppose a web analytics application maintains a page_views table partitioned by page_id. To query the number of visitor hosts on the top twenty most visited pages, we can use a subquery to find the list of pages, then an outer query to count the hosts.

SELECT page_id, count(distinct host_ip)
FROM page_views
WHERE page_id IN (
  SELECT page_id
  FROM page_views
  GROUP BY page_id
  ORDER BY count(*) DESC
  LIMIT 20
)
GROUP BY page_id;

The executor would like to run a fragment of this query against each shard by page_id, counting distinct host_ips, and combining the results on the coordinator. However, the LIMIT in the subquery means the subquery cannot be executed as part of the fragment. By recursively planning the query citus can run the subquery separately, push the results to all workers, run the main fragment query, and pull the results back to the coordinator. The push-pull design supports subqueries like the one above.

Let's see this in action by reviewing the EXPLAIN output for this query. It is fairly involved:

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)
  Group Key: remote_scan.page_id
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)
    Sort Key: remote_scan.page_id
    ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
      ->  Distributed Subplan 6_1
        ->  Limit  (cost=0.00..0.00 rows=0 width=0)
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)
              Group Key: remote_scan.page_id
              ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
                Task Count: 32
                Tasks Shown: One of 32
                ->  Task
                  Node: host=localhost port=9701 dbname=postgres
                  ->  HashAggregate  (cost=54.70..56.70 rows=200 width=12)
                    Group Key: page_id
                    ->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=4)
      Task Count: 32
      Tasks Shown: One of 32
      ->  Task
        Node: host=localhost port=9701 dbname=postgres
        ->  HashAggregate  (cost=84.50..86.75 rows=225 width=36)
          Group Key: page_views.page_id, page_views.host_ip
          ->  Hash Join  (cost=17.00..78.88 rows=1124 width=36)
            Hash Cond: (page_views.page_id = intermediate_result.page_id)
            ->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=36)
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)
                Group Key: intermediate_result.page_id
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)

Let's break it apart and examine each piece.

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)
  Group Key: remote_scan.page_id
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)
    Sort Key: remote_scan.page_id

The root of the tree is what the coordinator node does with the results from the workers. In this case, it is grouping them, and GroupAggregate requires they be sorted first.

->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
  ->  Distributed Subplan 6_1
.

The custom scan has two large sub-trees, starting with a distributed subplan.

->  Limit  (cost=0.00..0.00 rows=0 width=0)
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)
    Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
    ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)
      Group Key: remote_scan.page_id
      ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
        Task Count: 32
        Tasks Shown: One of 32
        ->  Task
          Node: host=localhost port=9701 dbname=postgres
          ->  HashAggregate  (cost=54.70..56.70 rows=200 width=12)
            Group Key: page_id
            ->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=4)
.

Worker nodes run the above for each of the thirty-two shards (citus is choosing one representative for display). We can recognize all the pieces of the IN (…) subquery: the sorting, grouping and limiting. When all workers have completed this query, they send their output back to the coordinator which puts it together as intermediate results.

Task Count: 32
Tasks Shown: One of 32
->  Task
  Node: host=localhost port=9701 dbname=postgres
  ->  HashAggregate  (cost=84.50..86.75 rows=225 width=36)
    Group Key: page_views.page_id, page_views.host_ip
    ->  Hash Join  (cost=17.00..78.88 rows=1124 width=36)
      Hash Cond: (page_views.page_id = intermediate_result.page_id)
.

The citus extension starts another executor job in this second subtree. It is going to count distinct hosts in page_views. It uses a JOIN to connect with the intermediate results. The intermediate results will help it restrict to the top twenty pages.

->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=36)
->  Hash  (cost=14.50..14.50 rows=200 width=4)
  ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)
    Group Key: intermediate_result.page_id
    ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)
.

The worker internally retrieves intermediate results using the read_intermediate_result function, which loads data from a file that was copied in from the coordinator node.

This example showed how citus executed the query in multiple steps with a distributed subplan and how you can use EXPLAIN to learn about distributed query execution.

H.5.6.4.5.3. Postgres Pro Planner and Executor #

Once the distributed executor sends the query fragments to the workers, they are processed like regular Postgres Pro queries. The Postgres Pro planner on that worker chooses the most optimal plan for executing that query locally on the corresponding shard table. The Postgres Pro executor then runs that query and returns the query results back to the distributed executor. Learn more about the Postgres Pro planner and executor. Finally, the distributed executor passes the results to the coordinator for final aggregation.

H.5.6.4.6. Manual Query Propagation #

When the user issues a query, the citus coordinator partitions it into smaller query fragments where each query fragment can be run independently on a worker shard. This allows citus to distribute each query across the cluster.

However, the way queries are partitioned into fragments (and which queries are propagated at all) varies by the type of query. In some advanced situations it is useful to manually control this behavior. citus provides utility functions to propagate SQL to workers, shards, or co-located placements.

Manual query propagation bypasses coordinator logic, locking, and any other consistency checks. These functions are available as a last resort to allow statements which citus otherwise does not run natively. Use them carefully to avoid data inconsistency and deadlocks.

H.5.6.4.6.1. Running on All Workers #

The least granular level of execution is broadcasting a statement for execution on all workers. This is useful for viewing properties of entire worker databases.

-- List the work_mem setting of each worker database
SELECT run_command_on_workers($cmd$ SHOW work_mem; $cmd$);

To run on all nodes, both workers and the coordinator, use the run_command_on_all_nodes function.

Note

This command should not be used to create database objects on the workers, as doing so will make it harder to add worker nodes in an automated fashion.

Note

The run_command_on_workers function and other manual propagation commands in this section can run only queries that return a single column and single row.

H.5.6.4.6.2. Running on All Shards #

The next level of granularity is running a command across all shards of a particular distributed table. It can be useful, for instance, in reading the properties of a table directly on workers. Queries run locally on a worker node have full access to metadata such as table statistics.

The run_command_on_shards function applies an SQL command to each shard, where the shard name is provided for interpolation in the command. Here is an example of estimating the row count for a distributed table by using the pg_class table on each worker to estimate the number of rows for each shard. Notice the %s, which will be replaced with each shard name.

-- Get the estimated row count for a distributed table by summing the
-- estimated counts of rows for each shard
SELECT sum(result::bigint) AS estimated_count
  FROM run_command_on_shards(
    'my_distributed_table',
    $cmd$
      SELECT reltuples
        FROM pg_class c
        JOIN pg_catalog.pg_namespace n on n.oid=c.relnamespace
       WHERE (n.nspname || '.' || relname)::regclass = '%s'::regclass
         AND n.nspname NOT IN ('citus', 'pg_toast', 'pg_catalog')
    $cmd$
  );

A useful companion to run_command_on_shards is the run_command_on_colocated_placements function. It interpolates the names of two placements of co-located distributed tables into a query. The placement pairs are always chosen to be local to the same worker where full SQL coverage is available. Thus we can use advanced SQL features like triggers to relate the tables:

-- Suppose we have two distributed tables
CREATE TABLE little_vals (key int, val int);
CREATE TABLE big_vals    (key int, val int);
SELECT create_distributed_table('little_vals', 'key');
SELECT create_distributed_table('big_vals',    'key');

-- We want to synchronize them so that every time little_vals
-- are created, big_vals appear with double the value
--
-- First we make a trigger function, which will
-- take the destination table placement as an argument
CREATE OR REPLACE FUNCTION embiggen() RETURNS TRIGGER AS $$
  BEGIN
    IF (TG_OP = 'INSERT') THEN
      EXECUTE format(
        'INSERT INTO %s (key, val) SELECT ($1).key, ($1).val*2;',
        TG_ARGV[0]
      ) USING NEW;
    END IF;
    RETURN NULL;
  END;
$$ LANGUAGE plpgsql;

-- Next we relate the co-located tables by the trigger function
-- on each co-located placement
SELECT run_command_on_colocated_placements(
  'little_vals',
  'big_vals',
  $cmd$
    CREATE TRIGGER after_insert AFTER INSERT ON %s
      FOR EACH ROW EXECUTE PROCEDURE embiggen(%L)
  $cmd$
);
H.5.6.4.6.3. Limitations #
  • There are no safeguards against deadlock for multi-statement transactions.

  • There are no safeguards against mid-query failures and resulting inconsistencies.

  • Query results are cached in memory; these functions cannot deal with very big result sets.

  • The functions error out early if they cannot connect to a node.

H.5.6.4.7. SQL Support and Workarounds #

As citus provides distributed functionality by extending Postgres Pro, it is compatible with Postgres Pro constructs. This means that users can use the tools and features that come with the rich and extensible Postgres Pro ecosystem for distributed tables created with citus.

citus has 100% SQL coverage for any queries it is able to execute on a single worker node. These kind of queries are common in multi-tenant applications when accessing information about a single tenant.

Even cross-node queries (used for parallel computations) support most SQL features. However, some SQL features are not supported for queries, which combine information from multiple nodes.

H.5.6.4.7.1. Limitations #
General #

These limitations apply to all models of operation:

  • The rule system is not supported.

  • Subqueries within INSERT queries are not supported.

  • Distributing multi-level partitioned tables is not supported.

  • Functions used in UPDATE queries on distributed tables must not be VOLATILE.

  • STABLE functions used in UPDATE queries cannot be called with column references.

  • Modifying views when the query contains citus tables is not supported.

citus encodes the node identifier in the sequence generated on every node, this allows every individual node to take inserts directly without having the sequence overlap. This method however does not work for sequences that are smaller than bigint, which may result in inserts on worker nodes failing, in that case you need to drop the column and add a bigint based one, or route the inserts via the coordinator.

Cross-Node SQL Queries #
  • SELECT … FOR UPDATE work in single-shard queries only.

  • TABLESAMPLE work in single-shard queries only.

  • Correlated subqueries are supported only when the correlation is on the distribution column.

  • Outer joins between distributed tables are only supported on the distribution column.

  • Recursive CTEs work in single-shard queries only.

  • Grouping sets work in single-shard queries only.

  • Only regular, foreign or partitioned tables can be distributed.

  • The SQL MERGE command is supported in the following combinations of table types:

    Target Source Support Comments

    Local

    Local

    Yes

    Local

    Reference

    Yes

    Local

    Distributed

    No

    Feature in development

    Distributed

    Local

    Yes

    Distributed

    Distributed

    Yes

    Including non co-located tables

    Distributed

    Reference

    Yes

    Reference

    N/A

    No

    Reference table as target is not allowed

For a detailed reference of the Postgres Pro SQL command dialect (which can be used as is by citus users), you can see SQL Commands.

Schema-Based Sharding SQL Compatibility #

When using schema-based sharding the following features are not available:

  • Foreign keys across distributed schemas are not supported.

  • Joins across distributed schemas are subject to cross-node SQL queries limitations.

  • Creating a distributed schema and tables in a single SQL statement is not supported.

H.5.6.4.7.2. Workarounds #

Before attempting workarounds consider whether citus is appropriate for your situation. The citus extension works well for real-time analytics and multi-tenant use cases.

citus supports all SQL statements in the multi-tenant use case. Even in the real-time analytics use cases, with queries that span across nodes, citus supports the majority of statements. Many of the unsupported features have workarounds; below are a number of the most useful.

Work Around Limitations Using CTEs #

When a SQL query is unsupported, one way to work around it is using CTEs, which use what we call pull-push execution.

SELECT * FROM dist WHERE EXISTS (SELECT 1 FROM local WHERE local.a = dist.a);
/*
ERROR:  direct joins between distributed and local tables are not supported
HINT:  Use CTEs or subqueries to select from local tables and use them in joins
*/

To work around this limitation, you can turn the query into a router query by wrapping the distributed part in a CTE.

WITH cte AS (SELECT * FROM dist)
SELECT * FROM cte WHERE EXISTS (SELECT 1 FROM local WHERE local.a = cte.a);

Remember that the coordinator will send the results of the CTE to all workers which require it for processing. Thus it is best to either add the most specific filters and limits to the inner query as possible, or else aggregate the table. That reduces the network overhead which such a query can cause.

Temp Tables: the Workaround of Last Resort #

There are still a few queries that are unsupported even with the use of push-pull execution via subqueries. One of them is using grouping sets on a distributed table.

In our real-time analytics tutorial we created a table called github_events, distributed by the column user_id. Let's query it and find the earliest events for a preselected set of repos, grouped by combinations of event type and event publicity. A convenient way to do this is with grouping sets. However, as mentioned, this feature is not yet supported in distributed queries:

-- This will not work

  SELECT repo_id, event_type, event_public,
         grouping(event_type, event_public),
         min(created_at)
    FROM github_events
   WHERE repo_id IN (8514, 15435, 19438, 21692)
GROUP BY repo_id, ROLLUP(event_type, event_public);
ERROR:  could not run distributed query with GROUPING
HINT:  Consider using an equality filter on the distributed table's partition column.

There is a trick, though. We can pull the relevant information to the coordinator as a temporary table:

-- Grab the data, minus the aggregate, into a local table

CREATE TEMP TABLE results AS (
  SELECT repo_id, event_type, event_public, created_at
    FROM github_events
       WHERE repo_id IN (8514, 15435, 19438, 21692)
    );

-- Now run the aggregate locally

  SELECT repo_id, event_type, event_public,
         grouping(event_type, event_public),
         min(created_at)
    FROM results
GROUP BY repo_id, ROLLUP(event_type, event_public);
 repo_id |    event_type     | event_public | grouping |         min
---------+-------------------+--------------+----------+---------------------
    8514 | PullRequestEvent  | t            |        0 | 2016-12-01 05:32:54
    8514 | IssueCommentEvent | t            |        0 | 2016-12-01 05:32:57
   19438 | IssueCommentEvent | t            |        0 | 2016-12-01 05:48:56
   21692 | WatchEvent        | t            |        0 | 2016-12-01 06:01:23
   15435 | WatchEvent        | t            |        0 | 2016-12-01 05:40:24
   21692 | WatchEvent        |              |        1 | 2016-12-01 06:01:23
   15435 | WatchEvent        |              |        1 | 2016-12-01 05:40:24
    8514 | PullRequestEvent  |              |        1 | 2016-12-01 05:32:54
    8514 | IssueCommentEvent |              |        1 | 2016-12-01 05:32:57
   19438 | IssueCommentEvent |              |        1 | 2016-12-01 05:48:56
   15435 |                   |              |        3 | 2016-12-01 05:40:24
   21692 |                   |              |        3 | 2016-12-01 06:01:23
   19438 |                   |              |        3 | 2016-12-01 05:48:56
    8514 |                   |              |        3 | 2016-12-01 05:32:54

Creating a temporary table on the coordinator is a last resort. It is limited by the disk size and CPU of the node.

Subqueries Within INSERT Queries #

Try rewriting your queries with INSERT INTO ... SELECT syntax.

The following SQL:

INSERT INTO a.widgets (map_id, widget_name)
VALUES (
    (SELECT mt.map_id FROM a.map_tags mt WHERE mt.map_license = '12345'),
    'Test'
);

Would become:

INSERT INTO a.widgets (map_id, widget_name)
SELECT mt.map_id, 'Test'
  FROM a.map_tags mt
 WHERE mt.map_license = '12345';

H.5.6.5. citus API #

H.5.6.5.1. citus Utility Functions #

This section contains reference information for the user defined functions provided by citus. These functions help in providing additional distributed functionality to citus other than the standard SQL commands.

H.5.6.5.1.1. Table and Shard DDL #
citus_schema_distribute (schemaname regnamespace) returns void #

Converts existing regular schemas into distributed schemas, which are automatically associated with individual co-location groups such that the tables created in those schemas will be automatically converted to co-located distributed tables without a shard key. The process of distributing the schema will automatically assign and move it to an existing node in the cluster.

Arguments:

  • schemaname — the name of the schema, which needs to be distributed.

The example below shows how to distribute three schemas named tenant_a, tenant_b, and tenant_c. For more examples, see Section H.5.4.3:

SELECT citus_schema_distribute('tenant_a');
SELECT citus_schema_distribute('tenant_b');
SELECT citus_schema_distribute('tenant_c');
citus_schema_undistribute (schemaname regnamespace) returns void #

Converts an existing distributed schema back into a regular schema. The process results in the tables and data being moved from the current node back to the coordinator node in the cluster.

Arguments:

  • schemaname — the name of the schema, which needs to be distributed.

The example below shows how to convert three different distributed schemas back into regular schemas. For more examples, see Section H.5.4.3:

SELECT citus_schema_undistribute('tenant_a');
SELECT citus_schema_undistribute('tenant_b');
SELECT citus_schema_undistribute('tenant_c');
create_distributed_table (table_name regclass, distribution_column text, distribution_type citus.distribution_type, colocate_with text, shard_count int) returns void #

Defines a distributed table and create its shards if it is a hash-distributed table. This function takes in a table name, the distribution column, and an optional distribution method and inserts appropriate metadata to mark the table as distributed. The function defaults to hash distribution if no distribution method is specified. If the table is hash-distributed, the function also creates worker shards based on the shard count configuration value. If the table contains any rows, they are automatically distributed to worker nodes.

Arguments:

  • table_name — the name of the table, which needs to be distributed.

  • distribution_column — the column on which the table is to be distributed.

  • distribution_type — an optional distribution method. The default value is hash.

  • colocate_with — include current table in the co-location group of another table. This is an optional argument. By default tables are co-located when they are distributed by columns of the same type with the same shard count. If you want to break this co-location later, you can use the update_distributed_table_colocation function. Possible values for this argument are default, which is the default value, none to start a new co-location group, or the name of another table to co-locate with the table. To learn more, see the Co-Locating Tables section.

    Keep in mind that the default value of the colocate_with argument does implicit co-location. As explained in the Table Co-Location section, this can be a great thing when tables are related or will be joined. However, when two tables are unrelated but happen to use the same datatype for their distribution columns, accidentally co-locating them can decrease performance during shard rebalancing. The table shards will be moved together unnecessarily in a cascade. If you want to break this implicit co-location, you can use the update_distributed_table_colocation function.

    If a new distributed table is not related to other tables, it is best to specify colocate_with => 'none'.

  • shard_count — the number of shards to create for the new distributed table. This is an optional argument. When specifying shard_count you cannot specify a value of colocate_with other than none. To change the shard count of an existing table or co-location group, use the alter_distributed_table function.

    Allowed values for the shard_count argument are between 1 and 64000.

This example informs the database that the github_events table should be distributed by hash on the repo_id column. For more examples, see the Creating and Modifying Distributed Objects (DDL) section:

SELECT create_distributed_table('github_events', 'repo_id');

-- Alternatively, to be more explicit:
SELECT create_distributed_table('github_events', 'repo_id',
                                colocate_with => 'github_repo');
truncate_local_data_after_distributing_table (function_name regclass) returns void #

Truncates all local rows after distributing a table and prevent constraints from failing due to outdated local records. The truncation cascades to tables having a foreign key to the designated table. If the referring tables are not themselves distributed, then truncation is forbidden until they are to protect referential integrity:

ERROR:  cannot truncate a table referenced in a foreign key constraint by a local table

Truncating local coordinator node table data is safe for distributed tables because their rows, if they have any, are copied to worker nodes during distribution.

Arguments:

  • table_name — the name of the distributed table whose local counterpart on the coordinator node should be truncated.

The example below shows how to use the function:

-- Requires that argument is a distributed table
SELECT truncate_local_data_after_distributing_table('public.github_events');
undistribute_table (table_name regclass, cascade_via_foreign_keys boolean) returns void #

Undoes the action of the create_distributed_table or create_reference_table functions. Undistributing moves all data from shards back into a local table on the coordinator node (assuming the data can fit), then deletes the shards.

citus will not undistribute tables that have, or are referenced by, foreign keys, unless the cascade_via_foreign_keys argument is set to true. If this argument is false (or omitted), then you must manually drop the offending foreign key constraints before undistributing.

Arguments:

  • table_name — the name of the distributed or reference table to undistribute.

  • cascade_via_foreign_keys — when this optional argument is set to true, the function also undistributes all tables that are related to table_name through foreign keys. Use caution with this argument because it can potentially affect many tables. The default value is false.

The example below shows how to distribute the github_events table and then undistribute it:

-- First distribute the table
SELECT create_distributed_table('github_events', 'repo_id');

-- Undo that and make it local again
SELECT undistribute_table('github_events');
alter_distributed_table (table_name regclass, distribution_column text, shard_count int, colocate_with text, cascade_to_colocated boolean) returns void #

Changes the distribution column, shard count or co-location properties of a distributed table.

Arguments:

  • table_name — the name of the distributed table, which will be altered.

  • distribution_column — the name of the new distribution column. The default value of this optional argument is NULL.

  • shard_count — the new shard count. The default value of this optional argument is NULL.

  • colocate_with — the table that the current distributed table will be co-located with. Possible values are default, none to start a new co-location group, or the name of another table with which to co-locate. The default value of this optional argument is default.

  • cascade_to_colocated. When this argument is set to true, shard_count and colocate_with changes will also be applied to all of the tables that were previously co-located with the table, and the co-location will be preserved. If it is false, the current co-location of this table will be broken. The default value of this optional argument is false.

The example below shows how to use the function:

-- Change distribution column
SELECT alter_distributed_table('github_events', distribution_column:='event_id');

-- Change shard count of all tables in colocation group
SELECT alter_distributed_table('github_events', shard_count:=6, cascade_to_colocated:=true);

-- Change colocation
SELECT alter_distributed_table('github_events', colocate_with:='another_table');
alter_table_set_access_method (table_name regclass, access_method text) returns void #

Changes access method of a table (e.g. heap or columnar).

Arguments:

  • table_name — the name of the table whose access method will change.

  • access_method — the name of the new access method.

The example below shows how to use the function:

SELECT alter_table_set_access_method('github_events', 'columnar');
remove_local_tables_from_metadata () returns void #

Removes local tables from metadata of the citus extension that no longer need to be there. (See the citus.enable_local_reference_table_foreign_keys configuration parameter.)

Usually if a local table is in citus metadata, there is a reason, such as the existence of foreign keys between the table and a reference table. However, if citus.enable_local_reference_table_foreign_keys is disabled, citus will no longer manage metadata in that situation, and unnecessary metadata can persist until manually cleaned.

create_reference_table (table_name regclass) returns void #

Defines a small reference or dimension table. This function takes in a table name, and creates a distributed table with just one shard, replicated to every worker node.

Arguments:

  • table_name — the name of the small dimension or reference table, which needs to be distributed.

The example below informs the database that the nation table should be defined as a reference table:

SELECT create_reference_table('nation');
citus_add_local_table_to_metadata (table_name regclass, cascade_via_foreign_keys boolean) returns void #

Adds a local Postgres Pro table into citus metadata. A major use case for this function is to make local tables on the coordinator accessible from any node in the cluster. This is mostly useful when running queries from other nodes. The data associated with the local table stays on the coordinator, only its schema and metadata are sent to the workers.

Note that adding local tables to the metadata comes at a slight cost. When you add the table, citus must track it in the pg_dist_partition. Local tables that are added to metadata inherit the same limitations as reference tables (see the Creating and Modifying Distributed Objects (DDL) and SQL Support and Workarounds sections).

If you use the undistribute_table function, citus will automatically remove the resulting local tables from metadata, which eliminates such limitations on those tables.

Arguments:

  • table_name — the name of the table on the coordinator to be added to citus metadata.

  • cascade_via_foreign_keys — when this optional argument is set to true, the function adds other tables that are in a foreign key relationship with given table into metadata automatically. Use caution with this argument, because it can potentially affect many tables. The default value is false.

The example below informs the database that the nation table should be defined as a coordinator-local table, accessible from any node:

SELECT citus_add_local_table_to_metadata('nation');
update_distributed_table_colocation (table_name regclass, colocate_with text) returns void #

Updates co-location of a distributed table. This function can also be used to break co-location of a distributed table. citus will implicitly co-locate two tables if the distribution column is the same type, this can be useful if the tables are related and will do some joins. If table A and B are co-located and table A gets rebalanced, table B will also be rebalanced. If table B does not have a replica identity, the rebalance will fail. Therefore, this function can be useful breaking the implicit co-location in that case. Note that this function does not move any data around physically.

Arguments:

  • table_name — the name of the table co-location of which will be updated.

  • colocate_with — the table with which the table should be co-located.

If you want to break the co-location of a table, specify colocate_with => 'none'.

The example below shows that co-location of table A is updated as co-location of table B:

SELECT update_distributed_table_colocation('A', colocate_with => 'B');

Assume that table A and table B are co-located (possibily implicitly). If you want to break the co-location, do the following:

SELECT update_distributed_table_colocation('A', colocate_with => 'none');

Now, assume that tables A, B, C, and D are co-located and you want to co-locate table A with B and table C with table D:

SELECT update_distributed_table_colocation('C', colocate_with => 'none');
SELECT update_distributed_table_colocation('D', colocate_with => 'C');

If you have a hash-distributed table named none and you want to update its co-location, you can do:

SELECT update_distributed_table_colocation('"none"', colocate_with => 'some_other_hash_distributed_table');
create_distributed_function (function_name regprocedure, distribution_arg_name text, colocate_with text, force_delegation bool) returns void #

Propagates a function from the coordinator node to workers and marks it for distributed execution. When a distributed function is called on the coordinator, citus uses the value of the distribution_arg_name argument to pick a worker node to run the function. Calling the function on workers increases parallelism and can bring the code closer to data in shards for lower latency.

Note that the Postgres Pro search path is not propagated from the coordinator to workers during distributed function execution, so distributed function code should fully qualify the names of database objects. Also notices emitted by the functions will not be displayed to the user.

Arguments:

  • function_name — the name of the function to be distributed. The name must include the function parameter types in parentheses because multiple functions can have the same name in Postgres Pro. For instance, 'foo(int)' is different from 'foo(int, text)'.

  • distribution_arg_name — the argument name by which to distribute. For convenience (or if the function arguments do not have names), a positional placeholder is allowed, such as '$1'. If this argument is not specified, then the function named by function_name is merely created on the workers. If worker nodes are added in the future, the function will automatically be created there too. This is an optional argument.

  • colocate_with — when the distributed function reads or writes to a distributed table (or more generally co-locating tables), be sure to name that table using the this argument. This ensures that each invocation of the function runs on the worker node containing relevant shards. This is an optional argument.

  • force_delegation. The default value is NULL.

The example below shows how to use the function:

-- An example function that updates a hypothetical
-- event_responses table, which itself is distributed by event_id
CREATE OR REPLACE FUNCTION
  register_for_event(p_event_id int, p_user_id int)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
  INSERT INTO event_responses VALUES ($1, $2, 'yes')
  ON CONFLICT (event_id, user_id)
  DO UPDATE SET response = EXCLUDED.response;
END;
$fn$;

-- Distribute the function to workers, using the p_event_id argument
-- to determine which shard each invocation affects, and explicitly
-- colocating with event_responses which the function updates
SELECT create_distributed_function(
  'register_for_event(int, int)', 'p_event_id',
  colocate_with := 'event_responses'
);
alter_columnar_table_set (table_name regclass, chunk_group_row_limit int, stripe_row_limit int, compression name, compression_level int) returns void #

Changes settings on a columnar table. Calling this function on a non-columnar table gives an error. All arguments except the table_name are optional.

To view current options for all columnar tables, consult this table:

SELECT * FROM columnar.options;

The default values for columnar settings for newly created tables can be overridden with these configuration parameters:

  • columnar.compression

  • columnar.compression_level

  • columnar.stripe_row_count

  • columnar.chunk_row_count

Arguments:

  • table_name — the name of the columnar table.

  • chunk_row_count — the maximum number of rows per chunk for newly inserted data. Existing chunks of data will not be changed and may have more rows than this maximum value. The default value is 10000.

  • stripe_row_count — the maximum number of rows per stripe for newly inserted data. Existing stripes of data will not be changed and may have more rows than this maximum value. The default value is 150000.

  • compression — the compression type for the newly inserted data. Existing data will not be recompressed or decompressed. The default and generally suggested value is zstd (if support has been compiled in). Allowed values are none, pglz, zstd, lz4, and lz4hc.

  • compression_level. Allowed values are from 1 to 19. If the compression method does not support the level chosen, the closest level will be selected instead.

The example below shows how to use the function:

SELECT alter_columnar_table_set(
  'my_columnar_table',
  compression => 'none',
  stripe_row_count => 10000);
create_time_partitions (table_name regclass, partition_interval interval, end_at timestamptz, start_from timestamptz) returns boolean #

Creates partitions of a given interval to cover a given range of time. Returns true if new partitions are created and false if they already exist.

Arguments:

  • table_name — the table for which to create new partitions. The table must be partitioned on one column of type date, timestamp, or timestamptz.

  • partition_interval — the interval of time, such as '2 hours', or '1 month', to use when setting ranges on new partitions.

  • end_at — create partitions up to this time. The last partition will contain the point end_at and no later partitions will be created.

  • start_from — pick the first partition so that it contains the point start_from. The default value is now().

The example below shows how to use the function:

-- Create a year's worth of monthly partitions
-- in table foo, starting from the current time

SELECT create_time_partitions(
  table_name         := 'foo',
  partition_interval := '1 month',
  end_at             := now() + '12 months'
);
drop_old_time_partitions (table_name regclass, older_than timestamptz) #

Removes all partitions whose intervals fall before a given timestamp. In addition to using this function, you might consider the alter_old_partitions_set_access_method function to compress the old partitions with columnar storage.

Arguments:

  • table_name — the table for which to remove partitions. The table must be partitioned on one column of type date, timestamp, or timestamptz.

  • older_than — drop partitions whose upper limit is less than or equal to the older_than value.

The example below shows how to use the procedure:

-- Drop partitions that are over a year old

CALL drop_old_time_partitions('foo', now() - interval '12 months');
alter_old_partitions_set_access_method (parent_table_name regclass, older_than timestamptz, new_access_method name) #

In the timeseries data use case tables are often partitioned by time and old partitions are compressed into read-only columnar storage.

Arguments:

  • parent_table_name — the table for which to change partitions. The table must be partitioned on one column of type date, timestamp, or timestamptz.

  • older_than — change partitions whose upper limit is less than or equal to the older_than value.

  • new_access_method. Allowed values are heap for row-based storage or columnar for columnar storage.

The example below shows how to use the procedure:

CALL alter_old_partitions_set_access_method(
  'foo', now() - interval '6 months',
  'columnar'
);
H.5.6.5.1.2. Metadata / Configuration Information #
citus_add_node (nodename text, nodeport integer, groupid integer, noderole noderole, nodecluster name) returns integer #

Note

This function requires database superuser access to run.

Registers a new node addition in the cluster in the citus metadata table pg_dist_node. It also copies reference tables to the new node. The function returns the nodeid column from the newly inserted row in pg_dist_node.

If you call the function on a single-node cluster, be sure to call the citus_set_coordinator_host function first.

Arguments:

  • nodename — the DNS name or IP address of the new node to be added.

  • nodeport — the port on which Postgres Pro is listening on the worker node.

  • groupid — the group of one primary server and its secondary servers, relevant only for streaming replication. Be sure to set this argument to a value greater than zero, since zero is reserved for the coordinator node. The default value is -1.

  • noderole — the role of the node. Allowed values are primary and secondary. The default value is primary.

  • nodecluster — the name of the cluster. The default value is default.

The example below shows how to use the function:

SELECT * FROM citus_add_node('new-node', 12345);
 citus_add_node
-----------------
               7
(1 row)
citus_update_node (node_id int, new_node_name text, new_node_port int, force bool, lock_cooldown int) returns void #

Note

This function requires database superuser access to run.

Changes the hostname and port for a node registered in the citus metadata table pg_dist_node.

Arguments:

  • node_id — the node ID from the pg_dist_node table.

  • new_node_name — the updated DNS name or IP address for the node.

  • new_node_port — the updated port on which Postgres Pro is listening on the worker node.

  • force. The default value is false.

  • lock_cooldown. The default value is 10000.

The example below shows how to use the function:

SELECT * FROM citus_update_node(123, 'new-address', 5432);
citus_set_node_property (nodename text, nodeport integer, property text, value boolean) returns void #

Changes properties in the citus metadata table pg_dist_node. Currently it can change only the shouldhaveshards property.

Arguments:

  • nodename — the DNS name or IP address for the node.

  • nodeport — the port on which Postgres Pro is listening on the worker node.

  • property — the column to change in pg_dist_node, currently only the shouldhaveshard property is supported.

  • value — the new value for the column.

The example below shows how to use the function:

SELECT * FROM citus_set_node_property('localhost', 5433, 'shouldhaveshards', false);
citus_add_inactive_node (nodename text, nodeport integer, groupid integer, noderole noderole, nodecluster name) returns integer #

Note

This function requires database superuser access to run.

Similarly to citus_add_node, registers a new node in pg_dist_node. However, it marks the new node as inactive, meaning no shards will be placed there. Also it does not copy reference tables to the new node. The function returns the nodeid column from the newly inserted row in pg_dist_node.

Arguments:

  • nodename — the DNS name or IP address of the new node to be added.

  • nodeport — the port on which Postgres Pro is listening on the worker node.

  • groupid — the group of one primary server and zero or more secondary servers, relevant only for streaming replication. The default is -1.

  • noderole — the role of the node. Allowed values are primary and secondary. The default value is primary.

  • nodecluster — the name of the cluster. The default value is default.

The example below shows how to use the function:

SELECT * FROM citus_add_inactive_node('new-node', 12345);
 citus_add_inactive_node
--------------------------
                        7
(1 row)
citus_activate_node (nodename text, nodeport integer) returns integer #

Note

This function requires database superuser access to run.

Marks a node as active in the citus metadata table pg_dist_node and copies reference tables to the node. Useful for nodes added via citus_add_inactive_node. The function returns the nodeid column from the newly inserted row in pg_dist_node.

Arguments:

  • nodename — the DNS name or IP address of the new node to be added.

  • nodeport — the port on which Postgres Pro is listening on the worker node.

The example below shows how to use the function:

SELECT * FROM citus_activate_node('new-node', 12345);
 citus_activate_node
----------------------
                    7
(1 row)
citus_disable_node (nodename text, nodeport integer, synchronous bool) returns void #

Note

This function requires database superuser access to run.

This function is the opposite from citus_activate_node. It marks a node as inactive in the citus metadata table pg_dist_node, removing it from the cluster temporarily. The function also deletes all reference table placements from the disabled node. To reactivate the node, just call citus_activate_node again.

Arguments:

  • nodename — the DNS name or IP address of the node to be disabled.

  • nodeport — the port on which Postgres Pro is listening on the worker node.

  • synchronous. The default value is false.

The example below shows how to use the function:

SELECT * FROM citus_disable_node('new-node', 12345);
citus_add_secondary_node (nodename text, nodeport integer, primaryname text, primaryport integer, nodecluster name) returns integer #

Note

This function requires database superuser access to run.

Registers a new secondary node in the cluster for an existing primary node. The function updates the citus pg_dist_node metadata table. The function returns the nodeid column for the secondary node from the inserted row in pg_dist_node.

Arguments:

  • nodename — the DNS name or IP address of the new node to be added.

  • nodeport — the port on which Postgres Pro is listening on the worker node.

  • primaryname — the DNS name or IP address of the primary node for this secondary.

  • primaryport — the port on which Postgres Pro is listening on the primary node.

  • nodecluster — the name of the cluster. The default value is default.

The example below shows how to use the function:

SELECT * FROM citus_add_secondary_node('new-node', 12345, 'primary-node', 12345);
 citus_add_secondary_node
---------------------------
                         7
(1 row)
citus_remove_node (nodename text, nodeport integer) returns void #

Note

This function requires database superuser access to run.

Removes the specified node from the pg_dist_node metadata table. This function will error out if there are existing shard placements on this node. Thus, before using this function, the shards will need to be moved off that node.

Arguments:

  • nodename — the DNS name of the node to be removed.

  • nodeport — the port on which Postgres Pro is listening on the worker node.

The example below shows how to use the function:

SELECT citus_remove_node('new-node', 12345);
 citus_remove_node
--------------------

(1 row)
citus_get_active_worker_nodes () returns setof record #

Returns active worker host names and port numbers as a list of tuples where each tuple contains the following information:

  • node_name — the DNS name of the worker node.

  • node_port — the port on the worker node on which the database server is listening.

The example below shows the output of the function:

SELECT * FROM citus_get_active_worker_nodes();
 node_name | node_port
-----------+-----------
 localhost |      9700
 localhost |      9702
 localhost |      9701

(3 rows)
citus_backend_gpid () returns bigint #

Returns the global process identifier (GPID) for the Postgres Pro backend serving the current session. The GPID value encodes both a node in the citus cluster and the operating system process ID of Postgres Pro on that node. The GPID is returned in the following form: (node ID * 10,000,000,000) + process ID.

citus extends the Postgres Pro server signaling functions pg_cancel_backend and pg_terminate_backend so that they accept GPIDs. In citus, calling these functions on one node can affect a backend running on another node.

The example below shows the output of the function:

SELECT citus_backend_gpid();
citus_backend_gpid
--------------------
       10000002055
citus_check_cluster_node_health () returns setof record #

Checks connectivity between all nodes. If there are N nodes, this function checks all N2 connections between them. The function returns the list of tuples where each tuple contains the following information:

  • from_nodename — the DNS name of the source worker node.

  • from_nodeport — the port on the source worker node on which the database server is listening.

  • to_nodename — the DNS name of the destination worker node.

  • to_nodeport — the port on the destination worker node on which the database server is listening.

  • result — whether a connection could be established.

The example below shows the output of the function:

SELECT * FROM citus_check_cluster_node_health();
from_nodename │ from_nodeport │ to_nodename │ to_nodeport │ result
---------------+---------------+-------------+-------------+--------
localhost     |          1400 | localhost   |        1400 | t
localhost     |          1400 | localhost   |        1401 | t
localhost     |          1400 | localhost   |        1402 | t
localhost     |          1401 | localhost   |        1400 | t
localhost     |          1401 | localhost   |        1401 | t
localhost     |          1401 | localhost   |        1402 | t
localhost     |          1402 | localhost   |        1400 | t
localhost     |          1402 | localhost   |        1401 | t
localhost     |          1402 | localhost   |        1402 | t

(9 rows)
citus_set_coordinator_host (host text, port integer, node_role noderole, node_cluster name) returns void #

This function is required when adding worker nodes to a citus cluster, which was created initially as a single-node cluster. When the coordinator registers a new worker, it adds a coordinator hostname from the value of the citus.local_hostname configuration parameter, which is localhost by default. The worker would attempt to connect to localhost to talk to the coordinator, which is obviously wrong.

Thus, the system administrator should call this function before calling the citus_add_node function in a single-node cluster.

Arguments:

  • host — the DNS name of the coordinator node.

  • port — the port on which the coordinator lists for Postgres Pro connections. The default value of this optional argument is current_setting('port').

  • node_role — the role of the node. The default value of this optional argument is primary.

  • node_cluster — the name of the cluster. The default value of this optional argument is default.

The example below shows how to use the function:

-- Assuming we are in a single-node cluster

-- First establish how workers should reach us
SELECT citus_set_coordinator_host('coord.example.com', 5432);

-- Then add a worker
SELECT * FROM citus_add_node('worker1.example.com', 5432);
get_shard_id_for_distribution_column (table_name regclass, distribution_value "any") returns bigint #

citus assigns every row of a distributed table to a shard based on the value of the row's distribution column and the table's method of distribution. In most cases the precise mapping is a low-level detail that the database administrator can ignore. However, it can be useful to determine a row's shard either for manual database maintenance tasks or just to satisfy curiosity. The get_shard_id_for_distribution_column function provides this info for hash-distributed tables as well as reference tables and returns the shard ID that citus associates with the distribution column value for the given table.

Arguments:

  • table_name — the name of the distributed table.

  • distribution_value — the value of the distribution column. The default value is NULL.

The example below shows how to use the function:

SELECT get_shard_id_for_distribution_column('my_table', 4);

 get_shard_id_for_distribution_column
--------------------------------------
                               540007
(1 row)
column_to_column_name (table_name regclass, column_var_text text) returns text #

Translates the partkey column of the pg_dist_partition table into a textual column name. This is useful to determine the distribution column of a distributed table. The function returns the distribution column name of the table_name table.

Arguments:

  • table_name — name of the distributed table.

  • column_var_text — value of partkey column in the pg_dist_partition table.

The example below shows how to use the function:

-- Get distribution column name for products table

SELECT column_to_column_name(logicalrelid, partkey) AS dist_col_name
  FROM pg_dist_partition
 WHERE logicalrelid='products'::regclass;
┌───────────────┐
│ dist_col_name │
├───────────────┤
│ company_id    │
└───────────────┘
citus_relation_size (logicalrelid regclass) returns bigint #

Returns the disk space used by all the shards of the specified distributed table. This includes the size of the main fork but excludes the visibility map and free space map for the shards.

Arguments:

  • logicalrelid — the name of the distributed table.

The example below shows how to use the function:

SELECT pg_size_pretty(citus_relation_size('github_events'));
pg_size_pretty
--------------
23 MB
citus_table_size (logicalrelid regclass) returns bigint #

Returns the disk space used by all the shards of the specified distributed table, excluding indexes (but including TOAST, free space map, and visibility map).

Arguments:

  • logicalrelid — the name of the distributed table.

The example below shows how to use the function:

SELECT pg_size_pretty(citus_table_size('github_events'));
pg_size_pretty
--------------
37 MB
citus_total_relation_size (logicalrelid regclass, fail_on_error boolean) returns bigint #

Returns the total disk space used by the all the shards of the specified distributed table, including all indexes and TOAST data.

Arguments:

  • logicalrelid — the name of the distributed table.

  • fail_on_error. The default value is true.

The example below shows how to use the function:

SELECT pg_size_pretty(citus_total_relation_size('github_events'));
pg_size_pretty
--------------
73 MB
citus_stat_statements_reset () returns void #

Removes all rows from the citus_stat_statements table. Note that this works independently from the pg_stat_statements_reset function. To reset all stats, call both functions.

H.5.6.5.1.3. Cluster Management And Repair Functions #
citus_move_shard_placement (shard_id bigint, source_node_name text, source_node_port integer, target_node_name text, target_node_port integer, shard_transfer_mode citus.shard_transfer_mode) returns void #

Moves a given shard (and shards co-located with it) from one node to another. It is typically used indirectly during shard rebalancing rather than being called directly by a database administrator.

There are two ways to move the data: blocking or non-blocking. The blocking approach means that during the move all modifications to the shard are paused. The second way, which avoids blocking shard writes, relies on Postgres Pro 10 logical replication.

After a successful move operation, shards in the source node get deleted. If the move fails at any point, this function throws an error and leaves the source and target nodes unchanged.

Arguments:

  • shard_id — the ID of the shard to be moved.

  • source_node_name — the DNS name of the node on which the healthy shard placement is present (source node).

  • source_node_port — the port on the source worker node on which the database server is listening.

  • target_node_name — the DNS name of the node on which the invalid shard placement is present (target node).

  • target_node_port — the port on the target worker node on which the database server is listening.

  • shard_transfer_mode — specify the method of replication, whether to use Postgres Pro logical replication or a cross-worker COPY command. The allowed values of this optional argument are:

    • auto — require replica identity if logical replication is possible, otherwise use legacy behaviour. This is the default value.

    • force_logical — use logical replication even if the table does not have a replica identity. Any concurrent update/delete statements to the table will fail during replication.

    • block_writes — use COPY (blocking writes) for tables lacking primary key or replica identity.

The example below shows how to use the function:

SELECT citus_move_shard_placement(12345, 'from_host', 5432, 'to_host', 5432);
citus_rebalance_start (rebalance_strategy name, drain_only boolean, shard_transfer_mode citus.shard_transfer_mode) returns bigint #

Moves table shards to make them evenly distributed among the workers. It begins a background job to do the rebalancing and returns immediately.

The rebalancing process first calculates the list of moves it needs to make in order to ensure that the cluster is balanced within the given threshold. Then, it moves shard placements one by one from the source node to the destination node and updates the corresponding shard metadata to reflect the move.

Every shard is assigned a cost when determining whether shards are evenly distributed. By default each shard has the same cost (a value of 1), so distributing to equalize the cost across workers is the same as equalizing the number of shards on each. The constant cost strategy is called by_shard_count and is the default rebalancing strategy.

The by_shard_count strategy is appropriate under these circumstances:

  • The shards are roughly the same size.

  • The shards get roughly the same amount of traffic.

  • Worker nodes are all the same size/type.

  • Shards have not been pinned to particular workers.

If any of these assumptions do not hold, then rebalancing using the by_shard_count strategy can result in a bad plan.

If any of these assumptions do not hold, then rebalancing using the by_shard_count strategy can result in a bad plan.

The default rebalancing starategy is by_disk_size. You can always customize the strategy, using the rebalance_strategy parameter.

It is advisable to call the get_rebalance_table_shards_plan function before citus_rebalance_start to see and verify the actions to be performed.

Arguments:

  • rebalance_strategy — name of a strategy in the pg_dist_rebalance_strategy table. If this argument is omitted, the function chooses the default strategy, as indicated in the table. The default value of this optional argument is NULL.

  • drain_only. When true, move shards off worker nodes who have shouldhaveshards set to false in the pg_dist_node table; move no other shards. The default value of this optional argument is false.

  • shard_transfer_mode — specify the method of replication, whether to use Postgres Pro logical replication or a cross-worker COPY command. The allowed values of this optional argument are:

    • auto — require replica identity if logical replication is possible, otherwise use legacy behaviour. This is the default value.

    • force_logical — use logical replication even if the table does not have a replica identity. Any concurrent update/delete statements to the table will fail during replication.

    • block_writes — use COPY (blocking writes) for tables lacking primary key or replica identity.

The example below will attempt to rebalance shards:

SELECT citus_rebalance_start();
NOTICE:  Scheduling...
NOTICE:  Scheduled as job 1337.
DETAIL:  Rebalance scheduled as background job 1337.
HINT:  To monitor progress, run: SELECT details FROM citus_rebalance_status();
citus_rebalance_status () returns table #

Allows you to monitor the progress of the rebalance. Returns immediately, while the rebalance continues as a background job.

To get general information about the rebalance, you can select all columns from the status. This shows the basic state of the job:

SELECT * FROM citus_rebalance_status();
.
 job_id |  state   | job_type  |           description           |          started_at           |          finished_at          | details
--------+----------+-----------+---------------------------------+-------------------------------+-------------------------------+-----------
      4 | running  | rebalance | Rebalance colocation group 1    | 2022-08-09 21:57:27.833055+02 | 2022-08-09 21:57:27.833055+02 | { ... }

Rebalancer specifics live in the details column, in JSON format:

SELECT details FROM citus_rebalance_status();
{
    "phase": "copy",
    "phase_index": 1,
    "phase_count": 3,
    "last_change":"2022-08-09 21:57:27",
    "colocations": {
        "1": {
            "shard_moves": 30,
            "shard_moved": 29,
            "last_move":"2022-08-09 21:57:27"
        },
        "1337": {
            "shard_moves": 130,
            "shard_moved": 0
        }
    }
}
citus_rebalance_stop () returns void #

Cancels the rebalance in progress, if any.

citus_rebalance_wait () returns void #

Blocks until a running rebalance is complete. If no rebalance is in progress when this function is called, then the function returns immediately.

The function can be useful for scripts or benchmarking.

get_rebalance_table_shards_plan () returns table #

Outputs the planned shard movements of citus_rebalance_start without performing them. While it is unlikely, this function can output a slightly different plan than what a citus_rebalance_start call with the same arguments will do. This could happen because they are not executed at the same time, so facts about the cluster, e.g. disk space, might differ between the calls. The function returns tuples containing the following columns:

  • table_name — the table whose shards would move.

  • shardid — the shard in question.

  • shard_size — the size, in bytes.

  • sourcename — the hostname of the source node.

  • sourceport — the port of the source node.

  • targetname — the hostname of the destination node.

  • targetport — the port of the destination node.

Arguments:

  • A superset of the arguments for the citus_rebalance_start function: relation, threshold, max_shard_moves, excluded_shard_list, and drain_only.

get_rebalance_progress () returns table #

Once the shard rebalance begins, this function lists the progress of every shard involved. It monitors the moves planned and executed by the citus_rebalance_start function. The function returns tuples containing the following columns:

  • sessionid — the Postgres Pro PID of the rebalance monitor.

  • table_name — the table whose shards are moving.

  • shardid — the shard in question.

  • shard_size — the size of the shard, in bytes.

  • sourcename — the hostname of the source node.

  • sourceport — the port of the source node.

  • targetname — the hostname of the destination node.

  • targetport — the port of the destination node.

  • progress. The following values may be returned: 0 — waiting to be moved, 1 — moving, 2 — complete.

  • source_shard_size — the size of the shard on the source node, in bytes.

  • target_shard_size — the size of the shard on the target node, in bytes.

The example below shows how to use the function:

SELECT * FROM get_rebalance_progress();
┌───────────┬────────────┬─────────┬────────────┬───────────────┬────────────┬───────────────┬────────────┬──────────┬───────────────────┬───────────────────┐
│ sessionid │ table_name │ shardid │ shard_size │  sourcename   │ sourceport │  targetname   │ targetport │ progress │ source_shard_size │ target_shard_size │
├───────────┼────────────┼─────────┼────────────┼───────────────┼────────────┼───────────────┼────────────┼──────────┼───────────────────┼───────────────────┤
│      7083 │ foo        │  102008 │    1204224 │ n1.foobar.com │       5432 │ n4.foobar.com │       5432 │        0 │           1204224 │                 0 │
│      7083 │ foo        │  102009 │    1802240 │ n1.foobar.com │       5432 │ n4.foobar.com │       5432 │        0 │           1802240 │                 0 │
│      7083 │ foo        │  102018 │     614400 │ n2.foobar.com │       5432 │ n4.foobar.com │       5432 │        1 │            614400 │            354400 │
│      7083 │ foo        │  102019 │       8192 │ n3.foobar.com │       5432 │ n4.foobar.com │       5432 │        2 │                 0 │              8192 │
└───────────┴────────────┴─────────┴────────────┴───────────────┴────────────┴───────────────┴────────────┴──────────┴───────────────────┴───────────────────┘
citus_add_rebalance_strategy (name name, shard_cost_function regproc, node_capacity_function regproc, shard_allowed_on_node_function regproc, default_threshold float4, minimum_threshold float4, improvement_threshold float4) returns void #

Append a row to the pg_dist_rebalance_strategy table.

Arguments:

  • name — the identifier for the new strategy.

  • shard_cost_function — identifies the function used to determine the cost of each shard.

  • node_capacity_function — identifies the function to measure node capacity.

  • shard_allowed_on_node_function — identifies the function that determines which shards can be placed on which nodes.

  • default_threshold — floating point threshold that tunes how precisely the cumulative shard cost should be balanced between nodes.

  • minimum_threshold — safeguard column that holds the minimum value allowed for the threshold argument of the citus_rebalance_start. function. The default value is 0.

  • improvement_threshold. The default value is 0.

citus_set_default_rebalance_strategy (name text) returns void #

Update the pg_dist_rebalance_strategy table changing the strategy named by its argument to be the default chosen when rebalancing shards.

Arguments:

  • name — the name of the strategy in the pg_dist_rebalance_strategy table.

The example below shows how to use the function:

SELECT citus_set_default_rebalance_strategy('by_disk_size');
citus_remote_connection_stats () returns setof record #

Shows the number of active connections to each remote node.

The example below shows how to use the function:

SELECT * FROM citus_remote_connection_stats();
.
    hostname    | port | database_name | connection_count_to_node
----------------+------+---------------+--------------------------
 citus_worker_1 | 5432 | postgres      |                        3
(1 row)
citus_drain_node (nodename text, nodeport integer, shard_transfer_mode citus.shard_transfer_mode, rebalance_strategy name) returns void #

Moves shards off the designated node and onto other nodes who have shouldhaveshards set to true in the pg_dist_node table. This function is designed to be called prior to removing a node from the cluster, i.e. turning the node's physical server off.

Arguments:

  • nodename — the DNS name of the node to be drained.

  • nodeport — the port number of the node to be drained.

  • shard_transfer_mode — specify the method of replication, whether to use Postgres Pro logical replication or a cross-worker COPY command. The allowed values of this optional argument are:

    • auto — require replica identity if logical replication is possible, otherwise use legacy behaviour. This is the default value.

    • force_logical — use logical replication even if the table does not have a replica identity. Any concurrent update/delete statements to the table will fail during replication.

    • block_writes — use COPY (blocking writes) for tables lacking primary key or replica identity.

  • rebalance_strategy — the name of a strategy in the pg_dist_rebalance_strategy table. If this argument is omitted, the function chooses the default strategy, as indicated in the table. The default value of this optional argument is NULL.

Here are the typical steps to remove a single node (for example '10.0.0.1' on a standard Postgres Pro port):

  1. Drain the node.

    SELECT * FROM citus_drain_node('10.0.0.1', 5432);
    

  2. Wait until the command finishes.

  3. Remove the node.

When draining multiple nodes it is recommended to use the citus_rebalance_start function instead. Doing so allows citus to plan ahead and move shards the minimum number of times.

  1. Run this for each node that you want to remove:

    SELECT * FROM citus_set_node_property(node_hostname, node_port, 'shouldhaveshards', false);
    
  2. Drain them all at once with the citus_rebalance_start function:

    SELECT * FROM citus_rebalance_start(drain_only := true);
    
  3. Wait until the draining rebalance finishes.

  4. Remove the nodes.

isolate_tenant_to_new_shard (table_name regclass, tenant_id "any", cascade_option text, shard_transfer_mode citus.shard_transfer_mode) returns bigint #

Creates a new shard to hold rows with a specific single value in the distribution column. It is especially handy for the multi-tenant citus use case, where a large tenant can be placed alone on its own shard and ultimately its own physical node. The function returns the unique ID assigned to the newly created shard.

Arguments:

  • table_name — the name of the table to get a new shard.

  • tenant_id — the value of the distribution column which will be assigned to the new shard.

  • cascade_option. When set to CASCADE, also isolates a shard from all tables in the current table's co-locating tables.

  • shard_transfer_mode — specify the method of replication, whether to use Postgres Pro logical replication or a cross-worker COPY command. The allowed values of this optional argument are:

    • auto — require replica identity if logical replication is possible, otherwise use legacy behaviour. This is the default value.

    • force_logical — use logical replication even if the table does not have a replica identity. Any concurrent update/delete statements to the table will fail during replication.

    • block_writes — use COPY (blocking writes) for tables lacking primary key or replica identity.

The example below shows how to create a new shard to hold the lineitems for tenant 135:

SELECT isolate_tenant_to_new_shard('lineitem', 135);
┌─────────────────────────────┐
│ isolate_tenant_to_new_shard │
├─────────────────────────────┤
│                      102240 │
└─────────────────────────────┘
citus_create_restore_point (name text) returns pg_lsn #

Temporarily blocks writes to the cluster, and creates a named restore point on all nodes. This function is similar to pg_create_restore_point, but applies to all nodes and makes sure the restore point is consistent across them. This function is well suited to doing point-in-time recovery, and cluster forking. The function returns the coordinator_lsn value, i.e. the log sequence number of the restore point in the coordinator node WAL.

Arguments:

  • name — the name of the restore point to create.

The example below shows how to use the function:

SELECT citus_create_restore_point('foo');
┌────────────────────────────┐
│ citus_create_restore_point │
├────────────────────────────┤
│ 0/1EA2808                  │
└────────────────────────────┘
H.5.6.5.2. citus Tables and Views #
H.5.6.5.2.1. Coordinator Metadata #

citus divides each distributed table into multiple logical shards based on the distribution column. The coordinator then maintains metadata tables to track statistics and information about the health and location of these shards. In this section, we describe each of these metadata tables and their schema. You can view and query these tables using SQL after logging into the coordinator node.

The pg_dist_partition Table #

The pg_dist_partition table stores metadata about which tables in the database are distributed. For each distributed table, it also stores information about the distribution method and detailed information about the distribution column.

Name Type Description
logicalrelid regclass Distributed table to which this row corresponds. This value references the relfilenode column in the pg_class system catalog table.
partmethod char The method used for partitioning / distribution. The values of this column corresponding to different distribution methods are: hash — h, reference table — n.
partkey text Detailed information about the distribution column including column number, type, and other relevant information.
colocationid integer Co-location group to which this table belongs. Tables in the same group allow co-located joins and distributed rollups among other optimizations. This value references the colocationid column in the pg_dist_colocation table.
repmodel char The method used for data replication. The values of this column corresponding to different replication methods are: Postgres Pro streaming replication — s, two-phase commit (for reference tables) — t.
SELECT * FROM pg_dist_partition;
 logicalrelid  | partmethod |                                                        partkey                                                         | colocationid | repmodel 
---------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 github_events | h          | {VAR :varno 1 :varattno 4 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 4 :location -1} |            2 | s
 (1 row)
The pg_dist_shard Table #

The pg_dist_shard table stores metadata about individual shards of a table. This includes information about which distributed table the shard belongs to and statistics about the distribution column for that shard. In case of hash distributed tables, they are hash token ranges assigned to that shard. These statistics are used for pruning away unrelated shards during SELECT queries.

Name Type Description
logicalrelid regclass Distributed table to which this shard belongs. This value references the relfilenode column in the pg_class system catalog table.
shardid bigint Globally unique identifier assigned to this shard.
shardstorage char Type of storage used for this shard. Different storage types are discussed in the table below.
shardminvalue text For hash distributed tables, minimum hash token value assigned to that shard (inclusive).
shardmaxvalue text For hash distributed tables, maximum hash token value assigned to that shard (inclusive).
SELECT * FROM pg_dist_shard;
 logicalrelid  | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------+---------+--------------+---------------+---------------
 github_events |  102026 | t            | 268435456     | 402653183
 github_events |  102027 | t            | 402653184     | 536870911
 github_events |  102028 | t            | 536870912     | 671088639
 github_events |  102029 | t            | 671088640     | 805306367
 (4 rows)

The shardstorage column in pg_dist_shard indicates the type of storage used for the shard. A brief overview of different shard storage types and their representation is below.

Storage Type shardstorage value Description
TABLE t Indicates that shard stores data belonging to a regular distributed table.
COLUMNAR c Indicates that shard stores columnar data. (Used by distributed cstore_fdw tables).
FOREIGN f Indicates that shard stores foreign data. (Used by distributed file_fdw tables).
The citus_shards View #

In addition to the low-level shard metadata table described above, citus provides the citus_shards view to easily check:

  • Where each shard is (node and port),

  • What kind of table it belongs to, and

  • Its size.

This view helps you inspect shards to find, among other things, any size imbalances across nodes.

SELECT * FROM citus_shards;
.
 table_name | shardid | shard_name   | citus_table_type | colocation_id | nodename  | nodeport | shard_size
------------+---------+--------------+------------------+---------------+-----------+----------+------------
 dist       |  102170 | dist_102170  | distributed      |            34 | localhost |     9701 |   90677248
 dist       |  102171 | dist_102171  | distributed      |            34 | localhost |     9702 |   90619904
 dist       |  102172 | dist_102172  | distributed      |            34 | localhost |     9701 |   90701824
 dist       |  102173 | dist_102173  | distributed      |            34 | localhost |     9702 |   90693632
 ref        |  102174 | ref_102174   | reference        |             2 | localhost |     9701 |       8192
 ref        |  102174 | ref_102174   | reference        |             2 | localhost |     9702 |       8192
 dist2      |  102175 | dist2_102175 | distributed      |            34 | localhost |     9701 |     933888
 dist2      |  102176 | dist2_102176 | distributed      |            34 | localhost |     9702 |     950272
 dist2      |  102177 | dist2_102177 | distributed      |            34 | localhost |     9701 |     942080
 dist2      |  102178 | dist2_102178 | distributed      |            34 | localhost |     9702 |     933888

The colocation_id refers to the colocation group. For more info about citus_table_type, see the Table Types section.

The pg_dist_placement Table #

The pg_dist_placement table tracks the location of shards on worker nodes. Each shard assigned to a specific node is called a shard placement. This table stores information about the health and location of each shard placement.

Name Type Description
placementid bigint Unique auto-generated identifier for each individual placement.
shardid bigint Shard identifier associated with this placement. This value references the shardid column in the pg_dist_shard catalog table.
shardstate int Describes the state of this placement. Different shard states are discussed in the section below.
shardlength bigint For hash distributed tables, zero.
groupid int Identifier used to denote a group of one primary server and zero or more secondary servers.
SELECT * FROM pg_dist_placement;
  placementid | shardid | shardstate | shardlength | groupid
 -------------+---------+------------+-------------+---------
            1 |  102008 |          1 |           0 |       1
            2 |  102008 |          1 |           0 |       2
            3 |  102009 |          1 |           0 |       2
            4 |  102009 |          1 |           0 |       3
            5 |  102010 |          1 |           0 |       3
            6 |  102010 |          1 |           0 |       4
            7 |  102011 |          1 |           0 |       4
The pg_dist_node Table #

The pg_dist_node table contains information about the worker nodes in the cluster.

Name Type Description
nodeid int Auto-generated identifier for an individual node.
groupid int Identifier used to denote a group of one primary server and zero or more secondary servers. By default it is the same as the nodeid.
nodename text Host name or IP Address of the Postgres Pro worker node.
nodeport int Port number on which the Postgres Pro worker node is listening.
noderack text Rack placement information for the worker node. This is an optional column.
hasmetadata boolean Reserved for internal use.
isactive boolean Whether the node is active accepting shard placements.
noderole text Whether the node is a primary or secondary.
nodecluster text The name of the cluster containing this node.
metadatasynced boolean Reserved for internal use.
shouldhaveshards boolean If false, shards will be moved off node (drained) when rebalancing, nor will shards from new distributed tables be placed on the node, unless they are co-located with shards already there.
SELECT * FROM pg_dist_node;
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
      1 |       1 | localhost |    12345 | default  | f           | t        | primary  | default     | f              | t
      2 |       2 | localhost |    12346 | default  | f           | t        | primary  | default     | f              | t
      3 |       3 | localhost |    12347 | default  | f           | t        | primary  | default     | f              | t
(3 rows)
The citus.pg_dist_object Table #

The citus.pg_dist_object table contains a list of objects such as types and functions that have been created on the coordinator node and propagated to worker nodes. When an administrator adds new worker nodes to the cluster, citus automatically creates copies of the distributed objects on the new nodes (in the correct order to satisfy object dependencies).

Name Type Description
classid oid Class of the distributed object
objid oid Object ID of the distributed object
objsubid integer Object sub-ID of the distributed object, e.g. attnum
type text Part of the stable address used during upgrades with pg_upgrade
object_names text[] Part of the stable address used during upgrades with pg_upgrade
object_args text[] Part of the stable address used during upgrades with pg_upgrade
distribution_argument_index integer Only valid for distributed functions/procedures
colocationid integer Only valid for distributed functions/procedures

Stable addresses uniquely identify objects independently of a specific server. citus tracks objects during a Postgres Pro upgrade using stable addresses created with the pg_identify_object_as_address function.

Here is an example of how the create_distributed_function function adds entries to the citus.pg_dist_object table:

CREATE TYPE stoplight AS enum ('green', 'yellow', 'red');

CREATE OR REPLACE FUNCTION intersection()
RETURNS stoplight AS $$
DECLARE
        color stoplight;
BEGIN
        SELECT *
          FROM unnest(enum_range(NULL::stoplight)) INTO color
         ORDER BY random() LIMIT 1;
        RETURN color;
END;
$$ LANGUAGE plpgsql VOLATILE;

SELECT create_distributed_function('intersection()');

-- Will have two rows, one for the TYPE and one for the FUNCTION
TABLE citus.pg_dist_object;
-[ RECORD 1 ]---------------+------
classid                     | 1247
objid                       | 16780
objsubid                    | 0
type                        |
object_names                |
object_args                 |
distribution_argument_index |
colocationid                |
-[ RECORD 2 ]---------------+------
classid                     | 1255
objid                       | 16788
objsubid                    | 0
type                        |
object_names                |
object_args                 |
distribution_argument_index |
colocationid                |
The citus_schemas View #

citus supports schema-based sharding and provides the citus_schemas view that shows which schemas have been distributed in the system. The view only lists distributed schemas, local schemas are not displayed.

Name Type Description
schema_name regnamespace Name of the distributed schema
colocation_id integer Co-location ID of the distributed schema
schema_size text Human-readable size summary of all objects within the schema
schema_owner name Role that owns the schema

Here is an example:

schema_name  | colocation_id | schema_size | schema_owner
--------------+---------------+-------------+--------------
user_service |             1 | 0 bytes     | user_service
time_service |             2 | 0 bytes     | time_service
ping_service |             3 | 632 kB      | ping_service
The citus_tables View #

The citus_tables view shows a summary of all tables managed by citus (distributed and reference tables). The view combines information from citus metadata tables for an easy, human-readable overview of these table properties:

Here is an example:

SELECT * FROM citus_tables;
┌────────────┬──────────────────┬─────────────────────┬───────────────┬────────────┬─────────────┬─────────────┬───────────────┐
│ table_name │ citus_table_type │ distribution_column │ colocation_id │ table_size │ shard_count │ table_owner │ access_method │
├────────────┼──────────────────┼─────────────────────┼───────────────┼────────────┼─────────────┼─────────────┼───────────────┤
│ foo.test   │ distributed      │ test_column         │             1 │ 0 bytes    │          32 │ citus       │ heap          │
│ ref        │ reference        │ <none>              │             2 │ 24 GB      │           1 │ citus       │ heap          │
│ test       │ distributed      │ id                  │             1 │ 248 TB     │          32 │ citus       │ heap          │
└────────────┴──────────────────┴─────────────────────┴───────────────┴────────────┴─────────────┴─────────────┴───────────────┘
The time_partitions View #

citus provides user defined functions to manage partitions for the timeseries use case. It also maintains the time_partitions view to inspect the partitions it manages.

The columns of this view are as follows:

  • parent_table — the table which is partitioned.

  • partition_column — the column on which the parent table is partitioned.

  • partition — the name of a partition table.

  • from_value — lower bound in time for rows in this partition.

  • to_value — upper bound in time for rows in this partition.

  • access_methodheap for row-based storage and columnar for columnar storage.

SELECT * FROM time_partitions;
┌────────────────────────┬──────────────────┬─────────────────────────────────────────┬─────────────────────┬─────────────────────┬───────────────┐
│      parent_table      │ partition_column │                partition                │     from_value      │      to_value       │ access_method │
├────────────────────────┼──────────────────┼─────────────────────────────────────────┼─────────────────────┼─────────────────────┼───────────────┤
│ github_columnar_events │ created_at       │ github_columnar_events_p2015_01_01_0000 │ 2015-01-01 00:00:00 │ 2015-01-01 02:00:00 │ columnar      │
│ github_columnar_events │ created_at       │ github_columnar_events_p2015_01_01_0200 │ 2015-01-01 02:00:00 │ 2015-01-01 04:00:00 │ columnar      │
│ github_columnar_events │ created_at       │ github_columnar_events_p2015_01_01_0400 │ 2015-01-01 04:00:00 │ 2015-01-01 06:00:00 │ columnar      │
│ github_columnar_events │ created_at       │ github_columnar_events_p2015_01_01_0600 │ 2015-01-01 06:00:00 │ 2015-01-01 08:00:00 │ heap          │
└────────────────────────┴──────────────────┴─────────────────────────────────────────┴─────────────────────┴─────────────────────┴───────────────┘
The pg_dist_colocation Table #

The pg_dist_colocation table contains information about which tables' shards should be placed together, or co-located. When two tables are in the same co-location group, citus ensures shards with the same partition values will be placed on the same worker nodes. This enables join optimizations, certain distributed rollups, and foreign key support. Shard co-location is inferred when the shard counts, and partition column types all match between two tables; however, a custom co-location group may be specified when creating a distributed table, if so desired.

Name Type Description
colocationid int Unique identifier for the co-location group this row corresponds to
shardcount int Shard count for all tables in this co-location group
replicationfactor int Replication factor for all tables in this co-location group. (Deprecated)
distributioncolumntype oid The type of the distribution column for all tables in this co-location group
distributioncolumncollation oid The collation of the distribution column for all tables in this co-location group
SELECT * FROM pg_dist_colocation;
  colocationid | shardcount | replicationfactor | distributioncolumntype | distributioncolumncollation
 --------------+------------+-------------------+------------------------+-----------------------------
             2 |         32 |                 1 |                     20 |                           0
  (1 row)
The pg_dist_rebalance_strategy Table #

This table defines strategies that the citus_rebalance_start function can use to determine where to move shards.

Name Type Description
name name Unique name for the strategy
default_strategy boolean Whether citus_rebalance_start should choose this strategy by default. Use citus_set_default_rebalance_strategy to update this column.
shard_cost_function regproc Identifier for a cost function, which must take a shardid as bigint and return its notion of a cost, as type real.
node_capacity_function regproc Identifier for a capacity function, which must take a nodeid as int and return its notion of node capacity as type real.
shard_allowed_on_node_function regproc Identifier for a function that given shardid bigint and nodeidarg int, returns boolean for whether the shard is allowed to be stored on the node.
default_threshold float4 Threshold for deeming a node too full or too empty, which determines when the citus_rebalance_start function should try to move shards.
minimum_threshold float4 A safeguard to prevent the threshold argument of citus_rebalance_start from being set too low.
improvement_threshold float4 Determines when moving a shard is worth it during a rebalance. The rebalancer will move a shard when the ratio of the improvement with the shard move to the improvement without crosses the threshold. This is most useful with the by_disk_size strategy.

A citus installation ships with these strategies in the table:

SELECT * FROM pg_dist_rebalance_strategy;
-[ RECORD 1 ]------------------+---------------------------------
name                           | by_shard_count
default_strategy               | f
shard_cost_function            | citus_shard_cost_1
node_capacity_function         | citus_node_capacity_1
shard_allowed_on_node_function | citus_shard_allowed_on_node_true
default_threshold              | 0
minimum_threshold              | 0
improvement_threshold          | 0
-[ RECORD 2 ]------------------+---------------------------------
name                           | by_disk_size
default_strategy               | t
shard_cost_function            | citus_shard_cost_by_disk_size
node_capacity_function         | citus_node_capacity_1
shard_allowed_on_node_function | citus_shard_allowed_on_node_true
default_threshold              | 0.1
minimum_threshold              | 0.01
improvement_threshold          | 0.5

The by_shard_count strategy, assigns every shard the same cost. Its effect is to equalize the shard count across nodes. The default strategy, by_disk_size, assigns a cost to each shard matching its disk size in bytes plus that of the shards that are co-located with it. The disk size is calculated using pg_total_relation_size, so it includes indices. This strategy attempts to achieve the same disk space on every node. Note the threshold of 0.1 — it prevents unnecessary shard movement caused by insigificant differences in disk space.

Here are examples of functions that can be used within new shard rebalancer strategies, and registered in the pg_dist_rebalance_strategy table with the citus_add_rebalance_strategy function.

  • Setting a node capacity exception by hostname pattern:

    -- Example of node_capacity_function
    
    CREATE FUNCTION v2_node_double_capacity(nodeidarg int)
        RETURNS real AS $$
        SELECT
            (CASE WHEN nodename LIKE '%.v2.worker.citusdata.com' THEN 2.0::float4 ELSE 1.0::float4 END)
        FROM pg_dist_node where nodeid = nodeidarg
        $$ LANGUAGE sql;
    
  • Rebalancing by number of queries that go to a shard, as measured by the citus_stat_statements table:

    -- Example of shard_cost_function
    
    CREATE FUNCTION cost_of_shard_by_number_of_queries(shardid bigint)
        RETURNS real AS $$
        SELECT coalesce(sum(calls)::real, 0.001) as shard_total_queries
        FROM citus_stat_statements
        WHERE partition_key is not null
            AND get_shard_id_for_distribution_column('tab', partition_key) = shardid;
    $$ LANGUAGE sql;
    
  • Isolating a specific shard (10000) on a node (address '10.0.0.1'):

    -- Example of shard_allowed_on_node_function
    
    CREATE FUNCTION isolate_shard_10000_on_10_0_0_1(shardid bigint, nodeidarg int)
        RETURNS boolean AS $$
        SELECT
            (CASE WHEN nodename = '10.0.0.1' THEN shardid = 10000 ELSE shardid != 10000 END)
        FROM pg_dist_node where nodeid = nodeidarg
        $$ LANGUAGE sql;
    
    -- The next two definitions are recommended in combination with the above function.
    -- This way the average utilization of nodes is not impacted by the isolated shard
    CREATE FUNCTION no_capacity_for_10_0_0_1(nodeidarg int)
        RETURNS real AS $$
        SELECT
            (CASE WHEN nodename = '10.0.0.1' THEN 0 ELSE 1 END)::real
        FROM pg_dist_node where nodeid = nodeidarg
        $$ LANGUAGE sql;
    CREATE FUNCTION no_cost_for_10000(shardid bigint)
        RETURNS real AS $$
        SELECT
            (CASE WHEN shardid = 10000 THEN 0 ELSE 1 END)::real
        $$ LANGUAGE sql;
    
The citus_stat_statements Table #

citus provides the citus_stat_statements table for stats about how queries are being executed, and for whom. It is analogous to (and can be joined with) the pg_stat_statements view in Postgres Pro, which tracks statistics about query speed.

Name Type Description
queryid bigint Identifier (good for pg_stat_statements joins)
userid oid User who ran the query
dbid oid Database instance of coordinator
query text Anonymized query string
executor text citus executor used: adaptive, or INSERT-SELECT
partition_key text Value of distribution column in router-executed queries, else NULL
calls bigint Number of times the query was run
-- Create and populate distributed table
CREATE TABLE foo ( id int );
SELECT create_distributed_table('foo', 'id');
INSERT INTO foo select generate_series(1,100);

-- Enable stats
-- pg_stat_statements must be in shared_preload_libraries
CREATE EXTENSION pg_stat_statements;

SELECT count(*) from foo;
SELECT * FROM foo where id = 42;

SELECT * FROM citus_stat_statements;

Results:

-[ RECORD 1 ]-+----------------------------------------------
queryid       | -909556869173432820
userid        | 10
dbid          | 13340
query         | insert into foo select generate_series($1,$2)
executor      | insert-select
partition_key |
calls         | 1
-[ RECORD 2 ]-+----------------------------------------------
queryid       | 3919808845681956665
userid        | 10
dbid          | 13340
query         | select count(*) from foo;
executor      | adaptive
partition_key |
calls         | 1
-[ RECORD 3 ]-+----------------------------------------------
queryid       | 5351346905785208738
userid        | 10
dbid          | 13340
query         | select * from foo where id = $1
executor      | adaptive
partition_key | 42
calls         | 1

Caveats:

  • The stats data is not replicated and will not survive database crashes or failover.

  • Tracks a limited number of queries set by the pg_stat_statements.max configuration parameter. The default value is 5000.

  • To truncate the table, use the citus_stat_statements_reset function.

The citus_stat_tenants View #

The citus_stat_tenants view augments the citus_stat_statements table with information about how many queries each tenant is running. Tracing queries to originating tenants helps, among other things, for deciding when to do tenant isolation.

This view counts recent single-tenant queries happening during a configurable time period. The tally of read-only and total queries for the period increases until the current period ends. After that, the counts are moved to last period's statistics, which stays constant until expiration. The period length can be set in seconds using citus.stats_tenants_period, and is 60 seconds by default.

The view displays up to citus.stat_tenants_limit rows (by default 100). It counts only queries filtered to a single tenant, ignoring queries that apply to multiple tenants at once.

Name Type Description
nodeid int Node ID from the pg_dist_node
colocation_id int ID of the co-location group
tenant_attribute text Value in the distribution column identifying tenant
read_count_in_this_period int Number of read (SELECT) queries for tenant in period
read_count_in_last_period int Number of read queries one period of time ago
query_count_in_this_period int Number of read/write queries for tenant in time period
query_count_in_last_period int Number of read/write queries one period of time ago
cpu_usage_in_this_period double Seconds of CPU time spent for this tenant in period
cpu_usage_in_last_period double Seconds of CPU time spent for this tenant last period

Tracking tenant level statistics adds overhead, and by default is disabled. To enable it, set citus.stat_tenants_track to 'all'.

By way of example, suppose we have a distributed table called dist_table, with distribution column tenant_id. Then we make some queries:

INSERT INTO dist_table(tenant_id) VALUES (1);
INSERT INTO dist_table(tenant_id) VALUES (1);
INSERT INTO dist_table(tenant_id) VALUES (2);

SELECT count(*) FROM dist_table WHERE tenant_id = 1;

The tenant-level statistics will reflect the queries we just made:

SELECT tenant_attribute, read_count_in_this_period,
       query_count_in_this_period, cpu_usage_in_this_period
  FROM citus_stat_tenants;
tenant_attribute | read_count_in_this_period | query_count_in_this_period | cpu_usage_in_this_period
------------------+---------------------------+----------------------------+--------------------------
1                |                         1 |                          3 |                 0.000883
2                |                         0 |                          1 |                 0.000144
Distributed Query Activity #

In some situations, queries might get blocked on row-level locks on one of the shards on a worker node. If that happens then those queries would not show up in pg_locks on the citus coordinator node.

citus provides special views to watch queries and locks throughout the cluster, including shard-specific queries used internally to build results for distributed queries.

  • citus_stat_activity — shows the distributed queries that are executing on all nodes. A superset of pg_stat_activity usable wherever the latter is.

  • citus_dist_stat_activity — the same as citus_stat_activity but restricted to distributed queries only, and excluding citus fragments queries.

  • citus_lock_waits — blocked queries throughout the cluster.

The first two views include all columns of pg_stat_activity plus the global PID of the worker that initiated the query.

For example, consider counting the rows in a distributed table:

-- Run in one session
-- (with a pg_sleep so we can see it)

SELECT count(*), pg_sleep(3) FROM users_table;

We can see the query appear in citus_dist_stat_activity:

-- Run in another session

SELECT * FROM citus_dist_stat_activity;

-[ RECORD 1 ]----+-------------------------------------------
global_pid       | 10000012199
nodeid           | 1
is_worker_query  | f
datid            | 13724
datname          | postgres
pid              | 12199
leader_pid       |
usesysid         | 10
usename          | postgres
application_name | psql
client_addr      |
client_hostname  |
client_port      | -1
backend_start    | 2022-03-23 11:30:00.533991-05
xact_start       | 2022-03-23 19:35:28.095546-05
query_start      | 2022-03-23 19:35:28.095546-05
state_change     | 2022-03-23 19:35:28.09564-05
wait_event_type  | Timeout
wait_event       | PgSleep
state            | active
backend_xid      |
backend_xmin     | 777
query_id         |
query            | SELECT count(*), pg_sleep(3) FROM users_table;
backend_type     | client backend

The citus_dist_stat_activity view hides internal citus fragment queries. To see those, we can use the more detailed citus_stat_activity view. For instance, the previous count(*) query requires information from all shards. Some of the information is in shard users_table_102039, which is visible in the query below.

SELECT * FROM citus_stat_activity;

-[ RECORD 1 ]----+-----------------------------------------------------------------------
global_pid       | 10000012199
nodeid           | 1
is_worker_query  | f
datid            | 13724
datname          | postgres
pid              | 12199
leader_pid       |
usesysid         | 10
usename          | postgres
application_name | psql
client_addr      |
client_hostname  |
client_port      | -1
backend_start    | 2022-03-23 11:30:00.533991-05
xact_start       | 2022-03-23 19:32:18.260803-05
query_start      | 2022-03-23 19:32:18.260803-05
state_change     | 2022-03-23 19:32:18.260821-05
wait_event_type  | Timeout
wait_event       | PgSleep
state            | active
backend_xid      |
backend_xmin     | 777
query_id         |
query            | SELECT count(*), pg_sleep(3) FROM users_table;
backend_type     | client backend
-[ RECORD 2 ]----+-----------------------------------------------------------------------------------------
global_pid       | 10000012199
nodeid           | 1
is_worker_query  | t
datid            | 13724
datname          | postgres
pid              | 12725
leader_pid       |
usesysid         | 10
usename          | postgres
application_name | citus_internal gpid=10000012199
client_addr      | 127.0.0.1
client_hostname  |
client_port      | 44106
backend_start    | 2022-03-23 19:29:53.377573-05
xact_start       |
query_start      | 2022-03-23 19:32:18.278121-05
state_change     | 2022-03-23 19:32:18.278281-05
wait_event_type  | Client
wait_event       | ClientRead
state            | idle
backend_xid      |
backend_xmin     |
query_id         |
query            | SELECT count(*) AS count FROM public.users_table_102039 users WHERE true
backend_type     | client backend

The query field shows rows being counted in shard 102039.

Here are examples of useful queries you can build using citus_stat_activity:

-- Active queries' wait events

SELECT query, wait_event_type, wait_event
  FROM citus_stat_activity
 WHERE state='active';

-- Active queries' top wait events

SELECT wait_event, wait_event_type, count(*)
  FROM citus_stat_activity
 WHERE state='active'
 GROUP BY wait_event, wait_event_type
 ORDER BY count(*) desc;

-- Total internal connections generated per node by citus

SELECT nodeid, count(*)
  FROM citus_stat_activity
 WHERE is_worker_query
 GROUP BY nodeid;

The next view is citus_lock_waits. To see how it works, we can generate a locking situation manually. First we will set up a test table from the coordinator:

CREATE TABLE numbers AS
  SELECT i, 0 AS j FROM generate_series(1,10) AS i;
SELECT create_distributed_table('numbers', 'i');

Then, using two sessions on the coordinator, we can run this sequence of statements:

-- Session 1                           -- Session 2
-------------------------------------  -------------------------------------
BEGIN;
UPDATE numbers SET j = 2 WHERE i = 1;
                                       BEGIN;
                                       UPDATE numbers SET j = 3 WHERE i = 1;
                                       -- (this blocks)

The citus_lock_waits view shows the situation.

SELECT * FROM citus_lock_waits;

-[ RECORD 1 ]-------------------------+--------------------------------------
waiting_gpid                          | 10000011981
blocking_gpid                         | 10000011979
blocked_statement                     | UPDATE numbers SET j = 3 WHERE i = 1;
current_statement_in_blocking_process | UPDATE numbers SET j = 2 WHERE i = 1;
waiting_nodeid                        | 1
blocking_nodeid                       | 1

In this example the queries originated on the coordinator, but the view can also list locks between queries originating on workers.

H.5.6.5.2.2. Tables on All Nodes #

citus has other informational tables and views which are accessible on all nodes, not just the coordinator.

The pg_dist_authinfo Table #

The pg_dist_authinfo table holds authentication parameters used by citus nodes to connect to one another.

Name Type Description
nodeid integer Node ID from pg_dist_node, or 0, or -1
rolename name Postgres Pro role
authinfo text Space-separated libpq connection parameters

Upon beginning a connection, a node consults the table to see whether a row with the destination nodeid and desired rolename exists. If so, the node includes the corresponding authinfo string in its libpq connection. A common example is to store a password, like 'password=abc123', but you can review the full list of possibilities.

The parameters in authinfo are space-separated, in the form key=val. To write an empty value, or a value containing spaces, surround it with single quotes, e.g., keyword='a value'. Single quotes and backslashes within the value must be escaped with a backslash, i.e., \' and \\.

The nodeid column can also take the special values 0 and -1, which mean all nodes or loopback connections, respectively. If, for a given node, both specific and all-node rules exist, the specific rule has precedence.

SELECT * FROM pg_dist_authinfo;

 nodeid | rolename | authinfo
--------+----------+-----------------
    123 | jdoe     | password=abc123
(1 row)
The pg_dist_poolinfo Table #

If you want to use a connection pooler to connect to a node, you can specify the pooler options using pg_dist_poolinfo. This metadata table holds the host, port and database name for citus to use when connecting to a node through a pooler.

If pool information is present, citus will try to use these values instead of setting up a direct connection. The pg_dist_poolinfo information in this case supersedes pg_dist_node.

Name Type Description
nodeid integer Node ID from pg_dist_node
poolinfo text Space-separated parameters: host, port, or dbname

Note

In some situations citus ignores the settings in pg_dist_poolinfo. For instance shard rebalancing is not compatible with connection poolers such as pgbouncer. In these scenarios citus will use a direct connection.

-- How to connect to node 1 (as identified in pg_dist_node)

INSERT INTO pg_dist_poolinfo (nodeid, poolinfo)
     VALUES (1, 'host=127.0.0.1 port=5433');
H.5.6.5.3. Configuration Reference #

There are various configuration parameters that affect the behaviour of citus. These include both standard Postgres Pro parameters and citus specific parameters. To learn more about Postgres Pro configuration parameters, you can visit Chapter 19.

The rest of this reference aims at discussing citus specific configuration parameters. These parameters can be set similar to Postgres Pro parameters by modifying postgresql.conf or by using the SET command.

As an example you can update a setting with:

ALTER DATABASE citus SET citus.multi_task_query_log_level = 'log';
H.5.6.5.3.1. General Configuration #
citus.max_background_task_executors_per_node (integer) #

Determines how many background tasks can be executed in parallel at a given time. For instance, these tasks are for shard moves from/to a node. When increasing the value of this parameter, you will often also want to increase the value of the citus.max_background_task_executors and max_worker_processes parameters. The minimum value is 1, the maximum value is 128. The default value is 1.

citus.max_worker_nodes_tracked (integer) #

citus tracks worker nodes' locations and their membership in a shared hash table on the coordinator node. This configuration parameter limits the size of the hash table and consequently the number of worker nodes that can be tracked. The default value is 2048. This parameter can only be set at server start and is effective on the coordinator node.

citus.use_secondary_nodes (enum) #

Sets the policy to use when choosing nodes for the SELECT queries. If set to always, the planner will query only nodes whose noderole is marked as secondary in the pg_dist_node table. The allowed values are:

  • never — all reads happen on primary nodes. This is the default value.

  • always — reads run against secondary nodes instead and INSERT/UPDATE statements are disabled.

citus.cluster_name (text) #

Informs the coordinator node planner which cluster it coordinates. Once cluster_name is set, the planner will query worker nodes in that cluster alone.

citus.enable_version_checks (boolean) #

Upgrading citus version requires a server restart (to pick up the new shared library), as well as running the ALTER EXTENSION UPDATE command. The failure to execute both steps could potentially cause errors or crashes. citus thus validates the version of the code and that of the extension match, and errors out if they do not.

The default value is true, and the parameter is effective on the coordinator. In rare cases, complex upgrade processes may require setting this parameter to false, thus disabling the check.

citus.log_distributed_deadlock_detection (boolean) #

Specifies whether to log distributed deadlock detection related processing in the server log. The default value is false.

citus.distributed_deadlock_detection_factor (floating point) #

Sets the time to wait before checking for distributed deadlocks. In particular the time to wait will be this value multiplied by the value set in the Postgres Pro deadlock_timeout parameter. The default value is 2. The value of -1 disables distributed deadlock detection.

citus.node_connection_timeout (integer) #

Sets the maximum duration to wait for connection establishment, in milliseconds. citus raises an error if the timeout elapses before at least one worker connection is established. This configuration parameter affects connections from the coordinator to workers and workers to each other. The minimum value is 10 milliseconds, the maximum value is 1 hour. The default value is 30 seconds.

The example below shows how to set this parameter:

-- Set to 60 seconds
ALTER DATABASE foo
SET citus.node_connection_timeout = 60000;
citus.node_conninfo (text) #

Sets non-sensitive libpq connection parameters used for all inter-node connections.

The example below shows how to set this parameter:

-- key=value pairs separated by spaces.
-- For example, ssl options:

ALTER DATABASE foo
SET citus.node_conninfo =
  'sslrootcert=/path/to/citus.crt sslmode=verify-full';

citus supports only a specific subset of the allowed options, namely:

  • application_name

  • connect_timeout

  • gsslib (subject to the runtime presence of optional Postgres Pro features)

  • keepalives

  • keepalives_count

  • keepalives_idle

  • keepalives_interval

  • krbsrvname (subject to the runtime presence of optional Postgres Pro features)

  • sslcompression

  • sslcrl

  • sslmode (defaults to require)

  • sslrootcert

  • tcp_user_timeout

The citus.node_conninfo configuration parameter takes effect only on newly opened connections. To force all connections to use the new settings, make sure to reload the Postgres Pro configuration:

SELECT pg_reload_conf();
citus.local_hostname (text) #

citus nodes need occasionally to connect to themselves for systems operations. By default, they use the localhost address to refer to themselves, but this can cause problems. For instance, when a host requires sslmode=verify-full for incoming connections, adding localhost as an alternative hostname on the SSL certificate is not always desirable or even feasible.

The citus.local_hostname configuration parameter selects the hostname a node uses to connect to itself. The default value is localhost.

The example below shows how to set this parameter:

ALTER SYSTEM SET citus.local_hostname TO 'mynode.example.com';
citus.show_shards_for_app_name_prefixes (text) #

By default, citus hides shards from the list of tables Postgres Pro gives to SQL clients. It does this because there are multiple shards per distributed table, and the shards can be distracting to the SQL client.

The citus.show_shards_for_app_name_prefixes configuration parameter allows shards to be displayed for selected clients that want to see them. The default value is ''.

The example below shows how to set this parameter:

-- Show shards to psql only (hide in other clients, like pgAdmin)

SET citus.show_shards_for_app_name_prefixes TO 'psql';

-- Also accepts a comma-separated list

SET citus.show_shards_for_app_name_prefixes TO 'psql,pg_dump';
citus.rebalancer_by_disk_size_base_cost (integer) #

When using the by_disk_size rebalance strategy each shard group will get this cost in bytes added to its actual disk size. This is used to avoid creating a bad balance when there is very little data in some of the shards. The assumption is that even empty shards have some cost, because of parallelism and because empty shard groups will likely grow in the future. The default value is 100 MB.

H.5.6.5.3.2. Query Statistics #
citus.stat_statements_purge_interval (integer) #

Sets the frequency at which the maintenance daemon removes records from the citus_stat_statements table that are unmatched in the pg_stat_statements view. This configuration parameter sets the time interval between purges in seconds, with the default value of 10. The value of 0 disables the purges. This parameter is effective on the coordinator and can be changed at runtime.

The example below shows how to set this parameter:

SET citus.stat_statements_purge_interval TO 5;
citus.stat_statements_max (integer) #

The maximum number of rows to store in the citus_stat_statements table. The default value is 50000 and may be changed to any value in the range of 1000 - 10000000. Note that each row requires 140 bytes of storage, so setting citus.stat_statements_max to its maximum value of 10M would consume 1.4GB of memory.

Changing this configuration parameter will not take effect until Postgres Pro is restarted.

citus.stat_statements_track (enum) #

Recording statistics for citus_stat_statements requires extra CPU resources. When the database is experiencing load, the administrator may wish to disable statement tracking. The citus.stat_statements_track configuration parameter can turn tracking on and off. The allowed values are:

  • all — track all statements. This is the default value.

  • none — disable tracking.

citus.stat_tenants_untracked_sample_rate (floating point) #

Sampling rate for new tenants in the citus_stat_tenants view. The rate can be of range between 0.0 and 1.0. The default value is 1.0 meaning 100% of untracked tenant queries are sampled. Setting it to a lower value means that the already tracked tenants have 100% queries sampled, but tenants that are currently untracked are sampled only at the provided rate.

H.5.6.5.3.3. Data Loading #
citus.shard_count (integer) #

Sets the shard count for hash-partitioned tables and defaults to 32. This value is used by the create_distributed_table function when creating hash-partitioned tables. This parameter can be set at runtime and is effective on the coordinator.

citus.metadata_sync_mode (enum) #

Note

This configuration parameter requires superuser access to change.

This configuration parameter determines how citus synchronizes metadata across nodes. By default, citus updates all metadata in a single transaction for consistency. However, Postgres Pro has a hard memory limit related to cache invalidations, and citus metadata syncing for a large cluster can fail from memory exhaustion.

As a workaround, citus provides an optional nontransactional sync mode, which uses a series of smaller transactions. While this mode works in limited memory, there is a possibility of transactions failing and leaving metadata in an inconsistency state. To help with this potential problem, nontransactional metadata sync is designed as an idempotent action, so you can re-run it repeatedly if needed.

There allowed values for this configiration parameters are as follows:

  • transactional — synchronize all metadata in a single transaction. This is the default value.

  • nontransactional — synchronize metadata using multiple small transactions.

The example below shows how to set this parameter:

-- To add a new node and sync nontransactionally

SET citus.metadata_sync_mode TO 'nontransactional';
SELECT citus_add_node(<ip>, <port>);

-- To manually (re)sync

SET citus.metadata_sync_mode TO 'nontransactional';
SELECT start_metadata_sync_to_all_nodes();

We advise trying transactional mode first and switching to nontransactional only if a memory failure occurs.

H.5.6.5.3.4. Planner Configuration #
citus.local_table_join_policy (enum) #

Determines how citus moves data when doing a join between local and distributed tables. Customizing the join policy can help reduce the amount of data sent between worker nodes.

citus will send either the local or distributed tables to nodes as necessary to support the join. Copying table data is referred to as a conversion. If a local table is converted, then it will be sent to any workers that need its data to perform the join. If a distributed table is converted, then it will be collected in the coordinator to support the join. The citus planner will send only the necessary rows doing a conversion.

There are four modes available to express conversion preference:

  • autocitus will convert either all local or all distributed tables to support local and distributed table joins. citus decides which to convert using a heuristic. It will convert distributed tables if they are joined using a constant filter on a unique index (such as a primary key). This ensures less data gets moved between workers. This is the default value.

  • nevercitus will not allow joins between local and distributed tables.

  • prefer-localcitus will prefer converting local tables to support local and distributed table joins.

  • prefer-distributedcitus will prefer converting distributed tables to support local and distributed table joins. If the distributed tables are huge, using this option might result in moving lots of data between workers.

For example, assume citus_table is a distributed table distributed by the column x, and that postgres_table is a local table:

CREATE TABLE citus_table(x int primary key, y int);
SELECT create_distributed_table('citus_table', 'x');

CREATE TABLE postgres_table(x int, y int);

-- Even though the join is on primary key, there isn't a constant filter
-- hence postgres_table will be sent to worker nodes to support the join
SELECT * FROM citus_table JOIN postgres_table USING (x);

-- There is a constant filter on a primary key, hence the filtered row
-- from the distributed table will be pulled to coordinator to support the join
SELECT * FROM citus_table JOIN postgres_table USING (x) WHERE citus_table.x = 10;

SET citus.local_table_join_policy to 'prefer-distributed';
-- Since we prefer distributed tables, citus_table will be pulled to coordinator
-- to support the join. Note that citus_table can be huge
SELECT * FROM citus_table JOIN postgres_table USING (x);

SET citus.local_table_join_policy to 'prefer-local';
-- Even though there is a constant filter on primary key for citus_table
-- postgres_table will be sent to necessary workers because we are using 'prefer-local'
SELECT * FROM citus_table JOIN postgres_table USING (x) WHERE citus_table.x = 10;
citus.limit_clause_row_fetch_count (integer) #

Sets the number of rows to fetch per task for limit clause optimization. In some cases, SELECT queries with LIMIT clauses may need to fetch all rows from each task to generate results. In those cases, and where an approximation would produce meaningful results, this configuration parameter sets the number of rows to fetch from each shard. Limit approximations are disabled by default and this parameter is set to -1. This value can be set at runtime and is effective on the coordinator.

citus.count_distinct_error_rate (floating point) #

citus can calculate count(distinct) approximates using the Postgres Pro hll extension. This configuration parameter sets the desired error rate when calculating count(distinct): 0.0, which is the default value, disables approximations for count(distinct), and 1.0, which provides no guarantees about the accuracy of results. We recommend setting this parameter to 0.005 for best results. This value can be set at runtime and is effective on the coordinator.

citus.task_assignment_policy (enum) #

Note

This configuration parameter is applicable for queries against reference tables.

Sets the policy to use when assigning tasks to workers. The coordinator assigns tasks to workers based on shard locations. This configuration parameter specifies the policy to use when making these assignments. Currently, there are three possible task assignment policies, which can be used:

  • greedy — aims at evenly distributing tasks across workers. This is the default value.

  • round-robin — assigns tasks to workers in a round-robin fashion alternating between different replicas. This enables much better cluster utilization when the shard count for a table is low compared to the number of workers.

  • first-replica — assigns tasks on the basis of the insertion order of placements (replicas) for the shards. In other words, the fragment query for a shard is simply assigned to the worker which has the first replica of that shard. This method allows you to have strong guarantees about which shards will be used on which nodes (i.e. stronger memory residency guarantees).

This configuration parameter can be set at runtime and is effective on the coordinator.

citus.enable_non_colocated_router_query_pushdown (boolean) #

Enables router planner for the queries that reference non-colocated distributed tables.

Normally, router planner is only enabled for the queries that reference co-located distributed tables because it is not guaranteed to have the target shards always on the same node, e.g., after rebalancing the shards. For this reason, while enabling this flag allows some degree of optimization for the queries that reference non-colocated distributed tables, it is not guaranteed that the same query will work after rebalancing the shards or altering the shard count of one of those distributed tables. The default value is off.

H.5.6.5.3.5. Intermediate Data Transfer #
citus.max_intermediate_result_size (integer) #

The maximum size in KB of intermediate results for CTEs that are unable to be pushed down to worker nodes for execution, and for complex subqueries. The default is 1 GB and a value of -1 means no limit. Queries exceeding the limit will be canceled and produce an error message.

H.5.6.5.3.6. DDL #
citus.enable_ddl_propagation (boolean) #

Specifies whether to automatically propagate DDL changes from the coordinator to all workers. The default value is true. Because some schema changes require an access exclusive lock on tables and because the automatic propagation applies to all workers sequentially it can make a citus cluster temporarily less responsive. You may choose to disable this setting and propagate changes manually.

Note

For a list of DDL propagation support, see the Modifying Tables section.

citus.enable_local_reference_table_foreign_keys (boolean) #

Allows foreign keys to be created between reference and local tables. For the feature to work, the coordinator node must be registered with itself, using the citus_add_node function. The default value is true.

Note that foreign keys between reference tables and local tables come at a slight cost. When you create the foreign key, citus must add the plain table to its metadata and track it in the pg_dist_partition table. Local tables that are added to metadata inherit the same limitations as reference tables (see the Creating and Modifying Distributed Objects (DDL) and SQL Support and Workarounds sections).

If you drop the foreign keys, citus will automatically remove such local tables from metadata, which eliminates such limitations on those tables.

citus.enable_change_data_capture (boolean) #

Causes citus to alter the wal2json and pgoutput logical decoders to work with distributed tables. Specifically, it rewrites the names of shards (e.g. foo_102027) in decoder output to the base names of the distributed tables (e.g. foo). It also avoids publishing duplicate events during tenant isolation and shard split/move/rebalance operations. The default value is false.

citus.enable_schema_based_sharding (boolean) #

With the parameter set to ON all created schemas will be distributed by default. Distributed schemas are automatically associated with individual co-location groups such that the tables created in those schemas will be automatically converted to co-located distributed tables without a shard key. This parameter can be modified for individual sessions.

To learn how to use this configuration parameter, see Section H.5.4.3.

H.5.6.5.3.7. Executor Configuration #
citus.all_modifications_commutative (boolean) #

citus enforces commutativity rules and acquires appropriate locks for modify operations in order to guarantee correctness of behavior. For example, it assumes that an INSERT statement commutes with another INSERT statement, but not with an UPDATE or DELETE statement. Similarly, it assumes that an UPDATE or DELETE statement does not commute with another UPDATE or DELETE statement. This means that UPDATE and DELETE statements require citus to acquire stronger locks.

If you have UPDATE statements that are commutative with your INSERTs or other UPDATEs, then you can relax these commutativity assumptions by setting this parameter to true. When this parameter is set to true, all commands are considered commutative and claim a shared lock, which can improve overall throughput. This parameter can be set at runtime and is effective on the coordinator.

citus.multi_task_query_log_level (enum) #

Sets a log-level for any query which generates more than one task (i.e. which hits more than one shard). This is useful during a multi-tenant application migration, as you can choose to error or warn for such queries, to find them and add the tenant_id filter to them. This parameter can be set at runtime and is effective on the coordinator. The default value for this parameter is off. The following values are supported:

  • off — turns off logging any queries, which generate multiple tasks (i.e. span multiple shards).

  • debug — logs statement at the DEBUG severity level.

  • log — logs statement at the LOG severity level. The log line will include the SQL query that was run.

  • notice — logs statement at the NOTICE severity level.

  • warning — logs statement at the WARNING severity level.

  • error — logs statement at the ERROR severity level.

Note that it may be useful to use error during development testing and a lower log-level like log during actual production deployment. Choosing log will cause multi-task queries to appear in the database logs with the query itself shown after STATEMENT.

LOG:  multi-task query about to be executed
HINT:  Queries are split to multiple tasks if they have to be split into several queries on the workers.
STATEMENT:  SELECT * FROM foo;
citus.propagate_set_commands (enum) #

Determines which SET commands are propagated from the coordinator to workers. The default value is none. The following values are supported:

  • none — no SET commands are propagated.

  • local — only SET LOCAL commands are propagated.

citus.enable_repartition_joins (boolean) #

Ordinarily, attempting to perform repartition joins with the adaptive executor will fail with an error message. However, setting this configuration parameter to true allows citus to perform the join. The default value is false.

citus.enable_repartitioned_insert_select (boolean) #

By default, an INSERT INTO … SELECT statement that cannot be pushed down will attempt to repartition rows from the SELECT statement and transfer them between workers for insertion. However, if the target table has too many shards then repartitioning will probably not perform well. The overhead of processing the shard intervals when determining how to partition the results is too great. Repartitioning can be disabled manually by setting this configuration parameter to false.

citus.enable_binary_protocol (boolean) #

Setting this parameter to true instructs the coordinator node to use Postgres Pro binary serialization format (when applicable) to transfer data with workers. Some column types do not support binary serialization.

Enabling this parameter is mostly useful when the workers must return large amounts of data. Examples are when a lot of rows are requested, the rows have many columns, or they use big types such as hll type from the hll extension.

The default value is true. When set to false, all results are encoded and transferred in text format.

citus.max_shared_pool_size (integer) #

Specifies the maximum number of connections that the coordinator node, across all simultaneous sessions, is allowed to make per worker node. Postgres Pro must allocate fixed resources for every connection and this configuration parameter helps ease connection pressure on workers.

Without connection throttling, every multi-shard query creates connections on each worker proportional to the number of shards it accesses (in particular, up to #shards/#workers). Running dozens of multi-shard queries at once can easily hit worker nodes' max_connections limit, causing queries to fail.

By default, the value is automatically set equal to the coordinator's own max_connections, which is not guaranteed to match that of the workers (see the note below). The value -1 disables throttling.

Note

There are certain operations that do not obey this parameter, most importantly repartition joins. That is why it can be prudent to increase the max_connections on the workers a bit higher than max_connections on the coordinator. This gives extra space for connections required for repartition queries on the workers.

citus.max_adaptive_executor_pool_size (integer) #

Whereas citus.max_shared_pool_size limits worker connections across all sessions, the citus.max_adaptive_executor_pool_size limits worker connections from just the current session. This parameter is useful for:

  • Preventing a single backend from getting all the worker resources.

  • Providing priority management: designate low priority sessions with low citus.max_adaptive_executor_pool_size value and high priority sessions with higher values.

The default value is 16.

citus.executor_slow_start_interval (integer) #

Time to wait between opening connections to the same worker node, in milliseconds.

When the individual tasks of a multi-shard query take very little time, they can often be finished over a single (often already cached) connection. To avoid redundantly opening additional connections, the executor waits between connection attempts for the configured number of milliseconds. At the end of the interval, it increases the number of connections it is allowed to open next time.

For long queries (those taking >500 ms), slow start might add latency, but for short queries it is faster. The default value is 10 ms.

citus.max_cached_conns_per_worker (integer) #

Each backend opens connections to the workers to query the shards. At the end of the transaction, the configured number of connections is kept open to speed up subsequent commands. Increasing this value will reduce the latency of multi-shard queries but will also increase overhead on the workers.

The default value is 1. A larger value such as 2 might be helpful for clusters that use a small number of concurrent sessions, but it's not wise to go much further (e.g. 16 would be too high).

citus.force_max_query_parallelization (boolean) #

Simulates the deprecated and now nonexistent real-time executor. This is used to open as many connections as possible to maximize query parallelization.

When this configuration parameter is enabled, citus will force the adaptive executor to use as many connections as possible while executing a parallel distributed query. If not enabled, the executor might choose to use fewer connections to optimize overall query execution throughput. Internally, setting this parameter to true will end up using one connection per task. The default value is false.

One place where this is useful is in a transaction whose first query is lightweight and requires few connections, while a subsequent query would benefit from more connections. citus decides how many connections to use in a transaction based on the first statement, which can throttle other queries unless we use the configuration parameter to provide a hint.

The example below shows how to set this parameter:

BEGIN;
-- Add this hint
SET citus.force_max_query_parallelization TO ON;

-- A lightweight query that doesn't require many connections
SELECT count(*) FROM table WHERE filter = x;

-- A query that benefits from more connections, and can obtain
-- them since we forced max parallelization above
SELECT ... very .. complex .. SQL;
COMMIT;
citus.explain_all_tasks (boolean) #

By default, citus shows the output of a single arbitrary task when running the EXPLAIN command on a distributed query. In most cases, the EXPLAIN output will be similar across tasks. Occasionally, some of the tasks will be planned differently or have much higher execution times. In those cases, it can be useful to enable this parameter, after which the EXPLAIN output will include all tasks. This may cause the EXPLAIN to take longer.

citus.explain_analyze_sort_method (enum) #

Determines the sort method of the tasks in the output of EXPLAIN ANALYZE. The following values are supported:

  • execution-time — sort by execution time.

  • taskId — sort by task ID.