From f784809cd81ed18ecbd1b3c7e87818a00b671fa0 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 5 Jan 2021 12:36:16 +0530 Subject: [PATCH v8 1/3] postgres_fdw function to discard cached connections This patch introduces new function postgres_fdw_disconnect() when called with a foreign server name discards the associated connections with the server name. When called without any argument, discards all the existing cached connections. This patch also adds another function postgres_fdw_get_connections() to get the list of all cached connections by corresponding foreign server names. --- contrib/postgres_fdw/Makefile | 2 +- contrib/postgres_fdw/connection.c | 313 ++++++++++++- .../postgres_fdw/expected/postgres_fdw.out | 424 +++++++++++++++++- ...dw--1.0.sql => postgres_fdw--1.0--1.1.sql} | 6 +- contrib/postgres_fdw/postgres_fdw--1.1.sql | 33 ++ contrib/postgres_fdw/postgres_fdw.control | 2 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 171 +++++++ doc/src/sgml/postgres-fdw.sgml | 54 +++ 8 files changed, 982 insertions(+), 23 deletions(-) rename contrib/postgres_fdw/{postgres_fdw--1.0.sql => postgres_fdw--1.0--1.1.sql} (60%) create mode 100644 contrib/postgres_fdw/postgres_fdw--1.1.sql diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index ee8a80a392..52d3dac0bd 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) EXTENSION = postgres_fdw -DATA = postgres_fdw--1.0.sql +DATA = postgres_fdw--1.1.sql postgres_fdw--1.0--1.1.sql REGRESS = postgres_fdw diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 266f66cc62..64e1d71514 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -22,6 +22,7 @@ #include "postgres_fdw.h" #include "storage/fd.h" #include "storage/latch.h" +#include "utils/builtins.h" #include "utils/datetime.h" #include "utils/hsearch.h" #include "utils/inval.h" @@ -57,6 +58,8 @@ typedef struct ConnCacheEntry bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ bool invalidated; /* true if reconnect is pending */ + /* Server oid to get the associated foreign server name. */ + Oid serverid; uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; @@ -73,6 +76,12 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; +/* + * SQL functions + */ +PG_FUNCTION_INFO_V1(postgres_fdw_get_connections); +PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); + /* prototypes of private functions */ static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); @@ -94,6 +103,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); static bool UserMappingPasswordRequired(UserMapping *user); +static bool disconnect_cached_connections(uint32 hashvalue, bool all, + bool *is_in_use); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -273,6 +284,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->have_error = false; entry->changing_xact_state = false; entry->invalidated = false; + entry->serverid = server->serverid; entry->server_hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(server->serverid)); @@ -789,6 +801,13 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (!xact_got_connection) return; + /* + * Quick exit if the cache has been destroyed in + * disconnect_cached_connections. + */ + if (!ConnectionHash) + return; + /* * Scan all connection cache entries to find open remote transactions, and * close them. @@ -985,6 +1004,13 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, if (!xact_got_connection) return; + /* + * Quick exit if the cache has been destroyed in + * disconnect_cached_connections. + */ + if (!ConnectionHash) + return; + /* * Scan all connection cache entries to find open remote subtransactions * of the current level, and close them. @@ -1093,6 +1119,13 @@ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID); + /* + * Quick exit if the cache has been destroyed in + * disconnect_cached_connections. + */ + if (!ConnectionHash) + return; + /* ConnectionHash must exist already, if we're registered */ hash_seq_init(&scan, ConnectionHash); while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) @@ -1138,8 +1171,6 @@ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) { - HeapTuple tup; - Form_pg_user_mapping umform; ForeignServer *server; /* nothing to do for inactive entries and entries of sane state */ @@ -1150,13 +1181,7 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) disconnect_pg_server(entry); /* find server name to be shown in the message below */ - tup = SearchSysCache1(USERMAPPINGOID, - ObjectIdGetDatum(entry->key)); - if (!HeapTupleIsValid(tup)) - elog(ERROR, "cache lookup failed for user mapping %u", entry->key); - umform = (Form_pg_user_mapping) GETSTRUCT(tup); - server = GetForeignServer(umform->umserver); - ReleaseSysCache(tup); + server = GetForeignServer(entry->serverid); ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), @@ -1341,3 +1366,273 @@ exit: ; *result = last_res; return timed_out; } + +/* + * List the foreign server connections. + * + * This function takes no input parameter and returns an array with elements + * as pairs of foreign server name and true/false to show whether or not the + * connection is valid. The array elements look like (srv1, true), + * (srv2, false), (srv3, true) ... (srvn, true). True if the connection is + * valid. False if the connection is invalidated in pgfdw_inval_callback. NULL + * is returned when there are no cached connections at all. + * + * This function issues a warning in case for any connection the associated + * foreign server has been dropped which means that the server name can not be + * read from the system catalogues. + */ +Datum +postgres_fdw_get_connections(PG_FUNCTION_ARGS) +{ + ArrayBuildState *astate = NULL; + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + StringInfoData buf; + uint16 no_server_conn_cnt = 0; + + if (!ConnectionHash) + PG_RETURN_NULL(); + + initStringInfo(&buf); + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + ForeignServer *server; + + /* We only look for open remote connections. */ + if (!entry->conn) + continue; + + server = GetForeignServerExtended(entry->serverid, true); + + /* + * The foreign server may have been dropped in the current explicit + * transaction. It's not possible to drop the server from another + * session when the connection associated with it is in use in the + * current transaction, if tried so the drop query in another session + * blocks until the current explicit transaction finishes. + * + * Even though the server is dropped in the current explicit + * transaction, the cache can have the associated active connection + * entry. Say we call such connections as dangling. Since we can not + * fetch the server name from system catalogues for dangling + * connections, instead we issue a warning. + * + * We could have done better by storing the server name in the cache + * entry instead of server oid so that it could be used in the output. + * But storing the server name in each cache entry requires 64bytes of + * memory, which is huge, considering the fact that there can exists + * many cached connections and the use case i.e. dropping the foreign + * server within the explicit current transaction seems rare. So, we + * chose to issue a warning instead. + * + * Such dangling connections get closed either in the next use or at + * the end of current explicit transaction in pgfdw_xact_callback. + */ + if (!server) + { + if (entry->conn) + { + /* + * If the server has been dropped in the current explicit + * transaction, then this entry would have been invalidated in + * pgfdw_inval_callback at the end of drop sever command. Note + * that this entry would not have been closed in + * pgfdw_inval_callback because it is still being used in the + * current explicit transaction. So, assert that here. + */ + Assert(entry->xact_depth > 0 && entry->invalidated); + + no_server_conn_cnt++; + } + + continue; + } + + appendStringInfo(&buf, "(%s, %s)", server->servername, + entry->invalidated ? "false" : "true"); + + /* stash away the prepared string into array value */ + astate = accumArrayResult(astate, CStringGetTextDatum(buf.data), + false, TEXTOID, CurrentMemoryContext); + resetStringInfo(&buf); + } + + if (no_server_conn_cnt > 0) + { + ereport(WARNING, + (errmsg_plural("found an active connection for which the foreign server would have been dropped", + "found some active connections for which the foreign servers would have been dropped", + no_server_conn_cnt), + no_server_conn_cnt > 1 ? + errdetail("Such connections are discarded at the end of remote transaction.") + : errdetail("Such connection is discarded at the end of remote transaction."))); + } + + if (astate) + PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate, + CurrentMemoryContext)); + else + PG_RETURN_NULL(); +} + +/* + * Disconnect the cached connections. + * + * If server name is provided as input, this function disconnects the + * associated cached connection. Otherwise all the cached connections are + * disconnected. The cache can be destroyed when there are no active + * connections left. + * + * This function returns false if the cache doesn't exist. + * When the cache exists: + * 1) If the server name is provided, it first checks whether the foreign + * server exists, if not, an error is emitted. Otherwise it disconnects the + * associated connection when it's not being used in current transaction + * and returns true. If it's in use, then issues a warning and returns + * false. + * 2) If no input argument is provided, then it tries to disconnect all the + * connections. If all the connections are not being used, then it + * disconnects them and returns true. If all the connections are being + * used, then it issues a warning and returns false. If at least one + * connection is closed and others are in use, then issues a warning and + * returns true. + */ +Datum +postgres_fdw_disconnect(PG_FUNCTION_ARGS) +{ + bool result = false; + bool is_in_use = false; + + if (!ConnectionHash) + PG_RETURN_BOOL(result); + + if (PG_NARGS() == 1) + { + char *servername = NULL; + ForeignServer *server = NULL; + uint32 hashvalue; + + servername = text_to_cstring(PG_GETARG_TEXT_PP(0)); + server = GetForeignServerByName(servername, true); + + if (!server) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("foreign server \"%s\" does not exist", servername))); + + hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, + ObjectIdGetDatum(server->serverid)); + result = disconnect_cached_connections(hashvalue, false, &is_in_use); + + /* + * Check if the connection associated with the given foreign server is + * in use i.e. entry->xact_depth > 0. Since we can not close it, so + * error out. + */ + if (is_in_use) + ereport(WARNING, + (errmsg("cannot close the connection because it is still in use"))); + } + else + { + result = disconnect_cached_connections(0, true, &is_in_use); + + /* + * Check if some or all of the connections are in use i.e. + * entry->xact_depth > 0. Since we can not close them, so inform the + * user. + */ + if (is_in_use) + { + if (result) + { + /* We closed at least one connection. Others are in use. */ + ereport(WARNING, + (errmsg("cannot close all connections because some of them are still in use"))); + } + else + { + /* We did not closed any connection, all are in use. */ + ereport(WARNING, + (errmsg("cannot close any connection because they are still in use"))); + } + } + } + + PG_RETURN_BOOL(result); +} + +/* + * Workhorse to disconnect the cached connections. + * + * This function tries to disconnect the connections only when they are not in + * use in the current transaction. + * + * If all is true, all the cached connections that are not being used in the + * current transaction are disconnected. Otherwise, the unused entries with the + * given hashvalue are disconnected. + * + * This function destroys the cache when there are no active connections. And + * set is_in_use flag to true when there exists at least one connection that's + * being used in the current transaction. + * + * This function returns true in the following cases if at least one connection + * is disconnected. Otherwise it returns false. + */ +static bool +disconnect_cached_connections(uint32 hashvalue, bool all, bool *is_in_use) +{ + bool result = false; + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + bool active_conn_exists = false; + + /* We are here only when ConnectionHash exists. */ + Assert(ConnectionHash); + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + /* + * Either disconnect given or all the active and not in use cached + * connections. + */ + if ((all || entry->server_hashvalue == hashvalue) && + entry->conn) + { + if (entry->xact_depth > 0) + *is_in_use = true; + else + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + result = true; + } + } + + /* + * If at least one active connection exists in the cache, mark it so + * that we don't destroy the cache. + */ + if (entry->conn && !active_conn_exists) + active_conn_exists = true; + } + + /* + * Destroy the cache if we discarded all the active connections i.e. if + * there is no single active connection, which we can know while scanning + * the cached entries in the above loop. Destroying the cache is better + * than to keep it in the memory with all inactive entries in it to save + * some memory. Cache can get initialized on the subsequent queries to + * foreign server. + */ + if (!active_conn_exists) + { + hash_destroy(ConnectionHash); + ConnectionHash = NULL; + } + + return result; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index c11092f8cc..ff7b6afa63 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -13,12 +13,22 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER temploopback1 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; + EXECUTE $$CREATE SERVER temploopback2 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR public SERVER temploopback1; +CREATE USER MAPPING FOR public SERVER temploopback2; -- =================================================================== -- create objects used through FDW loopback server -- =================================================================== @@ -129,6 +139,16 @@ CREATE FOREIGN TABLE ft6 ( c2 int NOT NULL, c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE templbtbl1 ( + c1 int NOT NULL, + c2 int NOT NULL, + c3 text +) SERVER temploopback1 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE templbtbl2 ( + c1 int NOT NULL, + c2 int NOT NULL, + c3 text +) SERVER temploopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); -- =================================================================== -- tests for validator -- =================================================================== @@ -191,15 +211,17 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1'); ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ - List of foreign tables - Schema | Table | Server | FDW options | Description ---------+-------+-----------+---------------------------------------+------------- - public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | - public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | - public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | -(5 rows) + List of foreign tables + Schema | Table | Server | FDW options | Description +--------+------------+---------------+---------------------------------------+------------- + public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | + public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | + public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | + public | templbtbl1 | temploopback1 | (schema_name 'S 1', table_name 'T 4') | + public | templbtbl2 | temploopback2 | (schema_name 'S 1', table_name 'T 4') | +(7 rows) -- Test that alteration of server options causes reconnection -- Remote's errors might be non-English, so hide them to ensure stable results @@ -9053,3 +9075,387 @@ SELECT 1 FROM ft1 LIMIT 1; ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off'); -- The invalid connection gets closed in pgfdw_xact_callback during commit. COMMIT; +-- ======================================================================== +-- Test postgres_fdw_get_connections and postgres_fdw_disconnect functions +-- ======================================================================== +-- postgres_fdw_get_connections returns an array with elements in a +-- machine-dependent ordering, so we must resort to unnesting and sorting for a +-- stable result. +CREATE FUNCTION fdw_unnest(anyarray) RETURNS SETOF anyelement +LANGUAGE SQL STRICT IMMUTABLE AS $$ +SELECT $1[i] FROM generate_series(array_lower($1,1), array_upper($1,1)) AS i +$$; +-- Discard all existing connections, before starting the tests. +SELECT postgres_fdw_disconnect(); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- Ensure to have temploopback1 and temploopback2 connections cached. +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- List all the existing cached connections. Should return temploopback1, +-- temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback1, true) + (temploopback2, true) +(2 rows) + +-- temploopback1 connection is disconnected. temploopback2 connection still +-- exists. +SELECT postgres_fdw_disconnect('temploopback1'); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- List all the existing cached connections. Should return temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback2, true) +(1 row) + +-- Cache exists, but the temploopback1 connection is not present in it. +SELECT postgres_fdw_disconnect('temploopback1'); /* FALSE */ + postgres_fdw_disconnect +------------------------- + f +(1 row) + +-- Cache exists, but the server name provided doesn't exist. +SELECT postgres_fdw_disconnect('unknownserver'); /* ERROR */ +ERROR: foreign server "unknownserver" does not exist +-- Cache and temploopback2 connection exist, so discard it. +SELECT postgres_fdw_disconnect(); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- Cache does not exist. +SELECT postgres_fdw_disconnect(); /* FALSE */ + postgres_fdw_disconnect +------------------------- + f +(1 row) + +-- Test the functions inside explicit xact. +-- Connections are being used in the xact, so they cannot be disconnected. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback1, true) + (temploopback2, true) +(2 rows) + +SELECT * FROM postgres_fdw_disconnect('temploopback1'); /* WARNING and FALSE */ +WARNING: cannot close the connection because it is still in use + postgres_fdw_disconnect +------------------------- + f +(1 row) + +SELECT * FROM postgres_fdw_disconnect(); /* WARNING and FALSE */ +WARNING: cannot close any connection because they are still in use + postgres_fdw_disconnect +------------------------- + f +(1 row) + +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback1, true) + (temploopback2, true) +(2 rows) + +COMMIT; +-- Should disconnect temploopback1, temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- Connections can be closed in the xact because they are not in use. +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +BEGIN; +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback1, true) + (temploopback2, true) +(2 rows) + +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- Should return nothing. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +------------ +(0 rows) + +COMMIT; +-- temploopback1 connection is closed and temploopback2 is not, because it's +-- being used in the xact. +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +BEGIN; +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback1, true) + (temploopback2, true) +(2 rows) + +SELECT * FROM postgres_fdw_disconnect('temploopback1'); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback2, true) +(1 row) + +COMMIT; +-- Should disconnect temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- temploopback1 connection is closed and temploopback2 is not, because it's +-- being used in the xact. +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +BEGIN; +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback1, true) + (temploopback2, true) +(2 rows) + +SELECT * FROM postgres_fdw_disconnect(); /* WARNING and TRUE */ +WARNING: cannot close all connections because some of them are still in use + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback2, true) +(1 row) + +COMMIT; +-- Should disconnect temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- temploopback1 connection is invalidated and temploopback2 is not. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +ALTER SERVER temploopback1 OPTIONS (ADD use_remote_estimate 'off'); +-- Should output temploopback1 as invalid, temploopback2 as valid. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +------------------------ + (temploopback1, false) + (temploopback2, true) +(2 rows) + +COMMIT; +-- Invalidated connection i.e. temploopback1 was closed at the end of the xact. +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback2, true) +(1 row) + +-- Should disconnect temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + postgres_fdw_disconnect +------------------------- + t +(1 row) + +-- Both temploopback1 and temploopback2 connections are invalidated. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +ALTER SERVER temploopback1 OPTIONS (SET use_remote_estimate 'off'); +ALTER SERVER temploopback2 OPTIONS (ADD use_remote_estimate 'off'); +-- Should output both temploopback1 and temploopback2 as invalid. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +------------------------ + (temploopback1, false) + (temploopback2, false) +(2 rows) + +COMMIT; +-- Invalidated connections i.e. temploopback1 and temploopback2 were closed at +-- the end of the xact. Should output nothing. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +------------ +(0 rows) + +-- No active connections. +SELECT * FROM postgres_fdw_disconnect(); /* FALSE */ + postgres_fdw_disconnect +------------------------- + f +(1 row) + +-- Both the servers were dropped inside the xact block, so a warning is +-- emitted. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +SELECT 1 FROM templbtbl2 LIMIT 1; + ?column? +---------- + 1 +(1 row) + +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + fdw_unnest +----------------------- + (temploopback1, true) + (temploopback2, true) +(2 rows) + +DROP SERVER temploopback1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to user mapping for public on server temploopback1 +drop cascades to foreign table templbtbl1 +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */ +WARNING: found an active connection for which the foreign server would have been dropped +DETAIL: Such connection is discarded at the end of remote transaction. + fdw_unnest +----------------------- + (temploopback2, true) +(1 row) + +DROP SERVER temploopback2 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to user mapping for public on server temploopback2 +drop cascades to foreign table templbtbl2 +-- Should output nothing. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */ +WARNING: found some active connections for which the foreign servers would have been dropped +DETAIL: Such connections are discarded at the end of remote transaction. + fdw_unnest +------------ +(0 rows) + +COMMIT; +-- Clean up. +DROP FUNCTION fdw_unnest; diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql similarity index 60% rename from contrib/postgres_fdw/postgres_fdw--1.0.sql rename to contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql index a0f0fc1bf4..fa8d12d3e3 100644 --- a/contrib/postgres_fdw/postgres_fdw--1.0.sql +++ b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql @@ -1,7 +1,7 @@ -/* contrib/postgres_fdw/postgres_fdw--1.0.sql */ +/* contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql */ --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use "CREATE EXTENSION postgres_fdw" to load this file. \quit +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.1'" to load this file. \quit CREATE FUNCTION postgres_fdw_handler() RETURNS fdw_handler diff --git a/contrib/postgres_fdw/postgres_fdw--1.1.sql b/contrib/postgres_fdw/postgres_fdw--1.1.sql new file mode 100644 index 0000000000..8a1453a495 --- /dev/null +++ b/contrib/postgres_fdw/postgres_fdw--1.1.sql @@ -0,0 +1,33 @@ +/* contrib/postgres_fdw/postgres_fdw--1.1.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION postgres_fdw" to load this file. \quit + +CREATE FUNCTION postgres_fdw_handler() +RETURNS fdw_handler +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION postgres_fdw_validator(text[], oid) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FOREIGN DATA WRAPPER postgres_fdw + HANDLER postgres_fdw_handler + VALIDATOR postgres_fdw_validator; + +CREATE FUNCTION postgres_fdw_get_connections () +RETURNS text[] +AS 'MODULE_PATHNAME','postgres_fdw_get_connections' +LANGUAGE C STRICT PARALLEL RESTRICTED; + +CREATE FUNCTION postgres_fdw_disconnect () +RETURNS bool +AS 'MODULE_PATHNAME','postgres_fdw_disconnect' +LANGUAGE C STRICT PARALLEL RESTRICTED; + +CREATE FUNCTION postgres_fdw_disconnect (text) +RETURNS bool +AS 'MODULE_PATHNAME','postgres_fdw_disconnect' +LANGUAGE C STRICT PARALLEL RESTRICTED; diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control index f9ed490752..d489382064 100644 --- a/contrib/postgres_fdw/postgres_fdw.control +++ b/contrib/postgres_fdw/postgres_fdw.control @@ -1,5 +1,5 @@ # postgres_fdw extension comment = 'foreign-data wrapper for remote PostgreSQL servers' -default_version = '1.0' +default_version = '1.1' module_pathname = '$libdir/postgres_fdw' relocatable = true diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 25dbc08b98..c2c5c10739 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -15,6 +15,14 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER temploopback1 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; + EXECUTE $$CREATE SERVER temploopback2 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; @@ -22,6 +30,8 @@ CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR public SERVER temploopback1; +CREATE USER MAPPING FOR public SERVER temploopback2; -- =================================================================== -- create objects used through FDW loopback server @@ -142,6 +152,17 @@ CREATE FOREIGN TABLE ft6 ( c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE templbtbl1 ( + c1 int NOT NULL, + c2 int NOT NULL, + c3 text +) SERVER temploopback1 OPTIONS (schema_name 'S 1', table_name 'T 4'); + +CREATE FOREIGN TABLE templbtbl2 ( + c1 int NOT NULL, + c2 int NOT NULL, + c3 text +) SERVER temploopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); -- =================================================================== -- tests for validator -- =================================================================== @@ -2711,3 +2732,153 @@ SELECT 1 FROM ft1 LIMIT 1; ALTER SERVER loopback OPTIONS (ADD use_remote_estimate 'off'); -- The invalid connection gets closed in pgfdw_xact_callback during commit. COMMIT; + +-- ======================================================================== +-- Test postgres_fdw_get_connections and postgres_fdw_disconnect functions +-- ======================================================================== + +-- postgres_fdw_get_connections returns an array with elements in a +-- machine-dependent ordering, so we must resort to unnesting and sorting for a +-- stable result. +CREATE FUNCTION fdw_unnest(anyarray) RETURNS SETOF anyelement +LANGUAGE SQL STRICT IMMUTABLE AS $$ +SELECT $1[i] FROM generate_series(array_lower($1,1), array_upper($1,1)) AS i +$$; + +-- Discard all existing connections, before starting the tests. +SELECT postgres_fdw_disconnect(); /* TRUE */ + +-- Ensure to have temploopback1 and temploopback2 connections cached. +SELECT 1 FROM templbtbl1 LIMIT 1; +SELECT 1 FROM templbtbl2 LIMIT 1; + +-- List all the existing cached connections. Should return temploopback1, +-- temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + +-- temploopback1 connection is disconnected. temploopback2 connection still +-- exists. +SELECT postgres_fdw_disconnect('temploopback1'); /* TRUE */ + +-- List all the existing cached connections. Should return temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + +-- Cache exists, but the temploopback1 connection is not present in it. +SELECT postgres_fdw_disconnect('temploopback1'); /* FALSE */ + +-- Cache exists, but the server name provided doesn't exist. +SELECT postgres_fdw_disconnect('unknownserver'); /* ERROR */ + +-- Cache and temploopback2 connection exist, so discard it. +SELECT postgres_fdw_disconnect(); /* TRUE */ + +-- Cache does not exist. +SELECT postgres_fdw_disconnect(); /* FALSE */ + +-- Test the functions inside explicit xact. +-- Connections are being used in the xact, so they cannot be disconnected. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; +SELECT 1 FROM templbtbl2 LIMIT 1; +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +SELECT * FROM postgres_fdw_disconnect('temploopback1'); /* WARNING and FALSE */ +SELECT * FROM postgres_fdw_disconnect(); /* WARNING and FALSE */ +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +COMMIT; + +-- Should disconnect temploopback1, temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + +-- Connections can be closed in the xact because they are not in use. +SELECT 1 FROM templbtbl1 LIMIT 1; +SELECT 1 FROM templbtbl2 LIMIT 1; +BEGIN; +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ +-- Should return nothing. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +COMMIT; + +-- temploopback1 connection is closed and temploopback2 is not, because it's +-- being used in the xact. +SELECT 1 FROM templbtbl1 LIMIT 1; +BEGIN; +SELECT 1 FROM templbtbl2 LIMIT 1; +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +SELECT * FROM postgres_fdw_disconnect('temploopback1'); /* TRUE */ +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +COMMIT; + +-- Should disconnect temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + +-- temploopback1 connection is closed and temploopback2 is not, because it's +-- being used in the xact. +SELECT 1 FROM templbtbl1 LIMIT 1; +BEGIN; +SELECT 1 FROM templbtbl2 LIMIT 1; +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +SELECT * FROM postgres_fdw_disconnect(); /* WARNING and TRUE */ +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +COMMIT; + +-- Should disconnect temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + +-- temploopback1 connection is invalidated and temploopback2 is not. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; +SELECT 1 FROM templbtbl2 LIMIT 1; +ALTER SERVER temploopback1 OPTIONS (ADD use_remote_estimate 'off'); +-- Should output temploopback1 as invalid, temploopback2 as valid. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +COMMIT; + +-- Invalidated connection i.e. temploopback1 was closed at the end of the xact. +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + +-- Should disconnect temploopback2. +SELECT * FROM postgres_fdw_disconnect(); /* TRUE */ + +-- Both temploopback1 and temploopback2 connections are invalidated. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; +SELECT 1 FROM templbtbl2 LIMIT 1; +ALTER SERVER temploopback1 OPTIONS (SET use_remote_estimate 'off'); +ALTER SERVER temploopback2 OPTIONS (ADD use_remote_estimate 'off'); +-- Should output both temploopback1 and temploopback2 as invalid. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +COMMIT; + +-- Invalidated connections i.e. temploopback1 and temploopback2 were closed at +-- the end of the xact. Should output nothing. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; + +-- No active connections. +SELECT * FROM postgres_fdw_disconnect(); /* FALSE */ + +-- Both the servers were dropped inside the xact block, so a warning is +-- emitted. +BEGIN; +SELECT 1 FROM templbtbl1 LIMIT 1; +SELECT 1 FROM templbtbl2 LIMIT 1; +-- Should output temploopback1, temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; +DROP SERVER temploopback1 CASCADE; +-- Should output temploopback2. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */ +DROP SERVER temploopback2 CASCADE; +-- Should output nothing. +SELECT * FROM fdw_unnest(postgres_fdw_get_connections()) ORDER BY 1; /* WARNING */ +COMMIT; + +-- Clean up. +DROP FUNCTION fdw_unnest; diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index e6fd2143c1..914d5a2ee6 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -479,6 +479,46 @@ OPTIONS (ADD password_required 'false'); + + Functions + + + postgres_fdw_get_connections ( ) which takes no input. + When called in the local session, it returns an array with each element as a + pair of the foreign server names of all the open connections that are + previously made to the foreign servers and true or + false to show whether or not the connection is valid. + The foreign server connections can get invalidated due to alter statements + on foreign server or user mapping. If there are no open connections, then + NULL is returned. This function issues a warning in case + for any connection, the associated foreign server has been dropped and the + server name can not be fetched from the system catalogues. + + + + postgres_fdw_disconnect ( servername text ) + which takes foreign server name as input. When called in the local session, + it discards the unused open connection previously made to the foreign server + and returns true. If the open connection is still being + used in the current transaction, it is not discarded, instead a warning is + issued and false is returned. false is + returned when there are no open connections. When there are some open + connections, but there is no connection for the given foreign server, + then false is returned. When no foreign server exists + with the given name, an error is emitted. + + + + postgres_fdw_disconnect ( ) which takes no input. + When called in the local session, it discards all the unused open + connections that are previously made to the foreign servers and returns + true. If there is any open connection that is still being + used in the current transaction, then a warning is issued. false + is returned when no open connection is discarded or there are no open + connections at all. + + + Connection Management @@ -490,6 +530,20 @@ OPTIONS (ADD password_required 'false'); multiple user identities (user mappings) are used to access the foreign server, a connection is established for each user mapping. + + + Since the postgres_fdw keeps the connections to remote + servers in the local session, the corresponding sessions that are opened on + the remote servers are kept idle until they are re-used by the local session. + This may waste resources if those connections are not frequently used by the + local session. To address this, the postgres_fdw + provides following way to remove the connections to the remote servers and + so the remote sessions: + + postgres_fdw_disconnect() to discard all the + connections or postgres_fdw_disconnect(text) + to discard the connection associated with the given foreign server. + -- 2.25.1