diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index f753c6e232..d55243fb7e 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -23,12 +23,14 @@ #include "postgres_fdw.h" #include "storage/fd.h" #include "storage/latch.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/datetime.h" #include "utils/hsearch.h" #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" +#include "utils/timeout.h" /* * Connection cache hash table entry @@ -109,6 +111,10 @@ static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); +static void pgfdw_connection_check(void); +static bool pgfdw_connection_check_internal(PGconn *conn); +static TimeoutId pgfdw_health_check_timeout = MAX_TIMEOUTS; +int pgfdw_health_check_interval; /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -153,6 +159,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) pgfdw_inval_callback, (Datum) 0); CacheRegisterSyscacheCallback(USERMAPPINGOID, pgfdw_inval_callback, (Datum) 0); + + /* Register a timeout for checking remote servers */ + pgfdw_health_check_timeout = RegisterTimeout(USER_TIMEOUT, pgfdw_connection_check); } /* Set flag that we did GetConnection during the current transaction */ @@ -276,6 +285,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) if (state) *state = &entry->state; + /* Fire timeout if needed */ + if (pgfdw_health_check_interval > 0 && + !get_timeout_active(pgfdw_health_check_timeout)) + enable_timeout_after(pgfdw_health_check_timeout, + pgfdw_health_check_interval); + return entry->conn; } @@ -1702,3 +1717,133 @@ disconnect_cached_connections(Oid serverid) return result; } + +/* + * Signal handler for checking remote servers. + * + * This function searches the hash table from the beginning + * and performs a health-check on each entry. + * + * Raise SIGINT if someone might be down, otherwise do nothing. + */ +void +pgfdw_connection_check(void) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + bool raised = false; + + Assert(ConnectionHash); + + /* + * checking will be done by waiting WL_SOCKET_CLOSED event, + * so exit immediately if it cannot be used in this system. + */ + if (!WaitEventSetCanReportClosed()) + return; + + /* Is there any cancel messages? */ + if (QeuryCancelMessage != NULL) + return; + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)) && !raised) + { + if (entry->conn == NULL || entry->xact_depth == 0) + continue; + if (!pgfdw_connection_check_internal(entry->conn)) + { + /* + * Foreign server might be down, so raise SIGINT. + * Note that error message is passed to QeuryCancelMessage + * for reporting error in ProcessInterrupts(). + */ + char msg[31 + MAXDATELEN]; + MemoryContext old; + ForeignServer *server; + + /* + * Switch to CurTransactionContext in order to + * make sure that the lifetime of palloc'd is transaction. + */ + old = MemoryContextSwitchTo(CurTransactionContext); + server = GetForeignServer(entry->serverid); + snprintf(msg, sizeof(msg), "Foreign Server %s might be down.", server->servername); + QeuryCancelMessage = pstrdup(msg); + MemoryContextSwitchTo(old); + + disconnect_pg_server(entry); + raise(SIGINT); + raised = true; + break; + } + } + + /* re-schedule timer if needed. */ + if (!raised && pgfdw_health_check_interval > 0) + enable_timeout_after(pgfdw_health_check_timeout, + pgfdw_health_check_interval); + + return; +} + +/* + * helper function for pgfdw_connection_check + */ +static bool +pgfdw_connection_check_internal(PGconn *conn) +{ + WaitEventSet *eventset; + WaitEvent events; + + Assert(WaitEventSetCanReportClosed()); + + eventset = CreateWaitEventSet(CurrentMemoryContext, 1); + AddWaitEventToSet(eventset, WL_SOCKET_CLOSED, PQsocket(conn), NULL, NULL); + + WaitEventSetWait(eventset, 0, &events, 1, 0); + + if (events.events & WL_SOCKET_CLOSED) + { + FreeWaitEventSet(eventset); + return false; + } + FreeWaitEventSet(eventset); + + return true; +} + +bool +check_pgfdw_health_check_interval(int *newval, void **extra, GucSource source) +{ + if (!WaitEventSetCanReportClosed() && *newval != 0) + { + GUC_check_errdetail("pgfdw_health_check_interval must be set to 0 on this platform"); + return false; + } + return true; +} + +void +assign_pgfdw_health_check_interval(int newval, void *extra) +{ + /* Quick return if timeout is not registered yet. */ + if (pgfdw_health_check_timeout == MAX_TIMEOUTS) + return; + + if (get_timeout_active(pgfdw_health_check_timeout)) + { + if (newval == 0) + disable_timeout(pgfdw_health_check_timeout, false); + + /* + * we don't have to do anything because + * new value will be used in pgfdw_connection_check(). + */ + return; + } + + /* Start timeout if wants to */ + if (newval > 0) + enable_timeout_after(pgfdw_health_check_timeout, newval); +} diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 2c6b2894b9..75a910b0ff 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -538,5 +538,18 @@ _PG_init(void) NULL, NULL); + DefineCustomIntVariable("postgres_fdw.health_check_interval", + "Sets the time interval between checks of remote servers.", + NULL, + &pgfdw_health_check_interval, + 0, + 0, + INT_MAX, + PGC_USERSET, + GUC_UNIT_MS, + check_pgfdw_health_check_interval, + assign_pgfdw_health_check_interval, + NULL); + MarkGUCPrefixReserved("postgres_fdw"); } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 8ae79e97e4..c129af5082 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -18,6 +18,7 @@ #include "libpq-fe.h" #include "nodes/execnodes.h" #include "nodes/pathnodes.h" +#include "utils/guc.h" #include "utils/relcache.h" /* @@ -151,6 +152,10 @@ extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern bool check_pgfdw_health_check_interval(int *newval, void **extra, + GucSource source); +extern void assign_pgfdw_health_check_interval(int newval, void *extra); +extern int pgfdw_health_check_interval; /* in option.c */ extern int ExtractConnectionOptions(List *defelems,