Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING - Mailing list pgsql-hackers
From | Kyotaro HORIGUCHI |
---|---|
Subject | Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING |
Date | |
Msg-id | 20170718.181213.206979369.horiguchi.kyotaro@lab.ntt.co.jp Whole thread Raw |
In response to | Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING (Tom Lane <tgl@sss.pgh.pa.us>) |
Responses |
Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING
|
List | pgsql-hackers |
Thank you for the comments. At Mon, 17 Jul 2017 16:09:04 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <9897.1500322144@sss.pgh.pa.us> > Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes: > > This is the revased and revised version of the previous patch. > > A few more comments: > > * Per the spec for CacheRegisterSyscacheCallback, a zero hash value means > to flush all associated state. This isn't handling that case correctly. Right, fixed. > Even when you are given a specific hash value, I think exiting after > finding one match is incorrect: there could be multiple connections > to the same server with different user mappings, or vice versa. Sure. I'm confused that key hash value nails an entry in "the connection cache". Thank you for pointing out that. > * I'm not really sure that it's worth the trouble to pay attention > to the hash value at all. Very few other syscache callbacks do, > and the pg_server/pg_user_mapping catalogs don't seem likely to > have higher than average traffic. Agreed to the points. But there is another point that reconection is far intensive than re-looking up of a system catalog or maybe even than replanning. For now I choosed to avoid a possibility of causing massive number of simultaneous reconnection. > * Personally I'd be inclined to register the callbacks at the same > time the hashtables are created, which is a pattern we use elsewhere > ... in fact, postgres_fdw itself does it that way for the transaction > related callbacks, so it seems a bit weird that you are going in a > different direction for these callbacks. That way avoids the need to > depend on a _PG_init function and means that the callbacks don't have to > worry about the hashtables not being valid. Sure, seems more reasonable than it is now. Changed the way of registring a callback in the attached. > Also, it seems a bit > pointless to have separate layers of postgresMappingSysCallback and > InvalidateConnectionForMapping functions. It used to be one function but it seemed a bit wierd that the function is called from two places (two catalogs) then branchs according to the caller. I don't have a firm opinion on this so changed. As the result the changes in postgres_fdw.c has been disappeard. > * How about some regression test cases? You couldn't really exercise > cross-session invalidation easily, but I don't think you need to. Ha Ha. You got me. I will add some test cases for this in the next version. Thanks. Ashutosh, I'll register this to the next CF after providing a regression, thanks. regards, -- Kyotaro Horiguchi NTT Open Source Software Center *** a/contrib/postgres_fdw/connection.c --- b/contrib/postgres_fdw/connection.c *************** *** 22,27 **** --- 22,28 ---- #include "pgstat.h" #include "storage/latch.h" #include "utils/hsearch.h" + #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" *************** *** 53,58 **** typedef struct ConnCacheEntry --- 54,62 ---- bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state changein process */ + bool invalidated; /* true if reconnect is requried */ + uint32 server_hashvalue; /* hash value of foreign server oid */ + uint32 mapping_hashvalue; /* hash value of user mapping oid */ } ConnCacheEntry; /* *************** *** 69,74 **** static bool xact_got_connection = false; --- 73,79 ---- /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping*user); + static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); *************** *** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event, --- 83,89 ---- SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); + static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry); static bool pgfdw_cancel_query(PGconn *conn); static boolpgfdw_exec_cleanup_query(PGconn *conn, const char *query, *************** *** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt) --- 136,145 ---- */ RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback,NULL); + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + pgfdw_inval_callback, (Datum)0); + CacheRegisterSyscacheCallback(USERMAPPINGOID, + pgfdw_inval_callback, (Datum)0); } /* Set flag that we did GetConnectionduring the current transaction */ *************** *** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt) entry = hash_search(ConnectionHash, &key, HASH_ENTER,&found); if (!found) { ! /* initialize new hashtable entry (key is already filled in) */ entry->conn = NULL; - entry->xact_depth = 0; - entry->have_prep_stmt = false; - entry->have_error = false; - entry->changing_xact_state = false; } /* Reject further use of connections which failed abort cleanup.*/ pgfdw_reject_incomplete_xact_state_change(entry); /* * We don't check the health of cached connectionhere, because it would * require some overhead. Broken connection will be detected when the --- 154,182 ---- entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); if (!found) { ! /* ! * key is already filled in, flags well be initialized at the time of ! * making a new connection, so just clear conn here. ! */ entry->conn = NULL; } /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); + + /* + * This connection is no longer valid. Disconnect such connections if no + * transaction is running. + */ + if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) + { + /* reconneced immediately, so the messages is "reconnecting" */ + elog(DEBUG3, "closing connection %p for option changes to take effect", + entry->conn); + disconnect_pg_server(entry); + } + /* * We don't check the health of cached connection here, because it would * require some overhead. Brokenconnection will be detected when the *************** *** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt) --- 195,206 ---- entry->xact_depth = 0; /* just to be sure */ entry->have_prep_stmt = false; entry->have_error= false; + entry->invalidated = false; + entry->changing_xact_state = false; + entry->server_hashvalue = + GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid); + entry->mapping_hashvalue = + GetSysCacheHashValue1(USERMAPPINGOID, user->umid); entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", *************** *** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user) --- 304,321 ---- return conn; } + /* disconnect the connection for a connection cache entry */ + static void + disconnect_pg_server(ConnCacheEntry *entry) + { + if (entry->conn != NULL) + { + PQfinish(entry->conn); + entry->conn = NULL; + } + } + + /* * For non-superusers, insist that the connstr specify a password. This * prevents a password from being picked upfrom .pgpass, a service file, *************** *** 429,434 **** ReleaseConnection(PGconn *conn) --- 469,519 ---- } /* + * Connection invalidation callback function + * + * Mark the connections to be disconnected if it depends on a foreign server + * or user mapping any options on which have been modified. + * + * Although most invalidate callbacks blow away all the related stuff + * regardless of the given hashvalue, if we blow away all existing connection + * from this server, it can cause a massive number of simultaneous + * reconnection to all the remotes. We restrict the connection to break to + * avoid such a mess. + * + * NB: We could avoid unnecessary disconnection more strictly by examining + * individual option values but it would be too-much for the gain. + */ + static void + pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) + { + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + if (!ConnectionHash) + return; + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->conn == NULL) + continue; + + if (hashvalue == 0) + entry->invalidated = true; + else + { + Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID); + + if ((cacheid == FOREIGNSERVEROID && + entry->server_hashvalue == hashvalue) || + (cacheid == USERMAPPINGOID && + entry->mapping_hashvalue == hashvalue)) + entry->invalidated = true; + } + } + } + + /* * Assign a "unique" number for a cursor. * * These really only need to be unique per connection within a transaction. *************** *** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg) entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); ! PQfinish(entry->conn); ! entry->conn = NULL; ! entry->changing_xact_state = false; } } --- 862,868 ---- entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); ! disconnect_pg_server(entry); } } *************** *** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) Form_pg_user_mapping umform; ForeignServer*server; ! if (!entry->changing_xact_state) return; tup = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key)); if (!HeapTupleIsValid(tup)) --- 996,1009 ---- Form_pg_user_mapping umform; ForeignServer *server; ! /* nothing to do for inactive entries and entries of sane state */ ! if (entry->conn ==NULL || !entry->changing_xact_state) return; + /* make sure this entry is inactive */ + disconnect_pg_server(entry); + + /* find server name to be shown in the message below */ tup = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key)); if (!HeapTupleIsValid(tup))
pgsql-hackers by date: