Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING - Mailing list pgsql-hackers

From Tom Lane
Subject Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING
Date
Msg-id 18927.1500588942@sss.pgh.pa.us
Whole thread Raw
In response to Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Responses Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING
List pgsql-hackers
Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes:
> Here it is. First I tried this using ordinary regression
> framework but the effect of this patch is shown only in log and
> it contains variable parts so I gave up it before trying more
> complex way.

> Next I tried existing TAP test but this test needs continuous
> session to achieve alternating operation on two sessions but
> PostgresNode::psql doesn't offer such a functionality.

> Finally, I added a new TAP test library PsqlSession. It offers
> interactive psql sessions. Then added a simple test to
> postgres_fdw using it.

This seems like overkill.  We can test it reasonably easily within the
existing framework, as shown in the attached patch.  I'm also fairly
concerned that what you're showing here would be unstable in the buildfarm
as a result of race conditions between the multiple sessions.

I made some cosmetic updates to the code patch, as well.

I think this is actually a bug fix, and should not wait for the next
commitfest.

            regards, tom lane

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 8c33dea..8eb477b 100644
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "pgstat.h"
  #include "storage/latch.h"
  #include "utils/hsearch.h"
+ #include "utils/inval.h"
  #include "utils/memutils.h"
  #include "utils/syscache.h"

*************** typedef struct ConnCacheEntry
*** 48,58 ****
--- 49,63 ----
  {
      ConnCacheKey key;            /* hash key (must be first) */
      PGconn       *conn;            /* connection to foreign server, or NULL */
+     /* Remaining fields are invalid when conn is NULL: */
      int            xact_depth;        /* 0 = no xact open, 1 = main xact open, 2 =
                                   * one level of subxact open, etc */
      bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
      bool        have_error;        /* have any subxacts aborted in this xact? */
      bool        changing_xact_state;    /* xact state change in process */
+     bool        invalidated;    /* true if reconnect is pending */
+     uint32        server_hashvalue;    /* hash value of foreign server OID */
+     uint32        mapping_hashvalue;    /* hash value of user mapping OID */
  } ConnCacheEntry;

  /*
*************** static bool xact_got_connection = false;
*** 69,74 ****
--- 74,80 ----

  /* prototypes of private functions */
  static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+ static void disconnect_pg_server(ConnCacheEntry *entry);
  static void check_conn_params(const char **keywords, const char **values);
  static void configure_remote_session(PGconn *conn);
  static void do_sql_command(PGconn *conn, const char *sql);
*************** static void pgfdw_subxact_callback(SubXa
*** 78,83 ****
--- 84,90 ----
                         SubTransactionId mySubid,
                         SubTransactionId parentSubid,
                         void *arg);
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
  static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
  static bool pgfdw_cancel_query(PGconn *conn);
  static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
*************** GetConnection(UserMapping *user, bool wi
*** 130,135 ****
--- 137,146 ----
           */
          RegisterXactCallback(pgfdw_xact_callback, NULL);
          RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
+         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                       pgfdw_inval_callback, (Datum) 0);
+         CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                       pgfdw_inval_callback, (Datum) 0);
      }

      /* Set flag that we did GetConnection during the current transaction */
