70.2. Silk #

70.2.1. Concept #

Silk (Postgres Pro Shardman InterLinK) is a transport feature used for inter-cluster communications. It is injected at the point where postgres_fdw decides to transmit deparsed piece of query through libpq connection to the remote node, replacing libpq connection with itself. It is designed to decrease the count of idle postgres_fdw connections during transaction execution, minimize latency and boost overall throughput.

Silk implementation uses several background processes. The main routing/multiplexing process (one per PostgreSQL instance), called silkroad, and a bunch of background workers, called silkworms. While postgres_fdw uses libpq, it spawns multiple libpq connections from each backend to the remote node (where multiple backend processes are spawned accordingly). But if silk replaces libpq - every silkroad process is connected to only one remote silkroad. In this scheme, remote silkworms play the role of remote backends otherwise spawned by postgres_fdw.

Silkroad wires local backend with remote node's workers this way:

  1. Backend process uses regular postgres_fdw API to access remote data as usual. But postgres_fdw, when silk is enabled, writes the query into shared memory queue instead of libpq connection;

  2. Silkroad process parses incoming shared memory queue from that backend and routes the message to appropriate network connection with remote silkroad process.

  3. Remote silkroad process grabs incoming message from network and (if it is a new one) redirects it to available worker's shared memory queue (or in a special "unassigned jobs" queue if all of the workers are busy).

  4. At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.

Silkroad acts here like a common network switch, tossing packets between backend's shared memory and appropriate network socket. It knows nothing about content of a message relying only on the message header.

70.2.2. Event Loop #

Silkroad process runs an event loop powered by the libev library. Each backend's shared memory queue is exposed at the event loop with the eventfd descriptor, and each network connection - with a socket descriptor.

During startup, the backend registers itself (its eventfd descriptors) at a local silkroad process. Silkroad responds by specifying which memory segments to use for the backend's message queue. From this moment silkroad will respond to events from the queue associated with this backend. Network connections between local and remote silkroads will be established at once on the first request from the backend to the remote node and stay alive until both of participants (silkroad processes) exist.

70.2.3. Routing and Multiplexing #

For each subquery, we expect a subset of tuples, and therefore represent the interaction within the subquery as a bidirectional data stream. Silkroad uses an internal routing table to register these streams. A unique stream ID (within the Postgres Pro Shardman cluster) is formed as a pair of "origin node address, target node address" and a locally (within the node) unique number. Each particular subquery from a backend to remote nodes will be registered by silkroad as such a stream. So, any backend can be associated with many streams at the time.

When a local silkroad process got a message with a new stream ID from backend, it registers it in a local routing table and then redirects this message to an appropriate socket. If the connection with the remote silkroad does not exist, it is established using a handshake procedure. The original message that initiated a handshake is placed into a special internal buffer until the handshake succeeds. The remote silkroad process receiving a packet with the new ID registers it in its own table, then assigns a silkworm worker from a pool of available workers and places the message into the worker's shared memory queue. If all of the silkworm workers are busy at the moment, the message will be postponed, i.e., placed into a special "unassigned jobs queue" (note that the configuration parameter shardman.silk_unassigned_job_queue_size is 1024). If there is no free space in the queue, an error message will be generated and sent back to the source backend. A job from this queue will be assigned later to the first available worker when it gets rid of the previous job.

When the worker got a new job, it executes it through SPI subsystem, organizing result tuples into batches and sends them back through shared memory to the local silkroad process. The rest is trivial due to the whole route is known. The last resulting packet with tuples in a stream is marked as closing. It is an order to silkroads to wipe out this route from their tables.

Note that backend and remote workers stay subscribed to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify the remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.

To observe the current state of the silkroad multiplexer process, Silk diagnostics views are available.

70.2.4. Error Handling and Route Integrity #

Besides the routing table silkroad tracks endpoints (backends and network connections) that were involved in some particular stream. So when some connection is closed, all the involved backends (and/or workers) will be notified of that event with a special error message, and all routes/streams related to this connection will be dismissed. The same way, if the backend crashes, its shared memory queue become detached and silkroad reacts by sending error messages to remote participants of every stream related to the crashed backend. So remote workers are not left doing useless work when the requester has already died.

70.2.5. Data Transmitting/batching/splitting Oversized Tuples #

The resulting tuples are transmitted by silkworm in a native binary mode. Tuples with external storage attribute will be deTOASTed, but those that were compressed stay compressed.

Small tuples will be organized in batches (about 256k). Big tuples will be cut into pieces by the sender and assembled into a whole by the receiving backend.

70.2.6. Streams Flow Control #

It may happen that when the next message is received from a backend, it will not fit the target network buffer. Or the message received from the network does not fit into the target shared memory queue. In such a case, the stream that caused this situation will be suspended. This means that the silkroad pauses the reaction to events from the source endpoint (connection or backend) until the target endpoint drains their messages. The rest backends and connections not affected by this route are kept working. Receiving modules of backends are designed to minimize these situations. The backend periodically checks and drains the incoming queue even when the plan executor is busy processing other plan nodes. Received tuples are stored in backend's tuplestores according the plan nodes until the executor requests the next tuple for a particular plan node execution.

When enough space is freed on the target queue, the suspended stream gets resumed, endpoint's events get unblocked and the process of receiving and sorting packets continues.

70.2.7. Implementation details #

70.2.7.1. State Transferring and CSNs #

When postgres_fdw works over Silk transport, only one connection between silkroad routing daemons is used to transfer user requests to silkworm workers and get their responses. Each request contains a transaction state, a replication group ID of the node where the request is formed (coordinator), a query itself and query parameters (if present). A response is either an error response message with a specific error message and error code or a bunch of tuples followed by end of tuples message. This means that silkworm has to switch to the transaction state coming with the request prior to executing the request.

For now, Silk transport is used only for read-only SELECT queries. All modifying requests are processed via a usual libpq connection and handled mostly as all other DML requests in PostgreSQL postgres_fdw. The only distinction is that when a DML request is processed by postgres_fdw, it resets the saved transaction state for the connection cache entry corresponding to the connection where this request is sent. Also a read-only flag is set to false for such a connection cache entry. When a request is sent over Silk transport, Postgres Pro Shardman extension asks for the transaction state for a pair of serverid and userid from postgres_fdw. If such a connection cache entry is found in the postgres_fdw connection cache, it is not a read-only cache entry and transaction state is present in this entry, the state is returned. If it is not present, postgres_fdw retreives a full transaction state from the remote server, saves it in the connection cache entry and returns to the Postgres Pro Shardman extension.

The full transaction state is similar to the parallel worker transaction state and contains:

  • information related to the current user (uid, username)

  • pid of the current backend

  • transaction start timestamp

  • current snapshot CSN

  • flags indicating that invalidation messages are present

  • set of configuration settings, set in current session (including the application name)

  • backend private state:

    • array of ComboCIDs

    • internal transaction state (full transaction ID, isolation level, current command ID, etc.)

    • information about reindexed indexes

If the connection is not found in the postgres_fdw connection cache (i.e., it is a new connection) or the entry in the connection cache is marked as read-only, only these characteristics form the transaction state:

  • information related to the current user (username)

  • transaction start timestamp

  • current snapshot CSN

  • flags indicating that invalidation messages are present

  • set of configuration settings, set in current session (including application name)

Using such transaction states, silkworm can attach to a running transaction or start a new read-only transaction with the provided snapshot CSN and retreive the result.

Note that the full transaction state can be imported only on the server that exported it. Also note that due to this transaction state transferring method, you cannot use Silk transport without enabling CSN snapshots.

70.2.7.2. Integration with Asynchronous FDW Engine #

In the Section 53.2.2, asynchronous ForeignScan plan nodes were presented as a way to optimize data retrieval from multiple hosts while these plan nodes were located under a single Append node. In the standard PostgreSQL architecture, the execution of ForeignScan plan nodes is implemented using the network protocol based on libpq. To improve the system performance during data transfer and reduce resource consumption, Postgres Pro Shardman employs a different method for exchanging data with remote hosts. The mechanism for executing ForeignScan nodes is implemented using the Silk protocol.

To incorporate Silk transport into the asynchronous executor, modifications were made to the postgres_fdw extension. A pluggable transport was implemented as a set of interface functions included as part of the Postgres Pro Shardman extension. During execution of callbacks that interact with remote hosts, these functions are called by the postgres_fdw extension. The pluggable Silk transport is activated if the Postgres Pro Shardman extension is preloaded and if the foreign server has the attribute extended_features (applicable for any FDW server in the Postgres Pro Shardman cluster). For all other cases, the postgres_fdw extension uses the standard exchange protocol based on libpq.

To disable the pluggable Silk transport for query execution, it is necessary to set the query_engine_mode configuration parameter to the value of ENGINE_NONE.

