7.4. Silk #

7.4.1. Concept #

Silk (Shardman InterLinK) is an experimental transport feature. It is injected at the point where postgres_fdw decides to transmit deparsed piece of query through libpq connection to the remote node, replacing libpq connection with itself. It is designed to decrease the count of idle postgres_fdw connections during transaction execution, minimize latency and boost overall throughput.

Silk implementation uses several background processes. The main routing/multiplexing process (one per PostgreSQL instance), called silkroad, and a bunch of background workers, called silkworms. While postgres_fdw uses libpq, it spawns multiple libpq connections from each backend to the remote node (where multiple backend processes are spawned accordingly). But if silk replaces libpq - every silkroad process is connected to only one remote silkroad. In this scheme, remote silkworms play the role of remote backends otherwise spawned by postgres_fdw.

Silkroad wires local backend with remote node's workers this way:

  1. Backend process uses regular postgres_fdw API to access remote data as usual. But postgres_fdw, when silk is enabled, writes the query into shared memory queue instead of libpq connection;

  2. Silkroad process parses incoming shared memory queue from that backend and routes the message to appropriate network connection with remote silkroad process.

  3. Remote silkroad process grabs incoming message from network and (if it is a new one) redirects it to available worker's shared memory queue (or in a special "unassigned jobs" queue if all of the workers are busy).

  4. At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.

Silkroad acts here like a common network switch, tossing packets between backend's shared memory and appropriate network socket. It knows nothing about content of a message relying only on the message header.

7.4.2. Event Loop #

Silkroad process runs an event loop powered by the libev library. Each backend's shared memory queue is exposed at the event loop with the eventfd descriptor, and each network connection - with a socket descriptor.

During startup, the backend registers itself (its eventfd descriptors) at a local silkroad process. Silkroad responds by specifying which memory segments to use for the backend's message queue. From this moment silkroad will respond to events from the queue associated with this backend. Network connections between local and remote silkroads will be established at once on the first request from the backend to the remote node and stay alive until both of participants (silkroad processes) exist.

7.4.3. Routing and Multiplexing #

For each subquery, we expect a subset of tuples, and therefore represent the interaction within the subquery as a bidirectional data stream. Silkroad uses an internal routing table to register these streams. A unique stream ID (within the Shardman cluster) is formed as a pair of "origin node address, target node address" and a locally (within the node) unique number. Each particular subquery from a backend to remote nodes will be registered by silkroad as such a stream. So, any backend can be associated with many streams at the time.

When a local silkroad process got a message with a new stream ID from backend, it registers it in a local routing table and then redirects this message to an appropriate socket. If the connection with the remote silkroad does not exist, it is established using a handshake procedure. The original message that initiated a handshake is placed into a special internal buffer until the handshake succeeds. The remote silkroad process receiving a packet with the new ID registers it in its own table, then assigns a silkworm worker from a pool of available workers and places the message into the worker's shared memory queue. If all of the silkworm workers are busy at the moment, the message will be postponed, i.e., placed into a special "unassigned jobs queue" (note that the configuration parameter shardman.silk_unassigned_job_queue_size is 1024). If there is no free space in the queue, an error message will be generated and sent back to the source backend. A job from this queue will be assigned later to the first available worker when it gets rid of the previous job.

When the worker got a new job, it executes it through SPI subsystem, organizing result tuples into batches and sends them back through shared memory to the local silkroad process. The rest is trivial due to the whole route is known. The last resulting packet with tuples in a stream is marked as closing. It is an order to silkroads to wipe out this route from their tables.

Note that backend and remote workers stay subscribed to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify the remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.

To observe the current state of the silkroad multiplexer process, Silk diagnostics views are available, as explained in Section 6.4.8.

7.4.4. Error Handling and Route Integrity #

Besides the routing table silkroad tracks endpoints (backends and network connections) that were involved in some particular stream. So when some connection is closed, all the involved backends (and/or workers) will be notified of that event with a special error message, and all routes/streams related to this connection will be dismissed. The same way, if the backend crashes, its shared memory queue become detached and silkroad reacts by sending error messages to remote participants of every stream related to the crashed backend. So remote workers are not left doing useless work when the requester has already died.

7.4.5. Data Transmitting/batching/splitting Oversized Tuples #

The resulting tuples are transmitted by silkworm in a native binary mode. Tuples with external storage attribute will be deTOASTed, but those that were compressed stay compressed.

Small tuples will be organized in batches (about 256k). Big tuples will be cut into pieces by the sender and assembled into a whole by the receiving backend.

7.4.6. Streams Flow Control #

It may happen that when the next message is received from a backend, it will not fit the target network buffer. Or the message received from the network does not fit into the target shared memory queue. In such a case, the stream that caused this situation will be suspended. This means that the silkroad pauses the reaction to events from the source endpoint (connection or backend) until the target endpoint drains their messages. The rest backends and connections not affected by this route are kept working. Receiving modules of backends are designed to minimize these situations. The backend periodically checks and drains the incoming queue even when the plan executor is busy processing other plan nodes. Received tuples are stored in backend's tuplestores according the plan nodes until the executor requests the next tuple for a particular plan node execution.

When enough space is freed on the target queue, the suspended stream gets resumed, endpoint's events get unblocked and the process of receiving and sorting packets continues.

7.4.7. Implementation details #

7.4.7.1. State Transferring and CSNs #

When postgres_fdw works over Silk transport, only one connection between silkroad routing daemons is used to transfer user requests to silkworm workers and get their responses. Each request contains a transaction state, a replication group ID of the node where the request is formed (coordinator), a query itself and query parameters (if present). A response is either an error response message with a specific error message and error code or a bunch of tuples followed by end of tuples message. This means that silkworm has to switch to the transaction state coming with the request prior to executing the request.

For now, Silk transport is used only for read-only SELECT queries. All modifying requests are processed via a usual libpq connection and handled mostly as all other DML requests in PostgreSQL postgres_fdw. The only distinction is that when a DML request is processed by postgres_fdw, it resets the saved transaction state for the connection cache entry corresponding to the connection where this request is sent. Also a read-only flag is set to false for such a connection cache entry. When a request is sent over Silk transport, Shardman extension asks for the transaction state for a pair of serverid and userid from postgres_fdw. If such a connection cache entry is found in the postgres_fdw connection cache, it is not a read-only cache entry and transaction state is present in this entry, the state is returned. If it is not present, postgres_fdw retreives a full transaction state from the remote server, saves it in the connection cache entry and returns to the Shardman extension.

The full transaction state is similar to the parallel worker transaction state and contains:

  • information related to the current user (uid, username)

  • pid of the current backend

  • transaction start timestamp

  • current snapshot CSN

  • flags indicating that invalidation messages are present

  • backend private state:

    • array of ComboCIDs

    • internal transaction state (full transaction ID, isolation level, current command ID, etc.)

    • information about reindexed indexes

If the connection is not found in the postgres_fdw connection cache (i.e., it is a new connection) or the entry in the connection cache is marked as read-only, only these characteristics form the transaction state:

  • information related to the current user (username)

  • transaction start timestamp

  • current snapshot CSN

  • flags indicating that invalidation messages are present

Using such transaction states, silkworm can attach to a running transaction or start a new read-only transaction with the provided snapshot CSN and retreive the result.

Note that the full transaction state can be imported only on the server that exported it. Also note that due to this transaction state transferring method, you cannot use Silk transport without enabling CSN snapshots.

7.4.7.2. Integration with Asynchronous FDW Engine #

In the Section 7.2.2, asynchronous ForeignScan plan nodes were presented as a way to optimize data retrieval from multiple hosts while these plan nodes were located under a single Append node. In the standard PostgreSQL architecture, the execution of ForeignScan plan nodes is implemented using the network protocol based on libpq. To improve the system performance during data transfer and reduce resource consumption, Shardman employs a different method for exchanging data with remote hosts. The mechanism for executing ForeignScan nodes is implemented using the Silk protocol.

To incorporate Silk transport into the asynchronous executor, modifications were made to the postgres_fdw extension. A pluggable transport was implemented as a set of interface functions included as part of the Shardman extension. During execution of callbacks that interact with remote hosts, these functions are called by the postgres_fdw extension. The pluggable Silk transport is activated if the Shardman extension is preloaded and if the foreign server has the attribute extended_features (applicable for any FDW server in the Shardman cluster). For all other cases, the postgres_fdw extension uses the standard exchange protocol based on libpq.

To disable the pluggable Silk transport in the Shardman cluster, it is necessary to set the query_engine_mode configuration parameter to the value of ENGINE_NONE.

In the current implementation, the pluggable Silk transport is only used for read-only queries, specifically during the execution of the ForeignScan node. The standard exchange protocol based on libpq is used for modifying queries.

When receiving query execution result rows using the Silk transport, the data is stored in a TupleStoreState storage as a complete result set, which is the same size as that returned by the remote host. The TupleStoreState is implemented as a data structure that can spill data to the disk in case of memory shortage. If the remote host returns a large result set, it does not lead to an out-of-memory (OOM) condition. Once the result set is received in the TupleStoreState, the data is copied into the ForeignScan executor's in-memory buffer. The size of this buffer is defined by the fetch_size attribute of the foreign server. The default value of 50000 rows can be adjusted to find a balance between the performance (number of ForeignScan node calls) and memory consumption.

Utilizing the pluggable Silk transport for the asynchronous FDW engine results in an increase of the network exchange performance and a reduction of the system resource consumption due to better utilization of system resources, including the number of network connections.

pdf