diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index ac339fb..09f67a3 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1432,6 +1432,27 @@ include_dir 'conf.d' + + max_prepared_foreign_transactions (integer) + + max_prepared_foreign_transactions configuration parameter + + + + + Sets the maximum number of foreign transactions that can be prepared + simultaneously. + This parameter can only be set at server start. + + + + When running a standby server, you must set this parameter to the + same or higher value than on the master server. Otherwise, queries + will not be allowed in the standby server. + + + + work_mem (integer) diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index dbeaab5..639e38b 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -1714,5 +1714,92 @@ GetForeignServerByName(const char *name, bool missing_ok); + + Transaction manager for Foreign Data Wrappers + + + PostgreSQL transaction manager allows FDWs to read and write + data on foreign server within a transaction while maintaining atomicity + (and hence consistency) of the foreign data. Every Foreign Data Wrapper is + required to register the foreign server along with the PostgreSQL + user whose user mapping is used to connect to the foreign server while starting a + transaction on the foreign server as part of the transaction on + PostgreSQL using RegisterXactForeignServer. + +void +RegisterXactForeignServer(Oid serverid, + Oid userid, + bool two_phase_compliant) + + two_phase_compliant should be true if the foreign server supports + two-phase commit protocol, false otherwise. + + + + An example of such transaction is as follows + +BEGIN; +UPDATE ft1 SET col = 'a'; +UPDATE ft2 SET col = 'b'; +COMMIT; + + ft1 and ft2 are foreign tables on different foreign servers may be using different + Foreign Data Wrappers. + + + + When max_prepared_foreign_transactions is more than zero + PostgreSQL employs Two-phase commit protocol to achieve + atomic distributed transaction. All the foreign servers registered should + support two-phase commit protocol. The two-phase commit protocol is used for + achieving atomic distributed transaction when more than two foreign servers + that support two-phase commit protocol are involved with transaction, or + transaction involves with one foreign server that support two-phase commit + protocol and changes on local data. In other case, for example where only one + foreign server that support two-phase commit is involved with transaction, + the two-phase commit protocol is not used. In Two-phase commit protocol is + processed in two phases: prepare phase and commit phase. In prepare phase, + PostgreSQL prepares the transactions on all the foreign + servers registered using RegisterXactForeignServer. If any of + the foreign server fails to prepare transaction, prepare phase fails. In commit + phase, all the prepared transactions are committed if prepare phase has succeeded + or rolled back if prepare phase fails to prepare transactions on all the foreign + servers. + + + + During prepare phase the distributed transaction manager calls + GetPrepareId to get the prepared transaction identifier for + each foreign server involved. It stores this identifier along with the + serverid and userid for later use. It then calls + ResolvePreparedForeignTranscation with the same identifier + with action FDW_XACT_RESOLVED. + + + + During commit phase the distributed transaction manager calls + ResolveForeignTransaction with the same identifier with action + FDW_XACT_COMMITTING_PREPARED to commit the prepared transaction or + FDW_XACT_ABORTING_PREPARED to rollback the prepared transaction. In case the + distributed transaction manager fails to commit or rollback a prepared + transaction because of connection failure, the operation can be tried again + through built-in pg_fdw_xact. One may set up a background worker + process to retry the operation by installing extension fdw_transaction_resolver + and including $libdir/fdw_transaction_resolver.so in + shared_preload_libraries. + + + + When max_prepared_foreign_transaction is zero, atomicity commit can + not be guaranteed across foreign servers. If transaction on PostgreSQL + is committed, Distributed transaction manager commit the transaction on all the + foreign servers registered using RegisterXactForeignServer, + independent of the outcome of the same operation on other foreign servers. + Thus transactions on some foreign servers may be committed, while the same + on other foreign servers would be rolled back. If the transaction on + PostgreSQL aborts transactions on all the foreign servers + are aborted too. + + diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 5514db1..6e23ec1 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -8,9 +8,10 @@ subdir = src/backend/access/rmgrdesc top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ - gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ - mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o +OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o fdw_xactdesc.o \ + genericdesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \ + logicalmsgdesc.o mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o \ + seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o \ + xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/fdw_xactdesc.c b/src/backend/access/rmgrdesc/fdw_xactdesc.c new file mode 100644 index 0000000..869faf7 --- /dev/null +++ b/src/backend/access/rmgrdesc/fdw_xactdesc.c @@ -0,0 +1,68 @@ +/*------------------------------------------------------------------------- + * + * fdw_xactdesc.c + * PostgreSQL distributed transaction manager for foreign server. + * + * This module describes the WAL records for foreign transaction manager. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/fdw_xactdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/fdw_xact.h" +#include "access/xloginsert.h" +#include "lib/stringinfo.h" + +extern void +fdw_xact_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_FDW_XACT_INSERT) + { + FDWXactOnDiskData *fdw_insert_xlog = (FDWXactOnDiskData *) rec; + + appendStringInfo(buf, "Foreign server oid: %u", fdw_insert_xlog->serverid); + appendStringInfo(buf, " user oid: %u", fdw_insert_xlog->userid); + appendStringInfo(buf, " database id: %u", fdw_insert_xlog->dboid); + appendStringInfo(buf, " local xid: %u", fdw_insert_xlog->local_xid); + /* TODO: This should be really interpreted by each FDW */ + + /* + * TODO: we also need to assess whether we want to add this + * information + */ + appendStringInfo(buf, " foreign transaction info: %s", + fdw_insert_xlog->fdw_xact_id); + } + else + { + FdwRemoveXlogRec *fdw_remove_xlog = (FdwRemoveXlogRec *) rec; + + appendStringInfo(buf, "Foreign server oid: %u", fdw_remove_xlog->serverid); + appendStringInfo(buf, " user oid: %u", fdw_remove_xlog->userid); + appendStringInfo(buf, " database id: %u", fdw_remove_xlog->dbid); + appendStringInfo(buf, " local xid: %u", fdw_remove_xlog->xid); + } + +} + +extern const char * +fdw_xact_identify(uint8 info) +{ + switch (info & ~XLR_INFO_MASK) + { + case XLOG_FDW_XACT_INSERT: + return "NEW FOREIGN TRANSACTION"; + case XLOG_FDW_XACT_REMOVE: + return "REMOVE FOREIGN TRANSACTION"; + } + /* Keep compiler happy */ + return NULL; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 5f07eb1..ff3064e 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -112,14 +112,15 @@ xlog_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "max_connections=%d max_worker_processes=%d " "max_prepared_xacts=%d max_locks_per_xact=%d " "wal_level=%s wal_log_hints=%s " - "track_commit_timestamp=%s", + "track_commit_timestamp=%s max_fdw_xacts=%d", xlrec.MaxConnections, xlrec.max_worker_processes, xlrec.max_prepared_xacts, xlrec.max_locks_per_xact, wal_level_str, xlrec.wal_log_hints ? "on" : "off", - xlrec.track_commit_timestamp ? "on" : "off"); + xlrec.track_commit_timestamp ? "on" : "off", + xlrec.max_prepared_foreign_xacts); } else if (info == XLOG_FPW_CHANGE) { diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 16fbe47..dd7ee32 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = clog.o commit_ts.o generic_xlog.o multixact.o parallel.o rmgr.o slru.o \ subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \ xact.o xlog.o xlogarchive.o xlogfuncs.o \ - xloginsert.o xlogreader.o xlogutils.o + xloginsert.o xlogreader.o xlogutils.o fdw_xact.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/fdw_xact.c b/src/backend/access/transam/fdw_xact.c new file mode 100644 index 0000000..90d11df --- /dev/null +++ b/src/backend/access/transam/fdw_xact.c @@ -0,0 +1,2182 @@ +/*------------------------------------------------------------------------- + * + * fdw_xact.c + * PostgreSQL distributed transaction manager for foreign server. + * + * This module manages the transactions involving foreign servers. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/backend/access/transam/fdw_xact.c + * + *------------------------------------------------------------------------- + */ +#include +#include +#include + +#include "postgres.h" + +#include "miscadmin.h" +#include "funcapi.h" + +#include "access/fdw_xact.h" +#include "access/htup_details.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "access/xlogutils.h" +#include "catalog/pg_type.h" +#include "foreign/foreign.h" +#include "foreign/fdwapi.h" +#include "libpq/pqsignal.h" +#include "pg_trace.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" + +/* + * This comment summarises how the transaction manager handles transactions + * involving one or more foreign server/s. + * + * When an foreign data wrapper starts transaction on a foreign server, it is + * required to register the foreign server and user who initiated the + * transaction using function RegisterXactForeignServer(). A foreign server + * connection is identified by oid of foreign server and user. + * + * The commit is executed in two phases: + * First phase (executed during pre-commit processing) + * ----------- + * Transactions are prepared on all the foreign servers, which can participate + * in two-phase commit protocol. Transaction on other foreign servers are + * committed in the same phase. + * + * Second phase (executed during post-commit/abort processing) + * ------------ + * If first phase succeeds, foreign servers are requested to commit respective + * prepared transactions. If the first phase does not succeed because of any + * failure, the foreign servers are asked to rollback respective prepared + * transactions or abort the transactions if they are not prepared. + * + * Any network failure, server crash after preparing foreign transaction leaves + * that prepared transaction unresolved. During the first phase, before actually + * preparing the transactions, enough information is persisted to the disk and + * logs in order to resolve such transactions. + * + * During replay and replication FDWXactGlobal also holds information about + * active prepared foreign transaction that haven't been moved to disk yet. + * + * Replay of fdw_xact records happens by the following rules: + * + * * On PREPARE redo we add the foreign transaction to + * FDWXactGlobal->fdw_xacts. We set fdw_xact->inredo to true for + * such entries. + * + * * On Checkpoint redo we iterate through FDWXactGlobal->fdw_xacts. + * entries that have fdw_xact->inredo set and are behind the redo_horizon. + * We save them to disk and also set fdw_xact->ondisk to true. + * + * * On COMMIT/ABORT we delete the entry from FDWXactGlobal->fdw_xacts. + * If fdw_xact->ondisk is true, we delete the corresponding entry from + * the disk as well. + * + * * RecoverPreparedTransactions(), StnadbyReoverPreparedTransactions() and + * PrescanPreparedTransactions() have been modified to go through + * fdw_xact->inredo entries that have not made to disk yet. + */ + +/* Shared memory entry for a prepared or being prepared foreign transaction */ +typedef struct FDWXactData *FDWXact; + +/* Structure to bundle the foreign connection participating in transaction */ +typedef struct +{ + Oid serverid; + Oid userid; + Oid umid; + char *servername; + FDWXact fdw_xact; /* foreign prepared transaction entry in case + * prepared */ + bool two_phase_commit; /* Should use two phase commit + * protocol while committing + * transaction on this server, + * whenever necessary. */ + EndForeignTransaction_function end_foreign_xact; + PrepareForeignTransaction_function prepare_foreign_xact; + ResolvePreparedForeignTransaction_function resolve_prepared_foreign_xact; +} FDWConnection; + +/* List of foreign connections participating in the transaction */ +List *MyFDWConnections = NIL; + +/* + * By default we assume that all the foreign connections participating in this + * transaction can use two phase commit protocol. + */ +bool TwoPhaseReady = true; + +/* Record the server, userid participating in the transaction. */ +void +RegisterXactForeignServer(Oid serverid, Oid userid, bool two_phase_commit) +{ + FDWConnection *fdw_conn; + ListCell *lcell; + ForeignServer *foreign_server; + ForeignDataWrapper *fdw; + UserMapping *user_mapping; + FdwRoutine *fdw_routine; + MemoryContext old_context; + + TwoPhaseReady = TwoPhaseReady && two_phase_commit; + + /* Check if the entry already exists, if so, raise an error */ + foreach(lcell, MyFDWConnections) + { + fdw_conn = lfirst(lcell); + + if (fdw_conn->serverid == serverid && + fdw_conn->userid == userid) + ereport(ERROR, + (errmsg("attempt to start transction again on server %u user %u", + serverid, userid))); + } + + /* + * This list and its contents needs to be saved in the transaction context + * memory + */ + old_context = MemoryContextSwitchTo(TopTransactionContext); + /* Add this foreign connection to the list for transaction management */ + fdw_conn = (FDWConnection *) palloc(sizeof(FDWConnection)); + + /* Make sure that the FDW has at least a transaction handler */ + foreign_server = GetForeignServer(serverid); + fdw = GetForeignDataWrapper(foreign_server->fdwid); + fdw_routine = GetFdwRoutine(fdw->fdwhandler); + user_mapping = GetUserMapping(userid, serverid); + + if (!fdw_routine->EndForeignTransaction) + ereport(ERROR, + (errmsg("no function to end a foreign transaction provided for FDW %s", + fdw->fdwname))); + + if (two_phase_commit) + { + if (max_prepared_foreign_xacts == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("prepread foreign transactions are disabled"), + errhint("Set max_prepared_foreign_transactions to a nonzero value."))); + + if (!fdw_routine->PrepareForeignTransaction) + ereport(ERROR, + (errmsg("no function provided for preparing foreign transaction for FDW %s", + fdw->fdwname))); + + if (!fdw_routine->ResolvePreparedForeignTransaction) + ereport(ERROR, + (errmsg("no function provided for resolving prepared foreign transaction for FDW %s", + fdw->fdwname))); + } + + fdw_conn->serverid = serverid; + fdw_conn->userid = userid; + fdw_conn->umid = user_mapping->umid; + + /* + * We may need following information at the end of a transaction, when the + * system caches are not available. So save it before hand. + */ + fdw_conn->servername = foreign_server->servername; + fdw_conn->prepare_foreign_xact = fdw_routine->PrepareForeignTransaction; + fdw_conn->resolve_prepared_foreign_xact = fdw_routine->ResolvePreparedForeignTransaction; + fdw_conn->end_foreign_xact = fdw_routine->EndForeignTransaction; + fdw_conn->fdw_xact = NULL; + fdw_conn->two_phase_commit = two_phase_commit; + MyFDWConnections = lappend(MyFDWConnections, fdw_conn); + /* Revert back the context */ + MemoryContextSwitchTo(old_context); + + return; +} + +/* Enum to track the status of prepared foreign transaction */ +typedef enum +{ + FDW_XACT_PREPARING, /* foreign transaction is (being) prepared */ + FDW_XACT_COMMITTING_PREPARED, /* foreign prepared transaction is to + * be committed */ + FDW_XACT_ABORTING_PREPARED, /* foreign prepared transaction is to be + * aborted */ + FDW_XACT_RESOLVED /* Status used only by pg_fdw_xact_resolve(). + * It doesn't appear in the in-memory entry. */ +} FDWXactStatus; + +typedef struct FDWXactData +{ + FDWXact fx_next; /* Next free FDWXact entry */ + Oid dboid; /* database oid where to find foreign server + * and user mapping */ + TransactionId local_xid; /* XID of local transaction */ + Oid serverid; /* foreign server where transaction takes + * place */ + Oid userid; /* user who initiated the foreign transaction */ + Oid umid; /* user mapping id for connection key */ + FDWXactStatus status; /* The state of the foreign + * transaction. This doubles as the + * action to be taken on this entry. */ + + /* + * Note that we need to keep track of two LSNs for each FDWXact. We keep + * track of the start LSN because this is the address we must use to read + * state data back from WAL when committing a FDWXact. We keep track of + * the end LSN because that is the LSN we need to wait for prior to + * commit. + */ + XLogRecPtr fdw_xact_start_lsn; /* XLOG offset of inserting this entry + * start */ + XLogRecPtr fdw_xact_end_lsn; /* XLOG offset of inserting this entry + * end */ + + bool valid; /* Has the entry been complete and written to file? */ + BackendId locking_backend; /* Backend working on this entry */ + bool ondisk; /* TRUE if prepare state file is on disk */ + bool inredo; /* TRUE if entry was added via xlog_redo */ + char fdw_xact_id[FDW_XACT_ID_LEN]; /* prepared transaction + * identifier */ +} FDWXactData; + +/* Directory where the foreign prepared transaction files will reside */ +#define FDW_XACTS_DIR "pg_fdw_xact" + +/* + * Name of foreign prepared transaction file is 8 bytes xid, 8 bytes foreign + * server oid and 8 bytes user oid separated by '_'. + */ +#define FDW_XACT_FILE_NAME_LEN (8 + 1 + 8 + 1 + 8) +#define FDWXactFilePath(path, xid, serverid, userid) \ + snprintf(path, MAXPGPATH, FDW_XACTS_DIR "/%08X_%08X_%08X", xid, \ + serverid, userid) + +/* Shared memory layout for maintaining foreign prepared transaction entries. */ +typedef struct +{ + /* Head of linked list of free FDWXactData structs */ + FDWXact freeFDWXacts; + + /* Number of valid FDW transaction entries */ + int numFDWXacts; + + /* Upto max_prepared_foreign_xacts entries in the array */ + FDWXact fdw_xacts[FLEXIBLE_ARRAY_MEMBER]; /* Variable length array */ +} FDWXactGlobalData; + +static void AtProcExit_FDWXact(int code, Datum arg); +static bool resolve_fdw_xact(FDWXact fdw_xact, + ResolvePreparedForeignTransaction_function prepared_foreign_xact_resolver); +static FDWXact insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, char *fdw_xact_id); +static void unlock_fdw_xact(FDWXact fdw_xact); +static void unlock_fdw_xact_entries(); +static void remove_fdw_xact(FDWXact fdw_xact); +static FDWXact register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, char *fdw_xact_info); +static int GetFDWXactList(FDWXact * fdw_xacts); +static ResolvePreparedForeignTransaction_function get_prepared_foreign_xact_resolver(FDWXact fdw_xact); +static FDWXactOnDiskData *ReadFDWXactFile(TransactionId xid, Oid serverid, + Oid userid); +static void RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid, + bool giveWarning); +static void RecreateFDWXactFile(TransactionId xid, Oid serverid, Oid userid, + void *content, int len); +static void XlogReadFDWXactData(XLogRecPtr lsn, char **buf, int *len); +static void prepare_foreign_transactions(void); +static FDWXact get_fdw_xact(TransactionId xid, Oid serverid, Oid userid); +bool search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid, + List **qualifying_xacts); + +/* + * Maximum number of foreign prepared transaction entries at any given time + * GUC variable, change requires restart. + */ +int max_prepared_foreign_xacts = 0; + +/* Keep track of registering process exit call back. */ +static bool fdwXactExitRegistered = false; + +/* Pointer to the shared memory holding the foreign transactions data */ +static FDWXactGlobalData *FDWXactGlobal; + +/* foreign transaction entries locked by this backend */ +List *MyLockedFDWXacts = NIL; + +/* + * FDWXactShmemSize + * Calculates the size of shared memory allocated for maintaining foreign + * prepared transaction entries. + */ +extern Size +FDWXactShmemSize(void) +{ + Size size; + + /* Need the fixed struct, foreign transaction information array */ + size = offsetof(FDWXactGlobalData, fdw_xacts); + size = add_size(size, mul_size(max_prepared_foreign_xacts, + sizeof(FDWXact))); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_prepared_foreign_xacts, + sizeof(FDWXactData))); + + return size; +} + +/* + * FDWXactShmemInit + * Initialization of shared memory for maintaining foreign prepared transaction + * entries. The shared memory layout is defined in definition of + * FDWXactGlobalData structure. + */ +extern void +FDWXactShmemInit(void) +{ + bool found; + + FDWXactGlobal = ShmemInitStruct("Foreign transactions table", + FDWXactShmemSize(), + &found); + if (!IsUnderPostmaster) + { + FDWXact fdw_xacts; + int cnt; + + Assert(!found); + FDWXactGlobal->freeFDWXacts = NULL; + FDWXactGlobal->numFDWXacts = 0; + + /* Initialise the linked list of free FDW transactions */ + fdw_xacts = (FDWXact) + ((char *) FDWXactGlobal + + MAXALIGN(offsetof(FDWXactGlobalData, fdw_xacts) + + sizeof(FDWXact) * max_prepared_foreign_xacts)); + for (cnt = 0; cnt < max_prepared_foreign_xacts; cnt++) + { + fdw_xacts[cnt].fx_next = FDWXactGlobal->freeFDWXacts; + FDWXactGlobal->freeFDWXacts = &fdw_xacts[cnt]; + } + } + else + { + Assert(FDWXactGlobal); + Assert(found); + } +} + +/* + * PreCommit_FDWXacts + * + * The function is responsible for pre-commit processing on foreign connections. + * Basically the foreign transactions are prepared on the foreign servers which + * can execute two-phase-commit protocol. But in case of where only one server + * that can execute two-phase-commit protocol is involved with transaction and + * no changes is made on local data then we don't need to two-phase-commit protocol, + * so try to commit transaction on the server. Those will be aborted or committed + * after the current transaction has been aborted or committed resp. We try to + * commit transactions on rest of the foreign servers now. For these foreign + * servers it is possible that some transactions commit even if the local + * transaction aborts. + */ +void +PreCommit_FDWXacts(void) +{ + ListCell *cur; + ListCell *prev; + ListCell *next; + + /* If there are no foreign servers involved, we have no business here */ + if (list_length(MyFDWConnections) < 1) + return; + + /* + * Try committing transactions on the foreign servers, which can not + * execute two-phase-commit protocol. + */ + for (cur = list_head(MyFDWConnections), prev = NULL; cur; cur = next) + { + FDWConnection *fdw_conn = lfirst(cur); + + next = lnext(cur); + + if (!fdw_conn->two_phase_commit) + { + /* + * The FDW has to make sure that the connection opened to the + * foreign server is out of transaction. Even if the handler + * function returns failure statue, there's hardly anything to do. + */ + if (!fdw_conn->end_foreign_xact(fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, true)) + elog(WARNING, "could not commit transaction on server %s", + fdw_conn->servername); + + /* The connection is no more part of this transaction, forget it */ + MyFDWConnections = list_delete_cell(MyFDWConnections, cur, prev); + } + else + prev = cur; + } + + /* + * Here foreign servers that can not execute two-phase-commit protocol + * already commit the transaction and MyFDWConnections has only foreign + * servers that can execute two-phase-commit protocol. We don't need to + * use two-phase-commit protocol if there is only one foreign server that + * that can execute two-phase-commit and didn't write no local node. + */ + if ((list_length(MyFDWConnections) > 1) || + (list_length(MyFDWConnections) == 1 && XactWriteLocalNode)) + { + /* + * Prepare the transactions on the all foreign servers, which can + * execute two-phase-commit protocol. + */ + prepare_foreign_transactions(); + } + else if (list_length(MyFDWConnections) == 1) + { + FDWConnection *fdw_conn = lfirst(list_head(MyFDWConnections)); + + /* + * We don't need to use two-phase commit protocol only one server + * remaining even if this server can execute two-phase-commit + * protocol. + */ + if (!fdw_conn->end_foreign_xact(fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, true)) + elog(WARNING, "could not commit transaction on server %s", + fdw_conn->servername); + + /* MyFDWConnections should be cleared here */ + MyFDWConnections = list_delete_cell(MyFDWConnections, cur, prev); + } +} + +/* + * prepare_foreign_transactions + * + * Prepare transactions on the foreign servers which can execute two phase + * commit protocol. Rest of the foreign servers are ignored. + */ +static void +prepare_foreign_transactions(void) +{ + ListCell *lcell; + + /* + * Loop over the foreign connections + */ + foreach(lcell, MyFDWConnections) + { + FDWConnection *fdw_conn = (FDWConnection *) lfirst(lcell); + char fdw_xact_id[FDW_XACT_ID_LEN]; + FDWXact fdw_xact; + + if (!fdw_conn->two_phase_commit) + continue; + + /* Generate prepare transaction id for foreign server */ + FDWXactId(fdw_xact_id, "px", GetTopTransactionId(), + fdw_conn->serverid, fdw_conn->userid); + + /* + * Register the foreign transaction with the identifier used to + * prepare it on the foreign server. Registration persists this + * information to the disk and logs (that way relaying it on standby). + * Thus in case we loose connectivity to the foreign server or crash + * ourselves, we will remember that we have prepared transaction on + * the foreign server and try to resolve it when connectivity is + * restored or after crash recovery. + * + * If we crash after persisting the information but before preparing + * the transaction on the foreign server, we will try to resolve a + * never-prepared transaction, and get an error. This is fine as long + * as the FDW provides us unique prepared transaction identifiers. + * + * If we prepare the transaction on the foreign server before + * persisting the information to the disk and crash in-between these + * two steps, we will forget that we prepared the transaction on the + * foreign server and will not be able to resolve it after the crash. + * Hence persist first then prepare. + */ + fdw_xact = register_fdw_xact(MyDatabaseId, GetTopTransactionId(), + fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, fdw_xact_id); + + /* + * Between register_fdw_xact call above till this backend hears back + * from foreign server, the backend may abort the local transaction + * (say, because of a signal). During abort processing, it will send + * an ABORT message to the foreign server. If the foreign server has + * not prepared the transaction, the message will succeed. If the + * foreign server has prepared transaction, it will throw an error, + * which we will ignore and the prepared foreign transaction will be + * resolved by the foreign transaction resolver. + */ + if (!fdw_conn->prepare_foreign_xact(fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, fdw_xact_id)) + { + /* + * An error occurred, and we didn't prepare the transaction. + * Delete the entry from foreign transaction table. Raise an + * error, so that the local server knows that one of the foreign + * server has failed to prepare the transaction. + * + * XXX : FDW is expected to print the error as a warning and then + * we raise actual error here. But instead, we should pull the + * error text from FDW and add it here in the message or as a + * context or a hint. + */ + remove_fdw_xact(fdw_xact); + + /* + * Delete the connection, since it doesn't require any further + * processing. This deletion will invalidate current cell pointer, + * but that is fine since we will not use that pointer because the + * subsequent ereport will get us out of this loop. + */ + MyFDWConnections = list_delete_ptr(MyFDWConnections, fdw_conn); + ereport(ERROR, + (errmsg("can not prepare transaction on foreign server %s", + fdw_conn->servername))); + } + + /* Prepare succeeded, remember it in the connection */ + fdw_conn->fdw_xact = fdw_xact; + } + return; +} + +/* + * register_fdw_xact + * + * This function is used to create new foreign transaction entry before an FDW + * executes the first phase of two-phase commit. The function adds the entry to + * WAL and will be persisted to the disk under pg_fdw_xact directory when checkpoint. + */ +static FDWXact +register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, char *fdw_xact_id) +{ + FDWXact fdw_xact; + FDWXactOnDiskData *fdw_xact_file_data; + int data_len; + + /* Enter the foreign transaction in the shared memory structure */ + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + fdw_xact = insert_fdw_xact(dbid, xid, serverid, userid, umid, + fdw_xact_id); + fdw_xact->status = FDW_XACT_PREPARING; + fdw_xact->locking_backend = MyBackendId; + LWLockRelease(FDWXactLock); + + /* Remember that we have locked this entry. */ + MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact); + + /* + * Prepare to write the entry to a file. Also add xlog entry. The contents + * of the xlog record are same as what is written to the file. + */ + data_len = offsetof(FDWXactOnDiskData, fdw_xact_id); + data_len = data_len + FDW_XACT_ID_LEN; + data_len = MAXALIGN(data_len); + fdw_xact_file_data = (FDWXactOnDiskData *) palloc0(data_len); + fdw_xact_file_data->dboid = fdw_xact->dboid; + fdw_xact_file_data->local_xid = fdw_xact->local_xid; + fdw_xact_file_data->serverid = fdw_xact->serverid; + fdw_xact_file_data->userid = fdw_xact->userid; + fdw_xact_file_data->umid = fdw_xact->umid; + memcpy(fdw_xact_file_data->fdw_xact_id, fdw_xact->fdw_xact_id, + FDW_XACT_ID_LEN); + + START_CRIT_SECTION(); + + /* Add the entry in the xlog and save LSN for checkpointer */ + XLogBeginInsert(); + XLogRegisterData((char *) fdw_xact_file_data, data_len); + fdw_xact->fdw_xact_end_lsn = XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_INSERT); + XLogFlush(fdw_xact->fdw_xact_end_lsn); + + /* Store record's start location to read that later on CheckPoint */ + fdw_xact->fdw_xact_start_lsn = ProcLastRecPtr; + + /* File is written completely, checkpoint can proceed with syncing */ + fdw_xact->valid = true; + + END_CRIT_SECTION(); + + pfree(fdw_xact_file_data); + return fdw_xact; +} + +/* + * insert_fdw_xact + * + * Insert a new entry for a given foreign transaction identified by transaction + * id, foreign server and user mapping, in the shared memory. Caller must hold + * FDWXactLock in exclusive mode. + * + * If the entry already exists, the function raises an error. + */ +static FDWXact +insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid, Oid umid, + char *fdw_xact_id) +{ + int i; + FDWXact fdw_xact; + + if (!fdwXactExitRegistered) + { + before_shmem_exit(AtProcExit_FDWXact, 0); + fdwXactExitRegistered = true; + } + + /* Check for duplicating foreign transaction entry */ + for (i = 0; i < FDWXactGlobal->numFDWXacts; i++) + { + fdw_xact = FDWXactGlobal->fdw_xacts[i]; + if (fdw_xact->local_xid == xid && + fdw_xact->serverid == serverid && + fdw_xact->userid == userid) + elog(ERROR, "duplicate entry for foreign transaction with transaction id %u, serverid %u, userid %u found", + xid, serverid, userid); + } + + /* + * Get the next free foreign transaction entry. Raise error if there are + * none left. + */ + if (!FDWXactGlobal->freeFDWXacts) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of foreign transactions reached"), + errhint("Increase max_prepared_foreign_transactions (currently %d).", + max_prepared_foreign_xacts))); + } + + fdw_xact = FDWXactGlobal->freeFDWXacts; + FDWXactGlobal->freeFDWXacts = fdw_xact->fx_next; + + /* Insert the entry to active array */ + Assert(FDWXactGlobal->numFDWXacts < max_prepared_foreign_xacts); + FDWXactGlobal->fdw_xacts[FDWXactGlobal->numFDWXacts++] = fdw_xact; + + /* Stamp the entry with backend id before releasing the LWLock */ + fdw_xact->locking_backend = InvalidBackendId; + fdw_xact->dboid = dboid; + fdw_xact->local_xid = xid; + fdw_xact->serverid = serverid; + fdw_xact->userid = userid; + fdw_xact->umid = umid; + fdw_xact->fdw_xact_start_lsn = InvalidXLogRecPtr; + fdw_xact->fdw_xact_end_lsn = InvalidXLogRecPtr; + fdw_xact->valid = false; + fdw_xact->ondisk = false; + fdw_xact->inredo = false; + memcpy(fdw_xact->fdw_xact_id, fdw_xact_id, FDW_XACT_ID_LEN); + + return fdw_xact; +} + +/* + * remove_fdw_xact + * + * Removes the foreign prepared transaction entry from shared memory, disk and + * logs about the removal in WAL. + */ +static void +remove_fdw_xact(FDWXact fdw_xact) +{ + int cnt; + + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + /* Search the slot where this entry resided */ + for (cnt = 0; cnt < FDWXactGlobal->numFDWXacts; cnt++) + { + if (FDWXactGlobal->fdw_xacts[cnt] == fdw_xact) + { + /* Remove the entry from active array */ + FDWXactGlobal->numFDWXacts--; + FDWXactGlobal->fdw_xacts[cnt] = FDWXactGlobal->fdw_xacts[FDWXactGlobal->numFDWXacts]; + + /* Put it back into free list */ + fdw_xact->fx_next = FDWXactGlobal->freeFDWXacts; + FDWXactGlobal->freeFDWXacts = fdw_xact; + + /* Unlock the entry */ + fdw_xact->locking_backend = InvalidBackendId; + MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact); + + LWLockRelease(FDWXactLock); + + if (!RecoveryInProgress()) + { + FdwRemoveXlogRec fdw_remove_xlog; + XLogRecPtr recptr; + + /* Fill up the log record before releasing the entry */ + fdw_remove_xlog.serverid = fdw_xact->serverid; + fdw_remove_xlog.dbid = fdw_xact->dboid; + fdw_remove_xlog.xid = fdw_xact->local_xid; + fdw_remove_xlog.userid = fdw_xact->userid; + + START_CRIT_SECTION(); + + /* + * Log that we are removing the foreign transaction entry and + * remove the file from the disk as well. + */ + XLogBeginInsert(); + XLogRegisterData((char *) &fdw_remove_xlog, sizeof(fdw_remove_xlog)); + recptr = XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_REMOVE); + XLogFlush(recptr); + + END_CRIT_SECTION(); + } + + /* Remove the file from the disk if exists. */ + if (fdw_xact->ondisk) + RemoveFDWXactFile(fdw_xact->local_xid, fdw_xact->serverid, + fdw_xact->userid, true); + return; + } + } + LWLockRelease(FDWXactLock); + + /* We did not find the given entry in global array */ + elog(ERROR, "failed to find %p in FDWXactGlobal array", fdw_xact); +} + +/* + * unlock_fdw_xact + * + * Unlock the foreign transaction entry by wiping out the locking_backend and + * removing it from the backend's list of foreign transaction. + */ +static void +unlock_fdw_xact(FDWXact fdw_xact) +{ + /* Only the backend holding the lock is allowed to unlock */ + Assert(fdw_xact->locking_backend == MyBackendId); + + /* + * First set the locking backend as invalid, and then remove it from the + * list of locked foreign transactions, under the LW lock. If we reverse + * the order and process exits in-between those two, we will be left an + * entry locked by this backend, which gets unlocked only at the server + * restart. + */ + + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + fdw_xact->locking_backend = InvalidBackendId; + MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact); + LWLockRelease(FDWXactLock); +} + +/* + * unlock_fdw_xact_entries + * + * Unlock the foreign transaction entries locked by this backend. + */ +static void +unlock_fdw_xact_entries() +{ + while (MyLockedFDWXacts) + { + FDWXact fdw_xact = (FDWXact) linitial(MyLockedFDWXacts); + + unlock_fdw_xact(fdw_xact); + } +} + +/* + * AtProcExit_FDWXact + * + * When the process exits, unlock the entries it held. + */ +static void +AtProcExit_FDWXact(int code, Datum arg) +{ + unlock_fdw_xact_entries(); +} + +/* + * AtEOXact_FDWXacts + * + * The function executes phase 2 of two-phase commit protocol. + * At the end of transaction perform following actions + * 1. Mark the entries locked by this backend as ABORTING or COMMITTING + * according the result of transaction. + * 2. Try to commit or abort the transactions on foreign servers. If that + * succeeds, remove them from foreign transaction entries, otherwise unlock + * them. + */ +extern void +AtEOXact_FDWXacts(bool is_commit) +{ + ListCell *lcell; + + foreach(lcell, MyFDWConnections) + { + FDWConnection *fdw_conn = lfirst(lcell); + + /* Commit/abort prepared foreign transactions */ + if (fdw_conn->fdw_xact) + { + FDWXact fdw_xact = fdw_conn->fdw_xact; + + fdw_xact->status = (is_commit ? + FDW_XACT_COMMITTING_PREPARED : + FDW_XACT_ABORTING_PREPARED); + + /* + * Try aborting or committing the transaction on the foreign + * server + */ + if (!resolve_fdw_xact(fdw_xact, fdw_conn->resolve_prepared_foreign_xact)) + { + /* + * The transaction was not resolved on the foreign server, + * unlock it, so that someone else can take care of it. + */ + unlock_fdw_xact(fdw_xact); + } + } + else + { + /* + * On servers where two phase commit protocol could not be + * executed we have tried to commit the transactions during + * pre-commit phase. Any remaining transactions need to be + * aborted. + */ + Assert(!is_commit); + + /* + * The FDW has to make sure that the connection opened to the + * foreign server is out of transaction. Even if the handler + * function returns failure statue, there's hardly anything to do. + */ + if (!fdw_conn->end_foreign_xact(fdw_conn->serverid, fdw_conn->userid, + fdw_conn->umid, is_commit)) + elog(WARNING, "could not %s transaction on server %s", + is_commit ? "commit" : "abort", + fdw_conn->servername); + + } + } + + /* + * Unlock any locked foreign transactions. Resolver might lock the + * entries, and may not be able to unlock them if aborted in-between. In + * any case, there is no reason for a foreign transaction entry to be + * locked after the transaction which locked it has ended. + */ + unlock_fdw_xact_entries(); + + /* + * Reset the list of registered connections. Since the memory for the list + * and its nodes comes from transaction memory context, it will be freed + * after this call. + */ + MyFDWConnections = NIL; + /* Set TwoPhaseReady to its default value */ + TwoPhaseReady = true; +} + +/* + * AtPrepare_FDWXacts + * + * The function is called while preparing a transaction. If there are foreign + * servers involved in the transaction, this function prepares transactions + * on those servers. + */ +extern void +AtPrepare_FDWXacts(void) +{ + /* If there are no foreign servers involved, we have no business here */ + if (list_length(MyFDWConnections) < 1) + return; + + /* + * All foreign servers participating in a transaction to be prepared + * should be two phase compliant. + */ + if (!TwoPhaseReady) + ereport(ERROR, + (errcode(ERRCODE_T_R_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("can not prepare the transaction because some foreign servers involved in transaction can not prepare the transaction"))); + + /* Prepare transactions on participating foreign servers. */ + prepare_foreign_transactions(); + + /* + * Unlock the foreign transaction entries so COMMIT/ROLLBACK PREPARED from + * some other backend will be able to lock those if required. + */ + unlock_fdw_xact_entries(); + + /* + * Reset the list of registered connections. Since the memory for the list + * and its nodes comes from transaction memory context, it will be freed + * after this call. + */ + MyFDWConnections = NIL; + + /* Set TwoPhaseReady to its default value */ + TwoPhaseReady = true; +} + +/* + * FDWXactTwoPhaseFinish + * + * This function is called as part of the COMMIT/ROLLBACK PREPARED command to + * commit/rollback the foreign transactions prepared as part of the local + * prepared transaction. The function looks for the foreign transaction entries + * with local_xid equal to xid of the prepared transaction and tries to resolve them. + */ +extern void +FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid) +{ + List *entries_to_resolve; + + FDWXactStatus status = isCommit ? FDW_XACT_COMMITTING_PREPARED : + FDW_XACT_ABORTING_PREPARED; + + /* + * Get all the entries belonging to the given transaction id locked. If + * foreign transaction resolver is running, it might lock entries to check + * whether they can be resolved. The search function will skip such + * entries. The resolver will resolve them at a later point of time. + */ + search_fdw_xact(xid, InvalidOid, InvalidOid, InvalidOid, &entries_to_resolve); + + /* Try resolving the foreign transactions */ + while (entries_to_resolve) + { + FDWXact fdw_xact = linitial(entries_to_resolve); + + entries_to_resolve = list_delete_first(entries_to_resolve); + fdw_xact->status = status; + + /* + * Resolve the foreign transaction. If resolution is not successful, + * unlock the entry so that someone else can pick it up. + */ + if (!resolve_fdw_xact(fdw_xact, + get_prepared_foreign_xact_resolver(fdw_xact))) + unlock_fdw_xact(fdw_xact); + } +} + +/* + * get_prepared_foreign_xact_resolver + */ +static ResolvePreparedForeignTransaction_function +get_prepared_foreign_xact_resolver(FDWXact fdw_xact) +{ + ForeignServer *foreign_server; + ForeignDataWrapper *fdw; + FdwRoutine *fdw_routine; + + foreign_server = GetForeignServer(fdw_xact->serverid); + fdw = GetForeignDataWrapper(foreign_server->fdwid); + fdw_routine = GetFdwRoutine(fdw->fdwhandler); + if (!fdw_routine->ResolvePreparedForeignTransaction) + elog(ERROR, "no foreign transaction resolver routine provided for FDW %s", + fdw->fdwname); + + return fdw_routine->ResolvePreparedForeignTransaction; +} + +/* + * resolve_fdw_xact + * + * Resolve the foreign transaction using the foreign data wrapper's transaction + * handler routine. + * If the resolution is successful, remove the foreign transaction entry from + * the shared memory and also remove the corresponding on-disk file. + */ +static bool +resolve_fdw_xact(FDWXact fdw_xact, + ResolvePreparedForeignTransaction_function fdw_xact_handler) +{ + bool resolved; + bool is_commit; + + Assert(fdw_xact->status == FDW_XACT_COMMITTING_PREPARED || + fdw_xact->status == FDW_XACT_ABORTING_PREPARED); + + is_commit = (fdw_xact->status == FDW_XACT_COMMITTING_PREPARED) ? + true : false; + + resolved = fdw_xact_handler(fdw_xact->serverid, fdw_xact->userid, + fdw_xact->umid, is_commit, + fdw_xact->fdw_xact_id); + + /* If we succeeded in resolving the transaction, remove the entry */ + if (resolved) + remove_fdw_xact(fdw_xact); + + return resolved; +} + +/* + * Get foreign transaction entry from FDWXactGlobal->fdw_xacts. Return NULL + * if foreign transacgiven does not exist. + */ +static FDWXact +get_fdw_xact(TransactionId xid, Oid serverid, Oid userid) +{ + int i; + FDWXact fdw_xact; + + LWLockAcquire(FDWXactLock, LW_SHARED); + + for (i = 0; i < FDWXactGlobal->numFDWXacts; i++) + { + fdw_xact = FDWXactGlobal->fdw_xacts[i]; + + if (fdw_xact->local_xid == xid && + fdw_xact->serverid == serverid && + fdw_xact->userid == userid) + { + LWLockRelease(FDWXactLock); + return fdw_xact; + } + } + + LWLockRelease(FDWXactLock); + return NULL; +} + +/* + * fdw_xact_exists + * Returns true if there exists at least one prepared foreign transaction which + * matches criteria. This function is wrapper around search_fdw_xact. Check that + * function's prologue for details. + */ +bool +fdw_xact_exists(TransactionId xid, Oid dbid, Oid serverid, Oid userid) +{ + return search_fdw_xact(xid, dbid, serverid, userid, NULL); +} + +/* + * search_fdw_xact + * Return true if there exists at least one prepared foreign transaction + * entry with given criteria. The criteria is defined by arguments with + * valid values for respective datatypes. + * + * The table below explains the same + * xid | dbid | serverid | userid | search for entry with + * invalid | invalid | invalid | invalid | nothing + * invalid | invalid | invalid | valid | given userid + * invalid | invalid | valid | invalid | given serverid + * invalid | invalid | valid | valid | given serverid and userid + * invalid | valid | invalid | invalid | given dbid + * invalid | valid | invalid | valid | given dbid and userid + * invalid | valid | valid | invalid | given dbid and serverid + * invalid | valid | valid | valid | given dbid, serveroid and userid + * valid | invalid | invalid | invalid | given xid + * valid | invalid | invalid | valid | given xid and userid + * valid | invalid | valid | invalid | given xid, serverid + * valid | invalid | valid | valid | given xid, serverid, userid + * valid | valid | invalid | invalid | given xid and dbid + * valid | valid | invalid | valid | given xid, dbid and userid + * valid | valid | valid | invalid | given xid, dbid, serverid + * valid | valid | valid | valid | given xid, dbid, serverid, userid + * + * When the criteria is void (all arguments invalid) the + * function returns true, since any entry would match the criteria. + * + * If qualifying_fdw_xacts is not NULL, the qualifying entries are locked and + * returned in a linked list. Any entry which is already locked is ignored. If + * all the qualifying entries are locked, nothing will be returned in the list + * but returned value will be true. + */ +bool +search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid, + List **qualifying_xacts) +{ + int cnt; + LWLockMode lock_mode; + + /* Return value if a qualifying entry exists */ + bool entry_exists = false; + + if (qualifying_xacts) + { + *qualifying_xacts = NIL; + /* The caller expects us to lock entries */ + lock_mode = LW_EXCLUSIVE; + } + else + lock_mode = LW_SHARED; + + LWLockAcquire(FDWXactLock, lock_mode); + for (cnt = 0; cnt < FDWXactGlobal->numFDWXacts; cnt++) + { + FDWXact fdw_xact = FDWXactGlobal->fdw_xacts[cnt]; + bool entry_matches = true; + + /* xid */ + if (xid != InvalidTransactionId && xid != fdw_xact->local_xid) + entry_matches = false; + + /* dbid */ + if (OidIsValid(dbid) && fdw_xact->dboid != dbid) + entry_matches = false; + + /* serverid */ + if (OidIsValid(serverid) && serverid != fdw_xact->serverid) + entry_matches = false; + + /* userid */ + if (OidIsValid(userid) && fdw_xact->userid != userid) + entry_matches = false; + + if (entry_matches) + { + entry_exists = true; + if (qualifying_xacts) + { + /* + * User has requested list of qualifying entries. If the + * matching entry is not locked, lock it and add to the list. + * If the entry is locked by some other backend, ignore it. + */ + if (fdw_xact->locking_backend == InvalidBackendId) + { + MemoryContext oldcontext; + + fdw_xact->locking_backend = MyBackendId; + + /* + * The list and its members may be required at the end of + * the transaction + */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact); + MemoryContextSwitchTo(oldcontext); + } + else if (fdw_xact->locking_backend != MyBackendId) + continue; + + *qualifying_xacts = lappend(*qualifying_xacts, fdw_xact); + } + else + { + /* + * User wants to check the existence, and we have found one + * matching entry. No need to check other entries. + */ + break; + } + } + } + + LWLockRelease(FDWXactLock); + + return entry_exists; +} + +/* + * fdw_xact_redo + * Apply the redo log for a foreign transaction. + */ +extern void +fdw_xact_redo(XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_FDW_XACT_INSERT) + FDWXactRedoAdd(record); + else if (info == XLOG_FDW_XACT_REMOVE) + { + FdwRemoveXlogRec *fdw_remove_xlog = (FdwRemoveXlogRec *) rec; + + /* Delete FDWXact entry and file if exists */ + FDWXactRedoRemove(fdw_remove_xlog->xid, fdw_remove_xlog->serverid, + fdw_remove_xlog->userid); + } + else + elog(ERROR, "invalid log type %d in foreign transction log record", info); + + return; +} + +/* + * CheckPointFDWXact + * + * Function syncs the foreign transaction files created between the two + * checkpoints. The foreign transaction entries and hence the corresponding + * files are expected to be very short-lived. By executing this function at the + * end, we might have lesser files to fsync, thus reducing some I/O. This is + * similar to CheckPointTwoPhase(). + * + * In order to avoid disk I/O while holding a light weight lock, the function + * first collects the files which need to be synced under FDWXactLock and then + * syncs them after releasing the lock. This approach creates a race condition: + * after releasing the lock, and before syncing a file, the corresponding + * foreign transaction entry and hence the file might get removed. The function + * checks whether that's true and ignores the error if so. + */ +void +CheckPointFDWXact(XLogRecPtr redo_horizon) +{ + int cnt; + int serialized_fdw_xacts = 0; + + /* Quick get-away, before taking lock */ + if (max_prepared_foreign_xacts <= 0) + return; + + TRACE_POSTGRESQL_FDWXACT_CHECKPOINT_START(); + + LWLockAcquire(FDWXactLock, LW_SHARED); + + /* Another quick, before we allocate memory */ + if (FDWXactGlobal->numFDWXacts <= 0) + { + LWLockRelease(FDWXactLock); + return; + } + + /* + * We are expecting there to be zero FDWXact that need to be copied to + * disk, so we perform all I/O while holding FDWXactLock for simplicity. + * This presents any new foreign xacts from preparing while this occurs, + * which shouldn't be a problem since the presence fo long-lived prepared + * foreign xacts indicated the transaction manager isn't active. + * + * it's also possible to move I/O out of the lock, but on every error we + * should check whether somebody committed our transaction in different + * backend. Let's leave this optimisation for future, if somebody will + * spot that this place cause bottleneck. + * + * Note that it isn't possible for there to be a FDWXact with a + * fdw_xact_end_lsn set prior to the last checkpoint yet is marked + * invalid, because of the efforts with delayChkpt. + */ + for (cnt = 0; cnt < FDWXactGlobal->numFDWXacts; cnt++) + { + FDWXact fdw_xact = FDWXactGlobal->fdw_xacts[cnt]; + + if ((fdw_xact->valid || fdw_xact->inredo) && + !fdw_xact->ondisk && + fdw_xact->fdw_xact_end_lsn <= redo_horizon) + { + char *buf; + int len; + + XlogReadFDWXactData(fdw_xact->fdw_xact_start_lsn, &buf, &len); + RecreateFDWXactFile(fdw_xact->local_xid, fdw_xact->serverid, + fdw_xact->userid, buf, len); + fdw_xact->ondisk = true; + serialized_fdw_xacts++; + pfree(buf); + } + } + + LWLockRelease(FDWXactLock); + + TRACE_POSTGRESQL_FDWXACT_CHECKPOINT_DONE(); + + if (log_checkpoints && serialized_fdw_xacts > 0) + ereport(LOG, + (errmsg_plural("%u foreign transaction state file was written " + "for long-running prepared transactions", + "%u foreign transaction state files were written " + "for long-running prepared transactions", + serialized_fdw_xacts, + serialized_fdw_xacts))); +} + +/* + * Reads foreign trasasction data from xlog. During checkpoint this data will + * be moved to fdwxact files and ReadFDWXactFile should be used instead. + * + * Note clearly that this function accesses WAL during normal operation, similarly + * to the way WALSender or Logical Decoding would do. It does not run during + * crash recovery or standby processing. + */ +static void +XlogReadFDWXactData(XLogRecPtr lsn, char **buf, int *len) +{ + XLogRecord *record; + XLogReaderState *xlogreader; + char *errormsg; + + xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL); + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating an XLog reading processor."))); + + record = XLogReadRecord(xlogreader, lsn, &errormsg); + + if (record == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read foreign transaction state from xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (XLogRecGetRmid(xlogreader) != RM_FDW_XACT_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_FDW_XACT_INSERT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("expected foreign transaction state data is not present in xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (len != NULL) + *len = XLogRecGetDataLen(xlogreader); + + *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader)); + memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader)); + + XLogReaderFree(xlogreader); +} + +/* + * Recreates a foreign transaction state file. This is used in WAL replay and + * during checkpoint creation. + * + * Note: content and len don't include CRC. + */ +void +RecreateFDWXactFile(TransactionId xid, Oid serverid, Oid userid, + void *content, int len) +{ + char path[MAXPGPATH]; + pg_crc32c fdw_xact_crc; + pg_crc32c bogus_crc; + int fd; + + /* Recompute CRC */ + INIT_CRC32C(fdw_xact_crc); + COMP_CRC32C(fdw_xact_crc, content, len); + + FDWXactFilePath(path, xid, serverid, userid); + + fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not recreate foreign transaction state file \"%s\": %m", + path))); + + if (write(fd, content, len) != len) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreign transcation state file: %m"))); + } + FIN_CRC32C(fdw_xact_crc); + + /* + * Write a deliberately bogus CRC to the state file; this is just paranoia + * to catch the case where four more bytes will run us out of disk space. + */ + bogus_crc = ~fdw_xact_crc; + if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreing transaction state file: %m"))); + } + /* Back up to prepare for rewriting the CRC */ + if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in foreign transaction state file: %m"))); + } + + /* write correct CRC and close file */ + if ((write(fd, &fdw_xact_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c)) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreign transaction state file: %m"))); + } + + /* + * We must fsync the file because the end-of-replay checkpoint will not do + * so, there being no GXACT in shared memory yet to tell it to. + */ + if (pg_fsync(fd) != 0) + { + CloseTransientFile(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync foreign transaction state file: %m"))); + } + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close foreign transaction file: %m"))); +} + +/* Built in functions */ +/* + * Structure to hold and iterate over the foreign transactions to be displayed + * by the built-in functions. + */ +typedef struct +{ + FDWXact fdw_xacts; + int num_xacts; + int cur_xact; +} WorkingStatus; + +/* + * pg_fdw_xact + * Produce a view with one row per prepared transaction on foreign server. + * + * This function is here so we don't have to export the + * FDWXactGlobalData struct definition. + * + */ +Datum +pg_fdw_xacts(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + WorkingStatus *status; + char *xact_status; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * Switch to memory context appropriate for multiple function calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + /* this had better match pg_fdw_xacts view in system_views.sql */ + tupdesc = CreateTemplateTupleDesc(6, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect status information that we will format and send out as a + * result set. + */ + status = (WorkingStatus *) palloc(sizeof(WorkingStatus)); + funcctx->user_fctx = (void *) status; + + status->num_xacts = GetFDWXactList(&status->fdw_xacts); + status->cur_xact = 0; + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + status = funcctx->user_fctx; + + while (status->cur_xact < status->num_xacts) + { + FDWXact fdw_xact = &status->fdw_xacts[status->cur_xact++]; + Datum values[6]; + bool nulls[6]; + HeapTuple tuple; + Datum result; + + if (!fdw_xact->valid) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(fdw_xact->dboid); + values[1] = TransactionIdGetDatum(fdw_xact->local_xid); + values[2] = ObjectIdGetDatum(fdw_xact->serverid); + values[3] = ObjectIdGetDatum(fdw_xact->userid); + switch (fdw_xact->status) + { + case FDW_XACT_PREPARING: + xact_status = "prepared"; + break; + case FDW_XACT_COMMITTING_PREPARED: + xact_status = "committing"; + break; + case FDW_XACT_ABORTING_PREPARED: + xact_status = "aborting"; + break; + default: + xact_status = "unknown"; + break; + } + values[4] = CStringGetTextDatum(xact_status); + /* should this be really interpreted by FDW */ + values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id, + FDW_XACT_ID_LEN)); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Returns an array of all foreign prepared transactions for the user-level + * function pg_fdw_xact. + * + * The returned array and all its elements are copies of internal data + * structures, to minimize the time we need to hold the FDWXactLock. + * + * WARNING -- we return even those transactions whose information is not + * completely filled yet. The caller should filter them out if he doesn't want them. + * + * The returned array is palloc'd. + */ +static int +GetFDWXactList(FDWXact * fdw_xacts) +{ + int num_xacts; + int cnt_xacts; + + LWLockAcquire(FDWXactLock, LW_SHARED); + + if (FDWXactGlobal->numFDWXacts == 0) + { + LWLockRelease(FDWXactLock); + *fdw_xacts = NULL; + return 0; + } + + num_xacts = FDWXactGlobal->numFDWXacts; + *fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * num_xacts); + for (cnt_xacts = 0; cnt_xacts < num_xacts; cnt_xacts++) + memcpy((*fdw_xacts) + cnt_xacts, FDWXactGlobal->fdw_xacts[cnt_xacts], + sizeof(FDWXactData)); + + LWLockRelease(FDWXactLock); + + return num_xacts; +} + +/* + * pg_fdw_xact_resolve + * a user interface to initiate foreign transaction resolution. The function + * tries to resolve the prepared transactions on foreign servers in the database + * from where it is run. + * The function prints the status of all the foreign transactions it + * encountered, whether resolved or not. + */ +Datum +pg_fdw_xact_resolve(PG_FUNCTION_ARGS) +{ + MemoryContext oldcontext; + FuncCallContext *funcctx; + WorkingStatus *status; + char *xact_status; + List *entries_to_resolve; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + + /* We will be modifying the shared memory. Prepare to clean up on exit */ + if (!fdwXactExitRegistered) + { + before_shmem_exit(AtProcExit_FDWXact, 0); + fdwXactExitRegistered = true; + } + + /* Allocate space for and prepare the returning set */ + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + /* Switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + tupdesc = CreateTemplateTupleDesc(6, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect status information that we will format and send out as a + * result set. + */ + status = (WorkingStatus *) palloc(sizeof(WorkingStatus)); + funcctx->user_fctx = (void *) status; + status->fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * FDWXactGlobal->numFDWXacts); + status->num_xacts = 0; + status->cur_xact = 0; + + /* Done preparation for the result. */ + MemoryContextSwitchTo(oldcontext); + + /* + * Get entries whose foreign servers are part of the database where + * this function was called. We can get information about only such + * foreign servers. The function will lock the entries. The entries + * which are locked by other backends and whose foreign servers belong + * to this database are left out, since we can not work on those. + */ + search_fdw_xact(InvalidTransactionId, MyDatabaseId, InvalidOid, InvalidOid, + &entries_to_resolve); + + /* Work to resolve the resolvable entries */ + while (entries_to_resolve) + { + FDWXact fdw_xact = linitial(entries_to_resolve); + + /* Remove the entry as we will not use it again */ + entries_to_resolve = list_delete_first(entries_to_resolve); + + /* Copy the data for the sake of result. */ + memcpy(status->fdw_xacts + status->num_xacts++, + fdw_xact, sizeof(FDWXactData)); + + if (fdw_xact->status == FDW_XACT_COMMITTING_PREPARED || + fdw_xact->status == FDW_XACT_ABORTING_PREPARED) + { + /* + * We have already decided what to do with the foreign + * transaction nothing to be done. + */ + } + else if (TransactionIdDidCommit(fdw_xact->local_xid)) + fdw_xact->status = FDW_XACT_COMMITTING_PREPARED; + else if (TransactionIdDidAbort(fdw_xact->local_xid)) + fdw_xact->status = FDW_XACT_ABORTING_PREPARED; + else if (!TransactionIdIsInProgress(fdw_xact->local_xid)) + { + /* + * The transaction is in progress but not on any of the + * backends. So probably, it crashed before actual abort or + * commit. So assume it to be aborted. + */ + fdw_xact->status = FDW_XACT_ABORTING_PREPARED; + } + else + { + /* + * Local transaction is in progress, should not resolve the + * foreign transaction. This can happen when the foreign + * transaction is prepared as part of a local prepared + * transaction. Just continue with the next one. + */ + unlock_fdw_xact(fdw_xact); + continue; + } + + /* + * Resolve the foreign transaction. If resolution was not + * successful, unlock the entry so that someone else can pick it + * up + */ + if (!resolve_fdw_xact(fdw_xact, get_prepared_foreign_xact_resolver(fdw_xact))) + unlock_fdw_xact(fdw_xact); + else + /* Update the status in the result set */ + status->fdw_xacts[status->num_xacts - 1].status = FDW_XACT_RESOLVED; + } + } + + /* Print the result set */ + funcctx = SRF_PERCALL_SETUP(); + status = funcctx->user_fctx; + + while (status->cur_xact < status->num_xacts) + { + FDWXact fdw_xact = &status->fdw_xacts[status->cur_xact++]; + Datum values[6]; + bool nulls[6]; + HeapTuple tuple; + Datum result; + + if (!fdw_xact->valid) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(fdw_xact->dboid); + values[1] = TransactionIdGetDatum(fdw_xact->local_xid); + values[2] = ObjectIdGetDatum(fdw_xact->serverid); + values[3] = ObjectIdGetDatum(fdw_xact->userid); + switch (fdw_xact->status) + { + case FDW_XACT_PREPARING: + xact_status = "preparing"; + break; + case FDW_XACT_COMMITTING_PREPARED: + xact_status = "committing"; + break; + case FDW_XACT_ABORTING_PREPARED: + xact_status = "aborting"; + break; + case FDW_XACT_RESOLVED: + xact_status = "resolved"; + break; + default: + xact_status = "unknown"; + break; + } + values[4] = CStringGetTextDatum(xact_status); + /* should this be really interpreted by FDW? */ + values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id, + FDW_XACT_ID_LEN)); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Built-in function to remove prepared foreign transaction entry/s without + * resolving. The function gives a way to forget about such prepared + * transaction in case + * 1. The foreign server where it is prepared is no longer available + * 2. The user which prepared this transaction needs to be dropped + * 3. PITR is recovering before a transaction id, which created the prepared + * foreign transaction + * 4. The database containing the entries needs to be dropped + * + * Or any such conditions in which resolution is no longer possible. + * + * The function accepts 4 arguments transaction id, dbid, serverid and userid, + * which define the criteria in the same way as search_fdw_xact(). The entries + * matching the criteria are removed. The function does not remove an entry + * which is locked by some other backend. + */ +Datum +pg_fdw_xact_remove(PG_FUNCTION_ARGS) +{ +/* Some #defines only for this function to deal with the arguments */ +#define XID_ARGNUM 0 +#define DBID_ARGNUM 1 +#define SRVID_ARGNUM 2 +#define USRID_ARGNUM 3 + + TransactionId xid; + Oid dbid; + Oid serverid; + Oid userid; + List *entries_to_remove; + + xid = PG_ARGISNULL(XID_ARGNUM) ? InvalidTransactionId : + DatumGetTransactionId(PG_GETARG_DATUM(XID_ARGNUM)); + dbid = PG_ARGISNULL(DBID_ARGNUM) ? InvalidOid : + PG_GETARG_OID(DBID_ARGNUM); + serverid = PG_ARGISNULL(SRVID_ARGNUM) ? InvalidOid : + PG_GETARG_OID(SRVID_ARGNUM); + userid = PG_ARGISNULL(USRID_ARGNUM) ? InvalidOid : + PG_GETARG_OID(USRID_ARGNUM); + + search_fdw_xact(xid, dbid, serverid, userid, &entries_to_remove); + + while (entries_to_remove) + { + FDWXact fdw_xact = linitial(entries_to_remove); + + entries_to_remove = list_delete_first(entries_to_remove); + + remove_fdw_xact(fdw_xact); + } + + PG_RETURN_VOID(); +} + +/* + * Code dealing with the on disk files used to store foreign transaction + * information. + */ + +/* + * ReadFDWXactFile + * Read the foreign transction state file and return the contents in a + * structure allocated in-memory. The structure can be later freed by the + * caller. + */ +static FDWXactOnDiskData * +ReadFDWXactFile(TransactionId xid, Oid serverid, Oid userid) +{ + char path[MAXPGPATH]; + int fd; + FDWXactOnDiskData *fdw_xact_file_data; + struct stat stat; + uint32 crc_offset; + pg_crc32c calc_crc; + pg_crc32c file_crc; + char *buf; + + FDWXactFilePath(path, xid, serverid, userid); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open FDW transaction state file \"%s\": %m", + path))); + + /* + * Check file length. We can determine a lower bound pretty easily. We + * set an upper bound to avoid palloc() failure on a corrupt file, though + * we can't guarantee that we won't get an out of memory error anyway, + * even on a valid file. + */ + if (fstat(fd, &stat)) + { + CloseTransientFile(fd); + + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not stat FDW transaction state file \"%s\": %m", + path))); + return NULL; + } + + if (stat.st_size < offsetof(FDWXactOnDiskData, fdw_xact_id) || + stat.st_size > MaxAllocSize) + { + CloseTransientFile(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("Too large FDW transaction state file \"%s\": %m", + path))); + return NULL; + } + + buf = (char *) palloc(stat.st_size); + fdw_xact_file_data = (FDWXactOnDiskData *) buf; + crc_offset = stat.st_size - sizeof(pg_crc32c); + /* Slurp the file */ + if (read(fd, fdw_xact_file_data, stat.st_size) != stat.st_size) + { + CloseTransientFile(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not read FDW transaction state file \"%s\": %m", + path))); + pfree(fdw_xact_file_data); + return NULL; + } + + CloseTransientFile(fd); + + /* + * Check the CRC. + */ + INIT_CRC32C(calc_crc); + COMP_CRC32C(calc_crc, buf, crc_offset); + FIN_CRC32C(calc_crc); + + file_crc = *((pg_crc32c *) (buf + crc_offset)); + + if (!EQ_CRC32C(calc_crc, file_crc)) + { + pfree(buf); + return NULL; + } + + if (fdw_xact_file_data->serverid != serverid || + fdw_xact_file_data->userid != userid || + fdw_xact_file_data->local_xid != xid) + { + ereport(WARNING, + (errmsg("removing corrupt foreign transaction state file \"%s\"", + path))); + CloseTransientFile(fd); + pfree(buf); + return NULL; + } + + return fdw_xact_file_data; +} + +/* + * PrescanFDWXacts + * + * Read the foreign prepared transactions directory for oldest active + * transaction. The transactions corresponding to the xids in this directory + * are not necessarily active per say locally. But we still need those XIDs to + * be alive so that + * 1. we can determine whether they are committed or aborted + * 2. the file name contains xid which shouldn't get used again to avoid + * conflicting file names. + * + * The function accepts the oldest active xid determined by other functions + * (e.g. PrescanPreparedTransactions()). It then compares every xid it comes + * across while scanning foreign prepared transactions directory with the oldest + * active xid. It returns the oldest of those xids or oldest active xid + * whichever is older. + * + * If any foreign prepared transaction is part of a future transaction (PITR), + * the function removes the corresponding file as + * 1. We can not know the status of the local transaction which prepared this + * foreign transaction + * 2. The foreign server or the user may not be available as per new timeline + * + * Anyway, the local transaction which prepared the foreign prepared transaction + * does not exist as per the new timeline, so it's better to forget the foreign + * prepared transaction as well. + */ +TransactionId +PrescanFDWXacts(TransactionId oldestActiveXid) +{ + TransactionId nextXid = ShmemVariableCache->nextXid; + DIR *cldir; + struct dirent *clde; + + cldir = AllocateDir(FDW_XACTS_DIR); + while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL) + { + if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN && + strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN) + { + Oid serverid; + Oid userid; + TransactionId local_xid; + + sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid, + &userid); + + /* + * Remove a foreign prepared transaction file corresponding to an + * XID, which is too new. + */ + if (TransactionIdFollowsOrEquals(local_xid, nextXid)) + { + ereport(WARNING, + (errmsg("removing future foreign prepared transaction file \"%s\"", + clde->d_name))); + RemoveFDWXactFile(local_xid, serverid, userid, true); + continue; + } + + if (TransactionIdPrecedesOrEquals(local_xid, oldestActiveXid)) + oldestActiveXid = local_xid; + } + } + + FreeDir(cldir); + return oldestActiveXid; +} + +/* + * RecoverFDWXacts + * Read the foreign prepared transaction information and set it up for further + * usage. + */ +void +RecoverFDWXacts(void) +{ + DIR *cldir; + struct dirent *clde; + + cldir = AllocateDir(FDW_XACTS_DIR); + while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL) + { + if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN && + strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN) + { + Oid serverid; + Oid userid; + TransactionId local_xid; + FDWXactOnDiskData *fdw_xact_file_data; + FDWXact fdw_xact; + + sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid, + &userid); + + fdw_xact_file_data = ReadFDWXactFile(local_xid, serverid, userid); + + if (!fdw_xact_file_data) + { + ereport(WARNING, + (errmsg("Removing corrupt foreign transaction file \"%s\"", + clde->d_name))); + RemoveFDWXactFile(local_xid, serverid, userid, false); + continue; + } + + ereport(LOG, + (errmsg("recovering foreign transaction entry for xid %u, foreign server %u and user %u", + local_xid, serverid, userid))); + + fdw_xact = get_fdw_xact(local_xid, serverid, userid); + + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + if (!fdw_xact) + { + /* + * Add this entry into the table of foreign transactions. The + * status of the transaction is set as preparing, since we do not + * know the exact status right now. Resolver will set it later + * based on the status of local transaction which prepared this + * foreign transaction. + */ + fdw_xact = insert_fdw_xact(fdw_xact_file_data->dboid, local_xid, + serverid, userid, + fdw_xact_file_data->umid, + fdw_xact_file_data->fdw_xact_id); + fdw_xact->locking_backend = MyBackendId; + fdw_xact->status = FDW_XACT_PREPARING; + } + else + { + Assert(fdw_xact->inredo); + fdw_xact->inredo = false; + } + + /* Mark the entry as ready */ + fdw_xact->valid = true; + /* Already synced to disk */ + fdw_xact->ondisk = true; + pfree(fdw_xact_file_data); + LWLockRelease(FDWXactLock); + } + } + + FreeDir(cldir); +} + +/* + * Remove the foreign transaction file for given entry. + * + * If giveWarning is false, do not complain about file-not-present; + * this is an expected case during WAL replay. + */ +static void +RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid, bool giveWarning) +{ + char path[MAXPGPATH]; + + FDWXactFilePath(path, xid, serverid, userid); + if (unlink(path)) + if (errno != ENOENT || giveWarning) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove foreign transaction state file \"%s\": %m", + path))); +} + +/* + * FDWXactRedoAdd + * + * Store pointer to the start/end of the WAL record along with the xid in + * a fdw_xact entry in shared memory FDWXactData structure. + */ +void +FDWXactRedoAdd(XLogReaderState *record) +{ + FDWXactOnDiskData *fdw_xact_data = (FDWXactOnDiskData *) XLogRecGetData(record); + FDWXact fdw_xact; + + Assert(RecoveryInProgress()); + + LWLockAcquire(FDWXactLock, LW_EXCLUSIVE); + fdw_xact = insert_fdw_xact(fdw_xact_data->dboid, fdw_xact_data->local_xid, + fdw_xact_data->serverid, fdw_xact_data->userid, + fdw_xact_data->umid, fdw_xact_data->fdw_xact_id); + fdw_xact->status = FDW_XACT_PREPARING; + fdw_xact->fdw_xact_start_lsn = record->ReadRecPtr; + fdw_xact->fdw_xact_end_lsn = record->EndRecPtr; + fdw_xact->inredo = true; + LWLockRelease(FDWXactLock); +} +/* + * FDWXactRedoRemove + * + * Remove the corresponding fdw_xact entry from FDWXactGlobal. + * Also remove fdw_xact file if a foreign transaction was saved + * via an earlier checkpoint. + */ +void +FDWXactRedoRemove(TransactionId xid, Oid serverid, Oid userid) +{ + FDWXact fdw_xact; + + Assert(RecoveryInProgress()); + + fdw_xact = get_fdw_xact(xid, serverid, userid); + + if (fdw_xact) + { + /* Now we can clean up any files we already left */ + Assert(fdw_xact->inredo); + remove_fdw_xact(fdw_xact); + } + else + { + /* + * Entry could be on disk. Call with giveWarning = false + * since it can be expected during replay. + */ + RemoveFDWXactFile(xid, serverid, userid, false); + } +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 9368b56..c10a027 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -9,6 +9,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/ginxlog.h" #include "access/gistxlog.h" #include "access/generic_xlog.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 83169cc..98f847b 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -58,6 +58,7 @@ #include #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/htup_details.h" #include "access/subtrans.h" #include "access/transam.h" @@ -1455,6 +1456,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit) PredicateLockTwoPhaseFinish(xid, isCommit); + /* + * Commit/Rollback the foreign transactions prepared as part of this + * prepared transaction. + */ + FDWXactTwoPhaseFinish(isCommit, xid); + /* Count the prepared xact as committed or aborted */ AtEOXact_PgStat(isCommit); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5ca7375..d62a9b2 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -1985,6 +1986,9 @@ CommitTransaction(void) break; } + /* Pre-commit step for foreign transcations */ + PreCommit_FDWXacts(); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT : XACT_EVENT_PRE_COMMIT); @@ -2143,6 +2147,7 @@ CommitTransaction(void) AtEOXact_PgStat(true); AtEOXact_Snapshot(true); AtCommit_ApplyLauncher(); + AtEOXact_FDWXacts(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2232,6 +2237,9 @@ PrepareTransaction(void) * the transaction-abort path. */ + /* Prepare step for foreign transactions */ + AtPrepare_FDWXacts(); + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2620,6 +2628,7 @@ AbortTransaction(void) AtEOXact_ComboCid(); AtEOXact_HashTables(false); AtEOXact_PgStat(false); + AtEOXact_FDWXacts(false); pgstat_report_xact_timestamp(0); } @@ -4313,6 +4322,10 @@ AbortOutOfAnyTransaction(void) void RegisterTransactionLocalNode(void) { + /* Quick exits if no need to remember */ + if (max_prepared_foreign_xacts == 0) + return; + XactWriteLocalNode = true; } @@ -4322,6 +4335,10 @@ RegisterTransactionLocalNode(void) void UnregisterTransactionLocalNode(void) { + /* Quick exits if no need to remember */ + if (max_prepared_foreign_xacts == 0) + return; + XactWriteLocalNode = false; } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5d58f09..f862369 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -24,6 +24,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/multixact.h" #include "access/rewriteheap.h" #include "access/subtrans.h" @@ -5104,6 +5105,7 @@ BootStrapXLOG(void) ControlFile->MaxConnections = MaxConnections; ControlFile->max_worker_processes = max_worker_processes; ControlFile->max_prepared_xacts = max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = max_locks_per_xact; ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; @@ -6176,6 +6178,9 @@ CheckRequiredParameterValues(void) RecoveryRequiresIntParameter("max_locks_per_transaction", max_locks_per_xact, ControlFile->max_locks_per_xact); + RecoveryRequiresIntParameter("max_prepared_foreign_transactions", + max_prepared_foreign_xacts, + ControlFile->max_prepared_foreign_xacts); } } @@ -6870,7 +6875,10 @@ StartupXLOG(void) InitRecoveryTransactionEnvironment(); if (wasShutdown) + { oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFDWXacts(oldestActiveXID); + } else oldestActiveXID = checkPoint.oldestActiveXid; Assert(TransactionIdIsValid(oldestActiveXID)); @@ -7495,6 +7503,7 @@ StartupXLOG(void) /* Pre-scan prepared transactions to find out the range of XIDs present */ oldestActiveXID = PrescanPreparedTransactions(NULL, NULL); + oldestActiveXID = PrescanFDWXacts(oldestActiveXID); /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE @@ -7681,6 +7690,9 @@ StartupXLOG(void) /* Reload shared-memory state for prepared transactions */ RecoverPreparedTransactions(); + /* Recover foreign transaction state and insert into shared-memory. */ + RecoverFDWXacts(); + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions(), see notes for lock_twophase_recover() @@ -8993,6 +9005,11 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointReplicationOrigin(); /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); + /* + * We deliberately delay foreign transaction checkpointing as long as + * possible. + */ + CheckPointFDWXact(checkPointRedo); } /* @@ -9430,7 +9447,8 @@ XLogReportParameters(void) max_worker_processes != ControlFile->max_worker_processes || max_prepared_xacts != ControlFile->max_prepared_xacts || max_locks_per_xact != ControlFile->max_locks_per_xact || - track_commit_timestamp != ControlFile->track_commit_timestamp) + track_commit_timestamp != ControlFile->track_commit_timestamp || + max_prepared_foreign_xacts != ControlFile->max_prepared_foreign_xacts) { /* * The change in number of backend slots doesn't need to be WAL-logged @@ -9447,6 +9465,7 @@ XLogReportParameters(void) xlrec.MaxConnections = MaxConnections; xlrec.max_worker_processes = max_worker_processes; xlrec.max_prepared_xacts = max_prepared_xacts; + xlrec.max_prepared_foreign_xacts = max_prepared_foreign_xacts; xlrec.max_locks_per_xact = max_locks_per_xact; xlrec.wal_level = wal_level; xlrec.wal_log_hints = wal_log_hints; @@ -9462,6 +9481,7 @@ XLogReportParameters(void) ControlFile->MaxConnections = MaxConnections; ControlFile->max_worker_processes = max_worker_processes; ControlFile->max_prepared_xacts = max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = max_locks_per_xact; ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; @@ -9658,6 +9678,7 @@ xlog_redo(XLogReaderState *record) RunningTransactionsData running; oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFDWXacts(oldestActiveXID); /* * Construct a RunningTransactions snapshot representing a shut @@ -9847,6 +9868,7 @@ xlog_redo(XLogReaderState *record) ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = xlrec.max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->wal_level = xlrec.wal_level; ControlFile->wal_log_hints = xlrec.wal_log_hints; diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 46c207c..2da7369 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -17,6 +17,7 @@ #include #include +#include "access/fdw_xact.h" #include "access/htup_details.h" #include "bootstrap/bootstrap.h" #include "catalog/index.h" diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index d357c8b..bf5fbc1 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -301,6 +301,9 @@ CREATE VIEW pg_prepared_xacts AS CREATE VIEW pg_prepared_statements AS SELECT * FROM pg_prepared_statement() AS P; +CREATE VIEW pg_fdw_xacts AS + SELECT * FROM pg_fdw_xacts() AS F; + CREATE VIEW pg_seclabels AS SELECT l.objoid, l.classoid, l.objsubid, diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index 68100df..3c05676 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -13,6 +13,7 @@ */ #include "postgres.h" +#include "access/fdw_xact.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/reloptions.h" @@ -1093,6 +1094,20 @@ RemoveForeignServerById(Oid srvId) if (!HeapTupleIsValid(tp)) elog(ERROR, "cache lookup failed for foreign server %u", srvId); + /* + * Check if the foreign server has any foreign transaction prepared on it. + * If there is one, and it gets dropped, we will not have any chance to + * resolve that transaction. + */ + if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srvId, InvalidOid)) + { + Form_pg_foreign_server srvForm; + srvForm = (Form_pg_foreign_server) GETSTRUCT(tp); + ereport(ERROR, + (errmsg("server \"%s\" has unresolved prepared transactions on it", + NameStr(srvForm->srvname)))); + } + CatalogTupleDelete(rel, &tp->t_self); ReleaseSysCache(tp); @@ -1403,6 +1418,17 @@ RemoveUserMapping(DropUserMappingStmt *stmt) user_mapping_ddl_aclcheck(useId, srv->serverid, srv->servername); /* + * If there is a foreign prepared transaction with this user mapping, + * dropping the user mapping might result in dangling prepared + * transaction. + */ + if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srv->serverid, + useId)) + ereport(ERROR, + (errmsg("server \"%s\" has unresolved prepared transaction for user \"%s\"", + srv->servername, MappingUserName(useId)))); + + /* * Do the deletion */ object.classId = UserMappingRelationId; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5c13d26..5b09f1d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -148,6 +148,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_SPGIST_ID: case RM_BRIN_ID: case RM_COMMIT_TS_ID: + case RM_FDW_XACT_ID: case RM_REPLORIGIN_ID: case RM_GENERIC_ID: /* just deal with xid, and done */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2d1ed14..f32db3a 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -16,6 +16,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, BackendRandomShmemSize()); + size = add_size(size, FDWXactShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -270,6 +272,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SyncScanShmemInit(); AsyncShmemInit(); BackendRandomShmemInit(); + FDWXactShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 3e13394..cdf2d8d 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -494,7 +494,7 @@ RegisterLWLockTranches(void) if (LWLockTrancheArray == NULL) { - LWLockTranchesAllocated = 64; + LWLockTranchesAllocated = 65; LWLockTrancheArray = (char **) MemoryContextAllocZero(TopMemoryContext, LWLockTranchesAllocated * sizeof(char *)); diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index e6025ec..8e7028a 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42 BackendRandomLock 43 LogicalRepWorkerLock 44 CLogTruncationLock 45 +FDWXactLock 46 diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index e9d561b..bab9a23 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -27,6 +27,7 @@ #endif #include "access/commit_ts.h" +#include "access/fdw_xact.h" #include "access/gin.h" #include "access/rmgr.h" #include "access/transam.h" @@ -2065,6 +2066,19 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + /* + * See also CheckRequiredParameterValues() if this parameter changes + */ + { + {"max_prepared_foreign_transactions", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Sets the maximum number of simultaneously prepared transactions on foreign servers."), + NULL + }, + &max_prepared_foreign_xacts, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + #ifdef LOCK_DEBUG { {"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 8a93bdc..1be8858 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -118,6 +118,12 @@ # (change requires restart) # Caution: it is not advisable to set max_prepared_transactions nonzero unless # you actively intend to use prepared transactions. +#max_prepared_foreign_transactions = 0 # zero disables the feature + # (change requires restart) +# Note: Increasing max_prepared_foreign_transactions costs ~600(?) bytes of shared memory +# per foreign transaction slot. +# It is not advisable to set max_prepared_foreign_transactions nonzero unless you +# actively intend to use atomic foreign transactions feature. #work_mem = 4MB # min 64kB #maintenance_work_mem = 64MB # min 1MB #replacement_sort_tuples = 150000 # limits use of replacement selection sort diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index 214dc71..af2c627 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -81,6 +81,8 @@ provider postgresql { probe multixact__checkpoint__done(bool); probe twophase__checkpoint__start(); probe twophase__checkpoint__done(); + probe fdwxact__checkpoint__start(); + probe fdwxact__checkpoint__done(); probe smgr__md__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int); probe smgr__md__read__done(ForkNumber, BlockNumber, Oid, Oid, Oid, int, int, int); diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 8dde1e8..f0fa78a 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -205,6 +205,7 @@ static const char *const subdirs[] = { "pg_snapshots", "pg_subtrans", "pg_twophase", + "pg_fdw_xact", "pg_multixact", "pg_multixact/members", "pg_multixact/offsets", diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 2ea8931..f703e60 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -276,6 +276,8 @@ main(int argc, char *argv[]) ControlFile->max_worker_processes); printf(_("max_prepared_xacts setting: %d\n"), ControlFile->max_prepared_xacts); + printf(_("max_prepared_foreign_xacts setting: %d\n"), + ControlFile->max_prepared_foreign_xacts); printf(_("max_locks_per_xact setting: %d\n"), ControlFile->max_locks_per_xact); printf(_("track_commit_timestamp setting: %s\n"), diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index bcb9ed9..739a475 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -585,6 +585,7 @@ GuessControlValues(void) ControlFile.MaxConnections = 100; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_prepared_foreign_xacts = 0; ControlFile.max_locks_per_xact = 64; ControlFile.maxAlign = MAXIMUM_ALIGNOF; @@ -797,6 +798,7 @@ RewriteControlFile(void) ControlFile.MaxConnections = 100; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_prepared_foreign_xacts = 0; ControlFile.max_locks_per_xact = 64; /* Now we can force the recorded xlog seg size to the right thing. */ diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca..41eed51 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -8,6 +8,7 @@ #define FRONTEND 1 #include "postgres.h" +#include "access/fdw_xact.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" diff --git a/src/include/access/fdw_xact.h b/src/include/access/fdw_xact.h new file mode 100644 index 0000000..0b470b4 --- /dev/null +++ b/src/include/access/fdw_xact.h @@ -0,0 +1,75 @@ +/* + * fdw_xact.h + * + * PostgreSQL distributed transaction manager + * + * Portions Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/include/access/fdw_xact.h + */ +#ifndef FDW_XACT_H +#define FDW_XACT_H + +#include "storage/backendid.h" +#include "foreign/foreign.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" +#include "nodes/pg_list.h" + +#define FDW_XACT_ID_LEN (2 + 1 + 8 + 1 + 8 + 1 + 8) +#define FDWXactId(path, prefix, xid, serverid, userid) \ + snprintf((path), FDW_XACT_ID_LEN + 1, "%s_%08X_%08X_%08X", (prefix), \ + (xid), (serverid), (userid)) + +/* + * On disk file structure + */ +typedef struct +{ + Oid dboid; /* database oid where to find foreign server + * and user mapping */ + TransactionId local_xid; + Oid serverid; /* foreign server where transaction takes + * place */ + Oid userid; /* user who initiated the foreign transaction */ + Oid umid; + char fdw_xact_id[FDW_XACT_ID_LEN]; /* foreign txn prepare id */ +} FDWXactOnDiskData; + +typedef struct +{ + TransactionId xid; + Oid serverid; + Oid userid; + Oid dbid; +} FdwRemoveXlogRec; + +extern int max_prepared_foreign_xacts; + +/* Info types for logs related to FDW transactions */ +#define XLOG_FDW_XACT_INSERT 0x00 +#define XLOG_FDW_XACT_REMOVE 0x10 + +extern Size FDWXactShmemSize(void); +extern void FDWXactShmemInit(void); +extern void RecoverFDWXacts(void); +extern TransactionId PrescanFDWXacts(TransactionId oldestActiveXid); +extern bool fdw_xact_has_usermapping(Oid serverid, Oid userid); +extern bool fdw_xact_has_server(Oid serverid); +extern void fdw_xact_redo(XLogReaderState *record); +extern void fdw_xact_desc(StringInfo buf, XLogReaderState *record); +extern const char *fdw_xact_identify(uint8 info); +extern void AtEOXact_FDWXacts(bool is_commit); +extern void AtPrepare_FDWXacts(void); +extern void FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid); +extern bool fdw_xact_exists(TransactionId xid, Oid dboid, Oid serverid, + Oid userid); +extern void CheckPointFDWXact(XLogRecPtr redo_horizon); +extern void RegisterXactForeignServer(Oid serverid, Oid userid, bool can_prepare); +extern bool FdwTwoPhaseNeeded(void); +extern void PreCommit_FDWXacts(void); +extern void FDWXactRedoAdd(XLogReaderState *record); +extern void FDWXactRedoRemove(TransactionId xid, Oid serverid, Oid userid); +extern void KnownFDWXactRecreateFiles(XLogRecPtr redo_horizon); + +#endif /* FDW_XACT_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 2f43c19..62702de 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_FDW_XACT_ID, "Foreign Transactions", fdw_xact_redo, fdw_xact_desc, fdw_xact_identify, NULL, NULL, NULL) diff --git a/src/include/access/xact.h b/src/include/access/xact.h index aee1a07..f30374d 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -93,6 +93,9 @@ extern int MyXactFlags; #define XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK (1U << 1) +/* Foreign transaction support */ +extern bool XactWriteLocalNode; + /* * start- and end-of-transaction callbacks for dynamically loaded modules */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index c09c0f8..be6a412 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -213,6 +213,7 @@ typedef struct xl_parameter_change int MaxConnections; int max_worker_processes; int max_prepared_xacts; + int max_prepared_foreign_xacts; int max_locks_per_xact; int wal_level; bool wal_log_hints; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 3a25cc8..c57a66f 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -182,6 +182,7 @@ typedef struct ControlFileData int MaxConnections; int max_worker_processes; int max_prepared_xacts; + int max_prepared_foreign_xacts; int max_locks_per_xact; bool track_commit_timestamp; diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 79f9b90..27f0adb 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5324,6 +5324,12 @@ DATA(insert OID = 3992 ( dense_rank PGNSP PGUID 12 1 0 2276 0 t f f f f f i s DESCR("rank of hypothetical row without gaps"); DATA(insert OID = 3993 ( dense_rank_final PGNSP PGUID 12 1 0 2276 0 f f f f f f i s 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ _null_ hypothetical_dense_rank_final _null_ _null_ _null_ )); DESCR("aggregate final function"); +DATA(insert OID = 4130 ( pg_fdw_xacts PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26,28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_xacts _null_ _null_ _null_ )); +DESCR("view foreign transactions"); +DATA(insert OID = 4131 ( pg_fdw_xact_resolve PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_xact_resolve _null_ _null_ _null_ )); +DESCR("resolve foreign prepared transactions"); +DATA(insert OID = 4132 ( pg_fdw_xact_remove PGNSP PGUID 12 1 0 0 0 f f f f f f v u 4 0 2278 "28 26 26 26" _null_ _null_ "{transaction,dbid,serverid,userid}" _null_ _null_ pg_fdw_xact_remove _null_ _null_ _null_ )); +DESCR("remove foreign transactions"); /* pg_upgrade support */ DATA(insert OID = 3582 ( binary_upgrade_set_next_pg_type_oid PGNSP PGUID 12 1 0 0 0 f f f f t f v r 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_pg_type_oid _null_ _null_ _null_ )); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 6ca44f7..7b95f77 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -12,6 +12,7 @@ #ifndef FDWAPI_H #define FDWAPI_H +#include "access/fdw_xact.h" #include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/relation.h" @@ -143,6 +144,18 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation, typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt, Oid serverOid); +typedef bool (*EndForeignTransaction_function) (Oid serverid, Oid userid, + Oid umid, bool is_commit); + +typedef bool (*PrepareForeignTransaction_function) (Oid serverid, Oid userid, + Oid umid, char *prep_info); + +typedef bool (*ResolvePreparedForeignTransaction_function) (Oid serverid, + Oid userid, + Oid umid, + bool is_commit, + char *prep_info); + typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node, ParallelContext *pcxt); typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, @@ -220,6 +233,11 @@ typedef struct FdwRoutine /* Support functions for IMPORT FOREIGN SCHEMA */ ImportForeignSchema_function ImportForeignSchema; + /* Support functions for foreign transactions */ + EndForeignTransaction_function EndForeignTransaction; + PrepareForeignTransaction_function PrepareForeignTransaction; + ResolvePreparedForeignTransaction_function ResolvePreparedForeignTransaction; + /* Support functions for parallelism under Gather node */ IsForeignScanParallelSafe_function IsForeignScanParallelSafe; EstimateDSMForeignScan_function EstimateDSMForeignScan; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 1a125d8..f59ecbb 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -266,11 +266,12 @@ extern PGPROC *PreparedXactProcs; * We set aside some extra PGPROC structures for auxiliary processes, * ie things that aren't full-fledged backends but need shmem access. * - * Background writer, checkpointer and WAL writer run during normal operation. + * Background writer, checkpointer, WAL writer and foreign transction resolver + * run during normal operation. * Startup process and WAL receiver also consume 2 slots, but WAL writer is * launched only after startup has exited, so we only need 4 slots. */ -#define NUM_AUXILIARY_PROCS 4 +#define NUM_AUXILIARY_PROCS 5 /* configurable options */ extern int DeadlockTimeout; diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 1435a7b..843c629 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -121,4 +121,8 @@ extern int32 type_maximum_size(Oid type_oid, int32 typemod); /* quote.c */ extern char *quote_literal_cstr(const char *rawstr); +/* access/transam/fdw_xact.c */ +extern Datum pg_fdw_xacts(PG_FUNCTION_ARGS); +extern Datum pg_fdw_xact_resolve(PG_FUNCTION_ARGS); +extern Datum pg_fdw_xact_remove(PG_FUNCTION_ARGS); #endif /* BUILTINS_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index d706f42..06102ff 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1323,6 +1323,13 @@ pg_cursors| SELECT c.name, c.is_scrollable, c.creation_time FROM pg_cursor() c(name, statement, is_holdable, is_binary, is_scrollable, creation_time); +pg_fdw_xacts| SELECT f.dbid, + f.transaction, + f.serverid, + f.userid, + f.status, + f.identifier + FROM pg_fdw_xacts() f(dbid, transaction, serverid, userid, status, identifier); pg_file_settings| SELECT a.sourcefile, a.sourceline, a.seqno, diff --git a/src/test/regress/pg_regress.c b/src/test/regress/pg_regress.c index b685aeb..478260b 100644 --- a/src/test/regress/pg_regress.c +++ b/src/test/regress/pg_regress.c @@ -2263,9 +2263,11 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc * Adjust the default postgresql.conf for regression testing. The user * can specify a file to be appended; in any case we expand logging * and set max_prepared_transactions to enable testing of prepared - * xacts. (Note: to reduce the probability of unexpected shmmax - * failures, don't set max_prepared_transactions any higher than - * actually needed by the prepared_xacts regression test.) + * xacts. We also set max_fdw_transctions to enable testing of atomic + * foreign transactions. (Note: to reduce the probability of unexpected + * shmmax failures, don't set max_prepared_transactions or + * max_prepared_foreign_transactions any higher than actually needed by the + * corresponding regression tests.). */ snprintf(buf, sizeof(buf), "%s/data/postgresql.conf", temp_instance); pg_conf = fopen(buf, "a"); @@ -2280,7 +2282,8 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc fputs("log_line_prefix = '%m [%p] %q%a '\n", pg_conf); fputs("log_lock_waits = on\n", pg_conf); fputs("log_temp_files = 128kB\n", pg_conf); - fputs("max_prepared_transactions = 2\n", pg_conf); + fputs("max_prepared_transactions = 3\n", pg_conf); + fputs("max_prepared_foreign_transactions = 2\n", pg_conf); for (sl = temp_configs; sl != NULL; sl = sl->next) {