•   PostgreSQL   •   By Dmitriy Ursegov

A New Approach to Sharding for Distributed PostgreSQL

Meeting the scalability challenge for large data sets
Source:
Postgres Pro Team Subscribe to blog
Source:
Postgres Pro Team Subscribe to blog

There are several ways to build a sharded database on top of distributed postgres instances. One of the most interesting and general approach is a built-in support for sharding. Historically postgres has fdw and partitioning features that can be used together to build a sharded database. There were concerns in the past about adoption them for a complete solution. We will review the current state of postgres fdw along with patches, that fix some significant bottlenecks in the planner and demonstrate the latest results in TPC-C with comparison to existing sharding solutions.

Despite the promising results, our experiments with postgres fdw revealed the fundamental issues that are still exist and make it hard to build an efficient system for most of the workloads. We'll discuss these issues and show a general approach that solves them for a cluster of homogeneous postgres instances. In the same time it is based on fdw and partitioning and most of the changes are implemented as extension. It consists of two components. The first part is a transport that allows to use only single connection between each of the nodes. It leads to M+N connections in the cluster in total instead of M*N where M is a number of client connections and N is a number of nodes. We'll show the implementation of such a multiplexing transport that achieves performance of 1 million pings/s between nodes as a single background worker process. The second part is an integration of postgres fdw, execution model and transaction support with the new transport. The implementation provides more than 2 times lower latency for short transactional queries. We achieved a single instance performance on two node cluster for simple queries with near linear scalability. The unmodified postgres fdw setup gives a single instance performance only on 8 nodes cluster in this test. Our approach also allows efficient transfers of data in binary format, that doesn't require a conversion into intermediate representation. The scalability test results for these transport heavy cases will be shown in the article.

For businesses that constantly need to collect and analyze ever-increasing volumes of information, a database that doesn’t easily scale stifles innovation and slows time-to-value for new projects. The key to enabling scalability is overcoming inherent database limitations related to the amount of data, the volume of requests or the size of requests the database can handle.

Strategies for overcoming the scalability challenge include:

  • Replication – Additional servers with a read-only copy of the database are added
  • Partitioning – Tables on single servers are divided into independent parts by some criteria
  • Sharding – Each of the partitions is located on a separate server.

Of these, sharding is usually the most effective. Replication can work well in some cases – and there's no need to change the data model. However, replication actually has limited scalability, especially for large tables, analytics and write requests. Partitioning can help with larger tables but only when a small part of the data is hot. By contrast, sharding offers unlimited scalability. However, sharding requires a high level of cooperation between an application and the database.

For sharding, the data model should ensure that data and queries are distributed evenly across the shards. For example, a database containing documents with some information about country regions should be distributed by document ID, not by region ID, because obviously some regions are bigger than others. The queries related to a single document ID should be adjusted by a filter with this document ID. Multi-document queries should mostly work as is, but the performance will depend on the type of the query. Analytical and search queries will scale well, but short transactional queries will typically give higher latencies and scale slower.

The necessary support from the distributed database lies mostly in two aspects:

  • Build a distributed plan that minimizes the amount and size of data transfers
  • Minimize the overhead caused by these data transfers

Sharding on the application level is reasonable in some cases, but it is hard to implement well and maintain.

PostgreSQL’s built-in sharding

Historically, it has been possible to build a sharded database on Postgres with partitioning and FDW (Foreign Data Wrappers) subsystems. FDW is a PostgreSQL-specific way to access foreign data sources using regular SQL queries.

FDW = a SQL interface to access foreign data sources

  • A set of callbacks in the planner and executor
  • Postgres FDW implements them to access foreign Postgres servers
  • Foreign partition on one server may point to real partition on another

For example, the postgres_fdw extension implements this interface for foreign Postgres servers. A sharded table can be built as a partitioned table where some of the partitions are declared as foreign partitions from different nodes.

Symmetric topology

This provides a symmetric topology where every node can be used to access the distributed database.

Distributed queries

 

SubsystemsCapabilities
  • planning and execution
    • partitioning
    • postgres_fdw
  • transport
    • libpq
    • connections model
    • transactions implementation
  • deduce partitions participated in a query (by filters) and prune the query plan
  • estimate what parts of the plan to push down
  • transfer these parts of the plan and transaction context
  • execute asynchronously and in parallel
  • get the results

 

A database should have several capabilities to execute a distributed query. The table below describes these in more detail and maps them to the corresponding Postgres subsystems.

While partitioning and postgres_fdw provide a great foundation for distributed planning and execution, this model can be greatly improved through more effective use of the transport subsystem.

The role of transport

The transport subsystem is responsible for transferring data, but, in a broader sense, it can’t be separated from transaction implementation or the model of connections and execution.

Transport – less execution time, more data transfers

 

Planning and execution – more execution time, less data transfers

Transport is important for workloads with:

  • Simple queries with network round-trip order latencies (YCSB, pgbench -S, -N)
  • Distributed transactions and many roundtrips (pgbench)
  • More complex queries when the push down problem is solved (TPC-C)
  • Analytics or search queries when a large amount of data is required to transfer for shuffle or results (TPC-H)

Transport is also important for any workload that transfers large amounts of data or at large scale.

So when considering different workloads, a simple principle emerges. The more data that needs to be transferred, the more important the transport becomes. Of course, low-latency queries, queries with many short roundtrips, and large data transfers matter more. At large scale, the overhead from the transport becomes much more significant.

Transport in Postgres FDW

In short, the implementation of transport in Postgres FDW looks as follows:

SELECT * FROM foreign_tbl WHERE id = 10;

  • Standard libpq connection
  • Start RR transaction (required for consistency even on a statement level, for example in case of joins to read the tables with the same snapshot)
  • Create cursor (again in case of joins (1) to fetch from several tables through 1 connection and also (2) to fetch big results in batches of fixed size)
  • Access to foreign table is deparsed back to sql and sent to the foreign server

Min. 5 commands & 5 network round trips.

SELECT * FROM foreign_tbl WHERE id = 10

Transport scalability analysis

Checking the scalability of transport in Postgres FDW reveals:

  • Point queries result in 4x more latency with the same CPU utilization
  • Queries with large results scale better (fetch_size '50000')

These are the results of running pgbench simultaneously on all nodes. Simultaneously means that for n-node cluster every node executes 1/n queries locally and routes the remaining (n-1)/n queries to other nodes.

As noted above, the overhead of sending five commands instead of one leads to 4 times greater latency with the same CPU utilization. This means that FDW achieves single-server performance on only a seven-node cluster. Bulk transfers of results work better, but a three-node cluster is still required to achieve single-server numbers.

The Libpq and connection model

There are other issues as well. Each external client connected to a random node may produce internal connections to all nodes because the required data may be located on any node. As a result, there can be M*N total number of connections in the cluster, where M is the number of external clients and N is the number of nodes. In other words, every node may end up having the number of connections equal to the number of external connections to the whole cluster. In our test, pgbench required 64 connections per node to get maximum throughput, so a hundred-node cluster would lead to 6400 connections per node!

Idle connections make this problem even worse. Network latency and application processing times often result in numerous database connections remaining idle most of the time. This happens even if the applications are issuing database requests as fast as they can. The increased latency requires even more connections to get the maximum throughput. Connections consume memory and generate context switches that become critical at scale.

The Libpq and connection model has other limitations that reduce the overall efficiency:

  • Lack of bulk operations leads to more overhead on transferring headers
  • Execution of commands and fetch the results within a single connection are synchronous with cursors
  • Serialization to text or custom binary representation and back for results is required with libpq protocol

A proposed new transport and connection model

Postgres Professional is proposing a new transport and connection model that has the potential to solve these issues. The architecture details are as follows (it is similar to the ideas from the C10K problem in some aspects):

  • Each node runs a multiplexing process
  • Multiplexers connect with each other via a single connection
  • A pool of workers is used to execute internal requests
  • Workers and backends connect to the multiplexer with shmem queues

Total number of connections (in the cluster): M + N

Total number of processes (in the cluster): M + N * W

where

M = number of external clients

N = number of nodes

W = number of workers on every node

The total number of connections and processes in the cluster becomes much lower.

Multiplexer

The multiplexer process runs the libev loop. In most cases, it just moves the data between backends and workers on local and remote nodes. Messages are read from the ready backends and workers. At the end of each iteration, they are written to the corresponding sockets in vectors. The same happens in the opposite direction. The results can come in any order.

  • Read
    • query messages from backends
    • result messages from workers
  • Write vectors to sockets
  • Read vectors from sockets
  • Write
    • results to backends
    • queries (jobs) to workers

More on workers & backends

The task of a worker is also very simple. It just executes queries in a loop. Here’s what happens in brief:

  • Workers run jobs on a substatement level
    • Parallel work on multiple partitions and tables
    • Serial work on different parts of a query plan
  • Workers are stateless
    • Transaction state is stored on a coordinator
    • State is transferred to workers and back
    • CSN is used as a global snapshot
    • No need for additional transaction commands
  • Fast transfer of results
    • Push protocol and binary representation for results
    • No need for additional fetch commands and serialization to text
  • Fast communication between processes

A worker can switch between transactional contexts quickly and execute queries from different transactions on a substatement level. Workers are stateless; transaction state is stored only on a coordinator, the node that started the transaction. The state goes back and forth between processes. In the current implementation of the PostgreSQL transport, CSN is used as a global snapshot, which makes the state very short.

Other features include push protocol and binary representation for the results, as well as fast communication between processes with shared memory queues.

Synthetic performance

Here are some synthetic numbers for latency and performance. Note that the whole chain includes a backend -> the first shmem queue -> mp1 -> tcp socket -> mp2 -> the second shmem queue -> a worker and the way back

Latency, μs

Multiplexer

Pgbench select 1

Ping localhost

35

32

45



As can be seen above, the results approach the hardware limits.

Throughput

The test is performed as follows:

while (seconds) {

for (i = 0; i < pipeline_len; i++) {

pushMessage(&channel, iov, 3);

}

for (i = 0; i < pipeline_len; i++) {

popMessage(&channel, &msg);

}

}

Pipeline_len

Result msg/s

32

432000

256

626000

If the throughput test is tuned to maintain queue utilization, it is quite possible to get ~1M messages per second.

New transport scalability analysis

Running pgbench with the new transport shows performance and scalability results for point queries and bulk transfers. They were chosen as the corner cases for the new transport.

The horizontal axis of the diagrams shows the number of nodes, and the vertical axis demonstrates the tps number.

With multiplexing and a fixed pool of workers, scalability becomes much better, and it is almost 1:1 to the number of nodes.

Notably, the performance on two nodes exceeds the performance of a single server.

 

Postgres (partitioned)

Postgres FDW

Citus 11.0-beta

Multiplexer

Select-only

78586

42907

65626

88589

Select *

425

382

-

555

What’s next: a to-do list

The implementation of the new transport model is currently incomplete. Following is a list of tasks that still need to be completed.

  1. Optimize multiplexing further and scale the number of processes
    1. One process can handle hundred-node clusters with ~10 cores per node
    2. Reasons: syscalls for point queries and memcpy for bulk transfers
    3. Example: point queries, the number of nodes scaled from 2 to 48
      1. multiplexer ratio is 17.01 for 8-core nodes and 9.95 for 16-core nodes
      2. Postgres FDW ratio is ~10 in both cases
  2. Fix prepared plans for the workers (a long story)
    1. The generic plan is not used for partitions because the custom plan becomes cheaper after pruning
    2. Double planning in Postgres FDW doesn’t use remote cache
    3. Cache is bound to a backend and can’t be used for the workers
  3. Full integration with Postgres FDW
    1. Write transactions support in progress

The first item on the list is related to the limits of a single multiplexing process. Despite eventfd usage, the number of system calls sets the limit for point queries. The same is true for memcpy and bulk transfers. So there is still room for optimization. However, the next step is to look at scaling the number of multiplexing processes. Currently a cluster of 100 nodes with 10 cores each is not limited by a single multiplexing process. For nodes with 16 cores, the scalability ratio drops to the FDW numbers, but the absolute numbers are still several times higher. Also note that these results are for transport-heavy workloads. It is expected that the multiplexer will be less loaded for more balanced queries.

The second item refers to prepared statements. They were broken long before the multiplexing was implemented, and significant time will likely be needed to fix them.

The third item, integration with Postgres Foreign Data Wrappers, is worth looking into in more detail.

Integration with Postgres FDW

It’s clear that Postgres FDW infrastructure is where the new transport belongs, and it needs to be fully integrated there. The following steps are required:

  • Introduce abstract transport interface for Postgres FDW
  • Implement it with multiplexer calls
  • Integrate central transaction state if CSN snapshots are supported, or store state on the nodes for backward compatibility
  • Straightforward modification for serial execution and some work to match async models
  • Write transactions support <= we are here!

Query planning and execution in Postgres FDW

Postgres FDW infrastructure was chosen for the following reason. Postgres FDW and partitioning provide a great infrastructure for distributed query planning and execution. However, it still lacks many capabilities, especially for push execution, so these are being worked on in parallel with the transport.

It is clear that fast transport is not enough to scale complex queries. Optimal planning and push execution become more important.

Stage I

  • Global tables and asymmetric joins
  • Fast transfer of statistics
  • Push down:
    • ForeignScan and FunctionScan Joins
    • Case expressions
    • Partial aggregates (some)
    • Expressions with time functions

Stage II

  • Subplans push down
  • Look through all possible foreign join orders
  • Estimate foreign join as min(hash, nestloop) instead of nestloop only
  • Estimate foreign scan as index scan for global tables with available index information

HammerDB testing results

To check what was gained with the FDW improvements, a HammerDB test was executed concurrently on all nodes. The following was used to ensure a fair “apples to apples” comparison:

  • Postgres with partitions
  • Citus without distributed functions (they don’t give much benefit here anyway)

Please note that the original Postgres FDW performance in the following diagram is marked as a black box on the shardman bar.

These results were achieved without the new transport, so a 30% performance gain is expected when the new transport is implemented.




Conclusions

  • The newly proposed transport ensures low latency and high throughput interconnection for distributed PostgreSQL.
  • The new transport makes execution more efficient for workloads assuming a large volume of requests or huge size of data to be transferred.
  • Postgres FDW with improved push down technique shows great results on complex queries.
  • Postgres FDW with the new transport can efficiently scale to hundred-node clusters and more.

The patches for Postgres FDW and the source code of the new multiplexer extension for PostgreSQL will become available to the community in the near future.

The advantages of the new transport, Postgres FDW, and the partitioning mechanism will provide a great infrastructure for distributed planning and execution.

← Back to all articles

Dmitriy Ursegov