*************** GetConnection(UserMapping *user, bool wi
*** 144,161 ****
      entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
      if (!found)
      {
!         /* initialize new hashtable entry (key is already filled in) */
          entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;
      }

      /* Reject further use of connections which failed abort cleanup. */
      pgfdw_reject_incomplete_xact_state_change(entry);

      /*
       * We don't check the health of cached connection here, because it would
       * require some overhead.  Broken connection will be detected when the
       * connection is actually used.
--- 155,182 ----
      entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
      if (!found)
      {
!         /*
!          * We need only clear "conn" here; remaining fields will be filled
!          * later when "conn" is set.
!          */
          entry->conn = NULL;
      }

      /* Reject further use of connections which failed abort cleanup. */
      pgfdw_reject_incomplete_xact_state_change(entry);

      /*
+      * If the connection needs to be remade due to invalidation, disconnect as
+      * soon as we're out of all transactions.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+
+     /*
       * We don't check the health of cached connection here, because it would
       * require some overhead.  Broken connection will be detected when the
       * connection is actually used.
*************** GetConnection(UserMapping *user, bool wi
*** 164,178 ****
      /*
       * If cache entry doesn't have a connection, we have to establish a new
       * connection.  (If connect_pg_server throws an error, the cache entry
!      * will be left in a valid empty state.)
       */
      if (entry->conn == NULL)
      {
          ForeignServer *server = GetForeignServer(user->serverid);

!         entry->xact_depth = 0;    /* just to be sure */
          entry->have_prep_stmt = false;
          entry->have_error = false;
          entry->conn = connect_pg_server(server, user);

          elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
--- 185,208 ----
      /*
       * If cache entry doesn't have a connection, we have to establish a new
       * connection.  (If connect_pg_server throws an error, the cache entry
!      * will remain in a valid empty state, ie conn == NULL.)
       */
      if (entry->conn == NULL)
      {
          ForeignServer *server = GetForeignServer(user->serverid);

!         /* Reset all transient state fields, to be sure all are clean */
!         entry->xact_depth = 0;
          entry->have_prep_stmt = false;
          entry->have_error = false;
+         entry->changing_xact_state = false;
+         entry->invalidated = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);
+
+         /* Now try to make the connection */
          entry->conn = connect_pg_server(server, user);

          elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
*************** connect_pg_server(ForeignServer *server,
*** 277,282 ****
--- 307,325 ----
  }

  /*
+  * Disconnect any open connection for a connection cache entry.
+  */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+
+ /*
   * For non-superusers, insist that the connstr specify a password.  This
   * prevents a password from being picked up from .pgpass, a service file,
   * the environment, etc.  We don't want the postgres user's passwords
*************** pgfdw_xact_callback(XactEvent event, voi
*** 777,785 ****
              entry->changing_xact_state)
          {
              elog(DEBUG3, "discarding connection %p", entry->conn);
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;
          }
      }

--- 820,826 ----
              entry->changing_xact_state)
          {
              elog(DEBUG3, "discarding connection %p", entry->conn);
!             disconnect_pg_server(entry);
          }
      }

*************** pgfdw_subxact_callback(SubXactEvent even
*** 897,902 ****
--- 938,985 ----
  }

  /*
+  * Connection invalidation callback function
+  *
+  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
+  * mark connections depending on that entry as needing to be remade.
+  * We can't immediately destroy them, since they might be in the midst of
+  * a transaction, but we'll remake them at the next opportunity.
+  *
+  * Although most cache invalidation callbacks blow away all the related stuff
+  * regardless of the given hashvalue, connections are expensive enough that
+  * it's worth trying to avoid that.
+  *
+  * NB: We could avoid unnecessary disconnection more strictly by examining
+  * individual option values, but it seems too much effort for the gain.
+  */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+
+     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+
+     /* ConnectionHash must exist already, if we're registered */
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         /* Ignore invalid entries */
+         if (entry->conn == NULL)
+             continue;
+
+         /* hashvalue == 0 means a cache reset, must clear all state */
+         if (hashvalue == 0)
+             entry->invalidated = true;
+         else if ((cacheid == FOREIGNSERVEROID &&
+                   entry->server_hashvalue == hashvalue) ||
+                  (cacheid == USERMAPPINGOID &&
+                   entry->mapping_hashvalue == hashvalue))
+             entry->invalidated = true;
+     }
+ }
+
+ /*
   * Raise an error if the given connection cache entry is marked as being
   * in the middle of an xact state change.  This should be called at which no
   * such change is expected to be in progress; if one is found to be in
*************** pgfdw_reject_incomplete_xact_state_chang
*** 913,921 ****
      Form_pg_user_mapping umform;
      ForeignServer *server;

!     if (!entry->changing_xact_state)
          return;

      tup = SearchSysCache1(USERMAPPINGOID,
                            ObjectIdGetDatum(entry->key));
      if (!HeapTupleIsValid(tup))
--- 996,1009 ----
      Form_pg_user_mapping umform;
      ForeignServer *server;

!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn == NULL || !entry->changing_xact_state)
          return;

+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+
+     /* find server name to be shown in the message below */
      tup = SearchSysCache1(USERMAPPINGOID,
                            ObjectIdGetDatum(entry->key));
      if (!HeapTupleIsValid(tup))
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index b112c19..ee75c4b 100644
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
*************** ALTER FOREIGN TABLE ft2 ALTER COLUMN c1
*** 191,196 ****
--- 191,233 ----
   public | ft_pg_type | loopback  | (schema_name 'pg_catalog', table_name 'pg_type') |
  (6 rows)

+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+   c3   |              c4
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  database "no such database" does not exist
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  role "no such user" does not exist
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+
  -- Now we should be able to run ANALYZE.
  -- To exercise multiple code paths, we use local stats on ft1
  -- and remote-estimate mode on ft2.
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 509bb54..2eca879 100644
*** a/contrib/postgres_fdw/sql/postgres_fdw.sql
--- b/contrib/postgres_fdw/sql/postgres_fdw.sql
*************** ALTER FOREIGN TABLE ft1 ALTER COLUMN c1
*** 195,200 ****
--- 195,220 ----
  ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
  \det+

+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+
  -- Now we should be able to run ANALYZE.
  -- To exercise multiple code paths, we use local stats on ft1
  -- and remote-estimate mode on ft2.

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: [HACKERS] <> join selectivity estimate question
Next
From: Alvaro Herrera
Date:
Subject: Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING