70.2. Silk #
70.2.1. Concept #
Silk (Postgres Pro Shardman InterLinK) is a transport feature used for inter-cluster communications. It is injected at the point where postgres_fdw
decides to transmit deparsed piece of query through libpq
connection to the remote node, replacing libpq
connection with itself. It is designed to decrease the count of idle postgres_fdw
connections during transaction execution, minimize latency and boost overall throughput.
Silk implementation uses several background processes. The main routing/multiplexing process (one per PostgreSQL instance), called silkroad
, and a bunch of background workers, called silkworms
. While postgres_fdw
uses libpq
, it spawns multiple libpq
connections from each backend to the remote node (where multiple backend processes are spawned accordingly). But if silk
replaces libpq
- every silkroad
process is connected to only one remote silkroad
. In this scheme, remote silkworms
play the role of remote backends otherwise spawned by postgres_fdw
.
Silkroad
wires local backend with remote node's workers this way:
Backend process uses regular
postgres_fdw
API to access remote data as usual. Butpostgres_fdw
, when silk is enabled, writes the query into shared memory queue instead oflibpq
connection;Silkroad
process parses incoming shared memory queue from that backend and routes the message to appropriate network connection with remotesilkroad
process.Remote
silkroad
process grabs incoming message from network and (if it is a new one) redirects it to available worker's shared memory queue (or in a special "unassigned jobs" queue if all of the workers are busy).At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.
Silkroad
acts here like a common network switch, tossing packets between backend's shared memory and appropriate network socket. It knows nothing about content of a message relying only on the message header.
70.2.2. Event Loop #
Silkroad
process runs an event loop powered by the libev
library. Each backend's shared memory queue is exposed at the event loop with the eventfd
descriptor, and each network connection - with a socket descriptor.
During startup, the backend registers itself (its eventfd
descriptors) at a local silkroad
process. Silkroad
responds by specifying which memory segments to use for the backend's message queue. From this moment silkroad
will respond to events from the queue associated with this backend. Network connections between local and remote silkroads
will be established at once on the first request from the backend to the remote node and stay alive until both of participants (silkroad
processes) exist.
70.2.3. Routing and Multiplexing #
For each subquery, we expect a subset of tuples, and therefore represent the interaction within the subquery as a bidirectional data stream. Silkroad
uses an internal routing table to register these streams. A unique stream ID (within the Postgres Pro Shardman cluster) is formed as a pair of "origin node address, target node address" and a locally (within the node) unique number. Each particular subquery from a backend to remote nodes will be registered by silkroad
as such a stream. So, any backend can be associated with many streams at the time.
When a local silkroad
process got a message with a new stream ID from backend, it registers it in a local routing table and then redirects this message to an appropriate socket. If the connection with the remote silkroad
does not exist, it is established using a handshake procedure. The original message that initiated a handshake is placed into a special internal buffer until the handshake succeeds. The remote silkroad
process receiving a packet with the new ID registers it in its own table, then assigns a silkworm
worker from a pool of available workers and places the message into the worker's shared memory queue. If all of the silkworm
workers are busy at the moment, the message will be postponed, i.e., placed into a special "unassigned jobs queue" (note that the configuration parameter shardman.silk_unassigned_job_queue_size
is 1024). If there is no free space in the queue, an error message will be generated and sent back to the source backend. A job from this queue will be assigned later to the first available worker when it gets rid of the previous job.
When the worker got a new “job”, it executes it through SPI
subsystem, organizing result tuples into batches and sends them back through shared memory to the local silkroad
process. The rest is trivial due to the whole route is known. The last resulting packet with tuples in a stream is marked as “closing”. It is an order to silkroads
to wipe out this route from their tables.
Note that backend and remote workers stay “subscribed” to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify the remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.
To observe the current state of the silkroad
multiplexer process, Silk diagnostics views are available.
70.2.4. Error Handling and Route Integrity #
Besides the routing table silkroad
tracks endpoints (backends and network connections) that were involved in some particular stream. So when some connection is closed, all the involved backends (and/or workers) will be notified of that event with a special error message, and all routes/streams related to this connection will be dismissed. The same way, if the backend crashes, its shared memory queue become detached and silkroad
reacts by sending error messages to remote participants of every stream related to the crashed backend. So remote workers are not left doing useless work when the requester has already died.
70.2.5. Data Transmitting/batching/splitting Oversized Tuples #
The resulting tuples are transmitted by silkworm
in a native binary mode. Tuples with external
storage attribute will be deTOASTed, but those that were compressed stay compressed.
Small tuples will be organized in batches (about 256k). Big tuples will be cut into pieces by the sender and assembled into a whole by the receiving backend.
70.2.6. Streams Flow Control #
It may happen that when the next message is received from a backend, it will not fit the target network buffer. Or the message received from the network does not fit into the target shared memory queue. In such a case, the stream that caused this situation will be “suspended”. This means that the silkroad
pauses the reaction to events from the source endpoint (connection or backend) until the target endpoint drains their messages. The rest backends and connections not affected by this route are kept working. Receiving modules of backends are designed to minimize these situations. The backend periodically checks and drains the incoming queue even when the plan executor is busy processing other plan nodes. Received tuples are stored in backend's tuplestores according the plan nodes until the executor requests the next tuple for a particular plan node execution.
When enough space is freed on the target queue, the suspended stream gets resumed, endpoint's events get unblocked and the process of receiving and sorting packets continues.
70.2.7. Implementation details #
70.2.7.1. State Transferring and CSNs #
When postgres_fdw works over Silk transport, only one connection between silkroad
routing daemons is used to transfer user requests to silkworm
workers and get their responses. Each request contains a transaction state, a replication group ID of the node where the request is formed (coordinator), a query itself and query parameters (if present). A response is either an error response message with a specific error message and error code or a bunch of tuples followed by “end of tuples” message. This means that silkworm
has to switch to the transaction state coming with the request prior to executing the request.
For now, Silk transport is used only for read-only SELECT
queries. All modifying requests are processed via a usual libpq connection and handled mostly as all other DML requests in PostgreSQL postgres_fdw. The only distinction is that when a DML request is processed by postgres_fdw, it resets the saved transaction state for the connection cache entry corresponding to the connection where this request is sent. Also a read-only flag is set to false for such a connection cache entry. When a request is sent over Silk transport, Postgres Pro Shardman extension asks for the transaction state for a pair of serverid and userid from postgres_fdw. If such a connection cache entry is found in the postgres_fdw connection cache, it is not a read-only cache entry and transaction state is present in this entry, the state is returned. If it is not present, postgres_fdw retreives a full transaction state from the remote server, saves it in the connection cache entry and returns to the Postgres Pro Shardman extension.
The full transaction state is similar to the parallel worker transaction state and contains:
information related to the current user (uid, username)
pid of the current backend
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
set of configuration settings, set in current session (including the application name)
backend private state:
array of ComboCIDs
internal transaction state (full transaction ID, isolation level, current command ID, etc.)
information about reindexed indexes
If the connection is not found in the postgres_fdw connection cache (i.e., it is a new connection) or the entry in the connection cache is marked as read-only, only these characteristics form the transaction state:
information related to the current user (username)
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
set of configuration settings, set in current session (including application name)
Using such transaction states, silkworm
can attach to a running transaction or start a new read-only transaction with the provided snapshot CSN
and retreive the result.
Note that the full transaction state can be imported only on the server that exported it. Also note that due to this transaction state transferring method, you cannot use Silk transport without enabling CSN
snapshots.
70.2.7.2. Integration with Asynchronous FDW Engine #
In the Section 53.2.2, asynchronous ForeignScan
plan nodes were presented as a way to optimize data retrieval from multiple hosts while these plan nodes were located under a single Append
node. In the standard PostgreSQL architecture, the execution of ForeignScan
plan nodes is implemented using the network protocol based on libpq. To improve the system performance during data transfer and reduce resource consumption, Postgres Pro Shardman employs a different method for exchanging data with remote hosts. The mechanism for executing ForeignScan
nodes is implemented using the Silk protocol.
To incorporate Silk transport into the asynchronous executor, modifications were made to the postgres_fdw extension. A pluggable transport was implemented as a set of interface functions included as part of the Postgres Pro Shardman extension. During execution of callbacks that interact with remote hosts, these functions are called by the postgres_fdw extension. The pluggable Silk transport is activated if the Postgres Pro Shardman extension is preloaded and if the foreign server has the attribute extended_features
(applicable for any FDW server in the Postgres Pro Shardman cluster). For all other cases, the postgres_fdw extension uses the standard exchange protocol based on libpq.
To disable the pluggable Silk transport for query execution, it is necessary to set the query_engine_mode
configuration parameter to the value of ENGINE_NONE
.
In the current implementation, the pluggable Silk transport is only used for read-only queries, specifically during the execution of the ForeignScan
node. The standard exchange protocol based on libpq is used for modifying queries.
When receiving query execution result rows using the Silk transport, the data is stored in a TupleStoreState
storage as a complete result set, which is the same size as that returned by the remote host. The TupleStoreState
is implemented as a data structure that can spill data to the disk in case of memory shortage. If the remote host returns a large result set, it does not lead to an out-of-memory (OOM) condition. Once the result set is received in the TupleStoreState
, the data is copied into the ForeignScan
executor's in-memory buffer. The size of this buffer is defined by the fetch_size
attribute of the foreign server. The default value of 50000
rows can be adjusted to find a balance between the performance (number of ForeignScan
node calls) and memory consumption.
Utilizing the pluggable Silk transport for the asynchronous FDW engine results in an increase of the network exchange performance and a reduction of the system resource consumption due to better utilization of system resources, including the number of network connections.
70.2.8. Multiplexor Diagnostics Views #
Views in this section provide various information related to Silk multiplexing. See Section 70.2.3 for details of silkroad
multiplexing process.
70.2.8.1. shardman.silk_routes
#
The shardman.silk_routes
view displays the current snapshot of the multiplexer routing table. The columns of the view are shown in Table 70.1.
Table 70.1. shardman.silk_routes
Columns
Name | Type | Description |
---|---|---|
hashvalue | integer | Internal unique route identifier. Can be used to join with other Silk diagnostics views. |
origin_ip | inet | IP address of the source node, which generated this route |
origin_port | int2 | External TCP connection port of the source node, which generated this route |
channel_id | integer | Route sequential number within the node that generated this route. channel_id is unique for the pair origin_ip + origin_port . This pair is a unique node identifier within the Postgres Pro Shardman cluster and hence the origin_ip + origin_port + channel_id tuple is a unique route identifier within the Postgres Pro Shardman cluster. |
from_cn | integer | Connect index in the shardman.silk_connects view for incoming routes, that is, not generated by this node, and -1 for routes generated by this node. |
backend_id | integer | ID of the local process that is currently using this route: either the ID of the backend that generated this route or the ID of the silkworm worker assigned to this route. Equals -1 for queued incoming routes that have not been assigned a worker yet. |
pending_queue_bytes | bigint | Size of the queue of delayed messages (awaiting a free worker) for this route, in bytes. This value is only meaningful for incoming routes of each node that are not assigned to a worker yet. |
pending_queue_messages | bigint | Number of messages in the queue of delayed messages (awaiting a free worker) for this route. This value is only meaningful for incoming routes of each node that are not assigned to a worker yet. |
connects | integer[] | List of indexes of connects that are currently using this route. |
70.2.8.2. shardman.silk_connects
#
The shardman.silk_connects
view displays the current list of multiplexer connects. The columns of the view are shown in Table 70.2.
Table 70.2. shardman.silk_connects
Columns
Name | Type | Description |
---|---|---|
cn_index | integer | Unique connect index |
reg_ip | inet | “Registration” IP address of the node with which the connection is established. See Notes for details. |
reg_port | int2 | “Registration” TCP port of the node with which the connection is established. See Notes for details. |
read_ev_active | boolean | true if the multiplexer is ready to receive data to the incoming queue. See Notes for details. |
write_ev_active | boolean | true if the multiplexer filled the queue of non-sent messages and is waiting for it to get free. See Notes for details. |
is_outgoing | boolean | true if the connection is outgoing, that is, created by connect , and false for incoming connects, that is, created by accept . Only used during the handshaking. |
state | text | Current state of the connect: connected — if the connection is established, in progress — if the client has already connected, but handshaking has not happened yet, free — if the client has already disconnected, but the connect structure for the disconnected client has not been destroyed yet. |
pending_queue_bytes | bigint | Size of the queue of non-sent messages for this connect, in bytes |
pending_queue_messages | bigint | Number of messages in the queue of non-sent messages for this connect |
blocked_by_backend | integer | ID of the backend that blocked this connect |
blocks_backends | integer[] | List of IDs of backends that are blocked by this connect |
routes | integer[] | List of unique IDs of routes that use this connect |
elapsed_time_write | bigint | Time from the last writing event of a connect |
elapsed_time_read | bigint | Time from the last reading event of a connect |
70.2.8.3. shardman.silk_backends
#
The shardman.silk_backends
view displays the current list of processes of two kinds: backends that serve client connections and silkworm
multiplexer workers, which interact with the multiplexer. The columns of the view are shown in Table 70.3.
Table 70.3. shardman.silk_backends
Columns
Name | Type | Description |
---|---|---|
backend_id | integer | Unique backend/worker identifier |
pid | integer | OS process ID |
attached | boolean | Value is true if backend is attached to multiplexer, false otherwis |
read_ev_active | boolean | true if the backend/worker is ready to receive data to the incoming queue. See Notes for details. |
write_ev_active | boolean | true if the backend/worker filled the queue of non-sent messages and is waiting for it to get free. See Notes for details. |
is_worker | boolean | true if this process is a silkworm multiplexer worker and false otherwise |
pending_queue_bytes | bigint | Size of the queue of messages being sent to this backend/worker, in bytes |
pending_queue_messages | bigint | Number of messages in the queue of messages being sent to this backend/worker |
blocked_by_connect | integer | Index of the connect that blocks this backend/worker |
blocks_connects | integer[] | List of indexes of connects that are blocked by this backend/worker |
routes | integer[] | List of unique IDs of routes that are used by this backend/worker |
in_queue_used | bigint | Number of queued data bytes in the incoming queue in the shared memory between the backend and multiplexer |
out_queue_used | bigint | Number of queued data bytes in the outgoing queue in the shared memory between the backend and multiplexer |
elapsed_time_write | bigint | Time from the last writing event of a backend |
elapsed_time_read | bigint | Time from the last reading event of backend |
70.2.8.4. shardman.silk_routing
#
The shardman.silk_routing
view displays the results of the shardman.silk_routing
function. Table 70.4.
Table 70.4. shardman.silk_routing
Columns
Name | Type | Description |
---|---|---|
hashvalue | integer | Internal unique route identifier |
origin_ip | inet | IP address of the node that generated this route |
origin_port | int2 | External TCP connection port of the source node that generated this route |
channel_id | integer | Route sequential number within the node that generated this route |
is_outgoing | boolean | true if this route was produced by the outgoing network connection, false if it was produced by the incoming network connection. |
pending_queue_bytes | bigint | Pending queue size, in bytes |
pending_queue_messages | bigint | Number of pending queue messages |
backend_id | integer | ID of the local process that is currently using this route: either the ID of the backend that generated this route or the ID of the silkworm worker assigned to this route. Equals -1 for queued incoming routes that have not been assigned a worker yet. |
backend_pid | integer | Returns the process ID of the server process attached to the current session |
attached | boolean | Value is true if backend is attached to multiplexer, false otherwis |
backend_rd_active | boolean | true if the backend/worker is ready to receive data to the incoming queue. See Notes for details. |
backend_wr_active | boolean | true if the backend/worker filled the queue of non-sent messages and is waiting for it to get free. See Notes for details. |
is_worker | boolean | true if this process is a silkworm multiplexer worker and false otherwise |
backend_blocked_by_cn | integer | Index of the connect that blocks this backend/worker |
blocks_connects | integer[] | List of indexes of connects that are blocked by this backend/worker |
in_queue_used | bigint | Number of queued data bytes in the incoming queue in the shared memory between the backend and multiplexer |
out_queue_used | bigint | Number of queued data bytes in the outgoing queue in the shared memory between the backend and multiplexer |
connect_id | integer | Unique connect index |
reg_ip | inet | “Registration” IP address of the node with which the connection is established |
reg_port | int2 | “Registration” TCP port of the node with which the connection is established |
connect_rd_active | boolean | true if the multiplexer is ready to receive data to the incoming queue |
connect_wr_active | boolean | true if the multiplexer filled the queue of non-sent messages and is waiting for it to get free |
connect_is_outgoing | boolean | true if the connection is outgoing, that is, created by connect , and false for incoming connects, that is, created by accept . Only used during the handshaking. |
connect_state | text | Current state of the connect: connected — if the connection is established, in progress — if the client has already connected, but handshaking has not happened yet, free — if the client has already disconnected, but the connect structure for the disconnected client has not been destroyed yet |
connect_outgoing_queue_bytes | bigint | Size of the queue of non-sent messages for this connect, in bytes |
connect_outgoing_queue_messages | bigint | Number of messages in the queue of non-sent messages for this connect |
connect_blocked_by_bk | integer | ID of the backend that blocked this connect |
blocks_backends | integer[] | List of IDs of backends that are blocked by this connect |
connect_elapsed_time_write | bigint | Time from the last writing event of a connect |
connect_elapsed_time_read | bigint | Time from the last reading event of a connect |
backend_elapsed_time_write | bigint | Time from the last writing event of a backend |
backend_elapsed_time_read | bigint | Time from the last reading event of a backend |
70.2.8.5. shardman.silk_pending_jobs
#
The shardman.silk_pending_jobs
view displays the current list of routes in the queue of delayed multiplexer jobs, that is, jobs that are not assigned to workers yet. The columns of the view are shown in Table 70.5.
Table 70.5. shardman.silk_pending_jobs
Columns
Name | Type | Description |
---|---|---|
hashvalue | integer | Internal unique route identifier |
origin_ip | inet | IP address of the node that generated this route |
origin_port | int2 | TCP connection port of the node that generated this route |
channel_id | integer | Route sequential number within the node that generated this route |
query | text | The first queued message |
pending_queue_bytes | bigint | Pending queue size, in bytes |
pending_queue_messages | bigint | Number of pending queue messages |
70.2.8.6. shardman.silk_statinfo
#
The shardman.silk_statinfo
view displays the current multiplexer state information. The columns of the view are shown in Table 70.6.
Table 70.6. shardman.silk_statinfo
Columns
Name | Type | Description |
---|---|---|
pid | integer | silkroad process ID |
started_at | timestamp with time zone | Time when the silkroad backend was started. |
transferred_bytes | json | JSON object of key value pairs, where the key is the name of the message type, and the value is total number of bytes sent for the message types with at least one message sent |
transferred_pkts | json | JSON object of key value pairs, where the key is the name of the message type, and the value is the total number of sent messages for the message types with at least one message sent |
transferred_max | json | JSON object of key value pairs, where the key is the name of the message type, and the value is the maximum size of a message for the message types with at least one message sent |
memcxt_dpg_allocated | bigint | The mem_allocated value of the process in DPGMemoryContext |
memcxt_top_allocated | bigint | The mem_allocated value of the process in TopMemoryContext |
read_efd_max | bigint | Maximum reading time of the eventfd since reset |
write_efd_max | bigint | Maximum writing time of the eventfd since reset |
read_efd_total | bigint | Total reading time of the eventfd since reset |
write_efd_total | bigint | Total writing time of the eventfd since reset |
read_efd_count | bigint | Total number of reading events of the eventfd since reset |
write_efd_count | bigint | Total number of writing events of the eventfd since reset |
sort_time_max | bigint | Maximum time of sorting operations with the silk_flow_control enabled (any value other than none ) |
sort_time_total | bigint | Total time of sorting operations with the silk_flow_control enabled (any value other than none ) |
sort_time_count | bigint | Total number of the sorting operations with the silk_flow_control enabled (any value other than none ) |
Note that read_efd_max
, write_efd_max
, read_efd_total
, write_efd_total
, read_efd_count
, write_efd_count
, sort_time_max
, sort_time_total
, and sort_time_count
are only calculated if the shardman.silk_track_time configuration parameter is enabled.
70.2.8.7. shardman.silk_state
#
The shardman.silk_state
view displays the current silkroad
process state. The columns of the view are shown in Table 70.7.
Table 70.7. shardman.silk_state
Columns
Name | Type | Description |
---|---|---|
state | text | State of the silkroad process |
70.2.8.8. Notes #
reg_ip
and reg_port
values are not actual network addresses, but the addresses by which the multiplexer accesses the node. They are determined during a handshake between multiplexer nodes and are equal to the corresponding parameters of an appropriate server in the pg_foreign_server
table.
All the read_ev_active
values are true
and all the write_ev_active
values are false
when the multiplexer is in the idle
state.
70.2.8.9. Global Views #
Postgres Pro Shardman has a list of global views based on the corresponding local views. The definition of global view columns is the same as in its corresponding local view. Fetching from a global view returns a union of rows from the corresponding local views. The rows are fetched from each of their cluster nodes. Another difference is that the global views have an added column rgid
. The rgid
value shows the replication group ID of the cluster node from which a row is fetched.
Below is the list of the global views with links to their corresponding local views:
Table 70.8. Silk-related global and local views
Global view | Local view | Description |
---|---|---|
shardman.gv_silk_routes | shardman.silk_routes | One row showing the current snapshot of the multiplexer routing table. |
shardman.gv_silk_connects | shardman.silk_connects | One row showing the current list of multiplexer connects. |
shardman.gv_silk_backends | shardman.silk_backends | One row showing the current list of processes of two kinds: backends that serve client connections and silkworm multiplexer workers, which interact with the multiplexer. |
shardman.gv_silk_pending_jobs | shardman.silk_pending_jobs | One row showing the current list of routes in the queue of multiplexer jobs that are not assigned to workers yet. |
shardman.gv_silk_routing | shardman.silk_routing | One row showing the results of the shardman.silk_routing function. |
70.2.9. Functions #
-
shardman.silk_statinfo_reset()
# Resets the values of the metrics with prefix
transferred_
and time-based metrics (with prefixesread_efd_
,write_efd_
, andsort_time_
) in theshardman.silk_statinfo
view.-
shardman.silk_routing()
# Retrieves the results of the multiplexer
silk_connects
,silk_backends
, andsilk_routes
functions.-
shardman.silk_rbc_snap()
# Retrieves a consistent snapshot of all the connects, backends and routes that can be used by
silk_connects
,silk_backends
, andsilk_routes
functions.