From 1733bad78bdf8c3cf190e262ee5d833779189c5b Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Sat, 1 May 2021 09:00:01 +0900 Subject: [PATCH v37 6/9] postgres_fdw marks foreign transaction as modified on modification. This commit enables postgres_fdw to execute two-phase commit protocol on transaction commit (without explicitly executing PREPARE TRANSACTION). Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- contrib/postgres_fdw/connection.c | 19 ++++++++++++++++++- contrib/postgres_fdw/postgres_fdw.c | 2 ++ contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index f8db97c641..262bf71485 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -63,6 +63,7 @@ typedef struct ConnCacheEntry bool keep_connections; /* setting value of keep_connections * server option */ Oid serverid; /* foreign server OID used to get server name */ + bool modified; /* true if data on the foreign server is modified */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ PgFdwConnState state; /* extra per-connection state */ @@ -311,6 +312,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->changing_xact_state = false; entry->invalidated = false; entry->serverid = server->serverid; + entry->modified = false; entry->server_hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(server->serverid)); @@ -346,6 +348,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->conn, server->servername, user->umid, user->userid); } +void +MarkConnectionModified(UserMapping *user) +{ + ConnCacheEntry *entry; + + entry = GetConnectionCacheEntry(user->umid); + + if (entry && !entry->modified) + { + FdwXactRegisterEntry(user, true); + entry->modified = true; + } +} + /* * Connect to remote server using specified server and user mapping properties. */ @@ -617,7 +633,7 @@ begin_remote_xact(ConnCacheEntry *entry, UserMapping *user) entry->conn); /* Register the foreign server to the transaction */ - FdwXactRegisterEntry(user); + FdwXactRegisterEntry(user, false); if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; @@ -626,6 +642,7 @@ begin_remote_xact(ConnCacheEntry *entry, UserMapping *user) entry->changing_xact_state = true; do_sql_command(entry->conn, sql); entry->xact_depth = 1; + entry->modified = false; entry->changing_xact_state = false; } diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index e1c6bd9330..2e2aee47b4 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2654,6 +2654,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * establish new connection if necessary. */ dmstate->conn = GetConnection(user, false, &dmstate->conn_state); + MarkConnectionModified(user); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -3968,6 +3969,7 @@ create_foreign_modify(EState *estate, /* Open connection; report that we'll create a prepared statement. */ fmstate->conn = GetConnection(user, true, &fmstate->conn_state); + MarkConnectionModified(user); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 97e4f244db..4fedbb76c4 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -144,6 +144,7 @@ extern void process_pending_request(AsyncRequest *areq); extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state); extern void ReleaseConnection(PGconn *conn); +extern void MarkConnectionModified(UserMapping *user); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern void do_sql_command(PGconn *conn, const char *sql); -- 2.24.3 (Apple Git-128)