Thread: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING

[HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING

From
Kyotaro HORIGUCHI
Date:
Hello, moved to pgsql-hackers.

This is the revased and revised version of the previous patch.

At Thu, 13 Jul 2017 13:42:49 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20170713.134249.97825982.horiguchi.kyotaro@lab.ntt.co.jp>
> At Tue, 11 Jul 2017 15:39:14 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <6234.1499801954@sss.pgh.pa.us>
> > Amit Langote <Langote_Amit_f8@lab.ntt.co.jp> writes:
> > > Horiguchi-san,
> > > On 2017/07/11 10:28, Kyotaro HORIGUCHI wrote:
> > >> I faintly recall such discussion was held aroud that time and
> > >> maybe we concluded that we don't do that but I haven't find such
> > >> a thread in pgsql-hackers..
> > 
> > > I mentioned it in my reply.  Here again:
> > > https://www.postgresql.org/message-id/20160405.184408.166437663.horiguchi.kyotaro%40lab.ntt.co.jp
> > 
> > The followup discussion noted that that approach was no good because it
> > would only close connections in the same session that had done the ALTER
> > SERVER.  I think the basic idea of marking postgres_fdw connections as
> > needing to be remade when next possible is OK, but we have to drive it
> > off catcache invalidation events, the same as we did in c52d37c8b.  An
> > advantage of that way is we don't need any new hooks in the core code.
> > 
> > Kyotaro-san, are you planning to update your old patch?
> 
> I'm pleased to do that. I will reconsider the way shown in a mail
> in the thread soon.

done.

(As a recap) Changing of some options such as host of a foreign
server or password of a user mapping make the existing
connections stale, but postgres_fdw continue using them.

The attached patch does the following things.

- postgres_fdw registers two invalidation callbacks on loading.

- On any change on a foreign server or a user mapping, the callbacks mark the affected connection as 'invalid'

- The invalidated connections will be once disconnected before the next execution if no transaction exists.

As the consequence, changes of options properly affects
subsequent queries in the next transaction on any session. It
occurs after whatever option has been modifed.

======
create server sv1 foreign data wrapper postgres_fdw options (host '/tmp', port '5432', dbname 'postgres');
create user mapping for public server sv1;
create table t (a int);
create foreign table ft1 (a int) server sv1 options (table_name 't1');
begin;
select * from ft1; -- no error
alter server sv1 options (set host '/tmpe');
select * from ft1; -- no error because transaction is living.
commit;
select * from ft1;
ERROR:  could not connect to server "sv1"  - NEW
======

This patch is postgres_fdw-private but it's annoying that hash
value of syscache is handled in connection.c. If we allow to add
something to the core for this feature, I could add a new member
in FdwRoutine to notify invalidation of mapping and server by
oid. (Of course it is not back-patcheable, though)

Does anyone have opinitons or suggestions?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 53,61 ----     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */     bool
have_error;       /* have any subxacts aborted in this xact? */     bool        changing_xact_state;    /* xact state
changein process */
 
+     bool        invalidated;    /* true if reconnect is requried */
+     uint32        server_hashvalue;    /* hash value of foreign server oid */
+     uint32        mapping_hashvalue;  /* hash value of user mapping oid */ } ConnCacheEntry;  /*
***************
*** 150,160 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 153,180 ----         entry->have_prep_stmt = false;         entry->have_error = false;
entry->changing_xact_state= false;
 
+         entry->invalidated = false;
+         entry->server_hashvalue = 0;
+         entry->mapping_hashvalue = 0;     }      /* Reject further use of connections which failed abort cleanup. */
  pgfdw_reject_incomplete_xact_state_change(entry); 
 
+ 
+     /*
+      * This connection is no longer valid. Disconnect such connections if no
+      * transaction is running. We could avoid suporious disconnection by
+      * examining individual option values but it would be too-much for the
+      * gain.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+         entry->invalidated = false;
+     }
+      /*      * We don't check the health of cached connection here, because it would      * require some overhead.
Brokenconnection will be detected when the
 
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 193,202 ----         entry->xact_depth = 0;    /* just to be sure */         entry->have_prep_stmt = false;
entry->have_error= false;
 
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);         entry->conn = connect_pg_server(server, user);
        elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 453,514 ---- }  /*
+  * Connection invalidation functions.
+  *
+  * Changes of some options of foreign server or user mapping that a connection
+  * depends on requires the connection to be disconnected at an oppotunity. The
+  * parameter is the hash value of target syscache entry given through syscache
+  * invalidation.
+  */
+ 
+ /* Connection invalidation by modifying foreign server options. */
+ void
+ InvalidateConnectionForServer(uint32 server_hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn != NULL &&
+             entry->server_hashvalue == server_hashvalue)
+         {
+             entry->invalidated = true;
+             hash_seq_term(&scan);
+             break;
+         }
+     }
+ }
+ 
+ /* Connection invalidation by modifying user mapping options. */
+ void
+ InvalidateConnectionForMapping(uint32 mapping_hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn != NULL &&
+             entry->mapping_hashvalue == mapping_hashvalue)
+         {
+             entry->invalidated = true;
+             hash_seq_term(&scan);
+             break;
+         }
+     }
+ }
+ 
+ 
+ /*  * Assign a "unique" number for a cursor.  *  * These really only need to be unique per connection within a
transaction.
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 36,41 ****
--- 36,43 ---- #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/guc.h"
+ #include "utils/inval.h"
+ #include "utils/syscache.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h"
***************
*** 420,425 **** static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
--- 422,429 ----                   const PgFdwRelationInfo *fpinfo_o,                   const PgFdwRelationInfo
*fpinfo_i);
 
+ void        _PG_init(void);
+ void        _PG_fini(void);  /*  * Foreign-data wrapper handler function: return a struct with pointers
***************
*** 476,481 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 480,514 ---- }  /*
+  *  Callback functions for connection invalidation by syscache invalidation
+  */
+ static void
+ postgresServerSysCallback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     InvalidateConnectionForServer(hashvalue);
+ }
+ 
+ static void
+ postgresMappingSysCallback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     InvalidateConnectionForMapping(hashvalue);
+ }
+ 
+ void
+ _PG_init(void)
+ {
+     CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                   postgresServerSysCallback, (Datum)0);
+     CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                   postgresMappingSysCallback, (Datum)0);
+ }
+ 
+ void
+ _PG_fini(void)
+ {
+ }
+ 
+ /*  * postgresGetForeignRelSize  *        Estimate # of rows and width of the result of the scan  *
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 117,122 **** extern void reset_transmission_modes(int nestlevel);
--- 117,124 ---- /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); extern
voidReleaseConnection(PGconn *conn);
 
+ extern void InvalidateConnectionForServer(uint32 server_hashvalue);
+ extern void InvalidateConnectionForMapping(uint32 mapping_hashvalue); extern unsigned int GetCursorNumber(PGconn
*conn);extern unsigned int GetPrepStmtNumber(PGconn *conn); extern PGresult *pgfdw_get_result(PGconn *conn, const char
*query);

Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Ashutosh Bapat
Date:
On Thu, Jul 13, 2017 at 2:53 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> Hello, moved to pgsql-hackers.
>
> This is the revased and revised version of the previous patch.
>
> At Thu, 13 Jul 2017 13:42:49 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20170713.134249.97825982.horiguchi.kyotaro@lab.ntt.co.jp>
 
>> At Tue, 11 Jul 2017 15:39:14 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <6234.1499801954@sss.pgh.pa.us>
>> > Amit Langote <Langote_Amit_f8@lab.ntt.co.jp> writes:
>> > > Horiguchi-san,
>> > > On 2017/07/11 10:28, Kyotaro HORIGUCHI wrote:
>> > >> I faintly recall such discussion was held aroud that time and
>> > >> maybe we concluded that we don't do that but I haven't find such
>> > >> a thread in pgsql-hackers..
>> >
>> > > I mentioned it in my reply.  Here again:
>> > > https://www.postgresql.org/message-id/20160405.184408.166437663.horiguchi.kyotaro%40lab.ntt.co.jp
>> >
>> > The followup discussion noted that that approach was no good because it
>> > would only close connections in the same session that had done the ALTER
>> > SERVER.  I think the basic idea of marking postgres_fdw connections as
>> > needing to be remade when next possible is OK, but we have to drive it
>> > off catcache invalidation events, the same as we did in c52d37c8b.  An
>> > advantage of that way is we don't need any new hooks in the core code.
>> >
>> > Kyotaro-san, are you planning to update your old patch?
>>
>> I'm pleased to do that. I will reconsider the way shown in a mail
>> in the thread soon.
>
> done.
>
> (As a recap) Changing of some options such as host of a foreign
> server or password of a user mapping make the existing
> connections stale, but postgres_fdw continue using them.
>
> The attached patch does the following things.
>
> - postgres_fdw registers two invalidation callbacks on loading.
>
> - On any change on a foreign server or a user mapping, the
>   callbacks mark the affected connection as 'invalid'
>
> - The invalidated connections will be once disconnected before
>   the next execution if no transaction exists.
>
> As the consequence, changes of options properly affects
> subsequent queries in the next transaction on any session. It
> occurs after whatever option has been modifed.
>
> ======
> create server sv1 foreign data wrapper postgres_fdw options (host '/tmp', port '5432', dbname 'postgres');
> create user mapping for public server sv1;
> create table t (a int);
> create foreign table ft1 (a int) server sv1 options (table_name 't1');
> begin;
> select * from ft1; -- no error
> alter server sv1 options (set host '/tmpe');
> select * from ft1; -- no error because transaction is living.
> commit;
> select * from ft1;
> ERROR:  could not connect to server "sv1"  - NEW
> ======
>
> This patch is postgres_fdw-private but it's annoying that hash
> value of syscache is handled in connection.c. If we allow to add
> something to the core for this feature, I could add a new member
> in FdwRoutine to notify invalidation of mapping and server by
> oid. (Of course it is not back-patcheable, though)
>
> Does anyone have opinitons or suggestions?
>

The patch and the idea looks good to me. I haven't reviewed it
thoroughly though.

I noticed a type "suporious", I think you meant "spurious"? Probably
that comment should be part of the function which marks the connection
as invalid e.g. InvalidateConnectionForMapping().

pgfdw_xact_callback() reports the reason for disconnection while
closing a connection. May be we want to report the reason for
disconnection here as well. Also, may be we want to create a function
to discard connection from an entry so that we consistently do the
same things while discarding a connection.

-- 
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company



Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Kyotaro HORIGUCHI
Date:
Thank you for the comments.

At Thu, 13 Jul 2017 16:54:42 +0530, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote in
<CAFjFpRd0yz3v0rL2yxmr95e_iDntkeQia9709KXaHLyVcZ=_mQ@mail.gmail.com>
> On Thu, Jul 13, 2017 at 2:53 PM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > Hello, moved to pgsql-hackers.
> >
> > This is the revased and revised version of the previous patch.
> >
> > At Thu, 13 Jul 2017 13:42:49 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20170713.134249.97825982.horiguchi.kyotaro@lab.ntt.co.jp>
 
> > This patch is postgres_fdw-private but it's annoying that hash
> > value of syscache is handled in connection.c. If we allow to add
> > something to the core for this feature, I could add a new member
> > in FdwRoutine to notify invalidation of mapping and server by
> > oid. (Of course it is not back-patcheable, though)
> >
> > Does anyone have opinitons or suggestions?
> >
> 
> The patch and the idea looks good to me. I haven't reviewed it
> thoroughly though.
> 
> I noticed a type "suporious", I think you meant "spurious"? Probably

Right, it is too bad typo, but fixed it as "unnecessary", which
would more appropriate here.

> that comment should be part of the function which marks the connection
> as invalid e.g. InvalidateConnectionForMapping().

Agreed. It'd been there but somehow I moved it to there. I have
moved it back to the place it used to be.

> pgfdw_xact_callback() reports the reason for disconnection while
> closing a connection. May be we want to report the reason for
> disconnection here as well. Also, may be we want to create a function

Agreed. Also, I had placed LOG message there but removedxs. Now it
emits a DEBUG3 message as shown below.

| DEBUG: closing connection 0xxxx for option changes to take effect
| DEBUG: new postgres_fdw connection 0xxxx for server ".." (user mapping oid 

> to discard connection from an entry so that we consistently do the
> same things while discarding a connection.

Sure. Now there's two places a connection is closed intentionally.

I'm a bit uneasy that many menbers of entry is getting reset in
so many places. Since the validity of an entry is checked only by
conn so it is enough to clear the flags of ConnCacheEntry only at
the time of connection creation. Instead,
pgfdw_reject_incomplete_xact_state_chanbe is no longer complains
on an inactive (conn == NULL) entry. I think this is safe but a
bit inconfident..

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 53,61 ----     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */     bool
have_error;       /* have any subxacts aborted in this xact? */     bool        changing_xact_state;    /* xact state
changein process */
 
+     bool        invalidated;    /* true if reconnect is requried */
+     uint32        server_hashvalue;    /* hash value of foreign server oid */
+     uint32        mapping_hashvalue;  /* hash value of user mapping oid */ } ConnCacheEntry;  /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 72,78 ----  /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server,
UserMapping*user);
 
+ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const
char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char
*sql);
***************
*** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt)     entry = hash_search(ConnectionHash, &key,
HASH_ENTER,&found);     if (!found)     {
 
!         /* initialize new hashtable entry (key is already filled in) */         entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;     }      /* Reject further use of connections which failed abort
cleanup.*/     pgfdw_reject_incomplete_xact_state_change(entry);      /*      * We don't check the health of cached
connectionhere, because it would      * require some overhead.  Broken connection will be detected when the
 
--- 148,176 ----     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);     if (!found)     {
!         /*
!          * key is already filled in, flags well be initialized at the time of
!          * making a new connection, so just clear conn here.
!          */         entry->conn = NULL;     }      /* Reject further use of connections which failed abort cleanup.
*/    pgfdw_reject_incomplete_xact_state_change(entry); 
 
+ 
+     /*
+      * This connection is no longer valid. Disconnect such connections if no
+      * transaction is running.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         /* reconneced immediately, so the messages is "reconnecting"  */
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+      /*      * We don't check the health of cached connection here, because it would      * require some overhead.
Brokenconnection will be detected when the
 
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 189,200 ----         entry->xact_depth = 0;    /* just to be sure */         entry->have_prep_stmt = false;
entry->have_error= false;
 
+         entry->invalidated = false;
+         entry->changing_xact_state = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);         entry->conn = connect_pg_server(server, user);
        elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
***************
*** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 298,315 ----     return conn; } 
+ /* disconnect the connection for a connection cache entry */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+ 
+  /*  * For non-superusers, insist that the connstr specify a password.  This  * prevents a password from being picked
upfrom .pgpass, a service file,
 
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 463,527 ---- }  /*
+  * Connection invalidation functions.
+  *
+  * Changes of some options of foreign server or user mapping that a connection
+  * depends on requires the connection to be disconnected at an oppotunity. The
+  * parameter is the hash value of target syscache entry given through syscache
+  * invalidation.
+  *
+  * NB: We could avoid unnecessary disconnection by examining individual option
+  * values but it would be too-much for the gain.
+  */
+ 
+ /* Connection invalidation by modifying foreign server options. */
+ void
+ InvalidateConnectionForServer(uint32 server_hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn != NULL &&
+             entry->server_hashvalue == server_hashvalue)
+         {
+             entry->invalidated = true;
+             hash_seq_term(&scan);
+             break;
+         }
+     }
+ }
+ 
+ /* Connection invalidation by modifying user mapping options. */
+ void
+ InvalidateConnectionForMapping(uint32 mapping_hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn != NULL &&
+             entry->mapping_hashvalue == mapping_hashvalue)
+         {
+             entry->invalidated = true;
+             hash_seq_term(&scan);
+             break;
+         }
+     }
+ }
+ 
+ 
+ /*  * Assign a "unique" number for a cursor.  *  * These really only need to be unique per connection within a
transaction.
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)             entry->changing_xact_state)         {
      elog(DEBUG3, "discarding connection %p", entry->conn);
 
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;         }     } 
--- 870,876 ----             entry->changing_xact_state)         {             elog(DEBUG3, "discarding connection %p",
entry->conn);
!             disconnect_pg_server(entry);         }     } 
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)     Form_pg_user_mapping umform;
ForeignServer*server; 
 
!     if (!entry->changing_xact_state)         return;      tup = SearchSysCache1(USERMAPPINGOID,
   ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
--- 1004,1017 ----     Form_pg_user_mapping umform;     ForeignServer *server; 
!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn ==NULL || !entry->changing_xact_state)         return; 
+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+ 
+     /* find server name to be shown in the message below */     tup = SearchSysCache1(USERMAPPINGOID,
         ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 36,41 ****
--- 36,43 ---- #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/guc.h"
+ #include "utils/inval.h"
+ #include "utils/syscache.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h"
***************
*** 420,425 **** static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
--- 422,429 ----                   const PgFdwRelationInfo *fpinfo_o,                   const PgFdwRelationInfo
*fpinfo_i);
 
+ void        _PG_init(void);
+ void        _PG_fini(void);  /*  * Foreign-data wrapper handler function: return a struct with pointers
***************
*** 476,481 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 480,514 ---- }  /*
+  *  Callback functions for connection invalidation by syscache invalidation
+  */
+ static void
+ postgresServerSysCallback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     InvalidateConnectionForServer(hashvalue);
+ }
+ 
+ static void
+ postgresMappingSysCallback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     InvalidateConnectionForMapping(hashvalue);
+ }
+ 
+ void
+ _PG_init(void)
+ {
+     CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                   postgresServerSysCallback, (Datum)0);
+     CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                   postgresMappingSysCallback, (Datum)0);
+ }
+ 
+ void
+ _PG_fini(void)
+ {
+ }
+ 
+ /*  * postgresGetForeignRelSize  *        Estimate # of rows and width of the result of the scan  *
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 117,122 **** extern void reset_transmission_modes(int nestlevel);
--- 117,124 ---- /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); extern
voidReleaseConnection(PGconn *conn);
 
+ extern void InvalidateConnectionForServer(uint32 server_hashvalue);
+ extern void InvalidateConnectionForMapping(uint32 mapping_hashvalue); extern unsigned int GetCursorNumber(PGconn
*conn);extern unsigned int GetPrepStmtNumber(PGconn *conn); extern PGresult *pgfdw_get_result(PGconn *conn, const char
*query);

Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Ashutosh Bapat
Date:
On Fri, Jul 14, 2017 at 2:04 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> Thank you for the comments.
>
> At Thu, 13 Jul 2017 16:54:42 +0530, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote in
<CAFjFpRd0yz3v0rL2yxmr95e_iDntkeQia9709KXaHLyVcZ=_mQ@mail.gmail.com>
>> On Thu, Jul 13, 2017 at 2:53 PM, Kyotaro HORIGUCHI
>> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
>> > Hello, moved to pgsql-hackers.
>> >
>> > This is the revased and revised version of the previous patch.
>> >
>> > At Thu, 13 Jul 2017 13:42:49 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
wrotein <20170713.134249.97825982.horiguchi.kyotaro@lab.ntt.co.jp>
 
>> > This patch is postgres_fdw-private but it's annoying that hash
>> > value of syscache is handled in connection.c. If we allow to add
>> > something to the core for this feature, I could add a new member
>> > in FdwRoutine to notify invalidation of mapping and server by
>> > oid. (Of course it is not back-patcheable, though)
>> >
>> > Does anyone have opinitons or suggestions?
>> >
>>
>> The patch and the idea looks good to me. I haven't reviewed it
>> thoroughly though.
>>
>> I noticed a type "suporious", I think you meant "spurious"? Probably
>
> Right, it is too bad typo, but fixed it as "unnecessary", which
> would more appropriate here.
>
>> that comment should be part of the function which marks the connection
>> as invalid e.g. InvalidateConnectionForMapping().
>
> Agreed. It'd been there but somehow I moved it to there. I have
> moved it back to the place it used to be.
>
>> pgfdw_xact_callback() reports the reason for disconnection while
>> closing a connection. May be we want to report the reason for
>> disconnection here as well. Also, may be we want to create a function
>
> Agreed. Also, I had placed LOG message there but removedxs. Now it
> emits a DEBUG3 message as shown below.
>
> | DEBUG: closing connection 0xxxx for option changes to take effect
> | DEBUG: new postgres_fdw connection 0xxxx for server ".." (user mapping oid
>
>> to discard connection from an entry so that we consistently do the
>> same things while discarding a connection.
>
> Sure. Now there's two places a connection is closed intentionally.

Thanks. Can you please add this to the next CF. I don't think we will
be able to accept this change in v10.

-- 
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company



Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes:
> This is the revased and revised version of the previous patch.

A few more comments:

* Per the spec for CacheRegisterSyscacheCallback, a zero hash value means
to flush all associated state.  This isn't handling that case correctly.
Even when you are given a specific hash value, I think exiting after
finding one match is incorrect: there could be multiple connections
to the same server with different user mappings, or vice versa.

* I'm not really sure that it's worth the trouble to pay attention
to the hash value at all.  Very few other syscache callbacks do,
and the pg_server/pg_user_mapping catalogs don't seem likely to
have higher than average traffic.

* Personally I'd be inclined to register the callbacks at the same
time the hashtables are created, which is a pattern we use elsewhere
... in fact, postgres_fdw itself does it that way for the transaction
related callbacks, so it seems a bit weird that you are going in a
different direction for these callbacks.  That way avoids the need to
depend on a _PG_init function and means that the callbacks don't have to
worry about the hashtables not being valid.  Also, it seems a bit
pointless to have separate layers of postgresMappingSysCallback and
InvalidateConnectionForMapping functions.

* How about some regression test cases?  You couldn't really exercise
cross-session invalidation easily, but I don't think you need to.
        regards, tom lane



Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Kyotaro HORIGUCHI
Date:
Thank you for the comments.

At Mon, 17 Jul 2017 16:09:04 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <9897.1500322144@sss.pgh.pa.us>
> Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes:
> > This is the revased and revised version of the previous patch.
> 
> A few more comments:
> 
> * Per the spec for CacheRegisterSyscacheCallback, a zero hash value means
> to flush all associated state.  This isn't handling that case correctly.

Right, fixed.

> Even when you are given a specific hash value, I think exiting after
> finding one match is incorrect: there could be multiple connections
> to the same server with different user mappings, or vice versa.

Sure. I'm confused that key hash value nails an entry in "the
connection cache". Thank you for pointing out that.

> * I'm not really sure that it's worth the trouble to pay attention
> to the hash value at all.  Very few other syscache callbacks do,
> and the pg_server/pg_user_mapping catalogs don't seem likely to
> have higher than average traffic.

Agreed to the points. But there is another point that reconection
is far intensive than re-looking up of a system catalog or maybe
even than replanning. For now I choosed to avoid a possibility of
causing massive number of simultaneous reconnection.

> * Personally I'd be inclined to register the callbacks at the same
> time the hashtables are created, which is a pattern we use elsewhere
> ... in fact, postgres_fdw itself does it that way for the transaction
> related callbacks, so it seems a bit weird that you are going in a
> different direction for these callbacks.  That way avoids the need to
> depend on a _PG_init function and means that the callbacks don't have to
> worry about the hashtables not being valid.

Sure, seems more reasonable than it is now. Changed the way of
registring a callback in the attached.

>  Also, it seems a bit
> pointless to have separate layers of postgresMappingSysCallback and
> InvalidateConnectionForMapping functions.

It used to be one function but it seemed a bit wierd that the
function is called from two places (two catalogs) then branchs
according to the caller. I don't have a firm opinion on this so
changed.

As the result the changes in postgres_fdw.c has been disappeard.

> * How about some regression test cases?  You couldn't really exercise
> cross-session invalidation easily, but I don't think you need to.

Ha Ha. You got me. I will add some test cases for this in the
next version. Thanks.


Ashutosh, I'll register this to the next CF after providing a
regression, thanks.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ---- #include "pgstat.h" #include "storage/latch.h" #include "utils/hsearch.h"
+ #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" 
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 54,62 ----     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */     bool
have_error;       /* have any subxacts aborted in this xact? */     bool        changing_xact_state;    /* xact state
changein process */
 
+     bool        invalidated;    /* true if reconnect is requried */
+     uint32        server_hashvalue;    /* hash value of foreign server oid */
+     uint32        mapping_hashvalue;  /* hash value of user mapping oid */ } ConnCacheEntry;  /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 73,79 ----  /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server,
UserMapping*user);
 
+ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const
char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char
*sql);
***************
*** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event,
--- 83,89 ----                        SubTransactionId mySubid,                        SubTransactionId parentSubid,
                   void *arg);
 
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry); static bool pgfdw_cancel_query(PGconn *conn); static
boolpgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 
***************
*** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 136,145 ----          */         RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback,NULL);
 
+         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                       pgfdw_inval_callback, (Datum)0);
+         CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                       pgfdw_inval_callback, (Datum)0);     }      /* Set flag that we did
GetConnectionduring the current transaction */
 
***************
*** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt)     entry = hash_search(ConnectionHash, &key,
HASH_ENTER,&found);     if (!found)     {
 
!         /* initialize new hashtable entry (key is already filled in) */         entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;     }      /* Reject further use of connections which failed abort
cleanup.*/     pgfdw_reject_incomplete_xact_state_change(entry);      /*      * We don't check the health of cached
connectionhere, because it would      * require some overhead.  Broken connection will be detected when the
 
--- 154,182 ----     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);     if (!found)     {
!         /*
!          * key is already filled in, flags well be initialized at the time of
!          * making a new connection, so just clear conn here.
!          */         entry->conn = NULL;     }      /* Reject further use of connections which failed abort cleanup.
*/    pgfdw_reject_incomplete_xact_state_change(entry); 
 
+ 
+     /*
+      * This connection is no longer valid. Disconnect such connections if no
+      * transaction is running.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         /* reconneced immediately, so the messages is "reconnecting"  */
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+      /*      * We don't check the health of cached connection here, because it would      * require some overhead.
Brokenconnection will be detected when the
 
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 195,206 ----         entry->xact_depth = 0;    /* just to be sure */         entry->have_prep_stmt = false;
entry->have_error= false;
 
+         entry->invalidated = false;
+         entry->changing_xact_state = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);         entry->conn = connect_pg_server(server, user);
        elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
***************
*** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 304,321 ----     return conn; } 
+ /* disconnect the connection for a connection cache entry */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+ 
+  /*  * For non-superusers, insist that the connstr specify a password.  This  * prevents a password from being picked
upfrom .pgpass, a service file,
 
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 469,519 ---- }  /*
+  * Connection invalidation callback function
+  *
+  * Mark the connections to be disconnected if it depends on a foreign server
+  * or user mapping any options on which have been modified.
+  *
+  * Although most invalidate callbacks blow away all the related stuff
+  * regardless of the given hashvalue, if we blow away all existing connection
+  * from this server, it can cause a massive number of simultaneous
+  * reconnection to all the remotes. We restrict the connection to break to
+  * avoid such a mess.
+  *
+  * NB: We could avoid unnecessary disconnection more strictly by examining
+  * individual option values but it would be too-much for the gain.
+  */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn == NULL)
+             continue;
+ 
+         if (hashvalue == 0)
+             entry->invalidated = true;
+         else
+         {
+             Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+ 
+             if ((cacheid == FOREIGNSERVEROID &&
+                  entry->server_hashvalue == hashvalue) ||
+                 (cacheid == USERMAPPINGOID &&
+                  entry->mapping_hashvalue == hashvalue))
+                 entry->invalidated = true;
+         }
+     }
+ }
+ 
+ /*  * Assign a "unique" number for a cursor.  *  * These really only need to be unique per connection within a
transaction.
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)             entry->changing_xact_state)         {
      elog(DEBUG3, "discarding connection %p", entry->conn);
 
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;         }     } 
--- 862,868 ----             entry->changing_xact_state)         {             elog(DEBUG3, "discarding connection %p",
entry->conn);
!             disconnect_pg_server(entry);         }     } 
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)     Form_pg_user_mapping umform;
ForeignServer*server; 
 
!     if (!entry->changing_xact_state)         return;      tup = SearchSysCache1(USERMAPPINGOID,
   ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
--- 996,1009 ----     Form_pg_user_mapping umform;     ForeignServer *server; 
!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn ==NULL || !entry->changing_xact_state)         return; 
+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+ 
+     /* find server name to be shown in the message below */     tup = SearchSysCache1(USERMAPPINGOID,
         ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup)) 

Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Kyotaro HORIGUCHI
Date:
Finally, I added new TAP test library PsqlSession.

At Tue, 18 Jul 2017 18:12:13 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20170718.181213.206979369.horiguchi.kyotaro@lab.ntt.co.jp>
> > * How about some regression test cases?  You couldn't really exercise
> > cross-session invalidation easily, but I don't think you need to.
> 
> Ha Ha. You got me. I will add some test cases for this in the
> next version. Thanks.

Here it is. First I tried this using ordinary regression
framework but the effect of this patch is shown only in log and
it contains variable parts so I gave up it before trying more
complex way.

Next I tried existing TAP test but this test needs continuous
session to achieve alternating operation on two sessions but
PostgresNode::psql doesn't offer such a functionality.

Finally, I added a new TAP test library PsqlSession. It offers
interactive psql sessions. Then added a simple test to
postgres_fdw using it.

The first patch is the PsqlSession.pm and the second is the new
test for postgres_fdw.

- The current PsqlSession is quite fragile but seems working enough for this usage at the present.

- I'm afraid this might not work on Windows according to the manpage of IPC::Run, but I haven't confirmed yet.
 http://search.cpan.org/~toddr/IPC-Run-0.96/lib/IPC/Run.pm#Win32_LIMITATIONS


Any comment or suggestions are welcome.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes:
> Here it is. First I tried this using ordinary regression
> framework but the effect of this patch is shown only in log and
> it contains variable parts so I gave up it before trying more
> complex way.

> Next I tried existing TAP test but this test needs continuous
> session to achieve alternating operation on two sessions but
> PostgresNode::psql doesn't offer such a functionality.

> Finally, I added a new TAP test library PsqlSession. It offers
> interactive psql sessions. Then added a simple test to
> postgres_fdw using it.

This seems like overkill.  We can test it reasonably easily within the
existing framework, as shown in the attached patch.  I'm also fairly
concerned that what you're showing here would be unstable in the buildfarm
as a result of race conditions between the multiple sessions.

I made some cosmetic updates to the code patch, as well.

I think this is actually a bug fix, and should not wait for the next
commitfest.

            regards, tom lane

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 8c33dea..8eb477b 100644
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "pgstat.h"
  #include "storage/latch.h"
  #include "utils/hsearch.h"
+ #include "utils/inval.h"
  #include "utils/memutils.h"
  #include "utils/syscache.h"

*************** typedef struct ConnCacheEntry
*** 48,58 ****
--- 49,63 ----
  {
      ConnCacheKey key;            /* hash key (must be first) */
      PGconn       *conn;            /* connection to foreign server, or NULL */
+     /* Remaining fields are invalid when conn is NULL: */
      int            xact_depth;        /* 0 = no xact open, 1 = main xact open, 2 =
                                   * one level of subxact open, etc */
      bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
      bool        have_error;        /* have any subxacts aborted in this xact? */
      bool        changing_xact_state;    /* xact state change in process */
+     bool        invalidated;    /* true if reconnect is pending */
+     uint32        server_hashvalue;    /* hash value of foreign server OID */
+     uint32        mapping_hashvalue;    /* hash value of user mapping OID */
  } ConnCacheEntry;

  /*
*************** static bool xact_got_connection = false;
*** 69,74 ****
--- 74,80 ----

  /* prototypes of private functions */
  static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+ static void disconnect_pg_server(ConnCacheEntry *entry);
  static void check_conn_params(const char **keywords, const char **values);
  static void configure_remote_session(PGconn *conn);
  static void do_sql_command(PGconn *conn, const char *sql);
*************** static void pgfdw_subxact_callback(SubXa
*** 78,83 ****
--- 84,90 ----
                         SubTransactionId mySubid,
                         SubTransactionId parentSubid,
                         void *arg);
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
  static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
  static bool pgfdw_cancel_query(PGconn *conn);
  static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
*************** GetConnection(UserMapping *user, bool wi
*** 130,135 ****
--- 137,146 ----
           */
          RegisterXactCallback(pgfdw_xact_callback, NULL);
          RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
+         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                       pgfdw_inval_callback, (Datum) 0);
+         CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                       pgfdw_inval_callback, (Datum) 0);
      }

      /* Set flag that we did GetConnection during the current transaction */
*************** GetConnection(UserMapping *user, bool wi
*** 144,161 ****
      entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
      if (!found)
      {
!         /* initialize new hashtable entry (key is already filled in) */
          entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;
      }

      /* Reject further use of connections which failed abort cleanup. */
      pgfdw_reject_incomplete_xact_state_change(entry);

      /*
       * We don't check the health of cached connection here, because it would
       * require some overhead.  Broken connection will be detected when the
       * connection is actually used.
--- 155,182 ----
      entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
      if (!found)
      {
!         /*
!          * We need only clear "conn" here; remaining fields will be filled
!          * later when "conn" is set.
!          */
          entry->conn = NULL;
      }

      /* Reject further use of connections which failed abort cleanup. */
      pgfdw_reject_incomplete_xact_state_change(entry);

      /*
+      * If the connection needs to be remade due to invalidation, disconnect as
+      * soon as we're out of all transactions.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+
+     /*
       * We don't check the health of cached connection here, because it would
       * require some overhead.  Broken connection will be detected when the
       * connection is actually used.
*************** GetConnection(UserMapping *user, bool wi
*** 164,178 ****
      /*
       * If cache entry doesn't have a connection, we have to establish a new
       * connection.  (If connect_pg_server throws an error, the cache entry
!      * will be left in a valid empty state.)
       */
      if (entry->conn == NULL)
      {
          ForeignServer *server = GetForeignServer(user->serverid);

!         entry->xact_depth = 0;    /* just to be sure */
          entry->have_prep_stmt = false;
          entry->have_error = false;
          entry->conn = connect_pg_server(server, user);

          elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
--- 185,208 ----
      /*
       * If cache entry doesn't have a connection, we have to establish a new
       * connection.  (If connect_pg_server throws an error, the cache entry
!      * will remain in a valid empty state, ie conn == NULL.)
       */
      if (entry->conn == NULL)
      {
          ForeignServer *server = GetForeignServer(user->serverid);

!         /* Reset all transient state fields, to be sure all are clean */
!         entry->xact_depth = 0;
          entry->have_prep_stmt = false;
          entry->have_error = false;
+         entry->changing_xact_state = false;
+         entry->invalidated = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);
+
+         /* Now try to make the connection */
          entry->conn = connect_pg_server(server, user);

          elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
*************** connect_pg_server(ForeignServer *server,
*** 277,282 ****
--- 307,325 ----
  }

  /*
+  * Disconnect any open connection for a connection cache entry.
+  */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+
+ /*
   * For non-superusers, insist that the connstr specify a password.  This
   * prevents a password from being picked up from .pgpass, a service file,
   * the environment, etc.  We don't want the postgres user's passwords
*************** pgfdw_xact_callback(XactEvent event, voi
*** 777,785 ****
              entry->changing_xact_state)
          {
              elog(DEBUG3, "discarding connection %p", entry->conn);
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;
          }
      }

--- 820,826 ----
              entry->changing_xact_state)
          {
              elog(DEBUG3, "discarding connection %p", entry->conn);
!             disconnect_pg_server(entry);
          }
      }

*************** pgfdw_subxact_callback(SubXactEvent even
*** 897,902 ****
--- 938,985 ----
  }

  /*
+  * Connection invalidation callback function
+  *
+  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
+  * mark connections depending on that entry as needing to be remade.
+  * We can't immediately destroy them, since they might be in the midst of
+  * a transaction, but we'll remake them at the next opportunity.
+  *
+  * Although most cache invalidation callbacks blow away all the related stuff
+  * regardless of the given hashvalue, connections are expensive enough that
+  * it's worth trying to avoid that.
+  *
+  * NB: We could avoid unnecessary disconnection more strictly by examining
+  * individual option values, but it seems too much effort for the gain.
+  */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+
+     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+
+     /* ConnectionHash must exist already, if we're registered */
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         /* Ignore invalid entries */
+         if (entry->conn == NULL)
+             continue;
+
+         /* hashvalue == 0 means a cache reset, must clear all state */
+         if (hashvalue == 0)
+             entry->invalidated = true;
+         else if ((cacheid == FOREIGNSERVEROID &&
+                   entry->server_hashvalue == hashvalue) ||
+                  (cacheid == USERMAPPINGOID &&
+                   entry->mapping_hashvalue == hashvalue))
+             entry->invalidated = true;
+     }
+ }
+
+ /*
   * Raise an error if the given connection cache entry is marked as being
   * in the middle of an xact state change.  This should be called at which no
   * such change is expected to be in progress; if one is found to be in
*************** pgfdw_reject_incomplete_xact_state_chang
*** 913,921 ****
      Form_pg_user_mapping umform;
      ForeignServer *server;

!     if (!entry->changing_xact_state)
          return;

      tup = SearchSysCache1(USERMAPPINGOID,
                            ObjectIdGetDatum(entry->key));
      if (!HeapTupleIsValid(tup))
--- 996,1009 ----
      Form_pg_user_mapping umform;
      ForeignServer *server;

!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn == NULL || !entry->changing_xact_state)
          return;

+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+
+     /* find server name to be shown in the message below */
      tup = SearchSysCache1(USERMAPPINGOID,
                            ObjectIdGetDatum(entry->key));
      if (!HeapTupleIsValid(tup))
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index b112c19..ee75c4b 100644
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
*************** ALTER FOREIGN TABLE ft2 ALTER COLUMN c1
*** 191,196 ****
--- 191,233 ----
   public | ft_pg_type | loopback  | (schema_name 'pg_catalog', table_name 'pg_type') |
  (6 rows)

+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+   c3   |              c4
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  database "no such database" does not exist
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  role "no such user" does not exist
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+
  -- Now we should be able to run ANALYZE.
  -- To exercise multiple code paths, we use local stats on ft1
  -- and remote-estimate mode on ft2.
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 509bb54..2eca879 100644
*** a/contrib/postgres_fdw/sql/postgres_fdw.sql
--- b/contrib/postgres_fdw/sql/postgres_fdw.sql
*************** ALTER FOREIGN TABLE ft1 ALTER COLUMN c1
*** 195,200 ****
--- 195,220 ----
  ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
  \det+

+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+
  -- Now we should be able to run ANALYZE.
  -- To exercise multiple code paths, we use local stats on ft1
  -- and remote-estimate mode on ft2.

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Alvaro Herrera
Date:
Kyotaro HORIGUCHI wrote:

> Finally, I added a new TAP test library PsqlSession. It offers
> interactive psql sessions. Then added a simple test to
> postgres_fdw using it.

Hmm, I think this can be very useful for other things.  Let's keep this
in mind to use in the future, even if we find another way to fix the
issue at hand.  In fact, I had a problem a couple of weeks ago in which
I needed two concurrent sessions and one of them disconnected in the
middle of the test.  Can't do that with isolationtester ...

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Kyotaro HORIGUCHI
Date:
At Thu, 20 Jul 2017 18:15:42 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <18927.1500588942@sss.pgh.pa.us>
> This seems like overkill.  We can test it reasonably easily within the
> existing framework, as shown in the attached patch.  I'm also fairly

It checks for a disconnection caused in a single session. I
thought that its inter-process characteristics is important
(since I had forgot that in the previous version), but it is
reasonable enough if we can rely on the fact that it surely works
through invalidation mechanism.

In shoft, I agree to the test in your patch.

> concerned that what you're showing here would be unstable in the buildfarm
> as a result of race conditions between the multiple sessions.

Sure. It is what I meant by 'fragile'.

> I made some cosmetic updates to the code patch, as well.

Thank you for leaving the hashvalue staff and revising the comment.

By the way I mistakenly had left the following code in the
previous patch.

+     /* hashvalue == 0 means a cache reset, must clear all state */
+     if (hashvalue == 0)
+       entry->invalidated = true;
+     else if ((cacheid == FOREIGNSERVEROID &&
+           entry->server_hashvalue == hashvalue) ||
+          (cacheid == USERMAPPINGOID &&
+           entry->mapping_hashvalue == hashvalue))
+       entry->invalidated = true;

The reason for the redundancy was that it had used switch-case in
the else block just before. However, it is no longer
reasonable. I'd like to change here as the follows.

+     /* hashvalue == 0 means a cache reset, must clear all state */
+     if ((hashvalue == 0) ||
+         ((cacheid == FOREIGNSERVEROID &&
+           entry->server_hashvalue == hashvalue) ||
+          (cacheid == USERMAPPINGOID &&
+           entry->mapping_hashvalue == hashvalue)))
+       entry->invalidated = true;

The attached patch differs only in this point.

> I think this is actually a bug fix, and should not wait for the next
> commitfest.

Agreed.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ---- #include "pgstat.h" #include "storage/latch.h" #include "utils/hsearch.h"
+ #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" 
***************
*** 48,58 **** typedef struct ConnCacheEntry
--- 49,63 ---- {     ConnCacheKey key;            /* hash key (must be first) */     PGconn       *conn;            /*
connectionto foreign server, or NULL */
 
+     /* Remaining fields are invalid when conn is NULL: */     int            xact_depth;        /* 0 = no xact open,
1= main xact open, 2 =                                  * one level of subxact open, etc */     bool
have_prep_stmt;/* have we prepared any stmts in this xact? */     bool        have_error;        /* have any subxacts
abortedin this xact? */     bool        changing_xact_state;    /* xact state change in process */
 
+     bool        invalidated;    /* true if reconnect is pending */
+     uint32        server_hashvalue;    /* hash value of foreign server OID */
+     uint32        mapping_hashvalue;    /* hash value of user mapping OID */ } ConnCacheEntry;  /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 74,80 ----  /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server,
UserMapping*user);
 
+ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const
char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char
*sql);
***************
*** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event,
--- 84,90 ----                        SubTransactionId mySubid,                        SubTransactionId parentSubid,
                   void *arg);
 
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry); static bool pgfdw_cancel_query(PGconn *conn); static
boolpgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 
***************
*** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 137,146 ----          */         RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback,NULL);
 
+         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                       pgfdw_inval_callback, (Datum) 0);
+         CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                       pgfdw_inval_callback, (Datum) 0);     }      /* Set flag that we did
GetConnectionduring the current transaction */
 
***************
*** 144,161 **** GetConnection(UserMapping *user, bool will_prep_stmt)     entry = hash_search(ConnectionHash, &key,
HASH_ENTER,&found);     if (!found)     {
 
!         /* initialize new hashtable entry (key is already filled in) */         entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;     }      /* Reject further use of connections which failed abort
cleanup.*/     pgfdw_reject_incomplete_xact_state_change(entry);      /*      * We don't check the health of cached
connectionhere, because it would      * require some overhead.  Broken connection will be detected when the      *
connectionis actually used.
 
--- 155,182 ----     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);     if (!found)     {
!         /*
!          * We need only clear "conn" here; remaining fields will be filled
!          * later when "conn" is set.
!          */         entry->conn = NULL;     }      /* Reject further use of connections which failed abort cleanup.
*/    pgfdw_reject_incomplete_xact_state_change(entry);      /*
 
+      * If the connection needs to be remade due to invalidation, disconnect as
+      * soon as we're out of all transactions.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+ 
+     /*      * We don't check the health of cached connection here, because it would      * require some overhead.
Brokenconnection will be detected when the      * connection is actually used.
 
***************
*** 164,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)     /*      * If cache entry doesn't have a
connection,we have to establish a new      * connection.  (If connect_pg_server throws an error, the cache entry
 
!      * will be left in a valid empty state.)      */     if (entry->conn == NULL)     {         ForeignServer *server
=GetForeignServer(user->serverid); 
 
!         entry->xact_depth = 0;    /* just to be sure */         entry->have_prep_stmt = false;
entry->have_error= false;         entry->conn = connect_pg_server(server, user);          elog(DEBUG3, "new
postgres_fdwconnection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
--- 185,208 ----     /*      * If cache entry doesn't have a connection, we have to establish a new      * connection.
(Ifconnect_pg_server throws an error, the cache entry
 
!      * will remain in a valid empty state, ie conn == NULL.)      */     if (entry->conn == NULL)     {
ForeignServer*server = GetForeignServer(user->serverid); 
 
!         /* Reset all transient state fields, to be sure all are clean */
!         entry->xact_depth = 0;         entry->have_prep_stmt = false;         entry->have_error = false;
+         entry->changing_xact_state = false;
+         entry->invalidated = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);
+ 
+         /* Now try to make the connection */         entry->conn = connect_pg_server(server, user);
elog(DEBUG3,"new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
***************
*** 277,282 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 307,325 ---- }  /*
+  * Disconnect any open connection for a connection cache entry.
+  */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+ 
+ /*  * For non-superusers, insist that the connstr specify a password.  This  * prevents a password from being picked
upfrom .pgpass, a service file,  * the environment, etc.  We don't want the postgres user's passwords
 
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)             entry->changing_xact_state)         {
      elog(DEBUG3, "discarding connection %p", entry->conn);
 
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;         }     } 
--- 820,826 ----             entry->changing_xact_state)         {             elog(DEBUG3, "discarding connection %p",
entry->conn);
!             disconnect_pg_server(entry);         }     } 
***************
*** 897,902 **** pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
--- 938,984 ---- }  /*
+  * Connection invalidation callback function
+  *
+  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
+  * mark connections depending on that entry as needing to be remade.
+  * We can't immediately destroy them, since they might be in the midst of
+  * a transaction, but we'll remake them at the next opportunity.
+  *
+  * Although most cache invalidation callbacks blow away all the related stuff
+  * regardless of the given hashvalue, connections are expensive enough that
+  * it's worth trying to avoid that.
+  *
+  * NB: We could avoid unnecessary disconnection more strictly by examining
+  * individual option values, but it seems too much effort for the gain.
+  */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+ 
+     /* ConnectionHash must exist already, if we're registered */
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         /* Ignore invalid entries */
+         if (entry->conn == NULL)
+             continue;
+ 
+         /* hashvalue == 0 means a cache reset, must clear all state */
+         if (hashvalue == 0 ||
+             ((cacheid == FOREIGNSERVEROID &&
+               entry->server_hashvalue == hashvalue) ||
+              (cacheid == USERMAPPINGOID &&
+               entry->mapping_hashvalue == hashvalue)))
+             entry->invalidated = true;
+     }
+ }
+ 
+ /*  * Raise an error if the given connection cache entry is marked as being  * in the middle of an xact state change.
This should be called at which no  * such change is expected to be in progress; if one is found to be in
 
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)     Form_pg_user_mapping umform;
ForeignServer*server; 
 
!     if (!entry->changing_xact_state)         return;      tup = SearchSysCache1(USERMAPPINGOID,
   ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
--- 995,1008 ----     Form_pg_user_mapping umform;     ForeignServer *server; 
!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn == NULL || !entry->changing_xact_state)         return; 
+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+ 
+     /* find server name to be shown in the message below */     tup = SearchSysCache1(USERMAPPINGOID,
         ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
***************
*** 191,196 **** ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
--- 191,233 ----  public | ft_pg_type | loopback  | (schema_name 'pg_catalog', table_name 'pg_type') |  (6 rows) 
+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+   c3   |              c4              
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+ 
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  database "no such database" does not exist
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4              
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+ 
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  role "no such user" does not exist
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4              
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+  -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and
remote-estimatemode on ft2.
 
*** a/contrib/postgres_fdw/sql/postgres_fdw.sql
--- b/contrib/postgres_fdw/sql/postgres_fdw.sql
***************
*** 195,200 **** ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
--- 195,220 ---- ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ 
+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+ 
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+  -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and
remote-estimatemode on ft2. 

Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Kyotaro HORIGUCHI
Date:
At Thu, 20 Jul 2017 18:23:05 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
<20170720222305.ij3pk7qw5im3wozr@alvherre.pgsql>
> Kyotaro HORIGUCHI wrote:
> 
> > Finally, I added a new TAP test library PsqlSession. It offers
> > interactive psql sessions. Then added a simple test to
> > postgres_fdw using it.
> 
> Hmm, I think this can be very useful for other things.  Let's keep this
> in mind to use in the future, even if we find another way to fix the
> issue at hand.  In fact, I had a problem a couple of weeks ago in which
> I needed two concurrent sessions and one of them disconnected in the
> middle of the test.  Can't do that with isolationtester ...

Thanks. I agree that it still useful to write more complex
tests. The most significant issue on this (PsqlSession.pm) comes
from the fact that I didn't find the way to detect the end of an
query execution without attaching a bogus query.. And this kind
of things tend to be unstable on an high-load environment.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center




Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Michael Paquier
Date:
On Fri, Jul 21, 2017 at 12:23 AM, Alvaro Herrera
<alvherre@2ndquadrant.com> wrote:
> Kyotaro HORIGUCHI wrote:
>> Finally, I added a new TAP test library PsqlSession. It offers
>> interactive psql sessions. Then added a simple test to
>> postgres_fdw using it.
>
> Hmm, I think this can be very useful for other things.  Let's keep this
> in mind to use in the future, even if we find another way to fix the
> issue at hand.  In fact, I had a problem a couple of weeks ago in which
> I needed two concurrent sessions and one of them disconnected in the
> middle of the test.

Agreed, I wanted the ability to hold a session at hand a couple of
times already for tests. And I agree with the point of having a
separate discussion for such things out of the scope of a bug fix.
Thinking larger, I think that it would be more helpful to hold
processes and run commands in parallel, say for pg_receivewal.

> Can't do that with isolationtester ...

In the pglogical fork of Postgres, you guys improved isolationtester
to handle multiple hosts, right? That sounds harder to integrate than
a perl module though, as isolation tester starts only one server.
-- 
Michael



Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Ashutosh Bapat
Date:
On Fri, Jul 21, 2017 at 10:55 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> At Thu, 20 Jul 2017 18:15:42 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <18927.1500588942@sss.pgh.pa.us>
>> This seems like overkill.  We can test it reasonably easily within the
>> existing framework, as shown in the attached patch.  I'm also fairly
>
> It checks for a disconnection caused in a single session. I
> thought that its inter-process characteristics is important
> (since I had forgot that in the previous version), but it is
> reasonable enough if we can rely on the fact that it surely works
> through invalidation mechanism.
>
> In shoft, I agree to the test in your patch.
>
>> concerned that what you're showing here would be unstable in the buildfarm
>> as a result of race conditions between the multiple sessions.
>
> Sure. It is what I meant by 'fragile'.
>
>> I made some cosmetic updates to the code patch, as well.
>
> Thank you for leaving the hashvalue staff and revising the comment.
>
> By the way I mistakenly had left the following code in the
> previous patch.
>
> +     /* hashvalue == 0 means a cache reset, must clear all state */
> +     if (hashvalue == 0)
> +       entry->invalidated = true;
> +     else if ((cacheid == FOREIGNSERVEROID &&
> +           entry->server_hashvalue == hashvalue) ||
> +          (cacheid == USERMAPPINGOID &&
> +           entry->mapping_hashvalue == hashvalue))
> +       entry->invalidated = true;
>
> The reason for the redundancy was that it had used switch-case in
> the else block just before. However, it is no longer
> reasonable. I'd like to change here as the follows.
>
> +     /* hashvalue == 0 means a cache reset, must clear all state */
> +     if ((hashvalue == 0) ||
> +         ((cacheid == FOREIGNSERVEROID &&
> +           entry->server_hashvalue == hashvalue) ||
> +          (cacheid == USERMAPPINGOID &&
> +           entry->mapping_hashvalue == hashvalue)))
> +       entry->invalidated = true;
>
> The attached patch differs only in this point.
>

+1. The patch looks good to me.


-- 
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company



Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> writes:
> On Fri, Jul 21, 2017 at 10:55 AM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
>> The attached patch differs only in this point.

> +1. The patch looks good to me.

Pushed with a couple additional changes: we'd all missed that the header
comment for GetConnection was obsoleted by this change, and the arguments
for GetSysCacheHashValue really need to be coerced to Datum.  (I think
OID to Datum is the same as what the compiler would do anyway, but best
not to assume that.)

Back-patching was more exciting than I could wish.  It seems that
before 9.6, we did not have struct UserMapping storing the OID of the
pg_user_mapping row it had been made from.  I changed GetConnection to
re-look-up that row and get the OID.  But that's ugly, and there's a
race condition: if user mappings are being added or deleted meanwhile,
we might locate a per-user mapping when we're really using a PUBLIC
mapping or vice versa, causing the ConnCacheEntry to be labeled with
the wrong hash value so that it might not get invalidated properly later.
Still, it's significantly better than it was, and that corner case seems
unlikely to get hit in practice --- for one thing, you'd have to then
revert the mapping addition/deletion before the ConnCacheEntry would be
found and used again.  I don't want to take the risk of modifying struct
UserMapping in stable branches, so it's hard to see a way to make that
completely bulletproof before 9.6.
        regards, tom lane



Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Ashutosh Bapat
Date:
On Fri, Jul 21, 2017 at 10:39 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
> Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> writes:
>> On Fri, Jul 21, 2017 at 10:55 AM, Kyotaro HORIGUCHI
>> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
>>> The attached patch differs only in this point.
>
>> +1. The patch looks good to me.
>
> Pushed with a couple additional changes: we'd all missed that the header
> comment for GetConnection was obsoleted by this change, and the arguments
> for GetSysCacheHashValue really need to be coerced to Datum.  (I think
> OID to Datum is the same as what the compiler would do anyway, but best
> not to assume that.)

Thanks and sorry for not noticing the prologue.

>
> Back-patching was more exciting than I could wish.  It seems that
> before 9.6, we did not have struct UserMapping storing the OID of the
> pg_user_mapping row it had been made from.  I changed GetConnection to
> re-look-up that row and get the OID.  But that's ugly, and there's a
> race condition: if user mappings are being added or deleted meanwhile,
> we might locate a per-user mapping when we're really using a PUBLIC
> mapping or vice versa, causing the ConnCacheEntry to be labeled with
> the wrong hash value so that it might not get invalidated properly later.
> Still, it's significantly better than it was, and that corner case seems
> unlikely to get hit in practice --- for one thing, you'd have to then
> revert the mapping addition/deletion before the ConnCacheEntry would be
> found and used again.  I don't want to take the risk of modifying struct
> UserMapping in stable branches, so it's hard to see a way to make that
> completely bulletproof before 9.6.

+1.

-- 
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company



Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

From
Kyotaro HORIGUCHI
Date:
At Mon, 24 Jul 2017 10:23:07 +0530, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote in
<CAFjFpRc_q8wNOe-RDTfRSpC6Pey3AjADAJ4noEiujAthW60K7A@mail.gmail.com>
> On Fri, Jul 21, 2017 at 10:39 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
> > Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> writes:
> >> On Fri, Jul 21, 2017 at 10:55 AM, Kyotaro HORIGUCHI
> >> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> >>> The attached patch differs only in this point.
> >
> >> +1. The patch looks good to me.
> >
> > Pushed with a couple additional changes: we'd all missed that the header
> > comment for GetConnection was obsoleted by this change, and the arguments
> > for GetSysCacheHashValue really need to be coerced to Datum.  (I think
> > OID to Datum is the same as what the compiler would do anyway, but best
> > not to assume that.)
> 
> Thanks and sorry for not noticing the prologue.

Ditto.

> >
> > Back-patching was more exciting than I could wish.  It seems that
> > before 9.6, we did not have struct UserMapping storing the OID of the
> > pg_user_mapping row it had been made from.  I changed GetConnection to
> > re-look-up that row and get the OID.  But that's ugly, and there's a
> > race condition: if user mappings are being added or deleted meanwhile,
> > we might locate a per-user mapping when we're really using a PUBLIC
> > mapping or vice versa, causing the ConnCacheEntry to be labeled with
> > the wrong hash value so that it might not get invalidated properly later.
> > Still, it's significantly better than it was, and that corner case seems
> > unlikely to get hit in practice --- for one thing, you'd have to then
> > revert the mapping addition/deletion before the ConnCacheEntry would be
> > found and used again.  I don't want to take the risk of modifying struct
> > UserMapping in stable branches, so it's hard to see a way to make that
> > completely bulletproof before 9.6.
> 
> +1.

Agreed.

> -- 
> Best Wishes,
> Ashutosh Bapat
> EnterpriseDB Corporation
> The Postgres Database Company

-- 
Kyotaro Horiguchi
NTT Open Source Software Center