7.4. Silk #
7.4.1. Concept #
Silk (Shardman InterLinK) is an experimental transport feature. It is injected at the point where postgres_fdw
decides to transmit deparsed piece of query through libpq
connection to the remote node, replacing libpq
connection with itself. It is designed to decrease the count of idle postgres_fdw
connections during transaction execution, minimize latency and boost overall throughput.
Silk implementation uses several background processes. The main routing/multiplexing process (one per PostgreSQL instance), called silkroad
, and a bunch of background workers, called silkworms
. While postgres_fdw
uses libpq
, it spawns multiple libpq
connections from each backend to the remote node (where multiple backend processes are spawned accordingly). But if silk
replaces libpq
- every silkroad
process is connected to only one remote silkroad
. In this scheme, remote silkworms
play the role of remote backends otherwise spawned by postgres_fdw
.
Silkroad
wires local backend with remote node's workers this way:
Backend process uses regular
postgres_fdw
API to access remote data as usual. Butpostgres_fdw
, when silk is enabled, writes the query into shared memory queue instead oflibpq
connection;Silkroad
process parses incoming shared memory queue from that backend and routes the message to appropriate network connection with remotesilkroad
process.Remote
silkroad
process grabs incoming message from network and (if it is a new one) redirects it to available worker's shared memory queue (or in a special "unassigned jobs" queue if all of the workers are busy).At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.
Silkroad
acts here like a common network switch, tossing packets between backend's shared memory and appropriate network socket. It knows nothing about content of a message relying only on the message header.
7.4.2. Event Loop #
Silkroad
process runs an event loop powered by the libev
library. Each backend's shared memory queue is exposed at the event loop with the eventfd
descriptor, and each network connection - with a socket descriptor.
During startup, the backend registers itself (its eventfd
descriptors) at a local silkroad
process. Silkroad
responds by specifying which memory segments to use for the backend's message queue. From this moment silkroad
will respond to events from the queue associated with this backend. Network connections between local and remote silkroads
will be established at once on the first request from the backend to the remote node and stay alive until both of participants (silkroad
processes) exist.
7.4.3. Routing and Multiplexing #
For each subquery, we expect a subset of tuples, and therefore represent the interaction within the subquery as a bidirectional data stream. Silkroad
uses an internal routing table to register these streams. A unique stream ID (within the Shardman cluster) is formed as a pair of "origin node address, target node address" and a locally (within the node) unique number. Each particular subquery from a backend to remote nodes will be registered by silkroad
as such a stream. So, any backend can be associated with many streams at the time.
When a local silkroad
process got a message with a new stream ID from backend, it registers it in a local routing table and then redirects this message to an appropriate socket. If the connection with the remote silkroad
does not exist, it is established using a handshake procedure. The original message that initiated a handshake is placed into a special internal buffer until the handshake succeeds. The remote silkroad
process receiving a packet with the new ID registers it in its own table, then assigns a silkworm
worker from a pool of available workers and places the message into the worker's shared memory queue. If all of the silkworm
workers are busy at the moment, the message will be postponed, i.e., placed into a special "unassigned jobs queue" (note that the configuration parameter shardman.silk_unassigned_job_queue_size
is 1024). If there is no free space in the queue, an error message will be generated and sent back to the source backend. A job from this queue will be assigned later to the first available worker when it gets rid of the previous job.
When the worker got a new “job”, it executes it through SPI
subsystem, organizing result tuples into batches and sends them back through shared memory to the local silkroad
process. The rest is trivial due to the whole route is known. The last resulting packet with tuples in a stream is marked as “closing”. It is an order to silkroads
to wipe out this route from their tables.
Note that backend and remote workers stay “subscribed” to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify the remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.
To observe the current state of the silkroad
multiplexer process, Silk diagnostics views are available, as explained in Section 6.4.8.
7.4.4. Error Handling and Route Integrity #
Besides the routing table silkroad
tracks endpoints (backends and network connections) that were involved in some particular stream. So when some connection is closed, all the involved backends (and/or workers) will be notified of that event with a special error message, and all routes/streams related to this connection will be dismissed. The same way, if the backend crashes, its shared memory queue become detached and silkroad
reacts by sending error messages to remote participants of every stream related to the crashed backend. So remote workers are not left doing useless work when the requester has already died.
7.4.5. Data Transmitting/batching/splitting Oversized Tuples #
The resulting tuples are transmitted by silkworm
in a native binary mode. Tuples with external
storage attribute will be deTOASTed, but those that were compressed stay compressed.
Small tuples will be organized in batches (about 256k). Big tuples will be cut into pieces by the sender and assembled into a whole by the receiving backend.
7.4.6. Streams Flow Control #
It may happen that when the next message is received from a backend, it will not fit the target network buffer. Or the message received from the network does not fit into the target shared memory queue. In such a case, the stream that caused this situation will be “suspended”. This means that the silkroad
pauses the reaction to events from the source endpoint (connection or backend) until the target endpoint drains their messages. The rest backends and connections not affected by this route are kept working. Receiving modules of backends are designed to minimize these situations. The backend periodically checks and drains the incoming queue even when the plan executor is busy processing other plan nodes. Received tuples are stored in backend's tuplestores according the plan nodes until the executor requests the next tuple for a particular plan node execution.
When enough space is freed on the target queue, the suspended stream gets resumed, endpoint's events get unblocked and the process of receiving and sorting packets continues.
7.4.7. Implementation details #
7.4.7.1. State Transferring and CSNs #
When postgres_fdw works over Silk transport, only one connection between silkroad
routing daemons is used to transfer user requests to silkworm
workers and get their responses. Each request contains a transaction state, a replication group ID of the node where the request is formed (coordinator), a query itself and query parameters (if present). A response is either an error response message with a specific error message and error code or a bunch of tuples followed by “end of tuples” message. This means that silkworm
has to switch to the transaction state coming with the request prior to executing the request.
For now, Silk transport is used only for read-only SELECT
queries. All modifying requests are processed via a usual libpq connection and handled mostly as all other DML requests in PostgreSQL postgres_fdw. The only distinction is that when a DML request is processed by postgres_fdw, it resets the saved transaction state for the connection cache entry corresponding to the connection where this request is sent. Also a read-only flag is set to false for such a connection cache entry. When a request is sent over Silk transport, Shardman extension asks for the transaction state for a pair of serverid and userid from postgres_fdw. If such a connection cache entry is found in the postgres_fdw connection cache, it is not a read-only cache entry and transaction state is present in this entry, the state is returned. If it is not present, postgres_fdw retreives a full transaction state from the remote server, saves it in the connection cache entry and returns to the Shardman extension.
The full transaction state is similar to the parallel worker transaction state and contains:
information related to the current user (uid, username)
pid of the current backend
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
backend private state:
array of ComboCIDs
internal transaction state (full transaction ID, isolation level, current command ID, etc.)
information about reindexed indexes
If the connection is not found in the postgres_fdw connection cache (i.e., it is a new connection) or the entry in the connection cache is marked as read-only, only these characteristics form the transaction state:
information related to the current user (username)
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
Using such transaction states, silkworm
can attach to a running transaction or start a new read-only transaction with the provided snapshot CSN
and retreive the result.
Note that the full transaction state can be imported only on the server that exported it. Also note that due to this transaction state transferring method, you cannot use Silk transport without enabling CSN
snapshots.
7.4.7.2. Integration with Asynchronous FDW Engine #
In the Section 7.2.2, asynchronous ForeignScan
plan nodes were presented as a way to optimize data retrieval from multiple hosts while these plan nodes were located under a single Append
node. In the standard PostgreSQL architecture, the execution of ForeignScan
plan nodes is implemented using the network protocol based on libpq. To improve the system performance during data transfer and reduce resource consumption, Shardman employs a different method for exchanging data with remote hosts. The mechanism for executing ForeignScan
nodes is implemented using the Silk protocol.
To incorporate Silk transport into the asynchronous executor, modifications were made to the postgres_fdw extension. A pluggable transport was implemented as a set of interface functions included as part of the Shardman extension. During execution of callbacks that interact with remote hosts, these functions are called by the postgres_fdw extension. The pluggable Silk transport is activated if the Shardman extension is preloaded and if the foreign server has the attribute extended_features
(applicable for any FDW server in the Shardman cluster). For all other cases, the postgres_fdw extension uses the standard exchange protocol based on libpq.
To disable the pluggable Silk transport in the Shardman cluster, it is necessary to set the query_engine_mode
configuration parameter to the value of ENGINE_NONE
.
In the current implementation, the pluggable Silk transport is only used for read-only queries, specifically during the execution of the ForeignScan
node. The standard exchange protocol based on libpq is used for modifying queries.
When receiving query execution result rows using the Silk transport, the data is stored in a TupleStoreState
storage as a complete result set, which is the same size as that returned by the remote host. The TupleStoreState
is implemented as a data structure that can spill data to the disk in case of memory shortage. If the remote host returns a large result set, it does not lead to an out-of-memory (OOM) condition. Once the result set is received in the TupleStoreState
, the data is copied into the ForeignScan
executor's in-memory buffer. The size of this buffer is defined by the fetch_size
attribute of the foreign server. The default value of 50000
rows can be adjusted to find a balance between the performance (number of ForeignScan
node calls) and memory consumption.
Utilizing the pluggable Silk transport for the asynchronous FDW engine results in an increase of the network exchange performance and a reduction of the system resource consumption due to better utilization of system resources, including the number of network connections.