In the current implementation, the pluggable Silk transport is only used for read-only queries, specifically during the execution of the ForeignScan node. The standard exchange protocol based on libpq is used for modifying queries.

When receiving query execution result rows using the Silk transport, the data is stored in a TupleStoreState storage as a complete result set, which is the same size as that returned by the remote host. The TupleStoreState is implemented as a data structure that can spill data to the disk in case of memory shortage. If the remote host returns a large result set, it does not lead to an out-of-memory (OOM) condition. Once the result set is received in the TupleStoreState, the data is copied into the ForeignScan executor's in-memory buffer. The size of this buffer is defined by the fetch_size attribute of the foreign server. The default value of 50000 rows can be adjusted to find a balance between the performance (number of ForeignScan node calls) and memory consumption.

Utilizing the pluggable Silk transport for the asynchronous FDW engine results in an increase of the network exchange performance and a reduction of the system resource consumption due to better utilization of system resources, including the number of network connections.

70.2.8. Multiplexor Diagnostics Views #

Views in this section provide various information related to Silk multiplexing. See Section 70.2.3 for details of silkroad multiplexing process.

70.2.8.1. shardman.silk_routes #

The shardman.silk_routes view displays the current snapshot of the multiplexer routing table. The columns of the view are shown in Table 70.1.

Table 70.1. shardman.silk_routes Columns

NameTypeDescription
hashvalueintegerInternal unique route identifier. Can be used to join with other Silk diagnostics views.
origin_ipinetIP address of the source node, which generated this route
origin_portint2External TCP connection port of the source node, which generated this route
channel_idintegerRoute sequential number within the node that generated this route. channel_id is unique for the pair origin_ip + origin_port. This pair is a unique node identifier within the Postgres Pro Shardman cluster and hence the origin_ip + origin_port + channel_id tuple is a unique route identifier within the Postgres Pro Shardman cluster.
from_cnintegerConnect index in the shardman.silk_connects view for incoming routes, that is, not generated by this node, and -1 for routes generated by this node.
backend_idintegerID of the local process that is currently using this route: either the ID of the backend that generated this route or the ID of the silkworm worker assigned to this route. Equals -1 for queued incoming routes that have not been assigned a worker yet.
pending_queue_bytesbigintSize of the queue of delayed messages (awaiting a free worker) for this route, in bytes. This value is only meaningful for incoming routes of each node that are not assigned to a worker yet.
pending_queue_messagesbigintNumber of messages in the queue of delayed messages (awaiting a free worker) for this route. This value is only meaningful for incoming routes of each node that are not assigned to a worker yet.
connectsinteger[]List of indexes of connects that are currently using this route.

70.2.8.2. shardman.silk_connects #

The shardman.silk_connects view displays the current list of multiplexer connects. The columns of the view are shown in Table 70.2.

Table 70.2. shardman.silk_connects Columns

NameTypeDescription
cn_indexintegerUnique connect index
reg_ipinetRegistration IP address of the node with which the connection is established. See Notes for details.
reg_portint2Registration TCP port of the node with which the connection is established. See Notes for details.
read_ev_activebooleantrue if the multiplexer is ready to receive data to the incoming queue. See Notes for details.
write_ev_activebooleantrue if the multiplexer filled the queue of non-sent messages and is waiting for it to get free. See Notes for details.
is_outgoingbooleantrue if the connection is outgoing, that is, created by connect, and false for incoming connects, that is, created by accept. Only used during the handshaking.
statetextCurrent state of the connect: connected — if the connection is established, in progress — if the client has already connected, but handshaking has not happened yet, free — if the client has already disconnected, but the connect structure for the disconnected client has not been destroyed yet.
pending_queue_bytesbigintSize of the queue of non-sent messages for this connect, in bytes
pending_queue_messagesbigintNumber of messages in the queue of non-sent messages for this connect
blocked_by_backendintegerID of the backend that blocked this connect
blocks_backendsinteger[]List of IDs of backends that are blocked by this connect
routesinteger[]List of unique IDs of routes that use this connect
elapsed_time_writebigintTime from the last writing event of a connect
elapsed_time_readbigintTime from the last reading event of a connect

70.2.8.3. shardman.silk_backends #

The shardman.silk_backends view displays the current list of processes of two kinds: backends that serve client connections and silkworm multiplexer workers, which interact with the multiplexer. The columns of the view are shown in Table 70.3.

Table 70.3. shardman.silk_backends Columns

NameTypeDescription
backend_idintegerUnique backend/worker identifier
pidintegerOS process ID
attachedbooleanValue is true if backend is attached to multiplexer, false otherwis
read_ev_activebooleantrue if the backend/worker is ready to receive data to the incoming queue. See Notes for details.
write_ev_activebooleantrue if the backend/worker filled the queue of non-sent messages and is waiting for it to get free. See Notes for details.
is_workerbooleantrue if this process is a silkworm multiplexer worker and false otherwise
pending_queue_bytesbigintSize of the queue of messages being sent to this backend/worker, in bytes
pending_queue_messagesbigintNumber of messages in the queue of messages being sent to this backend/worker
blocked_by_connectintegerIndex of the connect that blocks this backend/worker
blocks_connectsinteger[]List of indexes of connects that are blocked by this backend/worker
routesinteger[]List of unique IDs of routes that are used by this backend/worker
in_queue_usedbigintNumber of queued data bytes in the incoming queue in the shared memory between the backend and multiplexer
out_queue_usedbigintNumber of queued data bytes in the outgoing queue in the shared memory between the backend and multiplexer
elapsed_time_writebigintTime from the last writing event of a backend
elapsed_time_readbigintTime from the last reading event of backend

70.2.8.4. shardman.silk_routing #

The shardman.silk_routing view displays the results of the shardman.silk_routing function. Table 70.4.

Table 70.4. shardman.silk_routing Columns

NameTypeDescription
hashvalueintegerInternal unique route identifier
origin_ipinetIP address of the node that generated this route
origin_portint2External TCP connection port of the source node that generated this route
channel_idintegerRoute sequential number within the node that generated this route
is_outgoingbooleantrue if this route was produced by the outgoing network connection, false if it was produced by the incoming network connection.
pending_queue_bytesbigint Pending queue size, in bytes
pending_queue_messagesbigint Number of pending queue messages
backend_idintegerID of the local process that is currently using this route: either the ID of the backend that generated this route or the ID of the silkworm worker assigned to this route. Equals -1 for queued incoming routes that have not been assigned a worker yet.
backend_pidinteger Returns the process ID of the server process attached to the current session
attachedbooleanValue is true if backend is attached to multiplexer, false otherwis
backend_rd_active booleantrue if the backend/worker is ready to receive data to the incoming queue. See Notes for details.
backend_wr_active booleantrue if the backend/worker filled the queue of non-sent messages and is waiting for it to get free. See Notes for details.
is_workerbooleantrue if this process is a silkworm multiplexer worker and false otherwise
backend_blocked_by_cn integerIndex of the connect that blocks this backend/worker
blocks_connectsinteger[]List of indexes of connects that are blocked by this backend/worker
in_queue_usedbigintNumber of queued data bytes in the incoming queue in the shared memory between the backend and multiplexer
out_queue_usedbigintNumber of queued data bytes in the outgoing queue in the shared memory between the backend and multiplexer
connect_idintegerUnique connect index
reg_ipinetRegistration IP address of the node with which the connection is established
reg_portint2Registration TCP port of the node with which the connection is established
connect_rd_activebooleantrue if the multiplexer is ready to receive data to the incoming queue
connect_wr_activebooleantrue if the multiplexer filled the queue of non-sent messages and is waiting for it to get free
connect_is_outgoingbooleantrue if the connection is outgoing, that is, created by connect, and false for incoming connects, that is, created by accept. Only used during the handshaking.
connect_statetextCurrent state of the connect: connected — if the connection is established, in progress — if the client has already connected, but handshaking has not happened yet, free — if the client has already disconnected, but the connect structure for the disconnected client has not been destroyed yet
connect_outgoing_queue_bytesbigintSize of the queue of non-sent messages for this connect, in bytes
connect_outgoing_queue_messagesbigintNumber of messages in the queue of non-sent messages for this connect
connect_blocked_by_bkintegerID of the backend that blocked this connect
blocks_backendsinteger[]List of IDs of backends that are blocked by this connect
connect_elapsed_time_writebigintTime from the last writing event of a connect
connect_elapsed_time_readbigintTime from the last reading event of a connect
backend_elapsed_time_writebigintTime from the last writing event of a backend
backend_elapsed_time_readbigintTime from the last reading event of a backend

70.2.8.5. shardman.silk_pending_jobs #

The shardman.silk_pending_jobs view displays the current list of routes in the queue of delayed multiplexer jobs, that is, jobs that are not assigned to workers yet. The columns of the view are shown in Table 70.5.

Table 70.5. shardman.silk_pending_jobs Columns

NameTypeDescription
hashvalueintegerInternal unique route identifier
origin_ipinetIP address of the node that generated this route
origin_portint2TCP connection port of the node that generated this route
channel_idintegerRoute sequential number within the node that generated this route
querytext The first queued message
pending_queue_bytesbigint Pending queue size, in bytes
pending_queue_messagesbigint Number of pending queue messages

70.2.8.6. shardman.silk_statinfo #

The shardman.silk_statinfo view displays the current multiplexer state information. The columns of the view are shown in Table 70.6.

Table 70.6. shardman.silk_statinfo Columns

NameTypeDescription
pidintegersilkroad process ID
started_attimestamp with time zoneTime when the silkroad backend was started.
transferred_bytesjson JSON object of key value pairs, where the key is the name of the message type, and the value is total number of bytes sent for the message types with at least one message sent
transferred_pktsjsonJSON object of key value pairs, where the key is the name of the message type, and the value is the total number of sent messages for the message types with at least one message sent
transferred_maxjsonJSON object of key value pairs, where the key is the name of the message type, and the value is the maximum size of a message for the message types with at least one message sent
memcxt_dpg_allocatedbigintThe mem_allocated value of the process in DPGMemoryContext
memcxt_top_allocatedbigint The mem_allocated value of the process in TopMemoryContext
read_efd_maxbigintMaximum reading time of the eventfd since reset
write_efd_maxbigintMaximum writing time of the eventfd since reset
read_efd_totalbigintTotal reading time of the eventfd since reset
write_efd_totalbigintTotal writing time of the eventfd since reset
read_efd_countbigintTotal number of reading events of the eventfd since reset
write_efd_countbigintTotal number of writing events of the eventfd since reset
sort_time_maxbigintMaximum time of sorting operations with the silk_flow_control enabled (any value other than none)
sort_time_totalbigintTotal time of sorting operations with the silk_flow_control enabled (any value other than none)
sort_time_countbigintTotal number of the sorting operations with the silk_flow_control enabled (any value other than none)

Note that read_efd_max, write_efd_max, read_efd_total, write_efd_total, read_efd_count, write_efd_count, sort_time_max, sort_time_total, and sort_time_count are only calculated if the shardman.silk_track_time configuration parameter is enabled.

70.2.8.7. shardman.silk_state #

The shardman.silk_state view displays the current silkroad process state. The columns of the view are shown in Table 70.7.

Table 70.7. shardman.silk_state Columns

NameTypeDescription
statetextState of the silkroad process

70.2.8.8. Notes #

reg_ip and reg_port values are not actual network addresses, but the addresses by which the multiplexer accesses the node. They are determined during a handshake between multiplexer nodes and are equal to the corresponding parameters of an appropriate server in the pg_foreign_server table.

All the read_ev_active values are true and all the write_ev_active values are false when the multiplexer is in the idle state.

70.2.8.9. Global Views #

Postgres Pro Shardman has a list of global views based on the corresponding local views. The definition of global view columns is the same as in its corresponding local view. Fetching from a global view returns a union of rows from the corresponding local views. The rows are fetched from each of their cluster nodes. Another difference is that the global views have an added column rgid. The rgid value shows the replication group ID of the cluster node from which a row is fetched.

Below is the list of the global views with links to their corresponding local views:

Table 70.8. Silk-related global and local views

Global viewLocal viewDescription
shardman.gv_silk_routesshardman.silk_routesOne row showing the current snapshot of the multiplexer routing table.
shardman.gv_silk_connectsshardman.silk_connectsOne row showing the current list of multiplexer connects.
shardman.gv_silk_backendsshardman.silk_backendsOne row showing the current list of processes of two kinds: backends that serve client connections and silkworm multiplexer workers, which interact with the multiplexer.
shardman.gv_silk_pending_jobsshardman.silk_pending_jobsOne row showing the current list of routes in the queue of multiplexer jobs that are not assigned to workers yet.
shardman.gv_silk_routingshardman.silk_routingOne row showing the results of the shardman.silk_routing function.

70.2.9. Functions #

shardman.silk_statinfo_reset() #

Resets the values of the metrics with prefix transferred_ and time-based metrics (with prefixes read_efd_, write_efd_, and sort_time_) in the shardman.silk_statinfo view.

shardman.silk_routing() #

Retrieves the results of the multiplexer silk_connects, silk_backends, and silk_routes functions.

shardman.silk_rbc_snap() #

Retrieves a consistent snapshot of all the connects, backends and routes that can be used by silk_connects, silk_backends, and silk_routes functions.