diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index ac339fb..09f67a3 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1432,6 +1432,27 @@ include_dir 'conf.d'
+
+ max_prepared_foreign_transactions (integer)
+
+ max_prepared_foreign_transactions> configuration parameter
+
+
+
+
+ Sets the maximum number of foreign transactions that can be prepared
+ simultaneously.
+ This parameter can only be set at server start.
+
+
+
+ When running a standby server, you must set this parameter to the
+ same or higher value than on the master server. Otherwise, queries
+ will not be allowed in the standby server.
+
+
+
+
work_mem (integer)
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index dbeaab5..639e38b 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -1714,5 +1714,92 @@ GetForeignServerByName(const char *name, bool missing_ok);
+
+ Transaction manager for Foreign Data Wrappers
+
+
+ PostgreSQL> transaction manager allows FDWs to read and write
+ data on foreign server within a transaction while maintaining atomicity
+ (and hence consistency) of the foreign data. Every Foreign Data Wrapper is
+ required to register the foreign server along with the PostgreSQL>
+ user whose user mapping is used to connect to the foreign server while starting a
+ transaction on the foreign server as part of the transaction on
+ PostgreSQL> using RegisterXactForeignServer>.
+
+void
+RegisterXactForeignServer(Oid serverid,
+ Oid userid,
+ bool two_phase_compliant)
+
+ two_phase_compliant> should be true if the foreign server supports
+ two-phase commit protocol, false otherwise.
+
+
+
+ An example of such transaction is as follows
+
+BEGIN;
+UPDATE ft1 SET col = 'a';
+UPDATE ft2 SET col = 'b';
+COMMIT;
+
+ ft1 and ft2 are foreign tables on different foreign servers may be using different
+ Foreign Data Wrappers.
+
+
+
+ When max_prepared_foreign_transactions> is more than zero
+ PostgreSQL> employs Two-phase commit protocol to achieve
+ atomic distributed transaction. All the foreign servers registered should
+ support two-phase commit protocol. The two-phase commit protocol is used for
+ achieving atomic distributed transaction when more than two foreign servers
+ that support two-phase commit protocol are involved with transaction, or
+ transaction involves with one foreign server that support two-phase commit
+ protocol and changes on local data. In other case, for example where only one
+ foreign server that support two-phase commit is involved with transaction,
+ the two-phase commit protocol is not used. In Two-phase commit protocol is
+ processed in two phases: prepare phase and commit phase. In prepare phase,
+ PostgreSQL> prepares the transactions on all the foreign
+ servers registered using RegisterXactForeignServer>. If any of
+ the foreign server fails to prepare transaction, prepare phase fails. In commit
+ phase, all the prepared transactions are committed if prepare phase has succeeded
+ or rolled back if prepare phase fails to prepare transactions on all the foreign
+ servers.
+
+
+
+ During prepare phase the distributed transaction manager calls
+ GetPrepareId> to get the prepared transaction identifier for
+ each foreign server involved. It stores this identifier along with the
+ serverid and userid for later use. It then calls
+ ResolvePreparedForeignTranscation> with the same identifier
+ with action FDW_XACT_RESOLVED.
+
+
+
+ During commit phase the distributed transaction manager calls
+ ResolveForeignTransaction> with the same identifier with action
+ FDW_XACT_COMMITTING_PREPARED to commit the prepared transaction or
+ FDW_XACT_ABORTING_PREPARED to rollback the prepared transaction. In case the
+ distributed transaction manager fails to commit or rollback a prepared
+ transaction because of connection failure, the operation can be tried again
+ through built-in pg_fdw_xact>. One may set up a background worker
+ process to retry the operation by installing extension fdw_transaction_resolver
+ and including $libdir/fdw_transaction_resolver.so in
+ shared_preload_libraries>.
+
+
+
+ When max_prepared_foreign_transaction> is zero, atomicity commit can
+ not be guaranteed across foreign servers. If transaction on PostgreSQL>
+ is committed, Distributed transaction manager commit the transaction on all the
+ foreign servers registered using RegisterXactForeignServer>,
+ independent of the outcome of the same operation on other foreign servers.
+ Thus transactions on some foreign servers may be committed, while the same
+ on other foreign servers would be rolled back. If the transaction on
+ PostgreSQL> aborts transactions on all the foreign servers
+ are aborted too.
+
+
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index 5514db1..6e23ec1 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -8,9 +8,10 @@ subdir = src/backend/access/rmgrdesc
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
- gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
- mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
- smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
+OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o fdw_xactdesc.o \
+ genericdesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \
+ logicalmsgdesc.o mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o \
+ seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o \
+ xlogdesc.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/fdw_xactdesc.c b/src/backend/access/rmgrdesc/fdw_xactdesc.c
new file mode 100644
index 0000000..869faf7
--- /dev/null
+++ b/src/backend/access/rmgrdesc/fdw_xactdesc.c
@@ -0,0 +1,68 @@
+/*-------------------------------------------------------------------------
+ *
+ * fdw_xactdesc.c
+ * PostgreSQL distributed transaction manager for foreign server.
+ *
+ * This module describes the WAL records for foreign transaction manager.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/fdw_xactdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/fdw_xact.h"
+#include "access/xloginsert.h"
+#include "lib/stringinfo.h"
+
+extern void
+fdw_xact_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_FDW_XACT_INSERT)
+ {
+ FDWXactOnDiskData *fdw_insert_xlog = (FDWXactOnDiskData *) rec;
+
+ appendStringInfo(buf, "Foreign server oid: %u", fdw_insert_xlog->serverid);
+ appendStringInfo(buf, " user oid: %u", fdw_insert_xlog->userid);
+ appendStringInfo(buf, " database id: %u", fdw_insert_xlog->dboid);
+ appendStringInfo(buf, " local xid: %u", fdw_insert_xlog->local_xid);
+ /* TODO: This should be really interpreted by each FDW */
+
+ /*
+ * TODO: we also need to assess whether we want to add this
+ * information
+ */
+ appendStringInfo(buf, " foreign transaction info: %s",
+ fdw_insert_xlog->fdw_xact_id);
+ }
+ else
+ {
+ FdwRemoveXlogRec *fdw_remove_xlog = (FdwRemoveXlogRec *) rec;
+
+ appendStringInfo(buf, "Foreign server oid: %u", fdw_remove_xlog->serverid);
+ appendStringInfo(buf, " user oid: %u", fdw_remove_xlog->userid);
+ appendStringInfo(buf, " database id: %u", fdw_remove_xlog->dbid);
+ appendStringInfo(buf, " local xid: %u", fdw_remove_xlog->xid);
+ }
+
+}
+
+extern const char *
+fdw_xact_identify(uint8 info)
+{
+ switch (info & ~XLR_INFO_MASK)
+ {
+ case XLOG_FDW_XACT_INSERT:
+ return "NEW FOREIGN TRANSACTION";
+ case XLOG_FDW_XACT_REMOVE:
+ return "REMOVE FOREIGN TRANSACTION";
+ }
+ /* Keep compiler happy */
+ return NULL;
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 5f07eb1..ff3064e 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -112,14 +112,15 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
appendStringInfo(buf, "max_connections=%d max_worker_processes=%d "
"max_prepared_xacts=%d max_locks_per_xact=%d "
"wal_level=%s wal_log_hints=%s "
- "track_commit_timestamp=%s",
+ "track_commit_timestamp=%s max_fdw_xacts=%d",
xlrec.MaxConnections,
xlrec.max_worker_processes,
xlrec.max_prepared_xacts,
xlrec.max_locks_per_xact,
wal_level_str,
xlrec.wal_log_hints ? "on" : "off",
- xlrec.track_commit_timestamp ? "on" : "off");
+ xlrec.track_commit_timestamp ? "on" : "off",
+ xlrec.max_prepared_foreign_xacts);
}
else if (info == XLOG_FPW_CHANGE)
{
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 16fbe47..dd7ee32 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = clog.o commit_ts.o generic_xlog.o multixact.o parallel.o rmgr.o slru.o \
subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
xact.o xlog.o xlogarchive.o xlogfuncs.o \
- xloginsert.o xlogreader.o xlogutils.o
+ xloginsert.o xlogreader.o xlogutils.o fdw_xact.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/fdw_xact.c b/src/backend/access/transam/fdw_xact.c
new file mode 100644
index 0000000..90d11df
--- /dev/null
+++ b/src/backend/access/transam/fdw_xact.c
@@ -0,0 +1,2182 @@
+/*-------------------------------------------------------------------------
+ *
+ * fdw_xact.c
+ * PostgreSQL distributed transaction manager for foreign server.
+ *
+ * This module manages the transactions involving foreign servers.
+ *
+ * Copyright (c) 2017, PostgreSQL Global Development Group
+ *
+ * src/backend/access/transam/fdw_xact.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include
+#include
+#include
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "funcapi.h"
+
+#include "access/fdw_xact.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_type.h"
+#include "foreign/foreign.h"
+#include "foreign/fdwapi.h"
+#include "libpq/pqsignal.h"
+#include "pg_trace.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+
+/*
+ * This comment summarises how the transaction manager handles transactions
+ * involving one or more foreign server/s.
+ *
+ * When an foreign data wrapper starts transaction on a foreign server, it is
+ * required to register the foreign server and user who initiated the
+ * transaction using function RegisterXactForeignServer(). A foreign server
+ * connection is identified by oid of foreign server and user.
+ *
+ * The commit is executed in two phases:
+ * First phase (executed during pre-commit processing)
+ * -----------
+ * Transactions are prepared on all the foreign servers, which can participate
+ * in two-phase commit protocol. Transaction on other foreign servers are
+ * committed in the same phase.
+ *
+ * Second phase (executed during post-commit/abort processing)
+ * ------------
+ * If first phase succeeds, foreign servers are requested to commit respective
+ * prepared transactions. If the first phase does not succeed because of any
+ * failure, the foreign servers are asked to rollback respective prepared
+ * transactions or abort the transactions if they are not prepared.
+ *
+ * Any network failure, server crash after preparing foreign transaction leaves
+ * that prepared transaction unresolved. During the first phase, before actually
+ * preparing the transactions, enough information is persisted to the disk and
+ * logs in order to resolve such transactions.
+ *
+ * During replay and replication FDWXactGlobal also holds information about
+ * active prepared foreign transaction that haven't been moved to disk yet.
+ *
+ * Replay of fdw_xact records happens by the following rules:
+ *
+ * * On PREPARE redo we add the foreign transaction to
+ * FDWXactGlobal->fdw_xacts. We set fdw_xact->inredo to true for
+ * such entries.
+ *
+ * * On Checkpoint redo we iterate through FDWXactGlobal->fdw_xacts.
+ * entries that have fdw_xact->inredo set and are behind the redo_horizon.
+ * We save them to disk and also set fdw_xact->ondisk to true.
+ *
+ * * On COMMIT/ABORT we delete the entry from FDWXactGlobal->fdw_xacts.
+ * If fdw_xact->ondisk is true, we delete the corresponding entry from
+ * the disk as well.
+ *
+ * * RecoverPreparedTransactions(), StnadbyReoverPreparedTransactions() and
+ * PrescanPreparedTransactions() have been modified to go through
+ * fdw_xact->inredo entries that have not made to disk yet.
+ */
+
+/* Shared memory entry for a prepared or being prepared foreign transaction */
+typedef struct FDWXactData *FDWXact;
+
+/* Structure to bundle the foreign connection participating in transaction */
+typedef struct
+{
+ Oid serverid;
+ Oid userid;
+ Oid umid;
+ char *servername;
+ FDWXact fdw_xact; /* foreign prepared transaction entry in case
+ * prepared */
+ bool two_phase_commit; /* Should use two phase commit
+ * protocol while committing
+ * transaction on this server,
+ * whenever necessary. */
+ EndForeignTransaction_function end_foreign_xact;
+ PrepareForeignTransaction_function prepare_foreign_xact;
+ ResolvePreparedForeignTransaction_function resolve_prepared_foreign_xact;
+} FDWConnection;
+
+/* List of foreign connections participating in the transaction */
+List *MyFDWConnections = NIL;
+
+/*
+ * By default we assume that all the foreign connections participating in this
+ * transaction can use two phase commit protocol.
+ */
+bool TwoPhaseReady = true;
+
+/* Record the server, userid participating in the transaction. */
+void
+RegisterXactForeignServer(Oid serverid, Oid userid, bool two_phase_commit)
+{
+ FDWConnection *fdw_conn;
+ ListCell *lcell;
+ ForeignServer *foreign_server;
+ ForeignDataWrapper *fdw;
+ UserMapping *user_mapping;
+ FdwRoutine *fdw_routine;
+ MemoryContext old_context;
+
+ TwoPhaseReady = TwoPhaseReady && two_phase_commit;
+
+ /* Check if the entry already exists, if so, raise an error */
+ foreach(lcell, MyFDWConnections)
+ {
+ fdw_conn = lfirst(lcell);
+
+ if (fdw_conn->serverid == serverid &&
+ fdw_conn->userid == userid)
+ ereport(ERROR,
+ (errmsg("attempt to start transction again on server %u user %u",
+ serverid, userid)));
+ }
+
+ /*
+ * This list and its contents needs to be saved in the transaction context
+ * memory
+ */
+ old_context = MemoryContextSwitchTo(TopTransactionContext);
+ /* Add this foreign connection to the list for transaction management */
+ fdw_conn = (FDWConnection *) palloc(sizeof(FDWConnection));
+
+ /* Make sure that the FDW has at least a transaction handler */
+ foreign_server = GetForeignServer(serverid);
+ fdw = GetForeignDataWrapper(foreign_server->fdwid);
+ fdw_routine = GetFdwRoutine(fdw->fdwhandler);
+ user_mapping = GetUserMapping(userid, serverid);
+
+ if (!fdw_routine->EndForeignTransaction)
+ ereport(ERROR,
+ (errmsg("no function to end a foreign transaction provided for FDW %s",
+ fdw->fdwname)));
+
+ if (two_phase_commit)
+ {
+ if (max_prepared_foreign_xacts == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("prepread foreign transactions are disabled"),
+ errhint("Set max_prepared_foreign_transactions to a nonzero value.")));
+
+ if (!fdw_routine->PrepareForeignTransaction)
+ ereport(ERROR,
+ (errmsg("no function provided for preparing foreign transaction for FDW %s",
+ fdw->fdwname)));
+
+ if (!fdw_routine->ResolvePreparedForeignTransaction)
+ ereport(ERROR,
+ (errmsg("no function provided for resolving prepared foreign transaction for FDW %s",
+ fdw->fdwname)));
+ }
+
+ fdw_conn->serverid = serverid;
+ fdw_conn->userid = userid;
+ fdw_conn->umid = user_mapping->umid;
+
+ /*
+ * We may need following information at the end of a transaction, when the
+ * system caches are not available. So save it before hand.
+ */
+ fdw_conn->servername = foreign_server->servername;
+ fdw_conn->prepare_foreign_xact = fdw_routine->PrepareForeignTransaction;
+ fdw_conn->resolve_prepared_foreign_xact = fdw_routine->ResolvePreparedForeignTransaction;
+ fdw_conn->end_foreign_xact = fdw_routine->EndForeignTransaction;
+ fdw_conn->fdw_xact = NULL;
+ fdw_conn->two_phase_commit = two_phase_commit;
+ MyFDWConnections = lappend(MyFDWConnections, fdw_conn);
+ /* Revert back the context */
+ MemoryContextSwitchTo(old_context);
+
+ return;
+}
+
+/* Enum to track the status of prepared foreign transaction */
+typedef enum
+{
+ FDW_XACT_PREPARING, /* foreign transaction is (being) prepared */
+ FDW_XACT_COMMITTING_PREPARED, /* foreign prepared transaction is to
+ * be committed */
+ FDW_XACT_ABORTING_PREPARED, /* foreign prepared transaction is to be
+ * aborted */
+ FDW_XACT_RESOLVED /* Status used only by pg_fdw_xact_resolve().
+ * It doesn't appear in the in-memory entry. */
+} FDWXactStatus;
+
+typedef struct FDWXactData
+{
+ FDWXact fx_next; /* Next free FDWXact entry */
+ Oid dboid; /* database oid where to find foreign server
+ * and user mapping */
+ TransactionId local_xid; /* XID of local transaction */
+ Oid serverid; /* foreign server where transaction takes
+ * place */
+ Oid userid; /* user who initiated the foreign transaction */
+ Oid umid; /* user mapping id for connection key */
+ FDWXactStatus status; /* The state of the foreign
+ * transaction. This doubles as the
+ * action to be taken on this entry. */
+
+ /*
+ * Note that we need to keep track of two LSNs for each FDWXact. We keep
+ * track of the start LSN because this is the address we must use to read
+ * state data back from WAL when committing a FDWXact. We keep track of
+ * the end LSN because that is the LSN we need to wait for prior to
+ * commit.
+ */
+ XLogRecPtr fdw_xact_start_lsn; /* XLOG offset of inserting this entry
+ * start */
+ XLogRecPtr fdw_xact_end_lsn; /* XLOG offset of inserting this entry
+ * end */
+
+ bool valid; /* Has the entry been complete and written to file? */
+ BackendId locking_backend; /* Backend working on this entry */
+ bool ondisk; /* TRUE if prepare state file is on disk */
+ bool inredo; /* TRUE if entry was added via xlog_redo */
+ char fdw_xact_id[FDW_XACT_ID_LEN]; /* prepared transaction
+ * identifier */
+} FDWXactData;
+
+/* Directory where the foreign prepared transaction files will reside */
+#define FDW_XACTS_DIR "pg_fdw_xact"
+
+/*
+ * Name of foreign prepared transaction file is 8 bytes xid, 8 bytes foreign
+ * server oid and 8 bytes user oid separated by '_'.
+ */
+#define FDW_XACT_FILE_NAME_LEN (8 + 1 + 8 + 1 + 8)
+#define FDWXactFilePath(path, xid, serverid, userid) \
+ snprintf(path, MAXPGPATH, FDW_XACTS_DIR "/%08X_%08X_%08X", xid, \
+ serverid, userid)
+
+/* Shared memory layout for maintaining foreign prepared transaction entries. */
+typedef struct
+{
+ /* Head of linked list of free FDWXactData structs */
+ FDWXact freeFDWXacts;
+
+ /* Number of valid FDW transaction entries */
+ int numFDWXacts;
+
+ /* Upto max_prepared_foreign_xacts entries in the array */
+ FDWXact fdw_xacts[FLEXIBLE_ARRAY_MEMBER]; /* Variable length array */
+} FDWXactGlobalData;
+
+static void AtProcExit_FDWXact(int code, Datum arg);
+static bool resolve_fdw_xact(FDWXact fdw_xact,
+ ResolvePreparedForeignTransaction_function prepared_foreign_xact_resolver);
+static FDWXact insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid,
+ Oid umid, char *fdw_xact_id);
+static void unlock_fdw_xact(FDWXact fdw_xact);
+static void unlock_fdw_xact_entries();
+static void remove_fdw_xact(FDWXact fdw_xact);
+static FDWXact register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid,
+ Oid umid, char *fdw_xact_info);
+static int GetFDWXactList(FDWXact * fdw_xacts);
+static ResolvePreparedForeignTransaction_function get_prepared_foreign_xact_resolver(FDWXact fdw_xact);
+static FDWXactOnDiskData *ReadFDWXactFile(TransactionId xid, Oid serverid,
+ Oid userid);
+static void RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid,
+ bool giveWarning);
+static void RecreateFDWXactFile(TransactionId xid, Oid serverid, Oid userid,
+ void *content, int len);
+static void XlogReadFDWXactData(XLogRecPtr lsn, char **buf, int *len);
+static void prepare_foreign_transactions(void);
+static FDWXact get_fdw_xact(TransactionId xid, Oid serverid, Oid userid);
+bool search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid,
+ List **qualifying_xacts);
+
+/*
+ * Maximum number of foreign prepared transaction entries at any given time
+ * GUC variable, change requires restart.
+ */
+int max_prepared_foreign_xacts = 0;
+
+/* Keep track of registering process exit call back. */
+static bool fdwXactExitRegistered = false;
+
+/* Pointer to the shared memory holding the foreign transactions data */
+static FDWXactGlobalData *FDWXactGlobal;
+
+/* foreign transaction entries locked by this backend */
+List *MyLockedFDWXacts = NIL;
+
+/*
+ * FDWXactShmemSize
+ * Calculates the size of shared memory allocated for maintaining foreign
+ * prepared transaction entries.
+ */
+extern Size
+FDWXactShmemSize(void)
+{
+ Size size;
+
+ /* Need the fixed struct, foreign transaction information array */
+ size = offsetof(FDWXactGlobalData, fdw_xacts);
+ size = add_size(size, mul_size(max_prepared_foreign_xacts,
+ sizeof(FDWXact)));
+ size = MAXALIGN(size);
+ size = add_size(size, mul_size(max_prepared_foreign_xacts,
+ sizeof(FDWXactData)));
+
+ return size;
+}
+
+/*
+ * FDWXactShmemInit
+ * Initialization of shared memory for maintaining foreign prepared transaction
+ * entries. The shared memory layout is defined in definition of
+ * FDWXactGlobalData structure.
+ */
+extern void
+FDWXactShmemInit(void)
+{
+ bool found;
+
+ FDWXactGlobal = ShmemInitStruct("Foreign transactions table",
+ FDWXactShmemSize(),
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ FDWXact fdw_xacts;
+ int cnt;
+
+ Assert(!found);
+ FDWXactGlobal->freeFDWXacts = NULL;
+ FDWXactGlobal->numFDWXacts = 0;
+
+ /* Initialise the linked list of free FDW transactions */
+ fdw_xacts = (FDWXact)
+ ((char *) FDWXactGlobal +
+ MAXALIGN(offsetof(FDWXactGlobalData, fdw_xacts) +
+ sizeof(FDWXact) * max_prepared_foreign_xacts));
+ for (cnt = 0; cnt < max_prepared_foreign_xacts; cnt++)
+ {
+ fdw_xacts[cnt].fx_next = FDWXactGlobal->freeFDWXacts;
+ FDWXactGlobal->freeFDWXacts = &fdw_xacts[cnt];
+ }
+ }
+ else
+ {
+ Assert(FDWXactGlobal);
+ Assert(found);
+ }
+}
+
+/*
+ * PreCommit_FDWXacts
+ *
+ * The function is responsible for pre-commit processing on foreign connections.
+ * Basically the foreign transactions are prepared on the foreign servers which
+ * can execute two-phase-commit protocol. But in case of where only one server
+ * that can execute two-phase-commit protocol is involved with transaction and
+ * no changes is made on local data then we don't need to two-phase-commit protocol,
+ * so try to commit transaction on the server. Those will be aborted or committed
+ * after the current transaction has been aborted or committed resp. We try to
+ * commit transactions on rest of the foreign servers now. For these foreign
+ * servers it is possible that some transactions commit even if the local
+ * transaction aborts.
+ */
+void
+PreCommit_FDWXacts(void)
+{
+ ListCell *cur;
+ ListCell *prev;
+ ListCell *next;
+
+ /* If there are no foreign servers involved, we have no business here */
+ if (list_length(MyFDWConnections) < 1)
+ return;
+
+ /*
+ * Try committing transactions on the foreign servers, which can not
+ * execute two-phase-commit protocol.
+ */
+ for (cur = list_head(MyFDWConnections), prev = NULL; cur; cur = next)
+ {
+ FDWConnection *fdw_conn = lfirst(cur);
+
+ next = lnext(cur);
+
+ if (!fdw_conn->two_phase_commit)
+ {
+ /*
+ * The FDW has to make sure that the connection opened to the
+ * foreign server is out of transaction. Even if the handler
+ * function returns failure statue, there's hardly anything to do.
+ */
+ if (!fdw_conn->end_foreign_xact(fdw_conn->serverid, fdw_conn->userid,
+ fdw_conn->umid, true))
+ elog(WARNING, "could not commit transaction on server %s",
+ fdw_conn->servername);
+
+ /* The connection is no more part of this transaction, forget it */
+ MyFDWConnections = list_delete_cell(MyFDWConnections, cur, prev);
+ }
+ else
+ prev = cur;
+ }
+
+ /*
+ * Here foreign servers that can not execute two-phase-commit protocol
+ * already commit the transaction and MyFDWConnections has only foreign
+ * servers that can execute two-phase-commit protocol. We don't need to
+ * use two-phase-commit protocol if there is only one foreign server that
+ * that can execute two-phase-commit and didn't write no local node.
+ */
+ if ((list_length(MyFDWConnections) > 1) ||
+ (list_length(MyFDWConnections) == 1 && XactWriteLocalNode))
+ {
+ /*
+ * Prepare the transactions on the all foreign servers, which can
+ * execute two-phase-commit protocol.
+ */
+ prepare_foreign_transactions();
+ }
+ else if (list_length(MyFDWConnections) == 1)
+ {
+ FDWConnection *fdw_conn = lfirst(list_head(MyFDWConnections));
+
+ /*
+ * We don't need to use two-phase commit protocol only one server
+ * remaining even if this server can execute two-phase-commit
+ * protocol.
+ */
+ if (!fdw_conn->end_foreign_xact(fdw_conn->serverid, fdw_conn->userid,
+ fdw_conn->umid, true))
+ elog(WARNING, "could not commit transaction on server %s",
+ fdw_conn->servername);
+
+ /* MyFDWConnections should be cleared here */
+ MyFDWConnections = list_delete_cell(MyFDWConnections, cur, prev);
+ }
+}
+
+/*
+ * prepare_foreign_transactions
+ *
+ * Prepare transactions on the foreign servers which can execute two phase
+ * commit protocol. Rest of the foreign servers are ignored.
+ */
+static void
+prepare_foreign_transactions(void)
+{
+ ListCell *lcell;
+
+ /*
+ * Loop over the foreign connections
+ */
+ foreach(lcell, MyFDWConnections)
+ {
+ FDWConnection *fdw_conn = (FDWConnection *) lfirst(lcell);
+ char fdw_xact_id[FDW_XACT_ID_LEN];
+ FDWXact fdw_xact;
+
+ if (!fdw_conn->two_phase_commit)
+ continue;
+
+ /* Generate prepare transaction id for foreign server */
+ FDWXactId(fdw_xact_id, "px", GetTopTransactionId(),
+ fdw_conn->serverid, fdw_conn->userid);
+
+ /*
+ * Register the foreign transaction with the identifier used to
+ * prepare it on the foreign server. Registration persists this
+ * information to the disk and logs (that way relaying it on standby).
+ * Thus in case we loose connectivity to the foreign server or crash
+ * ourselves, we will remember that we have prepared transaction on
+ * the foreign server and try to resolve it when connectivity is
+ * restored or after crash recovery.
+ *
+ * If we crash after persisting the information but before preparing
+ * the transaction on the foreign server, we will try to resolve a
+ * never-prepared transaction, and get an error. This is fine as long
+ * as the FDW provides us unique prepared transaction identifiers.
+ *
+ * If we prepare the transaction on the foreign server before
+ * persisting the information to the disk and crash in-between these
+ * two steps, we will forget that we prepared the transaction on the
+ * foreign server and will not be able to resolve it after the crash.
+ * Hence persist first then prepare.
+ */
+ fdw_xact = register_fdw_xact(MyDatabaseId, GetTopTransactionId(),
+ fdw_conn->serverid, fdw_conn->userid,
+ fdw_conn->umid, fdw_xact_id);
+
+ /*
+ * Between register_fdw_xact call above till this backend hears back
+ * from foreign server, the backend may abort the local transaction
+ * (say, because of a signal). During abort processing, it will send
+ * an ABORT message to the foreign server. If the foreign server has
+ * not prepared the transaction, the message will succeed. If the
+ * foreign server has prepared transaction, it will throw an error,
+ * which we will ignore and the prepared foreign transaction will be
+ * resolved by the foreign transaction resolver.
+ */
+ if (!fdw_conn->prepare_foreign_xact(fdw_conn->serverid, fdw_conn->userid,
+ fdw_conn->umid, fdw_xact_id))
+ {
+ /*
+ * An error occurred, and we didn't prepare the transaction.
+ * Delete the entry from foreign transaction table. Raise an
+ * error, so that the local server knows that one of the foreign
+ * server has failed to prepare the transaction.
+ *
+ * XXX : FDW is expected to print the error as a warning and then
+ * we raise actual error here. But instead, we should pull the
+ * error text from FDW and add it here in the message or as a
+ * context or a hint.
+ */
+ remove_fdw_xact(fdw_xact);
+
+ /*
+ * Delete the connection, since it doesn't require any further
+ * processing. This deletion will invalidate current cell pointer,
+ * but that is fine since we will not use that pointer because the
+ * subsequent ereport will get us out of this loop.
+ */
+ MyFDWConnections = list_delete_ptr(MyFDWConnections, fdw_conn);
+ ereport(ERROR,
+ (errmsg("can not prepare transaction on foreign server %s",
+ fdw_conn->servername)));
+ }
+
+ /* Prepare succeeded, remember it in the connection */
+ fdw_conn->fdw_xact = fdw_xact;
+ }
+ return;
+}
+
+/*
+ * register_fdw_xact
+ *
+ * This function is used to create new foreign transaction entry before an FDW
+ * executes the first phase of two-phase commit. The function adds the entry to
+ * WAL and will be persisted to the disk under pg_fdw_xact directory when checkpoint.
+ */
+static FDWXact
+register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid,
+ Oid umid, char *fdw_xact_id)
+{
+ FDWXact fdw_xact;
+ FDWXactOnDiskData *fdw_xact_file_data;
+ int data_len;
+
+ /* Enter the foreign transaction in the shared memory structure */
+ LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+ fdw_xact = insert_fdw_xact(dbid, xid, serverid, userid, umid,
+ fdw_xact_id);
+ fdw_xact->status = FDW_XACT_PREPARING;
+ fdw_xact->locking_backend = MyBackendId;
+ LWLockRelease(FDWXactLock);
+
+ /* Remember that we have locked this entry. */
+ MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact);
+
+ /*
+ * Prepare to write the entry to a file. Also add xlog entry. The contents
+ * of the xlog record are same as what is written to the file.
+ */
+ data_len = offsetof(FDWXactOnDiskData, fdw_xact_id);
+ data_len = data_len + FDW_XACT_ID_LEN;
+ data_len = MAXALIGN(data_len);
+ fdw_xact_file_data = (FDWXactOnDiskData *) palloc0(data_len);
+ fdw_xact_file_data->dboid = fdw_xact->dboid;
+ fdw_xact_file_data->local_xid = fdw_xact->local_xid;
+ fdw_xact_file_data->serverid = fdw_xact->serverid;
+ fdw_xact_file_data->userid = fdw_xact->userid;
+ fdw_xact_file_data->umid = fdw_xact->umid;
+ memcpy(fdw_xact_file_data->fdw_xact_id, fdw_xact->fdw_xact_id,
+ FDW_XACT_ID_LEN);
+
+ START_CRIT_SECTION();
+
+ /* Add the entry in the xlog and save LSN for checkpointer */
+ XLogBeginInsert();
+ XLogRegisterData((char *) fdw_xact_file_data, data_len);
+ fdw_xact->fdw_xact_end_lsn = XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_INSERT);
+ XLogFlush(fdw_xact->fdw_xact_end_lsn);
+
+ /* Store record's start location to read that later on CheckPoint */
+ fdw_xact->fdw_xact_start_lsn = ProcLastRecPtr;
+
+ /* File is written completely, checkpoint can proceed with syncing */
+ fdw_xact->valid = true;
+
+ END_CRIT_SECTION();
+
+ pfree(fdw_xact_file_data);
+ return fdw_xact;
+}
+
+/*
+ * insert_fdw_xact
+ *
+ * Insert a new entry for a given foreign transaction identified by transaction
+ * id, foreign server and user mapping, in the shared memory. Caller must hold
+ * FDWXactLock in exclusive mode.
+ *
+ * If the entry already exists, the function raises an error.
+ */
+static FDWXact
+insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid, Oid umid,
+ char *fdw_xact_id)
+{
+ int i;
+ FDWXact fdw_xact;
+
+ if (!fdwXactExitRegistered)
+ {
+ before_shmem_exit(AtProcExit_FDWXact, 0);
+ fdwXactExitRegistered = true;
+ }
+
+ /* Check for duplicating foreign transaction entry */
+ for (i = 0; i < FDWXactGlobal->numFDWXacts; i++)
+ {
+ fdw_xact = FDWXactGlobal->fdw_xacts[i];
+ if (fdw_xact->local_xid == xid &&
+ fdw_xact->serverid == serverid &&
+ fdw_xact->userid == userid)
+ elog(ERROR, "duplicate entry for foreign transaction with transaction id %u, serverid %u, userid %u found",
+ xid, serverid, userid);
+ }
+
+ /*
+ * Get the next free foreign transaction entry. Raise error if there are
+ * none left.
+ */
+ if (!FDWXactGlobal->freeFDWXacts)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("maximum number of foreign transactions reached"),
+ errhint("Increase max_prepared_foreign_transactions (currently %d).",
+ max_prepared_foreign_xacts)));
+ }
+
+ fdw_xact = FDWXactGlobal->freeFDWXacts;
+ FDWXactGlobal->freeFDWXacts = fdw_xact->fx_next;
+
+ /* Insert the entry to active array */
+ Assert(FDWXactGlobal->numFDWXacts < max_prepared_foreign_xacts);
+ FDWXactGlobal->fdw_xacts[FDWXactGlobal->numFDWXacts++] = fdw_xact;
+
+ /* Stamp the entry with backend id before releasing the LWLock */
+ fdw_xact->locking_backend = InvalidBackendId;
+ fdw_xact->dboid = dboid;
+ fdw_xact->local_xid = xid;
+ fdw_xact->serverid = serverid;
+ fdw_xact->userid = userid;
+ fdw_xact->umid = umid;
+ fdw_xact->fdw_xact_start_lsn = InvalidXLogRecPtr;
+ fdw_xact->fdw_xact_end_lsn = InvalidXLogRecPtr;
+ fdw_xact->valid = false;
+ fdw_xact->ondisk = false;
+ fdw_xact->inredo = false;
+ memcpy(fdw_xact->fdw_xact_id, fdw_xact_id, FDW_XACT_ID_LEN);
+
+ return fdw_xact;
+}
+
+/*
+ * remove_fdw_xact
+ *
+ * Removes the foreign prepared transaction entry from shared memory, disk and
+ * logs about the removal in WAL.
+ */
+static void
+remove_fdw_xact(FDWXact fdw_xact)
+{
+ int cnt;
+
+ LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+ /* Search the slot where this entry resided */
+ for (cnt = 0; cnt < FDWXactGlobal->numFDWXacts; cnt++)
+ {
+ if (FDWXactGlobal->fdw_xacts[cnt] == fdw_xact)
+ {
+ /* Remove the entry from active array */
+ FDWXactGlobal->numFDWXacts--;
+ FDWXactGlobal->fdw_xacts[cnt] = FDWXactGlobal->fdw_xacts[FDWXactGlobal->numFDWXacts];
+
+ /* Put it back into free list */
+ fdw_xact->fx_next = FDWXactGlobal->freeFDWXacts;
+ FDWXactGlobal->freeFDWXacts = fdw_xact;
+
+ /* Unlock the entry */
+ fdw_xact->locking_backend = InvalidBackendId;
+ MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact);
+
+ LWLockRelease(FDWXactLock);
+
+ if (!RecoveryInProgress())
+ {
+ FdwRemoveXlogRec fdw_remove_xlog;
+ XLogRecPtr recptr;
+
+ /* Fill up the log record before releasing the entry */
+ fdw_remove_xlog.serverid = fdw_xact->serverid;
+ fdw_remove_xlog.dbid = fdw_xact->dboid;
+ fdw_remove_xlog.xid = fdw_xact->local_xid;
+ fdw_remove_xlog.userid = fdw_xact->userid;
+
+ START_CRIT_SECTION();
+
+ /*
+ * Log that we are removing the foreign transaction entry and
+ * remove the file from the disk as well.
+ */
+ XLogBeginInsert();
+ XLogRegisterData((char *) &fdw_remove_xlog, sizeof(fdw_remove_xlog));
+ recptr = XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_REMOVE);
+ XLogFlush(recptr);
+
+ END_CRIT_SECTION();
+ }
+
+ /* Remove the file from the disk if exists. */
+ if (fdw_xact->ondisk)
+ RemoveFDWXactFile(fdw_xact->local_xid, fdw_xact->serverid,
+ fdw_xact->userid, true);
+ return;
+ }
+ }
+ LWLockRelease(FDWXactLock);
+
+ /* We did not find the given entry in global array */
+ elog(ERROR, "failed to find %p in FDWXactGlobal array", fdw_xact);
+}
+
+/*
+ * unlock_fdw_xact
+ *
+ * Unlock the foreign transaction entry by wiping out the locking_backend and
+ * removing it from the backend's list of foreign transaction.
+ */
+static void
+unlock_fdw_xact(FDWXact fdw_xact)
+{
+ /* Only the backend holding the lock is allowed to unlock */
+ Assert(fdw_xact->locking_backend == MyBackendId);
+
+ /*
+ * First set the locking backend as invalid, and then remove it from the
+ * list of locked foreign transactions, under the LW lock. If we reverse
+ * the order and process exits in-between those two, we will be left an
+ * entry locked by this backend, which gets unlocked only at the server
+ * restart.
+ */
+
+ LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+ fdw_xact->locking_backend = InvalidBackendId;
+ MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact);
+ LWLockRelease(FDWXactLock);
+}
+
+/*
+ * unlock_fdw_xact_entries
+ *
+ * Unlock the foreign transaction entries locked by this backend.
+ */
+static void
+unlock_fdw_xact_entries()
+{
+ while (MyLockedFDWXacts)
+ {
+ FDWXact fdw_xact = (FDWXact) linitial(MyLockedFDWXacts);
+
+ unlock_fdw_xact(fdw_xact);
+ }
+}
+
+/*
+ * AtProcExit_FDWXact
+ *
+ * When the process exits, unlock the entries it held.
+ */
+static void
+AtProcExit_FDWXact(int code, Datum arg)
+{
+ unlock_fdw_xact_entries();
+}
+
+/*
+ * AtEOXact_FDWXacts
+ *
+ * The function executes phase 2 of two-phase commit protocol.
+ * At the end of transaction perform following actions
+ * 1. Mark the entries locked by this backend as ABORTING or COMMITTING
+ * according the result of transaction.
+ * 2. Try to commit or abort the transactions on foreign servers. If that
+ * succeeds, remove them from foreign transaction entries, otherwise unlock
+ * them.
+ */
+extern void
+AtEOXact_FDWXacts(bool is_commit)
+{
+ ListCell *lcell;
+
+ foreach(lcell, MyFDWConnections)
+ {
+ FDWConnection *fdw_conn = lfirst(lcell);
+
+ /* Commit/abort prepared foreign transactions */
+ if (fdw_conn->fdw_xact)
+ {
+ FDWXact fdw_xact = fdw_conn->fdw_xact;
+
+ fdw_xact->status = (is_commit ?
+ FDW_XACT_COMMITTING_PREPARED :
+ FDW_XACT_ABORTING_PREPARED);
+
+ /*
+ * Try aborting or committing the transaction on the foreign
+ * server
+ */
+ if (!resolve_fdw_xact(fdw_xact, fdw_conn->resolve_prepared_foreign_xact))
+ {
+ /*
+ * The transaction was not resolved on the foreign server,
+ * unlock it, so that someone else can take care of it.
+ */
+ unlock_fdw_xact(fdw_xact);
+ }
+ }
+ else
+ {
+ /*
+ * On servers where two phase commit protocol could not be
+ * executed we have tried to commit the transactions during
+ * pre-commit phase. Any remaining transactions need to be
+ * aborted.
+ */
+ Assert(!is_commit);
+
+ /*
+ * The FDW has to make sure that the connection opened to the
+ * foreign server is out of transaction. Even if the handler
+ * function returns failure statue, there's hardly anything to do.
+ */
+ if (!fdw_conn->end_foreign_xact(fdw_conn->serverid, fdw_conn->userid,
+ fdw_conn->umid, is_commit))
+ elog(WARNING, "could not %s transaction on server %s",
+ is_commit ? "commit" : "abort",
+ fdw_conn->servername);
+
+ }
+ }
+
+ /*
+ * Unlock any locked foreign transactions. Resolver might lock the
+ * entries, and may not be able to unlock them if aborted in-between. In
+ * any case, there is no reason for a foreign transaction entry to be
+ * locked after the transaction which locked it has ended.
+ */
+ unlock_fdw_xact_entries();
+
+ /*
+ * Reset the list of registered connections. Since the memory for the list
+ * and its nodes comes from transaction memory context, it will be freed
+ * after this call.
+ */
+ MyFDWConnections = NIL;
+ /* Set TwoPhaseReady to its default value */
+ TwoPhaseReady = true;
+}
+
+/*
+ * AtPrepare_FDWXacts
+ *
+ * The function is called while preparing a transaction. If there are foreign
+ * servers involved in the transaction, this function prepares transactions
+ * on those servers.
+ */
+extern void
+AtPrepare_FDWXacts(void)
+{
+ /* If there are no foreign servers involved, we have no business here */
+ if (list_length(MyFDWConnections) < 1)
+ return;
+
+ /*
+ * All foreign servers participating in a transaction to be prepared
+ * should be two phase compliant.
+ */
+ if (!TwoPhaseReady)
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_INTEGRITY_CONSTRAINT_VIOLATION),
+ errmsg("can not prepare the transaction because some foreign servers involved in transaction can not prepare the transaction")));
+
+ /* Prepare transactions on participating foreign servers. */
+ prepare_foreign_transactions();
+
+ /*
+ * Unlock the foreign transaction entries so COMMIT/ROLLBACK PREPARED from
+ * some other backend will be able to lock those if required.
+ */
+ unlock_fdw_xact_entries();
+
+ /*
+ * Reset the list of registered connections. Since the memory for the list
+ * and its nodes comes from transaction memory context, it will be freed
+ * after this call.
+ */
+ MyFDWConnections = NIL;
+
+ /* Set TwoPhaseReady to its default value */
+ TwoPhaseReady = true;
+}
+
+/*
+ * FDWXactTwoPhaseFinish
+ *
+ * This function is called as part of the COMMIT/ROLLBACK PREPARED command to
+ * commit/rollback the foreign transactions prepared as part of the local
+ * prepared transaction. The function looks for the foreign transaction entries
+ * with local_xid equal to xid of the prepared transaction and tries to resolve them.
+ */
+extern void
+FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid)
+{
+ List *entries_to_resolve;
+
+ FDWXactStatus status = isCommit ? FDW_XACT_COMMITTING_PREPARED :
+ FDW_XACT_ABORTING_PREPARED;
+
+ /*
+ * Get all the entries belonging to the given transaction id locked. If
+ * foreign transaction resolver is running, it might lock entries to check
+ * whether they can be resolved. The search function will skip such
+ * entries. The resolver will resolve them at a later point of time.
+ */
+ search_fdw_xact(xid, InvalidOid, InvalidOid, InvalidOid, &entries_to_resolve);
+
+ /* Try resolving the foreign transactions */
+ while (entries_to_resolve)
+ {
+ FDWXact fdw_xact = linitial(entries_to_resolve);
+
+ entries_to_resolve = list_delete_first(entries_to_resolve);
+ fdw_xact->status = status;
+
+ /*
+ * Resolve the foreign transaction. If resolution is not successful,
+ * unlock the entry so that someone else can pick it up.
+ */
+ if (!resolve_fdw_xact(fdw_xact,
+ get_prepared_foreign_xact_resolver(fdw_xact)))
+ unlock_fdw_xact(fdw_xact);
+ }
+}
+
+/*
+ * get_prepared_foreign_xact_resolver
+ */
+static ResolvePreparedForeignTransaction_function
+get_prepared_foreign_xact_resolver(FDWXact fdw_xact)
+{
+ ForeignServer *foreign_server;
+ ForeignDataWrapper *fdw;
+ FdwRoutine *fdw_routine;
+
+ foreign_server = GetForeignServer(fdw_xact->serverid);
+ fdw = GetForeignDataWrapper(foreign_server->fdwid);
+ fdw_routine = GetFdwRoutine(fdw->fdwhandler);
+ if (!fdw_routine->ResolvePreparedForeignTransaction)
+ elog(ERROR, "no foreign transaction resolver routine provided for FDW %s",
+ fdw->fdwname);
+
+ return fdw_routine->ResolvePreparedForeignTransaction;
+}
+
+/*
+ * resolve_fdw_xact
+ *
+ * Resolve the foreign transaction using the foreign data wrapper's transaction
+ * handler routine.
+ * If the resolution is successful, remove the foreign transaction entry from
+ * the shared memory and also remove the corresponding on-disk file.
+ */
+static bool
+resolve_fdw_xact(FDWXact fdw_xact,
+ ResolvePreparedForeignTransaction_function fdw_xact_handler)
+{
+ bool resolved;
+ bool is_commit;
+
+ Assert(fdw_xact->status == FDW_XACT_COMMITTING_PREPARED ||
+ fdw_xact->status == FDW_XACT_ABORTING_PREPARED);
+
+ is_commit = (fdw_xact->status == FDW_XACT_COMMITTING_PREPARED) ?
+ true : false;
+
+ resolved = fdw_xact_handler(fdw_xact->serverid, fdw_xact->userid,
+ fdw_xact->umid, is_commit,
+ fdw_xact->fdw_xact_id);
+
+ /* If we succeeded in resolving the transaction, remove the entry */
+ if (resolved)
+ remove_fdw_xact(fdw_xact);
+
+ return resolved;
+}
+
+/*
+ * Get foreign transaction entry from FDWXactGlobal->fdw_xacts. Return NULL
+ * if foreign transacgiven does not exist.
+ */
+static FDWXact
+get_fdw_xact(TransactionId xid, Oid serverid, Oid userid)
+{
+ int i;
+ FDWXact fdw_xact;
+
+ LWLockAcquire(FDWXactLock, LW_SHARED);
+
+ for (i = 0; i < FDWXactGlobal->numFDWXacts; i++)
+ {
+ fdw_xact = FDWXactGlobal->fdw_xacts[i];
+
+ if (fdw_xact->local_xid == xid &&
+ fdw_xact->serverid == serverid &&
+ fdw_xact->userid == userid)
+ {
+ LWLockRelease(FDWXactLock);
+ return fdw_xact;
+ }
+ }
+
+ LWLockRelease(FDWXactLock);
+ return NULL;
+}
+
+/*
+ * fdw_xact_exists
+ * Returns true if there exists at least one prepared foreign transaction which
+ * matches criteria. This function is wrapper around search_fdw_xact. Check that
+ * function's prologue for details.
+ */
+bool
+fdw_xact_exists(TransactionId xid, Oid dbid, Oid serverid, Oid userid)
+{
+ return search_fdw_xact(xid, dbid, serverid, userid, NULL);
+}
+
+/*
+ * search_fdw_xact
+ * Return true if there exists at least one prepared foreign transaction
+ * entry with given criteria. The criteria is defined by arguments with
+ * valid values for respective datatypes.
+ *
+ * The table below explains the same
+ * xid | dbid | serverid | userid | search for entry with
+ * invalid | invalid | invalid | invalid | nothing
+ * invalid | invalid | invalid | valid | given userid
+ * invalid | invalid | valid | invalid | given serverid
+ * invalid | invalid | valid | valid | given serverid and userid
+ * invalid | valid | invalid | invalid | given dbid
+ * invalid | valid | invalid | valid | given dbid and userid
+ * invalid | valid | valid | invalid | given dbid and serverid
+ * invalid | valid | valid | valid | given dbid, serveroid and userid
+ * valid | invalid | invalid | invalid | given xid
+ * valid | invalid | invalid | valid | given xid and userid
+ * valid | invalid | valid | invalid | given xid, serverid
+ * valid | invalid | valid | valid | given xid, serverid, userid
+ * valid | valid | invalid | invalid | given xid and dbid
+ * valid | valid | invalid | valid | given xid, dbid and userid
+ * valid | valid | valid | invalid | given xid, dbid, serverid
+ * valid | valid | valid | valid | given xid, dbid, serverid, userid
+ *
+ * When the criteria is void (all arguments invalid) the
+ * function returns true, since any entry would match the criteria.
+ *
+ * If qualifying_fdw_xacts is not NULL, the qualifying entries are locked and
+ * returned in a linked list. Any entry which is already locked is ignored. If
+ * all the qualifying entries are locked, nothing will be returned in the list
+ * but returned value will be true.
+ */
+bool
+search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid,
+ List **qualifying_xacts)
+{
+ int cnt;
+ LWLockMode lock_mode;
+
+ /* Return value if a qualifying entry exists */
+ bool entry_exists = false;
+
+ if (qualifying_xacts)
+ {
+ *qualifying_xacts = NIL;
+ /* The caller expects us to lock entries */
+ lock_mode = LW_EXCLUSIVE;
+ }
+ else
+ lock_mode = LW_SHARED;
+
+ LWLockAcquire(FDWXactLock, lock_mode);
+ for (cnt = 0; cnt < FDWXactGlobal->numFDWXacts; cnt++)
+ {
+ FDWXact fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+ bool entry_matches = true;
+
+ /* xid */
+ if (xid != InvalidTransactionId && xid != fdw_xact->local_xid)
+ entry_matches = false;
+
+ /* dbid */
+ if (OidIsValid(dbid) && fdw_xact->dboid != dbid)
+ entry_matches = false;
+
+ /* serverid */
+ if (OidIsValid(serverid) && serverid != fdw_xact->serverid)
+ entry_matches = false;
+
+ /* userid */
+ if (OidIsValid(userid) && fdw_xact->userid != userid)
+ entry_matches = false;
+
+ if (entry_matches)
+ {
+ entry_exists = true;
+ if (qualifying_xacts)
+ {
+ /*
+ * User has requested list of qualifying entries. If the
+ * matching entry is not locked, lock it and add to the list.
+ * If the entry is locked by some other backend, ignore it.
+ */
+ if (fdw_xact->locking_backend == InvalidBackendId)
+ {
+ MemoryContext oldcontext;
+
+ fdw_xact->locking_backend = MyBackendId;
+
+ /*
+ * The list and its members may be required at the end of
+ * the transaction
+ */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+ MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact);
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else if (fdw_xact->locking_backend != MyBackendId)
+ continue;
+
+ *qualifying_xacts = lappend(*qualifying_xacts, fdw_xact);
+ }
+ else
+ {
+ /*
+ * User wants to check the existence, and we have found one
+ * matching entry. No need to check other entries.
+ */
+ break;
+ }
+ }
+ }
+
+ LWLockRelease(FDWXactLock);
+
+ return entry_exists;
+}
+
+/*
+ * fdw_xact_redo
+ * Apply the redo log for a foreign transaction.
+ */
+extern void
+fdw_xact_redo(XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_FDW_XACT_INSERT)
+ FDWXactRedoAdd(record);
+ else if (info == XLOG_FDW_XACT_REMOVE)
+ {
+ FdwRemoveXlogRec *fdw_remove_xlog = (FdwRemoveXlogRec *) rec;
+
+ /* Delete FDWXact entry and file if exists */
+ FDWXactRedoRemove(fdw_remove_xlog->xid, fdw_remove_xlog->serverid,
+ fdw_remove_xlog->userid);
+ }
+ else
+ elog(ERROR, "invalid log type %d in foreign transction log record", info);
+
+ return;
+}
+
+/*
+ * CheckPointFDWXact
+ *
+ * Function syncs the foreign transaction files created between the two
+ * checkpoints. The foreign transaction entries and hence the corresponding
+ * files are expected to be very short-lived. By executing this function at the
+ * end, we might have lesser files to fsync, thus reducing some I/O. This is
+ * similar to CheckPointTwoPhase().
+ *
+ * In order to avoid disk I/O while holding a light weight lock, the function
+ * first collects the files which need to be synced under FDWXactLock and then
+ * syncs them after releasing the lock. This approach creates a race condition:
+ * after releasing the lock, and before syncing a file, the corresponding
+ * foreign transaction entry and hence the file might get removed. The function
+ * checks whether that's true and ignores the error if so.
+ */
+void
+CheckPointFDWXact(XLogRecPtr redo_horizon)
+{
+ int cnt;
+ int serialized_fdw_xacts = 0;
+
+ /* Quick get-away, before taking lock */
+ if (max_prepared_foreign_xacts <= 0)
+ return;
+
+ TRACE_POSTGRESQL_FDWXACT_CHECKPOINT_START();
+
+ LWLockAcquire(FDWXactLock, LW_SHARED);
+
+ /* Another quick, before we allocate memory */
+ if (FDWXactGlobal->numFDWXacts <= 0)
+ {
+ LWLockRelease(FDWXactLock);
+ return;
+ }
+
+ /*
+ * We are expecting there to be zero FDWXact that need to be copied to
+ * disk, so we perform all I/O while holding FDWXactLock for simplicity.
+ * This presents any new foreign xacts from preparing while this occurs,
+ * which shouldn't be a problem since the presence fo long-lived prepared
+ * foreign xacts indicated the transaction manager isn't active.
+ *
+ * it's also possible to move I/O out of the lock, but on every error we
+ * should check whether somebody committed our transaction in different
+ * backend. Let's leave this optimisation for future, if somebody will
+ * spot that this place cause bottleneck.
+ *
+ * Note that it isn't possible for there to be a FDWXact with a
+ * fdw_xact_end_lsn set prior to the last checkpoint yet is marked
+ * invalid, because of the efforts with delayChkpt.
+ */
+ for (cnt = 0; cnt < FDWXactGlobal->numFDWXacts; cnt++)
+ {
+ FDWXact fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+
+ if ((fdw_xact->valid || fdw_xact->inredo) &&
+ !fdw_xact->ondisk &&
+ fdw_xact->fdw_xact_end_lsn <= redo_horizon)
+ {
+ char *buf;
+ int len;
+
+ XlogReadFDWXactData(fdw_xact->fdw_xact_start_lsn, &buf, &len);
+ RecreateFDWXactFile(fdw_xact->local_xid, fdw_xact->serverid,
+ fdw_xact->userid, buf, len);
+ fdw_xact->ondisk = true;
+ serialized_fdw_xacts++;
+ pfree(buf);
+ }
+ }
+
+ LWLockRelease(FDWXactLock);
+
+ TRACE_POSTGRESQL_FDWXACT_CHECKPOINT_DONE();
+
+ if (log_checkpoints && serialized_fdw_xacts > 0)
+ ereport(LOG,
+ (errmsg_plural("%u foreign transaction state file was written "
+ "for long-running prepared transactions",
+ "%u foreign transaction state files were written "
+ "for long-running prepared transactions",
+ serialized_fdw_xacts,
+ serialized_fdw_xacts)));
+}
+
+/*
+ * Reads foreign trasasction data from xlog. During checkpoint this data will
+ * be moved to fdwxact files and ReadFDWXactFile should be used instead.
+ *
+ * Note clearly that this function accesses WAL during normal operation, similarly
+ * to the way WALSender or Logical Decoding would do. It does not run during
+ * crash recovery or standby processing.
+ */
+static void
+XlogReadFDWXactData(XLogRecPtr lsn, char **buf, int *len)
+{
+ XLogRecord *record;
+ XLogReaderState *xlogreader;
+ char *errormsg;
+
+ xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating an XLog reading processor.")));
+
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+
+ if (record == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read foreign transaction state from xlog at %X/%X",
+ (uint32) (lsn >> 32),
+ (uint32) lsn)));
+
+ if (XLogRecGetRmid(xlogreader) != RM_FDW_XACT_ID ||
+ (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_FDW_XACT_INSERT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("expected foreign transaction state data is not present in xlog at %X/%X",
+ (uint32) (lsn >> 32),
+ (uint32) lsn)));
+
+ if (len != NULL)
+ *len = XLogRecGetDataLen(xlogreader);
+
+ *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
+ memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
+
+ XLogReaderFree(xlogreader);
+}
+
+/*
+ * Recreates a foreign transaction state file. This is used in WAL replay and
+ * during checkpoint creation.
+ *
+ * Note: content and len don't include CRC.
+ */
+void
+RecreateFDWXactFile(TransactionId xid, Oid serverid, Oid userid,
+ void *content, int len)
+{
+ char path[MAXPGPATH];
+ pg_crc32c fdw_xact_crc;
+ pg_crc32c bogus_crc;
+ int fd;
+
+ /* Recompute CRC */
+ INIT_CRC32C(fdw_xact_crc);
+ COMP_CRC32C(fdw_xact_crc, content, len);
+
+ FDWXactFilePath(path, xid, serverid, userid);
+
+ fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not recreate foreign transaction state file \"%s\": %m",
+ path)));
+
+ if (write(fd, content, len) != len)
+ {
+ CloseTransientFile(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write foreign transcation state file: %m")));
+ }
+ FIN_CRC32C(fdw_xact_crc);
+
+ /*
+ * Write a deliberately bogus CRC to the state file; this is just paranoia
+ * to catch the case where four more bytes will run us out of disk space.
+ */
+ bogus_crc = ~fdw_xact_crc;
+ if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
+ {
+ CloseTransientFile(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write foreing transaction state file: %m")));
+ }
+ /* Back up to prepare for rewriting the CRC */
+ if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
+ {
+ CloseTransientFile(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in foreign transaction state file: %m")));
+ }
+
+ /* write correct CRC and close file */
+ if ((write(fd, &fdw_xact_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
+ {
+ CloseTransientFile(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write foreign transaction state file: %m")));
+ }
+
+ /*
+ * We must fsync the file because the end-of-replay checkpoint will not do
+ * so, there being no GXACT in shared memory yet to tell it to.
+ */
+ if (pg_fsync(fd) != 0)
+ {
+ CloseTransientFile(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync foreign transaction state file: %m")));
+ }
+
+ if (CloseTransientFile(fd) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close foreign transaction file: %m")));
+}
+
+/* Built in functions */
+/*
+ * Structure to hold and iterate over the foreign transactions to be displayed
+ * by the built-in functions.
+ */
+typedef struct
+{
+ FDWXact fdw_xacts;
+ int num_xacts;
+ int cur_xact;
+} WorkingStatus;
+
+/*
+ * pg_fdw_xact
+ * Produce a view with one row per prepared transaction on foreign server.
+ *
+ * This function is here so we don't have to export the
+ * FDWXactGlobalData struct definition.
+ *
+ */
+Datum
+pg_fdw_xacts(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ WorkingStatus *status;
+ char *xact_status;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ TupleDesc tupdesc;
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /*
+ * Switch to memory context appropriate for multiple function calls
+ */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* build tupdesc for result tuples */
+ /* this had better match pg_fdw_xacts view in system_views.sql */
+ tupdesc = CreateTemplateTupleDesc(6, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier",
+ TEXTOID, -1, 0);
+
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ /*
+ * Collect status information that we will format and send out as a
+ * result set.
+ */
+ status = (WorkingStatus *) palloc(sizeof(WorkingStatus));
+ funcctx->user_fctx = (void *) status;
+
+ status->num_xacts = GetFDWXactList(&status->fdw_xacts);
+ status->cur_xact = 0;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ status = funcctx->user_fctx;
+
+ while (status->cur_xact < status->num_xacts)
+ {
+ FDWXact fdw_xact = &status->fdw_xacts[status->cur_xact++];
+ Datum values[6];
+ bool nulls[6];
+ HeapTuple tuple;
+ Datum result;
+
+ if (!fdw_xact->valid)
+ continue;
+
+ /*
+ * Form tuple with appropriate data.
+ */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = ObjectIdGetDatum(fdw_xact->dboid);
+ values[1] = TransactionIdGetDatum(fdw_xact->local_xid);
+ values[2] = ObjectIdGetDatum(fdw_xact->serverid);
+ values[3] = ObjectIdGetDatum(fdw_xact->userid);
+ switch (fdw_xact->status)
+ {
+ case FDW_XACT_PREPARING:
+ xact_status = "prepared";
+ break;
+ case FDW_XACT_COMMITTING_PREPARED:
+ xact_status = "committing";
+ break;
+ case FDW_XACT_ABORTING_PREPARED:
+ xact_status = "aborting";
+ break;
+ default:
+ xact_status = "unknown";
+ break;
+ }
+ values[4] = CStringGetTextDatum(xact_status);
+ /* should this be really interpreted by FDW */
+ values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id,
+ FDW_XACT_ID_LEN));
+
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Returns an array of all foreign prepared transactions for the user-level
+ * function pg_fdw_xact.
+ *
+ * The returned array and all its elements are copies of internal data
+ * structures, to minimize the time we need to hold the FDWXactLock.
+ *
+ * WARNING -- we return even those transactions whose information is not
+ * completely filled yet. The caller should filter them out if he doesn't want them.
+ *
+ * The returned array is palloc'd.
+ */
+static int
+GetFDWXactList(FDWXact * fdw_xacts)
+{
+ int num_xacts;
+ int cnt_xacts;
+
+ LWLockAcquire(FDWXactLock, LW_SHARED);
+
+ if (FDWXactGlobal->numFDWXacts == 0)
+ {
+ LWLockRelease(FDWXactLock);
+ *fdw_xacts = NULL;
+ return 0;
+ }
+
+ num_xacts = FDWXactGlobal->numFDWXacts;
+ *fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * num_xacts);
+ for (cnt_xacts = 0; cnt_xacts < num_xacts; cnt_xacts++)
+ memcpy((*fdw_xacts) + cnt_xacts, FDWXactGlobal->fdw_xacts[cnt_xacts],
+ sizeof(FDWXactData));
+
+ LWLockRelease(FDWXactLock);
+
+ return num_xacts;
+}
+
+/*
+ * pg_fdw_xact_resolve
+ * a user interface to initiate foreign transaction resolution. The function
+ * tries to resolve the prepared transactions on foreign servers in the database
+ * from where it is run.
+ * The function prints the status of all the foreign transactions it
+ * encountered, whether resolved or not.
+ */
+Datum
+pg_fdw_xact_resolve(PG_FUNCTION_ARGS)
+{
+ MemoryContext oldcontext;
+ FuncCallContext *funcctx;
+ WorkingStatus *status;
+ char *xact_status;
+ List *entries_to_resolve;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ TupleDesc tupdesc;
+
+ /* We will be modifying the shared memory. Prepare to clean up on exit */
+ if (!fdwXactExitRegistered)
+ {
+ before_shmem_exit(AtProcExit_FDWXact, 0);
+ fdwXactExitRegistered = true;
+ }
+
+ /* Allocate space for and prepare the returning set */
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+ /* Switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* build tupdesc for result tuples */
+ tupdesc = CreateTemplateTupleDesc(6, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier",
+ TEXTOID, -1, 0);
+
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ /*
+ * Collect status information that we will format and send out as a
+ * result set.
+ */
+ status = (WorkingStatus *) palloc(sizeof(WorkingStatus));
+ funcctx->user_fctx = (void *) status;
+ status->fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * FDWXactGlobal->numFDWXacts);
+ status->num_xacts = 0;
+ status->cur_xact = 0;
+
+ /* Done preparation for the result. */
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * Get entries whose foreign servers are part of the database where
+ * this function was called. We can get information about only such
+ * foreign servers. The function will lock the entries. The entries
+ * which are locked by other backends and whose foreign servers belong
+ * to this database are left out, since we can not work on those.
+ */
+ search_fdw_xact(InvalidTransactionId, MyDatabaseId, InvalidOid, InvalidOid,
+ &entries_to_resolve);
+
+ /* Work to resolve the resolvable entries */
+ while (entries_to_resolve)
+ {
+ FDWXact fdw_xact = linitial(entries_to_resolve);
+
+ /* Remove the entry as we will not use it again */
+ entries_to_resolve = list_delete_first(entries_to_resolve);
+
+ /* Copy the data for the sake of result. */
+ memcpy(status->fdw_xacts + status->num_xacts++,
+ fdw_xact, sizeof(FDWXactData));
+
+ if (fdw_xact->status == FDW_XACT_COMMITTING_PREPARED ||
+ fdw_xact->status == FDW_XACT_ABORTING_PREPARED)
+ {
+ /*
+ * We have already decided what to do with the foreign
+ * transaction nothing to be done.
+ */
+ }
+ else if (TransactionIdDidCommit(fdw_xact->local_xid))
+ fdw_xact->status = FDW_XACT_COMMITTING_PREPARED;
+ else if (TransactionIdDidAbort(fdw_xact->local_xid))
+ fdw_xact->status = FDW_XACT_ABORTING_PREPARED;
+ else if (!TransactionIdIsInProgress(fdw_xact->local_xid))
+ {
+ /*
+ * The transaction is in progress but not on any of the
+ * backends. So probably, it crashed before actual abort or
+ * commit. So assume it to be aborted.
+ */
+ fdw_xact->status = FDW_XACT_ABORTING_PREPARED;
+ }
+ else
+ {
+ /*
+ * Local transaction is in progress, should not resolve the
+ * foreign transaction. This can happen when the foreign
+ * transaction is prepared as part of a local prepared
+ * transaction. Just continue with the next one.
+ */
+ unlock_fdw_xact(fdw_xact);
+ continue;
+ }
+
+ /*
+ * Resolve the foreign transaction. If resolution was not
+ * successful, unlock the entry so that someone else can pick it
+ * up
+ */
+ if (!resolve_fdw_xact(fdw_xact, get_prepared_foreign_xact_resolver(fdw_xact)))
+ unlock_fdw_xact(fdw_xact);
+ else
+ /* Update the status in the result set */
+ status->fdw_xacts[status->num_xacts - 1].status = FDW_XACT_RESOLVED;
+ }
+ }
+
+ /* Print the result set */
+ funcctx = SRF_PERCALL_SETUP();
+ status = funcctx->user_fctx;
+
+ while (status->cur_xact < status->num_xacts)
+ {
+ FDWXact fdw_xact = &status->fdw_xacts[status->cur_xact++];
+ Datum values[6];
+ bool nulls[6];
+ HeapTuple tuple;
+ Datum result;
+
+ if (!fdw_xact->valid)
+ continue;
+
+ /*
+ * Form tuple with appropriate data.
+ */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = ObjectIdGetDatum(fdw_xact->dboid);
+ values[1] = TransactionIdGetDatum(fdw_xact->local_xid);
+ values[2] = ObjectIdGetDatum(fdw_xact->serverid);
+ values[3] = ObjectIdGetDatum(fdw_xact->userid);
+ switch (fdw_xact->status)
+ {
+ case FDW_XACT_PREPARING:
+ xact_status = "preparing";
+ break;
+ case FDW_XACT_COMMITTING_PREPARED:
+ xact_status = "committing";
+ break;
+ case FDW_XACT_ABORTING_PREPARED:
+ xact_status = "aborting";
+ break;
+ case FDW_XACT_RESOLVED:
+ xact_status = "resolved";
+ break;
+ default:
+ xact_status = "unknown";
+ break;
+ }
+ values[4] = CStringGetTextDatum(xact_status);
+ /* should this be really interpreted by FDW? */
+ values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id,
+ FDW_XACT_ID_LEN));
+
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Built-in function to remove prepared foreign transaction entry/s without
+ * resolving. The function gives a way to forget about such prepared
+ * transaction in case
+ * 1. The foreign server where it is prepared is no longer available
+ * 2. The user which prepared this transaction needs to be dropped
+ * 3. PITR is recovering before a transaction id, which created the prepared
+ * foreign transaction
+ * 4. The database containing the entries needs to be dropped
+ *
+ * Or any such conditions in which resolution is no longer possible.
+ *
+ * The function accepts 4 arguments transaction id, dbid, serverid and userid,
+ * which define the criteria in the same way as search_fdw_xact(). The entries
+ * matching the criteria are removed. The function does not remove an entry
+ * which is locked by some other backend.
+ */
+Datum
+pg_fdw_xact_remove(PG_FUNCTION_ARGS)
+{
+/* Some #defines only for this function to deal with the arguments */
+#define XID_ARGNUM 0
+#define DBID_ARGNUM 1
+#define SRVID_ARGNUM 2
+#define USRID_ARGNUM 3
+
+ TransactionId xid;
+ Oid dbid;
+ Oid serverid;
+ Oid userid;
+ List *entries_to_remove;
+
+ xid = PG_ARGISNULL(XID_ARGNUM) ? InvalidTransactionId :
+ DatumGetTransactionId(PG_GETARG_DATUM(XID_ARGNUM));
+ dbid = PG_ARGISNULL(DBID_ARGNUM) ? InvalidOid :
+ PG_GETARG_OID(DBID_ARGNUM);
+ serverid = PG_ARGISNULL(SRVID_ARGNUM) ? InvalidOid :
+ PG_GETARG_OID(SRVID_ARGNUM);
+ userid = PG_ARGISNULL(USRID_ARGNUM) ? InvalidOid :
+ PG_GETARG_OID(USRID_ARGNUM);
+
+ search_fdw_xact(xid, dbid, serverid, userid, &entries_to_remove);
+
+ while (entries_to_remove)
+ {
+ FDWXact fdw_xact = linitial(entries_to_remove);
+
+ entries_to_remove = list_delete_first(entries_to_remove);
+
+ remove_fdw_xact(fdw_xact);
+ }
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Code dealing with the on disk files used to store foreign transaction
+ * information.
+ */
+
+/*
+ * ReadFDWXactFile
+ * Read the foreign transction state file and return the contents in a
+ * structure allocated in-memory. The structure can be later freed by the
+ * caller.
+ */
+static FDWXactOnDiskData *
+ReadFDWXactFile(TransactionId xid, Oid serverid, Oid userid)
+{
+ char path[MAXPGPATH];
+ int fd;
+ FDWXactOnDiskData *fdw_xact_file_data;
+ struct stat stat;
+ uint32 crc_offset;
+ pg_crc32c calc_crc;
+ pg_crc32c file_crc;
+ char *buf;
+
+ FDWXactFilePath(path, xid, serverid, userid);
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open FDW transaction state file \"%s\": %m",
+ path)));
+
+ /*
+ * Check file length. We can determine a lower bound pretty easily. We
+ * set an upper bound to avoid palloc() failure on a corrupt file, though
+ * we can't guarantee that we won't get an out of memory error anyway,
+ * even on a valid file.
+ */
+ if (fstat(fd, &stat))
+ {
+ CloseTransientFile(fd);
+
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not stat FDW transaction state file \"%s\": %m",
+ path)));
+ return NULL;
+ }
+
+ if (stat.st_size < offsetof(FDWXactOnDiskData, fdw_xact_id) ||
+ stat.st_size > MaxAllocSize)
+ {
+ CloseTransientFile(fd);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("Too large FDW transaction state file \"%s\": %m",
+ path)));
+ return NULL;
+ }
+
+ buf = (char *) palloc(stat.st_size);
+ fdw_xact_file_data = (FDWXactOnDiskData *) buf;
+ crc_offset = stat.st_size - sizeof(pg_crc32c);
+ /* Slurp the file */
+ if (read(fd, fdw_xact_file_data, stat.st_size) != stat.st_size)
+ {
+ CloseTransientFile(fd);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not read FDW transaction state file \"%s\": %m",
+ path)));
+ pfree(fdw_xact_file_data);
+ return NULL;
+ }
+
+ CloseTransientFile(fd);
+
+ /*
+ * Check the CRC.
+ */
+ INIT_CRC32C(calc_crc);
+ COMP_CRC32C(calc_crc, buf, crc_offset);
+ FIN_CRC32C(calc_crc);
+
+ file_crc = *((pg_crc32c *) (buf + crc_offset));
+
+ if (!EQ_CRC32C(calc_crc, file_crc))
+ {
+ pfree(buf);
+ return NULL;
+ }
+
+ if (fdw_xact_file_data->serverid != serverid ||
+ fdw_xact_file_data->userid != userid ||
+ fdw_xact_file_data->local_xid != xid)
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt foreign transaction state file \"%s\"",
+ path)));
+ CloseTransientFile(fd);
+ pfree(buf);
+ return NULL;
+ }
+
+ return fdw_xact_file_data;
+}
+
+/*
+ * PrescanFDWXacts
+ *
+ * Read the foreign prepared transactions directory for oldest active
+ * transaction. The transactions corresponding to the xids in this directory
+ * are not necessarily active per say locally. But we still need those XIDs to
+ * be alive so that
+ * 1. we can determine whether they are committed or aborted
+ * 2. the file name contains xid which shouldn't get used again to avoid
+ * conflicting file names.
+ *
+ * The function accepts the oldest active xid determined by other functions
+ * (e.g. PrescanPreparedTransactions()). It then compares every xid it comes
+ * across while scanning foreign prepared transactions directory with the oldest
+ * active xid. It returns the oldest of those xids or oldest active xid
+ * whichever is older.
+ *
+ * If any foreign prepared transaction is part of a future transaction (PITR),
+ * the function removes the corresponding file as
+ * 1. We can not know the status of the local transaction which prepared this
+ * foreign transaction
+ * 2. The foreign server or the user may not be available as per new timeline
+ *
+ * Anyway, the local transaction which prepared the foreign prepared transaction
+ * does not exist as per the new timeline, so it's better to forget the foreign
+ * prepared transaction as well.
+ */
+TransactionId
+PrescanFDWXacts(TransactionId oldestActiveXid)
+{
+ TransactionId nextXid = ShmemVariableCache->nextXid;
+ DIR *cldir;
+ struct dirent *clde;
+
+ cldir = AllocateDir(FDW_XACTS_DIR);
+ while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL)
+ {
+ if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN &&
+ strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN)
+ {
+ Oid serverid;
+ Oid userid;
+ TransactionId local_xid;
+
+ sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid,
+ &userid);
+
+ /*
+ * Remove a foreign prepared transaction file corresponding to an
+ * XID, which is too new.
+ */
+ if (TransactionIdFollowsOrEquals(local_xid, nextXid))
+ {
+ ereport(WARNING,
+ (errmsg("removing future foreign prepared transaction file \"%s\"",
+ clde->d_name)));
+ RemoveFDWXactFile(local_xid, serverid, userid, true);
+ continue;
+ }
+
+ if (TransactionIdPrecedesOrEquals(local_xid, oldestActiveXid))
+ oldestActiveXid = local_xid;
+ }
+ }
+
+ FreeDir(cldir);
+ return oldestActiveXid;
+}
+
+/*
+ * RecoverFDWXacts
+ * Read the foreign prepared transaction information and set it up for further
+ * usage.
+ */
+void
+RecoverFDWXacts(void)
+{
+ DIR *cldir;
+ struct dirent *clde;
+
+ cldir = AllocateDir(FDW_XACTS_DIR);
+ while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL)
+ {
+ if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN &&
+ strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN)
+ {
+ Oid serverid;
+ Oid userid;
+ TransactionId local_xid;
+ FDWXactOnDiskData *fdw_xact_file_data;
+ FDWXact fdw_xact;
+
+ sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid,
+ &userid);
+
+ fdw_xact_file_data = ReadFDWXactFile(local_xid, serverid, userid);
+
+ if (!fdw_xact_file_data)
+ {
+ ereport(WARNING,
+ (errmsg("Removing corrupt foreign transaction file \"%s\"",
+ clde->d_name)));
+ RemoveFDWXactFile(local_xid, serverid, userid, false);
+ continue;
+ }
+
+ ereport(LOG,
+ (errmsg("recovering foreign transaction entry for xid %u, foreign server %u and user %u",
+ local_xid, serverid, userid)));
+
+ fdw_xact = get_fdw_xact(local_xid, serverid, userid);
+
+ LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+ if (!fdw_xact)
+ {
+ /*
+ * Add this entry into the table of foreign transactions. The
+ * status of the transaction is set as preparing, since we do not
+ * know the exact status right now. Resolver will set it later
+ * based on the status of local transaction which prepared this
+ * foreign transaction.
+ */
+ fdw_xact = insert_fdw_xact(fdw_xact_file_data->dboid, local_xid,
+ serverid, userid,
+ fdw_xact_file_data->umid,
+ fdw_xact_file_data->fdw_xact_id);
+ fdw_xact->locking_backend = MyBackendId;
+ fdw_xact->status = FDW_XACT_PREPARING;
+ }
+ else
+ {
+ Assert(fdw_xact->inredo);
+ fdw_xact->inredo = false;
+ }
+
+ /* Mark the entry as ready */
+ fdw_xact->valid = true;
+ /* Already synced to disk */
+ fdw_xact->ondisk = true;
+ pfree(fdw_xact_file_data);
+ LWLockRelease(FDWXactLock);
+ }
+ }
+
+ FreeDir(cldir);
+}
+
+/*
+ * Remove the foreign transaction file for given entry.
+ *
+ * If giveWarning is false, do not complain about file-not-present;
+ * this is an expected case during WAL replay.
+ */
+static void
+RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid, bool giveWarning)
+{
+ char path[MAXPGPATH];
+
+ FDWXactFilePath(path, xid, serverid, userid);
+ if (unlink(path))
+ if (errno != ENOENT || giveWarning)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove foreign transaction state file \"%s\": %m",
+ path)));
+}
+
+/*
+ * FDWXactRedoAdd
+ *
+ * Store pointer to the start/end of the WAL record along with the xid in
+ * a fdw_xact entry in shared memory FDWXactData structure.
+ */
+void
+FDWXactRedoAdd(XLogReaderState *record)
+{
+ FDWXactOnDiskData *fdw_xact_data = (FDWXactOnDiskData *) XLogRecGetData(record);
+ FDWXact fdw_xact;
+
+ Assert(RecoveryInProgress());
+
+ LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+ fdw_xact = insert_fdw_xact(fdw_xact_data->dboid, fdw_xact_data->local_xid,
+ fdw_xact_data->serverid, fdw_xact_data->userid,
+ fdw_xact_data->umid, fdw_xact_data->fdw_xact_id);
+ fdw_xact->status = FDW_XACT_PREPARING;
+ fdw_xact->fdw_xact_start_lsn = record->ReadRecPtr;
+ fdw_xact->fdw_xact_end_lsn = record->EndRecPtr;
+ fdw_xact->inredo = true;
+ LWLockRelease(FDWXactLock);
+}
+/*
+ * FDWXactRedoRemove
+ *
+ * Remove the corresponding fdw_xact entry from FDWXactGlobal.
+ * Also remove fdw_xact file if a foreign transaction was saved
+ * via an earlier checkpoint.
+ */
+void
+FDWXactRedoRemove(TransactionId xid, Oid serverid, Oid userid)
+{
+ FDWXact fdw_xact;
+
+ Assert(RecoveryInProgress());
+
+ fdw_xact = get_fdw_xact(xid, serverid, userid);
+
+ if (fdw_xact)
+ {
+ /* Now we can clean up any files we already left */
+ Assert(fdw_xact->inredo);
+ remove_fdw_xact(fdw_xact);
+ }
+ else
+ {
+ /*
+ * Entry could be on disk. Call with giveWarning = false
+ * since it can be expected during replay.
+ */
+ RemoveFDWXactFile(xid, serverid, userid, false);
+ }
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 9368b56..c10a027 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -9,6 +9,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/fdw_xact.h"
#include "access/ginxlog.h"
#include "access/gistxlog.h"
#include "access/generic_xlog.h"
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 83169cc..98f847b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -58,6 +58,7 @@
#include
#include "access/commit_ts.h"
+#include "access/fdw_xact.h"
#include "access/htup_details.h"
#include "access/subtrans.h"
#include "access/transam.h"
@@ -1455,6 +1456,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
PredicateLockTwoPhaseFinish(xid, isCommit);
+ /*
+ * Commit/Rollback the foreign transactions prepared as part of this
+ * prepared transaction.
+ */
+ FDWXactTwoPhaseFinish(isCommit, xid);
+
/* Count the prepared xact as committed or aborted */
AtEOXact_PgStat(isCommit);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 5ca7375..d62a9b2 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
#include
#include "access/commit_ts.h"
+#include "access/fdw_xact.h"
#include "access/multixact.h"
#include "access/parallel.h"
#include "access/subtrans.h"
@@ -1985,6 +1986,9 @@ CommitTransaction(void)
break;
}
+ /* Pre-commit step for foreign transcations */
+ PreCommit_FDWXacts();
+
CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
: XACT_EVENT_PRE_COMMIT);
@@ -2143,6 +2147,7 @@ CommitTransaction(void)
AtEOXact_PgStat(true);
AtEOXact_Snapshot(true);
AtCommit_ApplyLauncher();
+ AtEOXact_FDWXacts(true);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@@ -2232,6 +2237,9 @@ PrepareTransaction(void)
* the transaction-abort path.
*/
+ /* Prepare step for foreign transactions */
+ AtPrepare_FDWXacts();
+
/* Shut down the deferred-trigger manager */
AfterTriggerEndXact(true);
@@ -2620,6 +2628,7 @@ AbortTransaction(void)
AtEOXact_ComboCid();
AtEOXact_HashTables(false);
AtEOXact_PgStat(false);
+ AtEOXact_FDWXacts(false);
pgstat_report_xact_timestamp(0);
}
@@ -4313,6 +4322,10 @@ AbortOutOfAnyTransaction(void)
void
RegisterTransactionLocalNode(void)
{
+ /* Quick exits if no need to remember */
+ if (max_prepared_foreign_xacts == 0)
+ return;
+
XactWriteLocalNode = true;
}
@@ -4322,6 +4335,10 @@ RegisterTransactionLocalNode(void)
void
UnregisterTransactionLocalNode(void)
{
+ /* Quick exits if no need to remember */
+ if (max_prepared_foreign_xacts == 0)
+ return;
+
XactWriteLocalNode = false;
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5d58f09..f862369 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/fdw_xact.h"
#include "access/multixact.h"
#include "access/rewriteheap.h"
#include "access/subtrans.h"
@@ -5104,6 +5105,7 @@ BootStrapXLOG(void)
ControlFile->MaxConnections = MaxConnections;
ControlFile->max_worker_processes = max_worker_processes;
ControlFile->max_prepared_xacts = max_prepared_xacts;
+ ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts;
ControlFile->max_locks_per_xact = max_locks_per_xact;
ControlFile->wal_level = wal_level;
ControlFile->wal_log_hints = wal_log_hints;
@@ -6176,6 +6178,9 @@ CheckRequiredParameterValues(void)
RecoveryRequiresIntParameter("max_locks_per_transaction",
max_locks_per_xact,
ControlFile->max_locks_per_xact);
+ RecoveryRequiresIntParameter("max_prepared_foreign_transactions",
+ max_prepared_foreign_xacts,
+ ControlFile->max_prepared_foreign_xacts);
}
}
@@ -6870,7 +6875,10 @@ StartupXLOG(void)
InitRecoveryTransactionEnvironment();
if (wasShutdown)
+ {
oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
+ oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
+ }
else
oldestActiveXID = checkPoint.oldestActiveXid;
Assert(TransactionIdIsValid(oldestActiveXID));
@@ -7495,6 +7503,7 @@ StartupXLOG(void)
/* Pre-scan prepared transactions to find out the range of XIDs present */
oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
+ oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
/*
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
@@ -7681,6 +7690,9 @@ StartupXLOG(void)
/* Reload shared-memory state for prepared transactions */
RecoverPreparedTransactions();
+ /* Recover foreign transaction state and insert into shared-memory. */
+ RecoverFDWXacts();
+
/*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions(), see notes for lock_twophase_recover()
@@ -8993,6 +9005,11 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointReplicationOrigin();
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
+ /*
+ * We deliberately delay foreign transaction checkpointing as long as
+ * possible.
+ */
+ CheckPointFDWXact(checkPointRedo);
}
/*
@@ -9430,7 +9447,8 @@ XLogReportParameters(void)
max_worker_processes != ControlFile->max_worker_processes ||
max_prepared_xacts != ControlFile->max_prepared_xacts ||
max_locks_per_xact != ControlFile->max_locks_per_xact ||
- track_commit_timestamp != ControlFile->track_commit_timestamp)
+ track_commit_timestamp != ControlFile->track_commit_timestamp ||
+ max_prepared_foreign_xacts != ControlFile->max_prepared_foreign_xacts)
{
/*
* The change in number of backend slots doesn't need to be WAL-logged
@@ -9447,6 +9465,7 @@ XLogReportParameters(void)
xlrec.MaxConnections = MaxConnections;
xlrec.max_worker_processes = max_worker_processes;
xlrec.max_prepared_xacts = max_prepared_xacts;
+ xlrec.max_prepared_foreign_xacts = max_prepared_foreign_xacts;
xlrec.max_locks_per_xact = max_locks_per_xact;
xlrec.wal_level = wal_level;
xlrec.wal_log_hints = wal_log_hints;
@@ -9462,6 +9481,7 @@ XLogReportParameters(void)
ControlFile->MaxConnections = MaxConnections;
ControlFile->max_worker_processes = max_worker_processes;
ControlFile->max_prepared_xacts = max_prepared_xacts;
+ ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts;
ControlFile->max_locks_per_xact = max_locks_per_xact;
ControlFile->wal_level = wal_level;
ControlFile->wal_log_hints = wal_log_hints;
@@ -9658,6 +9678,7 @@ xlog_redo(XLogReaderState *record)
RunningTransactionsData running;
oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
+ oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
/*
* Construct a RunningTransactions snapshot representing a shut
@@ -9847,6 +9868,7 @@ xlog_redo(XLogReaderState *record)
ControlFile->MaxConnections = xlrec.MaxConnections;
ControlFile->max_worker_processes = xlrec.max_worker_processes;
ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts;
+ ControlFile->max_prepared_foreign_xacts = xlrec.max_prepared_foreign_xacts;
ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
ControlFile->wal_level = xlrec.wal_level;
ControlFile->wal_log_hints = xlrec.wal_log_hints;
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 46c207c..2da7369 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -17,6 +17,7 @@
#include
#include
+#include "access/fdw_xact.h"
#include "access/htup_details.h"
#include "bootstrap/bootstrap.h"
#include "catalog/index.h"
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index d357c8b..bf5fbc1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -301,6 +301,9 @@ CREATE VIEW pg_prepared_xacts AS
CREATE VIEW pg_prepared_statements AS
SELECT * FROM pg_prepared_statement() AS P;
+CREATE VIEW pg_fdw_xacts AS
+ SELECT * FROM pg_fdw_xacts() AS F;
+
CREATE VIEW pg_seclabels AS
SELECT
l.objoid, l.classoid, l.objsubid,
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index 68100df..3c05676 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -13,6 +13,7 @@
*/
#include "postgres.h"
+#include "access/fdw_xact.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -1093,6 +1094,20 @@ RemoveForeignServerById(Oid srvId)
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for foreign server %u", srvId);
+ /*
+ * Check if the foreign server has any foreign transaction prepared on it.
+ * If there is one, and it gets dropped, we will not have any chance to
+ * resolve that transaction.
+ */
+ if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srvId, InvalidOid))
+ {
+ Form_pg_foreign_server srvForm;
+ srvForm = (Form_pg_foreign_server) GETSTRUCT(tp);
+ ereport(ERROR,
+ (errmsg("server \"%s\" has unresolved prepared transactions on it",
+ NameStr(srvForm->srvname))));
+ }
+
CatalogTupleDelete(rel, &tp->t_self);
ReleaseSysCache(tp);
@@ -1403,6 +1418,17 @@ RemoveUserMapping(DropUserMappingStmt *stmt)
user_mapping_ddl_aclcheck(useId, srv->serverid, srv->servername);
/*
+ * If there is a foreign prepared transaction with this user mapping,
+ * dropping the user mapping might result in dangling prepared
+ * transaction.
+ */
+ if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srv->serverid,
+ useId))
+ ereport(ERROR,
+ (errmsg("server \"%s\" has unresolved prepared transaction for user \"%s\"",
+ srv->servername, MappingUserName(useId))));
+
+ /*
* Do the deletion
*/
object.classId = UserMappingRelationId;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5c13d26..5b09f1d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -148,6 +148,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
+ case RM_FDW_XACT_ID:
case RM_REPLORIGIN_ID:
case RM_GENERIC_ID:
/* just deal with xid, and done */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2d1ed14..f32db3a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,7 @@
#include "access/clog.h"
#include "access/commit_ts.h"
+#include "access/fdw_xact.h"
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/nbtree.h"
@@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, BackendRandomShmemSize());
+ size = add_size(size, FDWXactShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -270,6 +272,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
SyncScanShmemInit();
AsyncShmemInit();
BackendRandomShmemInit();
+ FDWXactShmemInit();
#ifdef EXEC_BACKEND
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 3e13394..cdf2d8d 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
- LWLockTranchesAllocated = 64;
+ LWLockTranchesAllocated = 65;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ec..8e7028a 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42
BackendRandomLock 43
LogicalRepWorkerLock 44
CLogTruncationLock 45
+FDWXactLock 46
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e9d561b..bab9a23 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -27,6 +27,7 @@
#endif
#include "access/commit_ts.h"
+#include "access/fdw_xact.h"
#include "access/gin.h"
#include "access/rmgr.h"
#include "access/transam.h"
@@ -2065,6 +2066,19 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ /*
+ * See also CheckRequiredParameterValues() if this parameter changes
+ */
+ {
+ {"max_prepared_foreign_transactions", PGC_POSTMASTER, RESOURCES_MEM,
+ gettext_noop("Sets the maximum number of simultaneously prepared transactions on foreign servers."),
+ NULL
+ },
+ &max_prepared_foreign_xacts,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
#ifdef LOCK_DEBUG
{
{"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 8a93bdc..1be8858 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -118,6 +118,12 @@
# (change requires restart)
# Caution: it is not advisable to set max_prepared_transactions nonzero unless
# you actively intend to use prepared transactions.
+#max_prepared_foreign_transactions = 0 # zero disables the feature
+ # (change requires restart)
+# Note: Increasing max_prepared_foreign_transactions costs ~600(?) bytes of shared memory
+# per foreign transaction slot.
+# It is not advisable to set max_prepared_foreign_transactions nonzero unless you
+# actively intend to use atomic foreign transactions feature.
#work_mem = 4MB # min 64kB
#maintenance_work_mem = 64MB # min 1MB
#replacement_sort_tuples = 150000 # limits use of replacement selection sort
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index 214dc71..af2c627 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -81,6 +81,8 @@ provider postgresql {
probe multixact__checkpoint__done(bool);
probe twophase__checkpoint__start();
probe twophase__checkpoint__done();
+ probe fdwxact__checkpoint__start();
+ probe fdwxact__checkpoint__done();
probe smgr__md__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int);
probe smgr__md__read__done(ForkNumber, BlockNumber, Oid, Oid, Oid, int, int, int);
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 8dde1e8..f0fa78a 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -205,6 +205,7 @@ static const char *const subdirs[] = {
"pg_snapshots",
"pg_subtrans",
"pg_twophase",
+ "pg_fdw_xact",
"pg_multixact",
"pg_multixact/members",
"pg_multixact/offsets",
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 2ea8931..f703e60 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -276,6 +276,8 @@ main(int argc, char *argv[])
ControlFile->max_worker_processes);
printf(_("max_prepared_xacts setting: %d\n"),
ControlFile->max_prepared_xacts);
+ printf(_("max_prepared_foreign_xacts setting: %d\n"),
+ ControlFile->max_prepared_foreign_xacts);
printf(_("max_locks_per_xact setting: %d\n"),
ControlFile->max_locks_per_xact);
printf(_("track_commit_timestamp setting: %s\n"),
diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c
index bcb9ed9..739a475 100644
--- a/src/bin/pg_resetwal/pg_resetwal.c
+++ b/src/bin/pg_resetwal/pg_resetwal.c
@@ -585,6 +585,7 @@ GuessControlValues(void)
ControlFile.MaxConnections = 100;
ControlFile.max_worker_processes = 8;
ControlFile.max_prepared_xacts = 0;
+ ControlFile.max_prepared_foreign_xacts = 0;
ControlFile.max_locks_per_xact = 64;
ControlFile.maxAlign = MAXIMUM_ALIGNOF;
@@ -797,6 +798,7 @@ RewriteControlFile(void)
ControlFile.MaxConnections = 100;
ControlFile.max_worker_processes = 8;
ControlFile.max_prepared_xacts = 0;
+ ControlFile.max_prepared_foreign_xacts = 0;
ControlFile.max_locks_per_xact = 64;
/* Now we can force the recorded xlog seg size to the right thing. */
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca..41eed51 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
#define FRONTEND 1
#include "postgres.h"
+#include "access/fdw_xact.h"
#include "access/brin_xlog.h"
#include "access/clog.h"
#include "access/commit_ts.h"
diff --git a/src/include/access/fdw_xact.h b/src/include/access/fdw_xact.h
new file mode 100644
index 0000000..0b470b4
--- /dev/null
+++ b/src/include/access/fdw_xact.h
@@ -0,0 +1,75 @@
+/*
+ * fdw_xact.h
+ *
+ * PostgreSQL distributed transaction manager
+ *
+ * Portions Copyright (c) 2017, PostgreSQL Global Development Group
+ *
+ * src/include/access/fdw_xact.h
+ */
+#ifndef FDW_XACT_H
+#define FDW_XACT_H
+
+#include "storage/backendid.h"
+#include "foreign/foreign.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+#include "nodes/pg_list.h"
+
+#define FDW_XACT_ID_LEN (2 + 1 + 8 + 1 + 8 + 1 + 8)
+#define FDWXactId(path, prefix, xid, serverid, userid) \
+ snprintf((path), FDW_XACT_ID_LEN + 1, "%s_%08X_%08X_%08X", (prefix), \
+ (xid), (serverid), (userid))
+
+/*
+ * On disk file structure
+ */
+typedef struct
+{
+ Oid dboid; /* database oid where to find foreign server
+ * and user mapping */
+ TransactionId local_xid;
+ Oid serverid; /* foreign server where transaction takes
+ * place */
+ Oid userid; /* user who initiated the foreign transaction */
+ Oid umid;
+ char fdw_xact_id[FDW_XACT_ID_LEN]; /* foreign txn prepare id */
+} FDWXactOnDiskData;
+
+typedef struct
+{
+ TransactionId xid;
+ Oid serverid;
+ Oid userid;
+ Oid dbid;
+} FdwRemoveXlogRec;
+
+extern int max_prepared_foreign_xacts;
+
+/* Info types for logs related to FDW transactions */
+#define XLOG_FDW_XACT_INSERT 0x00
+#define XLOG_FDW_XACT_REMOVE 0x10
+
+extern Size FDWXactShmemSize(void);
+extern void FDWXactShmemInit(void);
+extern void RecoverFDWXacts(void);
+extern TransactionId PrescanFDWXacts(TransactionId oldestActiveXid);
+extern bool fdw_xact_has_usermapping(Oid serverid, Oid userid);
+extern bool fdw_xact_has_server(Oid serverid);
+extern void fdw_xact_redo(XLogReaderState *record);
+extern void fdw_xact_desc(StringInfo buf, XLogReaderState *record);
+extern const char *fdw_xact_identify(uint8 info);
+extern void AtEOXact_FDWXacts(bool is_commit);
+extern void AtPrepare_FDWXacts(void);
+extern void FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid);
+extern bool fdw_xact_exists(TransactionId xid, Oid dboid, Oid serverid,
+ Oid userid);
+extern void CheckPointFDWXact(XLogRecPtr redo_horizon);
+extern void RegisterXactForeignServer(Oid serverid, Oid userid, bool can_prepare);
+extern bool FdwTwoPhaseNeeded(void);
+extern void PreCommit_FDWXacts(void);
+extern void FDWXactRedoAdd(XLogReaderState *record);
+extern void FDWXactRedoRemove(TransactionId xid, Oid serverid, Oid userid);
+extern void KnownFDWXactRecreateFiles(XLogRecPtr redo_horizon);
+
+#endif /* FDW_XACT_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 2f43c19..62702de 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_FDW_XACT_ID, "Foreign Transactions", fdw_xact_redo, fdw_xact_desc, fdw_xact_identify, NULL, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index aee1a07..f30374d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -93,6 +93,9 @@ extern int MyXactFlags;
#define XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK (1U << 1)
+/* Foreign transaction support */
+extern bool XactWriteLocalNode;
+
/*
* start- and end-of-transaction callbacks for dynamically loaded modules
*/
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index c09c0f8..be6a412 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -213,6 +213,7 @@ typedef struct xl_parameter_change
int MaxConnections;
int max_worker_processes;
int max_prepared_xacts;
+ int max_prepared_foreign_xacts;
int max_locks_per_xact;
int wal_level;
bool wal_log_hints;
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 3a25cc8..c57a66f 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -182,6 +182,7 @@ typedef struct ControlFileData
int MaxConnections;
int max_worker_processes;
int max_prepared_xacts;
+ int max_prepared_foreign_xacts;
int max_locks_per_xact;
bool track_commit_timestamp;
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 79f9b90..27f0adb 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5324,6 +5324,12 @@ DATA(insert OID = 3992 ( dense_rank PGNSP PGUID 12 1 0 2276 0 t f f f f f i s
DESCR("rank of hypothetical row without gaps");
DATA(insert OID = 3993 ( dense_rank_final PGNSP PGUID 12 1 0 2276 0 f f f f f f i s 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ _null_ hypothetical_dense_rank_final _null_ _null_ _null_ ));
DESCR("aggregate final function");
+DATA(insert OID = 4130 ( pg_fdw_xacts PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26,28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_xacts _null_ _null_ _null_ ));
+DESCR("view foreign transactions");
+DATA(insert OID = 4131 ( pg_fdw_xact_resolve PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_xact_resolve _null_ _null_ _null_ ));
+DESCR("resolve foreign prepared transactions");
+DATA(insert OID = 4132 ( pg_fdw_xact_remove PGNSP PGUID 12 1 0 0 0 f f f f f f v u 4 0 2278 "28 26 26 26" _null_ _null_ "{transaction,dbid,serverid,userid}" _null_ _null_ pg_fdw_xact_remove _null_ _null_ _null_ ));
+DESCR("remove foreign transactions");
/* pg_upgrade support */
DATA(insert OID = 3582 ( binary_upgrade_set_next_pg_type_oid PGNSP PGUID 12 1 0 0 0 f f f f t f v r 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_pg_type_oid _null_ _null_ _null_ ));
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 6ca44f7..7b95f77 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -12,6 +12,7 @@
#ifndef FDWAPI_H
#define FDWAPI_H
+#include "access/fdw_xact.h"
#include "access/parallel.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
@@ -143,6 +144,18 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
Oid serverOid);
+typedef bool (*EndForeignTransaction_function) (Oid serverid, Oid userid,
+ Oid umid, bool is_commit);
+
+typedef bool (*PrepareForeignTransaction_function) (Oid serverid, Oid userid,
+ Oid umid, char *prep_info);
+
+typedef bool (*ResolvePreparedForeignTransaction_function) (Oid serverid,
+ Oid userid,
+ Oid umid,
+ bool is_commit,
+ char *prep_info);
+
typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
ParallelContext *pcxt);
typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
@@ -220,6 +233,11 @@ typedef struct FdwRoutine
/* Support functions for IMPORT FOREIGN SCHEMA */
ImportForeignSchema_function ImportForeignSchema;
+ /* Support functions for foreign transactions */
+ EndForeignTransaction_function EndForeignTransaction;
+ PrepareForeignTransaction_function PrepareForeignTransaction;
+ ResolvePreparedForeignTransaction_function ResolvePreparedForeignTransaction;
+
/* Support functions for parallelism under Gather node */
IsForeignScanParallelSafe_function IsForeignScanParallelSafe;
EstimateDSMForeignScan_function EstimateDSMForeignScan;
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1a125d8..f59ecbb 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -266,11 +266,12 @@ extern PGPROC *PreparedXactProcs;
* We set aside some extra PGPROC structures for auxiliary processes,
* ie things that aren't full-fledged backends but need shmem access.
*
- * Background writer, checkpointer and WAL writer run during normal operation.
+ * Background writer, checkpointer, WAL writer and foreign transction resolver
+ * run during normal operation.
* Startup process and WAL receiver also consume 2 slots, but WAL writer is
* launched only after startup has exited, so we only need 4 slots.
*/
-#define NUM_AUXILIARY_PROCS 4
+#define NUM_AUXILIARY_PROCS 5
/* configurable options */
extern int DeadlockTimeout;
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 1435a7b..843c629 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -121,4 +121,8 @@ extern int32 type_maximum_size(Oid type_oid, int32 typemod);
/* quote.c */
extern char *quote_literal_cstr(const char *rawstr);
+/* access/transam/fdw_xact.c */
+extern Datum pg_fdw_xacts(PG_FUNCTION_ARGS);
+extern Datum pg_fdw_xact_resolve(PG_FUNCTION_ARGS);
+extern Datum pg_fdw_xact_remove(PG_FUNCTION_ARGS);
#endif /* BUILTINS_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index d706f42..06102ff 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1323,6 +1323,13 @@ pg_cursors| SELECT c.name,
c.is_scrollable,
c.creation_time
FROM pg_cursor() c(name, statement, is_holdable, is_binary, is_scrollable, creation_time);
+pg_fdw_xacts| SELECT f.dbid,
+ f.transaction,
+ f.serverid,
+ f.userid,
+ f.status,
+ f.identifier
+ FROM pg_fdw_xacts() f(dbid, transaction, serverid, userid, status, identifier);
pg_file_settings| SELECT a.sourcefile,
a.sourceline,
a.seqno,
diff --git a/src/test/regress/pg_regress.c b/src/test/regress/pg_regress.c
index b685aeb..478260b 100644
--- a/src/test/regress/pg_regress.c
+++ b/src/test/regress/pg_regress.c
@@ -2263,9 +2263,11 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc
* Adjust the default postgresql.conf for regression testing. The user
* can specify a file to be appended; in any case we expand logging
* and set max_prepared_transactions to enable testing of prepared
- * xacts. (Note: to reduce the probability of unexpected shmmax
- * failures, don't set max_prepared_transactions any higher than
- * actually needed by the prepared_xacts regression test.)
+ * xacts. We also set max_fdw_transctions to enable testing of atomic
+ * foreign transactions. (Note: to reduce the probability of unexpected
+ * shmmax failures, don't set max_prepared_transactions or
+ * max_prepared_foreign_transactions any higher than actually needed by the
+ * corresponding regression tests.).
*/
snprintf(buf, sizeof(buf), "%s/data/postgresql.conf", temp_instance);
pg_conf = fopen(buf, "a");
@@ -2280,7 +2282,8 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc
fputs("log_line_prefix = '%m [%p] %q%a '\n", pg_conf);
fputs("log_lock_waits = on\n", pg_conf);
fputs("log_temp_files = 128kB\n", pg_conf);
- fputs("max_prepared_transactions = 2\n", pg_conf);
+ fputs("max_prepared_transactions = 3\n", pg_conf);
+ fputs("max_prepared_foreign_transactions = 2\n", pg_conf);
for (sl = temp_configs; sl != NULL; sl = sl->next)
{