Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING - Mailing list pgsql-hackers
| From | Kyotaro HORIGUCHI |
|---|---|
| Subject | Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING |
| Date | |
| Msg-id | 20170718.181213.206979369.horiguchi.kyotaro@lab.ntt.co.jp Whole thread Raw |
| In response to | Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING (Tom Lane <tgl@sss.pgh.pa.us>) |
| Responses |
Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING
|
| List | pgsql-hackers |
Thank you for the comments.
At Mon, 17 Jul 2017 16:09:04 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <9897.1500322144@sss.pgh.pa.us>
> Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes:
> > This is the revased and revised version of the previous patch.
>
> A few more comments:
>
> * Per the spec for CacheRegisterSyscacheCallback, a zero hash value means
> to flush all associated state. This isn't handling that case correctly.
Right, fixed.
> Even when you are given a specific hash value, I think exiting after
> finding one match is incorrect: there could be multiple connections
> to the same server with different user mappings, or vice versa.
Sure. I'm confused that key hash value nails an entry in "the
connection cache". Thank you for pointing out that.
> * I'm not really sure that it's worth the trouble to pay attention
> to the hash value at all. Very few other syscache callbacks do,
> and the pg_server/pg_user_mapping catalogs don't seem likely to
> have higher than average traffic.
Agreed to the points. But there is another point that reconection
is far intensive than re-looking up of a system catalog or maybe
even than replanning. For now I choosed to avoid a possibility of
causing massive number of simultaneous reconnection.
> * Personally I'd be inclined to register the callbacks at the same
> time the hashtables are created, which is a pattern we use elsewhere
> ... in fact, postgres_fdw itself does it that way for the transaction
> related callbacks, so it seems a bit weird that you are going in a
> different direction for these callbacks. That way avoids the need to
> depend on a _PG_init function and means that the callbacks don't have to
> worry about the hashtables not being valid.
Sure, seems more reasonable than it is now. Changed the way of
registring a callback in the attached.
> Also, it seems a bit
> pointless to have separate layers of postgresMappingSysCallback and
> InvalidateConnectionForMapping functions.
It used to be one function but it seemed a bit wierd that the
function is called from two places (two catalogs) then branchs
according to the caller. I don't have a firm opinion on this so
changed.
As the result the changes in postgres_fdw.c has been disappeard.
> * How about some regression test cases? You couldn't really exercise
> cross-session invalidation easily, but I don't think you need to.
Ha Ha. You got me. I will add some test cases for this in the
next version. Thanks.
Ashutosh, I'll register this to the next CF after providing a
regression, thanks.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ---- #include "pgstat.h" #include "storage/latch.h" #include "utils/hsearch.h"
+ #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h"
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 54,62 ---- bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool
have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state
changein process */
+ bool invalidated; /* true if reconnect is requried */
+ uint32 server_hashvalue; /* hash value of foreign server oid */
+ uint32 mapping_hashvalue; /* hash value of user mapping oid */ } ConnCacheEntry; /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 73,79 ---- /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server,
UserMapping*user);
+ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const
char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char
*sql);
***************
*** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event,
--- 83,89 ---- SubTransactionId mySubid, SubTransactionId parentSubid,
void *arg);
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry); static bool pgfdw_cancel_query(PGconn *conn); static
boolpgfdw_exec_cleanup_query(PGconn *conn, const char *query,
***************
*** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 136,145 ---- */ RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback,NULL);
+ CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+ pgfdw_inval_callback, (Datum)0);
+ CacheRegisterSyscacheCallback(USERMAPPINGOID,
+ pgfdw_inval_callback, (Datum)0); } /* Set flag that we did
GetConnectionduring the current transaction */
***************
*** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt) entry = hash_search(ConnectionHash, &key,
HASH_ENTER,&found); if (!found) {
! /* initialize new hashtable entry (key is already filled in) */ entry->conn = NULL;
- entry->xact_depth = 0;
- entry->have_prep_stmt = false;
- entry->have_error = false;
- entry->changing_xact_state = false; } /* Reject further use of connections which failed abort
cleanup.*/ pgfdw_reject_incomplete_xact_state_change(entry); /* * We don't check the health of cached
connectionhere, because it would * require some overhead. Broken connection will be detected when the
--- 154,182 ---- entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); if (!found) {
! /*
! * key is already filled in, flags well be initialized at the time of
! * making a new connection, so just clear conn here.
! */ entry->conn = NULL; } /* Reject further use of connections which failed abort cleanup.
*/ pgfdw_reject_incomplete_xact_state_change(entry);
+
+ /*
+ * This connection is no longer valid. Disconnect such connections if no
+ * transaction is running.
+ */
+ if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+ {
+ /* reconneced immediately, so the messages is "reconnecting" */
+ elog(DEBUG3, "closing connection %p for option changes to take effect",
+ entry->conn);
+ disconnect_pg_server(entry);
+ }
+ /* * We don't check the health of cached connection here, because it would * require some overhead.
Brokenconnection will be detected when the
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 195,206 ---- entry->xact_depth = 0; /* just to be sure */ entry->have_prep_stmt = false;
entry->have_error= false;
+ entry->invalidated = false;
+ entry->changing_xact_state = false;
+ entry->server_hashvalue =
+ GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+ entry->mapping_hashvalue =
+ GetSysCacheHashValue1(USERMAPPINGOID, user->umid); entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
***************
*** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 304,321 ---- return conn; }
+ /* disconnect the connection for a connection cache entry */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+ if (entry->conn != NULL)
+ {
+ PQfinish(entry->conn);
+ entry->conn = NULL;
+ }
+ }
+
+ /* * For non-superusers, insist that the connstr specify a password. This * prevents a password from being picked
upfrom .pgpass, a service file,
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 469,519 ---- } /*
+ * Connection invalidation callback function
+ *
+ * Mark the connections to be disconnected if it depends on a foreign server
+ * or user mapping any options on which have been modified.
+ *
+ * Although most invalidate callbacks blow away all the related stuff
+ * regardless of the given hashvalue, if we blow away all existing connection
+ * from this server, it can cause a massive number of simultaneous
+ * reconnection to all the remotes. We restrict the connection to break to
+ * avoid such a mess.
+ *
+ * NB: We could avoid unnecessary disconnection more strictly by examining
+ * individual option values but it would be too-much for the gain.
+ */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+
+ if (!ConnectionHash)
+ return;
+
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->conn == NULL)
+ continue;
+
+ if (hashvalue == 0)
+ entry->invalidated = true;
+ else
+ {
+ Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+
+ if ((cacheid == FOREIGNSERVEROID &&
+ entry->server_hashvalue == hashvalue) ||
+ (cacheid == USERMAPPINGOID &&
+ entry->mapping_hashvalue == hashvalue))
+ entry->invalidated = true;
+ }
+ }
+ }
+
+ /* * Assign a "unique" number for a cursor. * * These really only need to be unique per connection within a
transaction.
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg) entry->changing_xact_state) {
elog(DEBUG3, "discarding connection %p", entry->conn);
! PQfinish(entry->conn);
! entry->conn = NULL;
! entry->changing_xact_state = false; } }
--- 862,868 ---- entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p",
entry->conn);
! disconnect_pg_server(entry); } }
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) Form_pg_user_mapping umform;
ForeignServer*server;
! if (!entry->changing_xact_state) return; tup = SearchSysCache1(USERMAPPINGOID,
ObjectIdGetDatum(entry->key)); if (!HeapTupleIsValid(tup))
--- 996,1009 ---- Form_pg_user_mapping umform; ForeignServer *server;
! /* nothing to do for inactive entries and entries of sane state */
! if (entry->conn ==NULL || !entry->changing_xact_state) return;
+ /* make sure this entry is inactive */
+ disconnect_pg_server(entry);
+
+ /* find server name to be shown in the message below */ tup = SearchSysCache1(USERMAPPINGOID,
ObjectIdGetDatum(entry->key)); if (!HeapTupleIsValid(tup))
pgsql-hackers by date: