From 63220a37295f98a022835e121465481fc947b3ee Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 25 Nov 2020 21:02:29 +0900 Subject: [PATCH v37 5/9] Prepare foreign transactions at commit time With this commit, the foreign server modified within the transaction marked as 'modified'. On the 'modified' servers, foreign transactions are prepared automatically if foreign_twophase_commit is 'required'. Previously, users need to do PREPARE TRANSACTION and COMMIT/ROLLBACK PREPARED to use two-phase commit protocol. This commit enables users to use two-phase commit protocol transparently. Prepared foreign transactions are resolved in asynchronous manner by foreign transaction resolver process. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- src/backend/access/transam/fdwxact.c | 164 +++++++++++++++++- src/backend/utils/misc/guc.c | 28 +++ src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/access/fdwxact.h | 11 +- src/include/foreign/fdwapi.h | 2 +- 5 files changed, 197 insertions(+), 10 deletions(-) diff --git a/src/backend/access/transam/fdwxact.c b/src/backend/access/transam/fdwxact.c index 0c6e80a6de..9280d79a3a 100644 --- a/src/backend/access/transam/fdwxact.c +++ b/src/backend/access/transam/fdwxact.c @@ -20,6 +20,23 @@ * * FOREIGN TRANSACTION RESOLUTION * + * The transaction involving multiple foreign transactions uses two-phase commit + * protocol to commit the distributed transaction if enabled. The basic strategy + * is that we prepare all of the remote transactions before committing locally and + * commit them after committing locally. + * + * At pre-commit of local transaction, we prepare the transactions on all foreign + * servers after logging the information of foreign transaction. The result of + * distributed transaction is determined by the result of the corresponding local + * transaction. Once the local transaction is successfully committed, all + * transactions on foreign servers must be committed. In case where an error occurred + * before the local transaction commit all transactions must be aborted. After + * committing or rolling back locally, we leave foreign transactions as in-doubt + * transactions and then notify the resolver process. The resolver process asynchronously + * resolves these foreign transactions according to the result of the corresponding local + * transaction. Also, the user can use pg_resolve_foreign_xact() SQL function to + * resolve a foreign transaction manually. + * * At PREPARE TRANSACTION, we prepare all transactions on foreign servers by executing * PrepareForeignTransaction() API for each foreign transaction regardless of data on * the foreign server having been modified. At COMMIT PREPARED and ROLLBACK PREPARED, @@ -97,8 +114,10 @@ #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lock.h" +#include "storage/pmsignal.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -119,6 +138,10 @@ #define ServerSupportTwophaseCommit(fdwent) \ (((FdwXactEntry *)(fdwent))->prepare_foreign_xact_fn != NULL) +/* Foreign twophase commit is enabled and requested by user */ +#define IsForeignTwophaseCommitRequested() \ + (foreign_twophase_commit > FOREIGN_TWOPHASE_COMMIT_DISABLED) + /* * Name of foreign prepared transaction file is 8 bytes xid and * user mapping OID separated by '_'. @@ -152,6 +175,9 @@ typedef struct FdwXactEntry */ FdwXactState fdwxact; + /* true if modified the data on the server */ + bool modified; + /* Callbacks for foreign transaction */ CommitForeignTransaction_function commit_foreign_xact_fn; RollbackForeignTransaction_function rollback_foreign_xact_fn; @@ -167,10 +193,13 @@ typedef struct DistributedXactStateData { bool local_prepared; /* will (did) we prepare the local transaction? */ + bool twophase_commit_required; + /* Statistics of participants */ int nparticipants_no_twophase; /* how many participants doesn't * support two-phase commit * protocol? */ + int nparticipants_modified; /* how many participants are modified? */ HTAB *participants; /* foreign transaction participants (FdwXactEntry) */ List *serveroids_uniq; /* list of unique server OIDs in @@ -178,7 +207,9 @@ typedef struct DistributedXactStateData } DistributedXactStateData; static DistributedXactStateData DistributedXactState = { .local_prepared = false, + .twophase_commit_required = false, .nparticipants_no_twophase = 0, + .nparticipants_modified = 0, .participants = NULL, .serveroids_uniq = NIL, }; @@ -191,18 +222,19 @@ static DistributedXactStateData DistributedXactState = { /* Keep track of registering process exit call back. */ static bool fdwXactExitRegistered = false; + /* Guc parameter */ int max_prepared_foreign_xacts = 0; int max_foreign_xact_resolvers = 0; +int foreign_twophase_commit = FOREIGN_TWOPHASE_COMMIT_DISABLED; static void RemoveFdwXactEntry(Oid umid); static void EndFdwXactEntry(FdwXactEntry *fdwent, bool isCommit, bool is_parallel_worker); static char *getFdwXactIdentifier(FdwXactEntry *fdwent, TransactionId xid); static void ForgetAllParticipants(void); -static void FdwXactLaunchResolvers(void); -static void PrepareAllFdwXacts(TransactionId xid); +static void PrepareAllFdwXacts(TransactionId xid, bool prepare_all); static XLogRecPtr FdwXactInsertEntry(TransactionId xid, FdwXactEntry *fdwent, char *identifier); static void AtProcExit_FdwXact(int code, Datum arg); @@ -216,6 +248,7 @@ static char *ProcessFdwXactBuffer(TransactionId xid, Oid umid, static char *ReadFdwXactStateFile(TransactionId xid, Oid umid); static void RemoveFdwXactStateFile(TransactionId xid, Oid umid, bool giveWarning); static void RecreateFdwXactFile(TransactionId xid, Oid umid, void *content, int len); +static bool checkForeignTwophaseCommitRequired(bool local_modified); static FdwXactState insert_fdwxact(Oid dbid, TransactionId xid, Oid umid, Oid serverid, Oid owner, char *identifier); @@ -292,7 +325,7 @@ FdwXactShmemInit(void) * given user mapping OID as a participant of the transaction. */ void -FdwXactRegisterEntry(UserMapping *usermapping) +FdwXactRegisterEntry(UserMapping *usermapping, bool modified) { FdwXactEntry *fdwent; FdwRoutine *routine; @@ -318,8 +351,21 @@ FdwXactRegisterEntry(UserMapping *usermapping) fdwent = hash_search(DistributedXactState.participants, (void *) &umid, HASH_ENTER, &found); + /* Already registered */ if (found) + { + /* Update statistics if necessary */ + if (fdwent->modified && !modified) + DistributedXactState.nparticipants_modified--; + else if (!fdwent->modified && modified) + DistributedXactState.nparticipants_modified++; + + fdwent->modified = modified; + + Assert(DistributedXactState.nparticipants_modified <= + hash_get_num_entries(DistributedXactState.participants)); return; + } /* * The participant information needs to live until the end of the @@ -341,6 +387,7 @@ FdwXactRegisterEntry(UserMapping *usermapping) (errmsg("cannot register foreign server not supporting transaction callback"))); fdwent->fdwxact = NULL; + fdwent->modified = modified; fdwent->commit_foreign_xact_fn = routine->CommitForeignTransaction; fdwent->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; fdwent->prepare_foreign_xact_fn = routine->PrepareForeignTransaction; @@ -350,8 +397,12 @@ FdwXactRegisterEntry(UserMapping *usermapping) /* Update statistics */ if (!ServerSupportTwophaseCommit(fdwent)) DistributedXactState.nparticipants_no_twophase++; + if (fdwent->modified) + DistributedXactState.nparticipants_modified++; Assert(DistributedXactState.nparticipants_no_twophase <= + hash_get_num_entries(DistributedXactState.participants)); + Assert(DistributedXactState.nparticipants_modified <= hash_get_num_entries(DistributedXactState.participants)); } @@ -381,9 +432,13 @@ RemoveFdwXactEntry(Oid umid) /* Update statistics */ if (!ServerSupportTwophaseCommit(fdwent)) DistributedXactState.nparticipants_no_twophase--; + if (fdwent->modified) + DistributedXactState.nparticipants_modified--; Assert(DistributedXactState.nparticipants_no_twophase <= hash_get_num_entries(DistributedXactState.participants)); + Assert(DistributedXactState.nparticipants_modified <= + hash_get_num_entries(DistributedXactState.participants)); } } @@ -454,12 +509,15 @@ AtEOXact_FdwXact(bool isCommit, bool is_parallel_worker) * transaction after preparing the foreign transactions. In this case, we * need to rollback the prepared transaction on the foreign servers. */ - if (DistributedXactState.local_prepared && !isCommit) + if (DistributedXactState.twophase_commit_required || + (DistributedXactState.local_prepared && !isCommit)) FdwXactLaunchResolvers(); /* Reset all fields */ DistributedXactState.local_prepared = false; + DistributedXactState.twophase_commit_required = false; DistributedXactState.nparticipants_no_twophase = 0; + DistributedXactState.nparticipants_modified = 0; list_free(DistributedXactState.serveroids_uniq); DistributedXactState.serveroids_uniq = NIL; } @@ -533,7 +591,7 @@ AtPrepare_FdwXact(void) */ DistributedXactState.local_prepared = true; - PrepareAllFdwXacts(xid); + PrepareAllFdwXacts(xid, true); } /* @@ -545,6 +603,8 @@ PreCommit_FdwXact(bool is_parallel_worker) { HASH_SEQ_STATUS scan; FdwXactEntry *fdwent; + TransactionId xid; + bool local_modified; /* * If there is no foreign server involved or all foreign transactions are @@ -555,6 +615,41 @@ PreCommit_FdwXact(bool is_parallel_worker) Assert(!RecoveryInProgress()); + /* + * Check if the current transaction did writes. We need to include the + * local node to the distributed transaction participant and to regard it + * as modified, if the current transaction has performed WAL logging and + * has assigned an xid. The transaction can end up not writing any WAL, + * even if it has an xid, if it only wrote to temporary and/or unlogged + * tables. It can end up having written WAL without an xid if did HOT + * pruning. + */ + xid = GetTopTransactionIdIfAny(); + local_modified = (TransactionIdIsValid(xid) && (XactLastRecEnd != 0)); + + /* + * Perform twophase commit if required. Note that we don't support foreign + * twophase commit in single user mode. + */ + if (IsUnderPostmaster && checkForeignTwophaseCommitRequired(local_modified)) + { + /* + * Two-phase commit is required. Assign a transaction id to the + * current transaction if not yet because the local transaction is + * necessary to determine the result of the distributed transaction. + * Then we prepare foreign transactions on foreign servers that support + * two-phase commit. Note that we keep FdwXactParticipants until the + * end of the transaction. + */ + if (!TransactionIdIsValid(xid)) + xid = GetTopTransactionId(); + + DistributedXactState.twophase_commit_required = true; + PrepareAllFdwXacts(xid, false); + + return; + } + /* Commit all foreign transactions in the participant list */ hash_seq_init(&scan, DistributedXactState.participants); while ((fdwent = (FdwXactEntry *) hash_seq_search(&scan))) @@ -684,6 +779,53 @@ CheckPointFdwXacts(XLogRecPtr redo_horizon) serialized_fdwxacts))); } +/* + * Return true if the current transaction modifies data on two or more servers + * in FdwXactParticipants and local server itself. + */ +static bool +checkForeignTwophaseCommitRequired(bool local_modified) +{ + int nserverswritten; + + if (!IsForeignTwophaseCommitRequested()) + return false; + + nserverswritten = DistributedXactState.nparticipants_modified; + + /* Did we modify the local non-temporary data? */ + if (local_modified) + nserverswritten++; + + /* + * Two-phase commit is not required if the number of servers performing + * writes is less than 2. + */ + if (nserverswritten < 2) + return false; + + if (DistributedXactState.nparticipants_no_twophase > 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot process a distributed transaction that has operated on a foreign server that does not support two-phase commit protocol"), + errdetail("foreign_twophase_commit is \'required\' but the transaction has some foreign servers which are not capable of two-phase commit"))); + + /* Two-phase commit is required. Check parameters */ + if (max_prepared_foreign_xacts == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("foreign two-phase commit is required but prepared foreign transactions are disabled"), + errhint("Set max_prepared_foreign_transactions to a nonzero value."))); + + if (max_foreign_xact_resolvers == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("foreign two-phase commit is required but prepared foreign transactions are disabled"), + errhint("Set max_foreign_transaction_resolvers to a nonzero value."))); + + return true; +} + /* * Prepare all foreign transactions. * @@ -704,9 +846,12 @@ CheckPointFdwXacts(XLogRecPtr redo_horizon) * able to resolve it after the server crash. Hence persist first then prepare. * Point (b) guarantees that foreign transaction information are not lost even * if the failover happens. + * + * If prepare_all is true, we prepare all foreign transaction regardless of + * writes having happened on the server. */ static void -PrepareAllFdwXacts(TransactionId xid) +PrepareAllFdwXacts(TransactionId xid, bool prepare_all) { FdwXactEntry *fdwent; XLogRecPtr flush_lsn; @@ -725,6 +870,9 @@ PrepareAllFdwXacts(TransactionId xid) CHECK_FOR_INTERRUPTS(); + if (!prepare_all && !fdwent->modified) + continue; + /* Get prepared transaction identifier */ identifier = getFdwXactIdentifier(fdwent, xid); Assert(identifier); @@ -1094,7 +1242,7 @@ ForgetAllParticipants(void) Assert(!HasFdwXactParticipant()); } -static void +void FdwXactLaunchResolvers(void) { if (list_length(DistributedXactState.serveroids_uniq) > 0) @@ -1283,7 +1431,7 @@ FdwXactGetTransactionFate(TransactionId xid) * * Note: content and len don't include CRC. */ -void +static void RecreateFdwXactFile(TransactionId xid, Oid umid, void *content, int len) { char path[MAXPGPATH]; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 65815ec047..aad92816f5 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -557,6 +557,24 @@ static const struct config_enum_entry wal_compression_options[] = { {NULL, 0, false} }; +/* + * Although only "required" and "disabled" are documented, we accept all + * the likely variants of "on" and "off". + */ +static const struct config_enum_entry foreign_twophase_commit_options[] = { + {"required", FOREIGN_TWOPHASE_COMMIT_REQUIRED, false}, + {"disabled", FOREIGN_TWOPHASE_COMMIT_DISABLED, false}, + {"on", FOREIGN_TWOPHASE_COMMIT_REQUIRED, false}, + {"off", FOREIGN_TWOPHASE_COMMIT_DISABLED, false}, + {"true", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"false", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {"yes", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"no", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {"1", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"0", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {NULL, 0, false} +}; + /* * Options for enum values stored in other modules */ @@ -4824,6 +4842,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, assign_synchronous_commit, NULL }, + { + {"foreign_twophase_commit", PGC_USERSET, FOREIGN_TRANSACTION, + gettext_noop("Use of foreign twophase commit for the current transaction."), + NULL + }, + &foreign_twophase_commit, + FOREIGN_TWOPHASE_COMMIT_DISABLED, foreign_twophase_commit_options, + NULL, NULL, NULL + }, + { {"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING, gettext_noop("Allows archiving of WAL files using archive_command."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9cc35c7109..7619da024b 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -757,6 +757,8 @@ # retrying to resolve # foreign transactions # after a failed attempt +#foreign_twophase_commit = disabled # use two-phase commit for distributed transactions: + # disabled or required #------------------------------------------------------------------------------ # VERSION AND PLATFORM COMPATIBILITY diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h index 85854864b9..8ac27a0b67 100644 --- a/src/include/access/fdwxact.h +++ b/src/include/access/fdwxact.h @@ -22,6 +22,14 @@ * without preparation */ #define FDWXACT_FLAG_PARALLEL_WORKER 0x02 /* is parallel worker? */ +/* Enum for foreign_twophase_commit parameter */ +typedef enum +{ + FOREIGN_TWOPHASE_COMMIT_DISABLED, /* disable foreign twophase commit */ + FOREIGN_TWOPHASE_COMMIT_REQUIRED /* all foreign servers have to support + * twophase commit */ +} ForeignTwophaseCommitLevel; + /* Enum to track the status of foreign transaction */ typedef enum { @@ -100,6 +108,7 @@ extern int max_prepared_foreign_xacts; extern int max_foreign_xact_resolvers; extern int foreign_xact_resolution_retry_interval; extern int foreign_xact_resolver_timeout; +extern int foreign_twophase_commit; /* Function declarations */ extern void PreCommit_FdwXact(bool is_parallel_worker); @@ -107,7 +116,7 @@ extern void AtEOXact_FdwXact(bool isCommit, bool is_parallel_worker); extern Size FdwXactShmemSize(void); extern void FdwXactShmemInit(void); extern void AtPrepare_FdwXact(void); -extern bool FdwXactIsForeignTwophaseCommitRequired(void); +extern void FdwXactLaunchResolvers(void); extern int CountFdwXactsForUserMapping(Oid umid); extern int CountFdwXactsForDB(Oid dbid); extern void FdwXactLaunchResolversForXid(TransactionId xid); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 5338f4f2d9..05c758f869 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -303,7 +303,7 @@ extern bool IsImportableForeignTable(const char *tablename, extern Path *GetExistingLocalJoinPath(RelOptInfo *joinrel); /* Functions in transam/fdwxact.c */ -extern void FdwXactRegisterEntry(UserMapping *usermapping); +extern void FdwXactRegisterEntry(UserMapping *usermapping, bool modified); extern void FdwXactUnregisterEntry(UserMapping *usermapping); #endif /* FDWAPI_H */ -- 2.24.3 (Apple Git-128)