Re: Global snapshots - Mailing list pgsql-hackers

From Arseny Sher
Subject Re: Global snapshots
Date
Msg-id 874lgna6qh.fsf@ars-thinkpad
Whole thread Raw
In response to Global snapshots  (Stas Kelvich <s.kelvich@postgrespro.ru>)
Responses Re: Global snapshots
List pgsql-hackers
Hello,

I have looked through the patches and found them pretty accurate. I'd
fixed a lot of small issues here and there; updated patchset is
attached. But first, some high-level notes:

 * I agree that it would be cool to implement functionality like current
   "snapshot too old": that is, abort transaction with old global
   snapshot only if it really attempted to touch modified data.

 * I also agree with Stas that any attempts to trade oldestxmin in
   gossip between the nodes would drastically complicate this patch and
   make it discussion-prone; it would be nice first to get some feedback
   on general approach, especially from people trying to distribute
   Postgres.

 * One drawback of these patches is that only REPEATABLE READ is
   supported. For READ COMMITTED, we must export every new snapshot
   generated on coordinator to all nodes, which is fairly easy to
   do. SERIALIZABLE will definitely require chattering between nodes,
   but that's much less demanded isolevel (e.g. we still don't support
   it on replicas).

 * Another somewhat serious issue is that there is a risk of recency
   guarantee violation. If client starts transaction at node with
   lagging clocks, its snapshot might not include some recently
   committed transactions; if client works with different nodes, she
   might not even see her own changes. CockroachDB describes at [1] how
   they and Google Spanner overcome this problem. In short, both set
   hard limit on maximum allowed clock skew.  Spanner uses atomic
   clocks, so this skew is small and they just wait it at the end of
   each transaction before acknowledging the client. In CockroachDB, if
   tuple is not visible but we are unsure whether it is truly invisible
   or it's just the skew (the difference between snapshot and tuple's
   csn is less than the skew), transaction is restarted with advanced
   snapshot. This process is not infinite because the upper border
   (initial snapshot + max skew) stays the same; this is correct as we
   just want to ensure that our xact sees all the committed ones before
   it started. We can implement the same thing.

Now, the description of my mostly cosmetical changes:

 * Don't ERROR in BroadcastStmt to allow us to handle failure manually;
 * Check global_snapshot_defer_time in ImportGlobalSnapshot instead of
   falling on assert;
 * (Arguably) improved comments around locking at circular buffer
   maintenance; also, don't lock procarray during global_snapshot_xmin
   bump.
 * s/snaphot/snapshot, other typos.
 * Don't track_global_snapshots by default -- while handy for testing, it
   doesn't look generally good.
 * Set track_global_snapshots = true in tests everywhere.
 * GUC renamed from postgres_fdw.use_tsdtm to
   postgres_fdw.use_global_snapshots for consistency.
 * 003_bank_shared.pl test is removed. In current shape (loading one
   node) it is useless, and if we bombard both nodes, deadlock surely
   appears. In general, global snaphots are not needed for such
   multimaster-like setup -- either there are no conflicts and we are
   fine, or there is a conflict, in which case we get a deadlock.
 * Fix initdb failure with non-zero global_snapshot_defer_time.
 * Enforce REPEATABLE READ since currently we export snap only once in
   xact.
 * Remove assertion that circular buffer entries are monotonic, as
   GetOldestXmin *can* go backwards.


[1] https://www.cockroachlabs.com/blog/living-without-atomic-clocks/

--
Arseny Sher
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

From 21687e75366df03b92e48c6125bb2e90d01bb70a Mon Sep 17 00:00:00 2001
From: Stas Kelvich <stanconn@gmail.com>
Date: Wed, 25 Apr 2018 16:05:46 +0300
Subject: [PATCH 1/3] GlobalCSNLog SLRU

---
 src/backend/access/transam/Makefile         |   3 +-
 src/backend/access/transam/global_csn_log.c | 439 ++++++++++++++++++++++++++++
 src/backend/access/transam/twophase.c       |   1 +
 src/backend/access/transam/varsup.c         |   2 +
 src/backend/access/transam/xlog.c           |  12 +
 src/backend/storage/ipc/ipci.c              |   3 +
 src/backend/storage/ipc/procarray.c         |   3 +
 src/backend/storage/lmgr/lwlocknames.txt    |   1 +
 src/backend/utils/misc/guc.c                |   9 +
 src/backend/utils/probes.d                  |   2 +
 src/bin/initdb/initdb.c                     |   3 +-
 src/include/access/global_csn_log.h         |  30 ++
 src/include/storage/lwlock.h                |   1 +
 src/include/utils/snapshot.h                |   3 +
 14 files changed, 510 insertions(+), 2 deletions(-)
 create mode 100644 src/backend/access/transam/global_csn_log.c
 create mode 100644 src/include/access/global_csn_log.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 16fbe47269..03aa360ea3 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -12,7 +12,8 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = clog.o commit_ts.o generic_xlog.o multixact.o parallel.o rmgr.o slru.o \
+OBJS = clog.o commit_ts.o global_csn_log.o generic_xlog.o \
+    multixact.o parallel.o rmgr.o slru.o \
     subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
     xact.o xlog.o xlogarchive.o xlogfuncs.o \
     xloginsert.o xlogreader.o xlogutils.o
diff --git a/src/backend/access/transam/global_csn_log.c b/src/backend/access/transam/global_csn_log.c
new file mode 100644
index 0000000000..d9d66528e4
--- /dev/null
+++ b/src/backend/access/transam/global_csn_log.c
@@ -0,0 +1,439 @@
+/*-----------------------------------------------------------------------------
+ *
+ * global_csn_log.c
+ *        Track global commit sequence numbers of finished transactions
+ *
+ * Implementation of cross-node transaction isolation relies on commit sequence
+ * number (CSN) based visibility rules.  This module provides SLRU to store
+ * CSN for each transaction.  This mapping need to be kept only for xid's
+ * greater then oldestXid, but that can require arbitrary large amounts of
+ * memory in case of long-lived transactions.  Because of same lifetime and
+ * persistancy requirements this module is quite similar to subtrans.c
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_csn_log.c
+ *
+ *-----------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/snapmgr.h"
+
+bool track_global_snapshots;
+
+/*
+ * Defines for GlobalCSNLog page sizes.  A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * GlobalCSNLog page numbering also wraps around at
+ * 0xFFFFFFFF/GLOBAL_CSN_LOG_XACTS_PER_PAGE, and GlobalCSNLog segment numbering at
+ * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateGlobalCSNLog (see GlobalCSNLogPagePrecedes).
+ */
+
+/* We store the commit GlobalCSN for each xid */
+#define GCSNLOG_XACTS_PER_PAGE (BLCKSZ / sizeof(GlobalCSN))
+
+#define TransactionIdToPage(xid)    ((xid) / (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) GCSNLOG_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CLOG control
+ */
+static SlruCtlData GlobalCSNLogCtlData;
+#define GlobalCsnlogCtl (&GlobalCSNLogCtlData)
+
+static int    ZeroGlobalCSNLogPage(int pageno);
+static bool GlobalCSNLogPagePrecedes(int page1, int page2);
+static void GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+                                      TransactionId *subxids,
+                                      GlobalCSN csn, int pageno);
+static void GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn,
+                                      int slotno);
+
+/*
+ * GlobalCSNLogSetCSN
+ *
+ * Record GlobalCSN of transaction and its subtransaction tree.
+ *
+ * xid is a single xid to set status for. This will typically be the top level
+ * transactionid for a top level commit or abort. It can also be a
+ * subtransaction when we record transaction aborts.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ *
+ * csn is the commit sequence number of the transaction. It should be
+ * AbortedGlobalCSN for abort cases.
+ */
+void
+GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+                     TransactionId *subxids, GlobalCSN csn)
+{
+    int            pageno;
+    int            i = 0;
+    int            offset = 0;
+
+    /* Callers of GlobalCSNLogSetCSN() must check GUC params */
+    Assert(track_global_snapshots);
+
+    Assert(TransactionIdIsValid(xid));
+
+    pageno = TransactionIdToPage(xid);        /* get page of parent */
+    for (;;)
+    {
+        int            num_on_page = 0;
+
+        while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno)
+        {
+            num_on_page++;
+            i++;
+        }
+
+        GlobalCSNLogSetPageStatus(xid,
+                            num_on_page, subxids + offset,
+                            csn, pageno);
+        if (i >= nsubxids)
+            break;
+
+        offset = i;
+        pageno = TransactionIdToPage(subxids[offset]);
+        xid = InvalidTransactionId;
+    }
+}
+
+/*
+ * Record the final state of transaction entries in the csn log for
+ * all entries on a single page.  Atomic only on this page.
+ *
+ * Otherwise API is same as TransactionIdSetTreeStatus()
+ */
+static void
+GlobalCSNLogSetPageStatus(TransactionId xid, int nsubxids,
+                           TransactionId *subxids,
+                           GlobalCSN csn, int pageno)
+{
+    int            slotno;
+    int            i;
+
+    LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+    slotno = SimpleLruReadPage(GlobalCsnlogCtl, pageno, true, xid);
+
+    /* Subtransactions first, if needed ... */
+    for (i = 0; i < nsubxids; i++)
+    {
+        Assert(GlobalCsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+        GlobalCSNLogSetCSNInSlot(subxids[i],    csn, slotno);
+    }
+
+    /* ... then the main transaction */
+    if (TransactionIdIsValid(xid))
+        GlobalCSNLogSetCSNInSlot(xid, csn, slotno);
+
+    GlobalCsnlogCtl->shared->page_dirty[slotno] = true;
+
+    LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Sets the commit status of a single transaction.
+ */
+static void
+GlobalCSNLogSetCSNInSlot(TransactionId xid, GlobalCSN csn, int slotno)
+{
+    int            entryno = TransactionIdToPgIndex(xid);
+    GlobalCSN *ptr;
+
+    Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+
+    ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+
+    *ptr = csn;
+}
+
+/*
+ * Interrogate the state of a transaction in the log.
+ *
+ * NB: this is a low-level routine and is NOT the preferred entry point
+ * for most uses; TransactionIdGetGlobalCSN() in global_snapshot.c is the
+ * intended caller.
+ */
+GlobalCSN
+GlobalCSNLogGetCSN(TransactionId xid)
+{
+    int            pageno = TransactionIdToPage(xid);
+    int            entryno = TransactionIdToPgIndex(xid);
+    int            slotno;
+    GlobalCSN *ptr;
+    GlobalCSN    global_csn;
+
+    /* Callers of GlobalCSNLogGetCSN() must check GUC params */
+    Assert(track_global_snapshots);
+
+    /* Can't ask about stuff that might not be around anymore */
+    Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
+
+    /* lock is acquired by SimpleLruReadPage_ReadOnly */
+
+    slotno = SimpleLruReadPage_ReadOnly(GlobalCsnlogCtl, pageno, xid);
+    ptr = (GlobalCSN *) (GlobalCsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+    global_csn = *ptr;
+
+    LWLockRelease(GlobalCSNLogControlLock);
+
+    return global_csn;
+}
+
+/*
+ * Number of shared GlobalCSNLog buffers.
+ */
+static Size
+GlobalCSNLogShmemBuffers(void)
+{
+    return Min(32, Max(4, NBuffers / 512));
+}
+
+/*
+ * Reserve shared memory for GlobalCsnlogCtl.
+ */
+Size
+GlobalCSNLogShmemSize(void)
+{
+    if (!track_global_snapshots)
+        return 0;
+
+    return SimpleLruShmemSize(GlobalCSNLogShmemBuffers(), 0);
+}
+
+/*
+ * Initialization of shared memory for GlobalCSNLog.
+ */
+void
+GlobalCSNLogShmemInit(void)
+{
+    if (!track_global_snapshots)
+        return;
+
+    GlobalCsnlogCtl->PagePrecedes = GlobalCSNLogPagePrecedes;
+    SimpleLruInit(GlobalCsnlogCtl, "GlobalCSNLog Ctl", GlobalCSNLogShmemBuffers(), 0,
+                  GlobalCSNLogControlLock, "pg_global_csn", LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS);
+}
+
+/*
+ * This func must be called ONCE on system install.  It creates the initial
+ * GlobalCSNLog segment.  The pg_global_csn directory is assumed to have been
+ * created by initdb, and GlobalCSNLogShmemInit must have been called already.
+ */
+void
+BootStrapGlobalCSNLog(void)
+{
+    int            slotno;
+
+    if (!track_global_snapshots)
+        return;
+
+    LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+    /* Create and zero the first page of the commit log */
+    slotno = ZeroGlobalCSNLogPage(0);
+
+    /* Make sure it's written out */
+    SimpleLruWritePage(GlobalCsnlogCtl, slotno);
+    Assert(!GlobalCsnlogCtl->shared->page_dirty[slotno]);
+
+    LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Initialize (or reinitialize) a page of GlobalCSNLog to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroGlobalCSNLogPage(int pageno)
+{
+    Assert(LWLockHeldByMe(GlobalCSNLogControlLock));
+    return SimpleLruZeroPage(GlobalCsnlogCtl, pageno);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ *
+ * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+ * if there are none.
+ */
+void
+StartupGlobalCSNLog(TransactionId oldestActiveXID)
+{
+    int            startPage;
+    int            endPage;
+
+    if (!track_global_snapshots)
+        return;
+
+    /*
+     * Since we don't expect pg_global_csn to be valid across crashes, we
+     * initialize the currently-active page(s) to zeroes during startup.
+     * Whenever we advance into a new page, ExtendGlobalCSNLog will likewise
+     * zero the new page without regard to whatever was previously on disk.
+     */
+    LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+    startPage = TransactionIdToPage(oldestActiveXID);
+    endPage = TransactionIdToPage(ShmemVariableCache->nextXid);
+
+    while (startPage != endPage)
+    {
+        (void) ZeroGlobalCSNLogPage(startPage);
+        startPage++;
+        /* must account for wraparound */
+        if (startPage > TransactionIdToPage(MaxTransactionId))
+            startPage = 0;
+    }
+    (void) ZeroGlobalCSNLogPage(startPage);
+
+    LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownGlobalCSNLog(void)
+{
+    if (!track_global_snapshots)
+        return;
+
+    /*
+     * Flush dirty GlobalCSNLog pages to disk.
+     *
+     * This is not actually necessary from a correctness point of view. We do
+     * it merely as a debugging aid.
+     */
+    TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(false);
+    SimpleLruFlush(GlobalCsnlogCtl, false);
+    TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointGlobalCSNLog(void)
+{
+    if (!track_global_snapshots)
+        return;
+
+    /*
+     * Flush dirty GlobalCSNLog pages to disk.
+     *
+     * This is not actually necessary from a correctness point of view. We do
+     * it merely to improve the odds that writing of dirty pages is done by
+     * the checkpoint process and not by backends.
+     */
+    TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_START(true);
+    SimpleLruFlush(GlobalCsnlogCtl, true);
+    TRACE_POSTGRESQL_GLOBALCSNLOG_CHECKPOINT_DONE(true);
+}
+
+/*
+ * Make sure that GlobalCSNLog has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock.  We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty clog or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendGlobalCSNLog(TransactionId newestXact)
+{
+    int            pageno;
+
+    if (!track_global_snapshots)
+        return;
+
+    /*
+     * No work except at first XID of a page.  But beware: just after
+     * wraparound, the first XID of page zero is FirstNormalTransactionId.
+     */
+    if (TransactionIdToPgIndex(newestXact) != 0 &&
+        !TransactionIdEquals(newestXact, FirstNormalTransactionId))
+        return;
+
+    pageno = TransactionIdToPage(newestXact);
+
+    LWLockAcquire(GlobalCSNLogControlLock, LW_EXCLUSIVE);
+
+    /* Zero the page and make an XLOG entry about it */
+    ZeroGlobalCSNLogPage(pageno);
+
+    LWLockRelease(GlobalCSNLogControlLock);
+}
+
+/*
+ * Remove all GlobalCSNLog segments before the one holding the passed
+ * transaction ID.
+ *
+ * This is normally called during checkpoint, with oldestXact being the
+ * oldest TransactionXmin of any running transaction.
+ */
+void
+TruncateGlobalCSNLog(TransactionId oldestXact)
+{
+    int            cutoffPage;
+
+    if (!track_global_snapshots)
+        return;
+
+    /*
+     * The cutoff point is the start of the segment containing oldestXact. We
+     * pass the *page* containing oldestXact to SimpleLruTruncate. We step
+     * back one transaction to avoid passing a cutoff page that hasn't been
+     * created yet in the rare case that oldestXact would be the first item on
+     * a page and oldestXact == next XID.  In that case, if we didn't subtract
+     * one, we'd trigger SimpleLruTruncate's wraparound detection.
+     */
+    TransactionIdRetreat(oldestXact);
+    cutoffPage = TransactionIdToPage(oldestXact);
+
+    SimpleLruTruncate(GlobalCsnlogCtl, cutoffPage);
+}
+
+/*
+ * Decide which of two GlobalCSNLog page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic.  However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+GlobalCSNLogPagePrecedes(int page1, int page2)
+{
+    TransactionId xid1;
+    TransactionId xid2;
+
+    xid1 = ((TransactionId) page1) * GCSNLOG_XACTS_PER_PAGE;
+    xid1 += FirstNormalTransactionId;
+    xid2 = ((TransactionId) page2) * GCSNLOG_XACTS_PER_PAGE;
+    xid2 += FirstNormalTransactionId;
+
+    return TransactionIdPrecedes(xid1, xid2);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 306861bb79..3aee5e50c5 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 394843f7e9..4035b90d5e 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -169,6 +170,7 @@ GetNewTransactionId(bool isSubXact)
      * Extend pg_subtrans and pg_commit_ts too.
      */
     ExtendCLOG(xid);
+    ExtendGlobalCSNLog(xid);
     ExtendCommitTs(xid);
     ExtendSUBTRANS(xid);
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 493f1db7b9..ca0d934c76 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
 #include "access/subtrans.h"
@@ -5258,6 +5259,7 @@ BootStrapXLOG(void)
 
     /* Bootstrap the commit log, too */
     BootStrapCLOG();
+    BootStrapGlobalCSNLog();
     BootStrapCommitTs();
     BootStrapSUBTRANS();
     BootStrapMultiXact();
@@ -7066,6 +7068,7 @@ StartupXLOG(void)
              * maintained during recovery and need not be started yet.
              */
             StartupCLOG();
+            StartupGlobalCSNLog(oldestActiveXID);
             StartupSUBTRANS(oldestActiveXID);
 
             /*
@@ -7864,6 +7867,7 @@ StartupXLOG(void)
     if (standbyState == STANDBY_DISABLED)
     {
         StartupCLOG();
+        StartupGlobalCSNLog(oldestActiveXID);
         StartupSUBTRANS(oldestActiveXID);
     }
 
@@ -8527,6 +8531,7 @@ ShutdownXLOG(int code, Datum arg)
         CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
     }
     ShutdownCLOG();
+    ShutdownGlobalCSNLog();
     ShutdownCommitTs();
     ShutdownSUBTRANS();
     ShutdownMultiXact();
@@ -9104,7 +9109,10 @@ CreateCheckPoint(int flags)
      * StartupSUBTRANS hasn't been called yet.
      */
     if (!RecoveryInProgress())
+    {
         TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+        TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+    }
 
     /* Real work is done, but log and update stats before releasing lock. */
     LogCheckpointEnd(false);
@@ -9180,6 +9188,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
     CheckPointCLOG();
+    CheckPointGlobalCSNLog();
     CheckPointCommitTs();
     CheckPointSUBTRANS();
     CheckPointMultiXact();
@@ -9463,7 +9472,10 @@ CreateRestartPoint(int flags)
      * this because StartupSUBTRANS hasn't been called yet.
      */
     if (EnableHotStandby)
+    {
         TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+        TruncateGlobalCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+    }
 
     /* Real work is done, but log and update before releasing lock. */
     LogCheckpointEnd(true);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 0c86a581c0..2af468fc6a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/global_csn_log.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
@@ -127,6 +128,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
         size = add_size(size, ProcGlobalShmemSize());
         size = add_size(size, XLOGShmemSize());
         size = add_size(size, CLOGShmemSize());
+        size = add_size(size, GlobalCSNLogShmemSize());
         size = add_size(size, CommitTsShmemSize());
         size = add_size(size, SUBTRANSShmemSize());
         size = add_size(size, TwoPhaseShmemSize());
@@ -219,6 +221,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
      */
     XLOGShmemInit();
     CLOGShmemInit();
+    GlobalCSNLogShmemInit();
     CommitTsShmemInit();
     SUBTRANSShmemInit();
     MultiXactShmemInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index bd20497d81..64ab249615 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -46,6 +46,7 @@
 #include <signal.h>
 
 #include "access/clog.h"
+#include "access/global_csn_log.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -830,6 +831,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
     while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
     {
         ExtendSUBTRANS(latestObservedXid);
+        ExtendGlobalCSNLog(latestObservedXid);
         TransactionIdAdvance(latestObservedXid);
     }
     TransactionIdRetreat(latestObservedXid);    /* = running->nextXid - 1 */
@@ -3209,6 +3211,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
         while (TransactionIdPrecedes(next_expected_xid, xid))
         {
             TransactionIdAdvance(next_expected_xid);
+            ExtendGlobalCSNLog(next_expected_xid);
             ExtendSUBTRANS(next_expected_xid);
         }
         Assert(next_expected_xid == xid);
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ecedb..9615058f29 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock                42
 BackendRandomLock                    43
 LogicalRepWorkerLock                44
 CLogTruncationLock                    45
+GlobalCSNLogControlLock                46
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a88ea6cfc9..2701528c55 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1019,6 +1019,15 @@ static struct config_bool ConfigureNamesBool[] =
         NULL, NULL, NULL
     },
     {
+        {"track_global_snapshots", PGC_POSTMASTER, RESOURCES_MEM,
+            gettext_noop("Enable global snapshot tracking."),
+            gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
+        },
+        &track_global_snapshots,
+        true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+        NULL, NULL, NULL
+    },
+    {
         {"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
             gettext_noop("Enables SSL connections."),
             NULL
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index ad06e8e2ea..5ebe2ad888 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -77,6 +77,8 @@ provider postgresql {
     probe clog__checkpoint__done(bool);
     probe subtrans__checkpoint__start(bool);
     probe subtrans__checkpoint__done(bool);
+    probe globalcsnlog__checkpoint__start(bool);
+    probe globalcsnlog__checkpoint__done(bool);
     probe multixact__checkpoint__start(bool);
     probe multixact__checkpoint__done(bool);
     probe twophase__checkpoint__start();
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 3f203c6ca6..40fceb81f8 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -220,7 +220,8 @@ static const char *const subdirs[] = {
     "pg_xact",
     "pg_logical",
     "pg_logical/snapshots",
-    "pg_logical/mappings"
+    "pg_logical/mappings",
+    "pg_global_csn"
 };
 
 
diff --git a/src/include/access/global_csn_log.h b/src/include/access/global_csn_log.h
new file mode 100644
index 0000000000..417c26c8a3
--- /dev/null
+++ b/src/include/access/global_csn_log.h
@@ -0,0 +1,30 @@
+/*
+ * global_csn_log.h
+ *
+ * Commit-Sequence-Number log.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_csn_log.h
+ */
+#ifndef CSNLOG_H
+#define CSNLOG_H
+
+#include "access/xlog.h"
+#include "utils/snapshot.h"
+
+extern void GlobalCSNLogSetCSN(TransactionId xid, int nsubxids,
+                               TransactionId *subxids, GlobalCSN csn);
+extern GlobalCSN GlobalCSNLogGetCSN(TransactionId xid);
+
+extern Size GlobalCSNLogShmemSize(void);
+extern void GlobalCSNLogShmemInit(void);
+extern void BootStrapGlobalCSNLog(void);
+extern void StartupGlobalCSNLog(TransactionId oldestActiveXID);
+extern void ShutdownGlobalCSNLog(void);
+extern void CheckPointGlobalCSNLog(void);
+extern void ExtendGlobalCSNLog(TransactionId newestXact);
+extern void TruncateGlobalCSNLog(TransactionId oldestXact);
+
+#endif   /* CSNLOG_H */
\ No newline at end of file
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index c21bfe2f66..ab330b71c2 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -198,6 +198,7 @@ typedef enum BuiltinTrancheIds
     LWTRANCHE_CLOG_BUFFERS = NUM_INDIVIDUAL_LWLOCKS,
     LWTRANCHE_COMMITTS_BUFFERS,
     LWTRANCHE_SUBTRANS_BUFFERS,
+    LWTRANCHE_GLOBAL_CSN_LOG_BUFFERS,
     LWTRANCHE_MXACTOFFSET_BUFFERS,
     LWTRANCHE_MXACTMEMBER_BUFFERS,
     LWTRANCHE_ASYNC_BUFFERS,
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index a8a5a8f4c0..318d41e6f7 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -24,6 +24,9 @@ typedef struct SnapshotData *Snapshot;
 
 #define InvalidSnapshot        ((Snapshot) NULL)
 
+typedef uint64 GlobalCSN;
+extern bool track_global_snapshots;
+
 /*
  * We use SnapshotData structures to represent both "regular" (MVCC)
  * snapshots and "special" snapshots that have non-MVCC semantics.
-- 
2.11.0

From 4000408863fca43b1e79c55d638525c0cd04f92a Mon Sep 17 00:00:00 2001
From: Stas Kelvich <stanconn@gmail.com>
Date: Wed, 25 Apr 2018 16:22:30 +0300
Subject: [PATCH 2/3] Global snapshots

---
 src/backend/access/transam/Makefile           |   2 +-
 src/backend/access/transam/global_snapshot.c  | 754 ++++++++++++++++++++++++++
 src/backend/access/transam/twophase.c         | 156 ++++++
 src/backend/access/transam/xact.c             |  29 +
 src/backend/access/transam/xlog.c             |   2 +
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/storage/ipc/procarray.c           |  93 +++-
 src/backend/storage/lmgr/lwlocknames.txt      |   1 +
 src/backend/storage/lmgr/proc.c               |   5 +
 src/backend/utils/misc/guc.c                  |  13 +-
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/backend/utils/time/snapmgr.c              | 103 ++++
 src/backend/utils/time/tqual.c                |  65 ++-
 src/include/access/global_snapshot.h          |  72 +++
 src/include/access/twophase.h                 |   1 +
 src/include/catalog/pg_proc.dat               |  14 +
 src/include/datatype/timestamp.h              |   3 +
 src/include/fmgr.h                            |   1 +
 src/include/portability/instr_time.h          |  10 +
 src/include/storage/proc.h                    |  15 +
 src/include/storage/procarray.h               |   8 +
 src/include/utils/snapmgr.h                   |   3 +
 src/include/utils/snapshot.h                  |   8 +
 23 files changed, 1355 insertions(+), 8 deletions(-)
 create mode 100644 src/backend/access/transam/global_snapshot.c
 create mode 100644 src/include/access/global_snapshot.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 03aa360ea3..8ef677cada 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = clog.o commit_ts.o global_csn_log.o generic_xlog.o \
+OBJS = clog.o commit_ts.o global_csn_log.o global_snapshot.o generic_xlog.o \
     multixact.o parallel.o rmgr.o slru.o \
     subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
     xact.o xlog.o xlogarchive.o xlogfuncs.o \
diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c
new file mode 100644
index 0000000000..b9d6c56334
--- /dev/null
+++ b/src/backend/access/transam/global_snapshot.c
@@ -0,0 +1,754 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.c
+ *        Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported global_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+/*
+ * GlobalSnapshotState
+ *
+ * Do not trust local clocks to be strictly monotonical and save last acquired
+ * value so later we can compare next timestamp with it. Accessed through
+ * GlobalSnapshotGenerate() and GlobalSnapshotSync().
+ */
+typedef struct
+{
+    GlobalCSN         last_global_csn;
+    volatile slock_t lock;
+} GlobalSnapshotState;
+
+static GlobalSnapshotState *gsState;
+
+
+/*
+ * GUC to delay advance of oldestXid for this amount of time. Also determines
+ * the size GlobalSnapshotXidMap circular buffer.
+ */
+int global_snapshot_defer_time;
+
+/*
+ * Enables this module.
+ */
+extern bool track_global_snapshots;
+
+/*
+ * GlobalSnapshotXidMap
+ *
+ * To be able to install global snapshot that points to past we need to keep
+ * old versions of tuples and therefore delay advance of oldestXid.  Here we
+ * keep track of correspondence between snapshot's global_csn and oldestXid
+ * that was set at the time when the snapshot was taken.  Much like the
+ * snapshot too old's OldSnapshotControlData does, but with finer granularity
+ * to seconds.
+ *
+ * Different strategies can be employed to hold oldestXid (e.g. we can track
+ * oldest global_csn-based snapshot among cluster nodes and map it oldestXid
+ * on each node) but here implemented one that tries to avoid cross-node
+ * communications which are tricky in case of postgres_fdw.
+ *
+ * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores
+ * correspondence between current global_csn and oldestXmin in a sparse way:
+ * global_csn is rounded to seconds (and here we use the fact that global_csn
+ * is just a timestamp) and oldestXmin is stored in the circular buffer where
+ * rounded global_csn acts as an offset from current circular buffer head.
+ * Size of the circular buffer is controlled by global_snapshot_defer_time GUC.
+ *
+ * When global snapshot arrives from different node we check that its
+ * global_csn is still in our map, otherwise we'll error out with "snapshot too
+ * old" message.  If global_csn is successfully mapped to oldestXid we move
+ * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to
+ * mapped oldestXid.  That way GetOldestXmin() can take into account backends
+ * with imported global snapshot and old tuple versions will be preserved.
+ *
+ * Also while calculating oldestXmin for our map in presence of imported
+ * global snapshots we should use proc->originalXmin instead of pgxact->xmin
+ * that was set during import.  Otherwise, we can create a feedback loop:
+ * xmin's of imported global snapshots were calculated using our map and new
+ * entries in map going to be calculated based on that xmin's, and there is
+ * a risk to stuck forever with one non-increasing oldestXmin.  All other
+ * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions
+ * are preserved.
+ */
+typedef struct GlobalSnapshotXidMap
+{
+    int                 head;                /* offset of current freshest value */
+    int                 size;                /* total size of circular buffer */
+    GlobalCSN_atomic last_csn_seconds;    /* last rounded global_csn that changed
+                                         * xmin_by_second[] */
+    TransactionId   *xmin_by_second;    /* circular buffer of oldestXmin's */
+}
+GlobalSnapshotXidMap;
+
+static GlobalSnapshotXidMap *gsXidMap;
+
+
+/* Estimate shared memory space needed */
+Size
+GlobalSnapshotShmemSize(void)
+{
+    Size    size = 0;
+
+    if (track_global_snapshots || global_snapshot_defer_time > 0)
+    {
+        size += MAXALIGN(sizeof(GlobalSnapshotState));
+    }
+
+    if (global_snapshot_defer_time > 0)
+    {
+        size += sizeof(GlobalSnapshotXidMap);
+        size += global_snapshot_defer_time*sizeof(TransactionId);
+        size = MAXALIGN(size);
+    }
+
+    return size;
+}
+
+/* Init shared memory structures */
+void
+GlobalSnapshotShmemInit()
+{
+    bool found;
+
+    if (track_global_snapshots || global_snapshot_defer_time > 0)
+    {
+        gsState = ShmemInitStruct("gsState",
+                                sizeof(GlobalSnapshotState),
+                                &found);
+        if (!found)
+        {
+            gsState->last_global_csn = 0;
+            SpinLockInit(&gsState->lock);
+        }
+    }
+
+    if (global_snapshot_defer_time > 0)
+    {
+        gsXidMap = ShmemInitStruct("gsXidMap",
+                                   sizeof(GlobalSnapshotXidMap),
+                                   &found);
+        if (!found)
+        {
+            int i;
+
+            pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0);
+            gsXidMap->head = 0;
+            gsXidMap->size = global_snapshot_defer_time;
+            gsXidMap->xmin_by_second =
+                            ShmemAlloc(sizeof(TransactionId)*gsXidMap->size);
+
+            for (i = 0; i < gsXidMap->size; i++)
+                gsXidMap->xmin_by_second[i] = InvalidTransactionId;
+        }
+    }
+}
+
+/*
+ * GlobalSnapshotStartup
+ *
+ * Set gsXidMap entries to oldestActiveXID during startup.
+ */
+void
+GlobalSnapshotStartup(TransactionId oldestActiveXID)
+{
+    /*
+     * Run only if we have initialized shared memory and gsXidMap
+     * is enabled.
+     */
+    if (IsNormalProcessingMode() && global_snapshot_defer_time > 0)
+    {
+        int i;
+
+        Assert(TransactionIdIsValid(oldestActiveXID));
+        for (i = 0; i < gsXidMap->size; i++)
+            gsXidMap->xmin_by_second[i] = oldestActiveXID;
+        ProcArraySetGlobalSnapshotXmin(oldestActiveXID);
+    }
+}
+
+/*
+ * GlobalSnapshotMapXmin
+ *
+ * Maintain circular buffer of oldestXmins for several seconds in past. This
+ * buffer allows to shift oldestXmin in the past when backend is importing
+ * global transaction. Otherwise old versions of tuples that were needed for
+ * this transaction can be recycled by other processes (vacuum, HOT, etc).
+ *
+ * Locking here is not trivial. Called upon each snapshot creation after
+ * ProcArrayLock is released. Such usage creates several race conditions. It
+ * is possible that backend who got global_csn called GlobalSnapshotMapXmin()
+ * only after other backends managed to get snapshot and complete
+ * GlobalSnapshotMapXmin() call, or even committed. This is safe because
+ *
+ *      * We already hold our xmin in MyPgXact, so our snapshot will not be
+ *           harmed even though ProcArrayLock is released.
+ *
+ *        * snapshot_global_csn is always pessmistically rounded up to the next
+ *          second.
+ *
+ *      * For performance reasons, xmin value for particular second is filled
+ *        only once. Because of that instead of writing to buffer just our
+ *        xmin (which is enough for our snapshot), we bump oldestXmin there --
+ *        it mitigates the possibility of damaging someone else's snapshot by
+ *        writing to the buffer too advanced value in case of slowness of
+ *        another backend who generated csn earlier, but didn't manage to
+ *        insert it before us.
+ *
+ *        * if GlobalSnapshotMapXmin() founds a gap in several seconds between
+ *          current call and latest completed call then it should fill that gap
+ *          with latest known values instead of new one. Otherwise it is
+ *          possible (however highly unlikely) that this gap also happend
+ *          between taking snapshot and call to GlobalSnapshotMapXmin() for some
+ *          backend. And we are at risk to fill circullar buffer with
+ *          oldestXmin's that are bigger then they actually were.
+ */
+void
+GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn)
+{
+    int offset, gap, i;
+    GlobalCSN csn_seconds;
+    GlobalCSN last_csn_seconds;
+    volatile TransactionId oldest_deferred_xmin;
+    TransactionId current_oldest_xmin, previous_oldest_xmin;
+
+    /* Callers should check config values */
+    Assert(global_snapshot_defer_time > 0);
+    Assert(gsXidMap != NULL);
+
+    /*
+     * Round up global_csn to the next second -- pessimistically and safely.
+     */
+    csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1);
+
+    /*
+     * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock
+     * if oldestXid was already written to xmin_by_second[] for this rounded
+     * global_csn.
+     */
+    if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds)
+        return;
+
+    /* Ok, we have new entry (or entries) */
+    LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE);
+
+    /* Re-check last_csn_seconds under lock */
+    last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+    if (last_csn_seconds >= csn_seconds)
+    {
+        LWLockRelease(GlobalSnapshotXidMapLock);
+        return;
+    }
+    pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds);
+
+    /*
+     * Count oldest_xmin.
+     *
+     * It was possible to calculate oldest_xmin during corresponding snapshot
+     * creation, but GetSnapshotData() intentionally reads only PgXact, but not
+     * PgProc. And we need info about originalXmin (see comment to gsXidMap)
+     * which is stored in PgProc because of threats in comments around PgXact
+     * about extending it with new fields. So just calculate oldest_xmin again,
+     * that anyway happens quite rarely.
+     */
+    current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN);
+
+    previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+
+    Assert(TransactionIdIsNormal(current_oldest_xmin));
+    Assert(TransactionIdIsNormal(previous_oldest_xmin));
+
+    gap = csn_seconds - last_csn_seconds;
+    offset = csn_seconds % gsXidMap->size;
+
+    /* Sanity check before we update head and gap */
+    Assert( gap >= 1 );
+    Assert( (gsXidMap->head + gap) % gsXidMap->size == offset );
+
+    gap = gap > gsXidMap->size ? gsXidMap->size : gap;
+    gsXidMap->head = offset;
+
+    /* Fill new entry with current_oldest_xmin */
+    gsXidMap->xmin_by_second[offset] = current_oldest_xmin;
+
+    /*
+     * If we have gap then fill it with previous_oldest_xmin for reasons
+     * outlined in comment above this function.
+     */
+    for (i = 1; i < gap; i++)
+    {
+        offset = (offset + gsXidMap->size - 1) % gsXidMap->size;
+        gsXidMap->xmin_by_second[offset] = previous_oldest_xmin;
+    }
+
+    oldest_deferred_xmin =
+        gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ];
+
+    LWLockRelease(GlobalSnapshotXidMapLock);
+
+    /*
+     * Advance procArray->global_snapshot_xmin after we released
+     * GlobalSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it
+     * never goes backwards regardless of how slow we can do that.
+     */
+    Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin,
+                                        ProcArrayGetGlobalSnapshotXmin()));
+    ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin);
+}
+
+
+/*
+ * GlobalSnapshotToXmin
+ *
+ * Get oldestXmin that took place when snapshot_global_csn was taken.
+ */
+TransactionId
+GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn)
+{
+    TransactionId xmin;
+    GlobalCSN csn_seconds;
+    volatile GlobalCSN last_csn_seconds;
+
+    /* Callers should check config values */
+    Assert(global_snapshot_defer_time > 0);
+    Assert(gsXidMap != NULL);
+
+    /* Round down to get conservative estimates */
+    csn_seconds = (snapshot_global_csn / NSECS_PER_SEC);
+
+    LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED);
+    last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+    if (csn_seconds > last_csn_seconds)
+    {
+        /* we don't have entry for this global_csn yet, return latest known */
+        xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+    }
+    else if (last_csn_seconds - csn_seconds < gsXidMap->size)
+    {
+        /* we are good, retrieve value from our map */
+        Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head);
+        xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size];
+    }
+    else
+    {
+        /* requested global_csn is too old, let caller know */
+        xmin = InvalidTransactionId;
+    }
+    LWLockRelease(GlobalSnapshotXidMapLock);
+
+    return xmin;
+}
+
+/*
+ * GlobalSnapshotGenerate
+ *
+ * Generate GlobalCSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+GlobalCSN
+GlobalSnapshotGenerate(bool locked)
+{
+    instr_time    current_time;
+    GlobalCSN    global_csn;
+
+    Assert(track_global_snapshots || global_snapshot_defer_time > 0);
+
+    /*
+     * TODO: create some macro that add small random shift to current time.
+     */
+    INSTR_TIME_SET_CURRENT(current_time);
+    global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time);
+
+    /* TODO: change to atomics? */
+    if (!locked)
+        SpinLockAcquire(&gsState->lock);
+
+    if (global_csn <= gsState->last_global_csn)
+        global_csn = ++gsState->last_global_csn;
+    else
+        gsState->last_global_csn = global_csn;
+
+    if (!locked)
+        SpinLockRelease(&gsState->lock);
+
+    return global_csn;
+}
+
+/*
+ * GlobalSnapshotSync
+ *
+ * Due to time desynchronization on different nodes we can receive global_csn
+ * which is greater than global_csn on this node. To preserve proper isolation
+ * this node needs to wait when such global_csn comes on local clock.
+ *
+ * This should happend relatively rare if nodes have running NTP/PTP/etc.
+ * Complain if wait time is more than SNAP_SYNC_COMPLAIN.
+ */
+void
+GlobalSnapshotSync(GlobalCSN remote_gcsn)
+{
+    GlobalCSN    local_gcsn;
+    GlobalCSN    delta;
+
+    Assert(track_global_snapshots);
+
+    for(;;)
+    {
+        SpinLockAcquire(&gsState->lock);
+        if (gsState->last_global_csn > remote_gcsn)
+        {
+            /* Everything is fine */
+            SpinLockRelease(&gsState->lock);
+            return;
+        }
+        else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn)
+        {
+            /*
+             * Everything is fine too, but last_global_csn wasn't updated for
+             * some time.
+             */
+            SpinLockRelease(&gsState->lock);
+            return;
+        }
+        SpinLockRelease(&gsState->lock);
+
+        /* Okay we need to sleep now */
+        delta = remote_gcsn - local_gcsn;
+        if (delta > SNAP_DESYNC_COMPLAIN)
+            ereport(WARNING,
+                (errmsg("remote global snapshot exceeds ours by more than a second"),
+                 errhint("Consider running NTPd on servers participating in global transaction")));
+
+        /* TODO: report this sleeptime somewhere? */
+        pg_usleep((long) (delta/NSECS_PER_USEC));
+
+        /*
+         * Loop that checks to ensure that we actually slept for specified
+         * amount of time.
+         */
+    }
+
+    Assert(false); /* Should not happend */
+    return;
+}
+
+/*
+ * TransactionIdGetGlobalCSN
+ *
+ * Get GlobalCSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+GlobalCSN
+TransactionIdGetGlobalCSN(TransactionId xid)
+{
+    GlobalCSN global_csn;
+
+    Assert(track_global_snapshots);
+
+    /* Handle permanent TransactionId's for which we don't have mapping */
+    if (!TransactionIdIsNormal(xid))
+    {
+        if (xid == InvalidTransactionId)
+            return AbortedGlobalCSN;
+        if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+            return FrozenGlobalCSN;
+        Assert(false); /* Should not happend */
+    }
+
+    /*
+     * For xids which less then TransactionXmin GlobalCSNLog can be already
+     * trimmed but we know that such transaction is definetly not concurrently
+     * running according to any snapshot including timetravel ones. Callers
+     * should check TransactionDidCommit after.
+     */
+    if (TransactionIdPrecedes(xid, TransactionXmin))
+        return FrozenGlobalCSN;
+
+    /* Read GlobalCSN from SLRU */
+    global_csn = GlobalCSNLogGetCSN(xid);
+
+    /*
+     * If we faced InDoubt state then transaction is beeing committed and we
+     * should wait until GlobalCSN will be assigned so that visibility check
+     * could decide whether tuple is in snapshot. See also comments in
+     * GlobalSnapshotPrecommit().
+     */
+    if (GlobalCSNIsInDoubt(global_csn))
+    {
+        XactLockTableWait(xid, NULL, NULL, XLTW_None);
+        global_csn = GlobalCSNLogGetCSN(xid);
+        Assert(GlobalCSNIsNormal(global_csn) ||
+                GlobalCSNIsAborted(global_csn));
+    }
+
+    Assert(GlobalCSNIsNormal(global_csn) ||
+            GlobalCSNIsInProgress(global_csn) ||
+            GlobalCSNIsAborted(global_csn));
+
+    return global_csn;
+}
+
+/*
+ * XidInvisibleInGlobalSnapshot
+ *
+ * Version of XidInMVCCSnapshot for global transactions. For non-imported
+ * global snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot)
+{
+    GlobalCSN csn;
+
+    Assert(track_global_snapshots);
+
+    csn = TransactionIdGetGlobalCSN(xid);
+
+    if (GlobalCSNIsNormal(csn))
+    {
+        if (csn < snapshot->global_csn)
+            return false;
+        else
+            return true;
+    }
+    else if (GlobalCSNIsFrozen(csn))
+    {
+        /* It is bootstrap or frozen transaction */
+        return false;
+    }
+    else
+    {
+        /* It is aborted or in-progress */
+        Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn));
+        if (GlobalCSNIsAborted(csn))
+            Assert(TransactionIdDidAbort(xid));
+        return true;
+    }
+}
+
+
+/*****************************************************************************
+ * Functions to handle distributed commit on transaction coordinator:
+ * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent().
+ * Correspoding functions for remote nodes are defined in twophase.c:
+ * pg_global_snapshot_prepare/pg_global_snapshot_assign.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotPrepareCurrent
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+GlobalCSN
+GlobalSnapshotPrepareCurrent()
+{
+    TransactionId xid = GetCurrentTransactionIdIfAny();
+
+    if (!track_global_snapshots)
+        ereport(ERROR,
+            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("could not prepare transaction for global commit"),
+                errhint("Make sure the configuration parameter \"%s\" is enabled.",
+                        "track_global_snapshots")));
+
+    if (TransactionIdIsValid(xid))
+    {
+        TransactionId *subxids;
+        int nsubxids = xactGetCommittedChildren(&subxids);
+        GlobalCSNLogSetCSN(xid, nsubxids,
+                                    subxids, InDoubtGlobalCSN);
+    }
+
+    /* Nothing to write if we don't heve xid */
+
+    return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * GlobalSnapshotAssignCsnCurrent
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ */
+void
+GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn)
+{
+    if (!track_global_snapshots)
+        ereport(ERROR,
+            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("could not prepare transaction for global commit"),
+                errhint("Make sure the configuration parameter \"%s\" is enabled.",
+                        "track_global_snapshots")));
+
+    if (!GlobalCSNIsNormal(global_csn))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+    /* Skip emtpty transactions */
+    if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+        return;
+
+    /* Set global_csn and defuse ProcArrayEndTransaction from assigning one */
+    pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn);
+}
+
+
+/*****************************************************************************
+ * Functions to handle global and local transactions commit.
+ *
+ * For local transactions GlobalSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires global_csn under ProcArray lock and stores it
+ * in proc->assignedGlobalCsn. It's important that global_csn for commit is
+ * generated under ProcArray lock, otherwise global and local snapshots won't
+ * be equivalent. Consequent call to GlobalSnapshotCommit will write
+ * proc->assignedGlobalCsn to GlobalCSNLog.
+ *
+ * Same rules applies to global transaction, except that global_csn is already
+ * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snapshot_assign and
+ * GlobalSnapshotPrecommit is basically no-op.
+ *
+ * GlobalSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotAbort
+ *
+ * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+GlobalSnapshotAbort(PGPROC *proc, TransactionId xid,
+                    int nsubxids, TransactionId *subxids)
+{
+    if (!track_global_snapshots)
+        return;
+
+    GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN);
+
+    /*
+     * Clean assignedGlobalCsn anyway, as it was possibly set in
+     * GlobalSnapshotAssignCsnCurrent.
+     */
+    pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
+
+/*
+ * GlobalSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * global csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and GlobalCSN assignment can wait
+ * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN().
+ *
+ * For global transaction this does nothing as InDoubt state was written
+ * earlier.
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+                    int nsubxids, TransactionId *subxids)
+{
+    GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN;
+    bool in_progress;
+
+    if (!track_global_snapshots)
+        return;
+
+    /* Set InDoubt status if it is local transaction */
+    in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn,
+                                                 &oldAssignedGlobalCsn,
+                                                 InDoubtGlobalCSN);
+    if (in_progress)
+    {
+        Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn));
+        GlobalCSNLogSetCSN(xid, nsubxids,
+                           subxids, InDoubtGlobalCSN);
+    }
+    else
+    {
+        /* Otherwise we should have valid GlobalCSN by this time */
+        Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn));
+        /* Also global transaction should already be in InDoubt state */
+        Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid)));
+    }
+}
+
+/*
+ * GlobalSnapshotCommit
+ *
+ * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be
+ * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetGlobalCSN can wait on this
+ * lock for GlobalCSN.
+ */
+void
+GlobalSnapshotCommit(PGPROC *proc, TransactionId xid,
+                    int nsubxids, TransactionId *subxids)
+{
+    volatile GlobalCSN assigned_global_csn;
+
+    if (!track_global_snapshots)
+        return;
+
+    if (!TransactionIdIsValid(xid))
+    {
+        assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+        Assert(GlobalCSNIsInProgress(assigned_global_csn));
+        return;
+    }
+
+    /* Finally write resulting GlobalCSN in SLRU */
+    assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+    Assert(GlobalCSNIsNormal(assigned_global_csn));
+    GlobalCSNLogSetCSN(xid, nsubxids,
+                           subxids, assigned_global_csn);
+
+    /* Reset for next transaction */
+    pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3aee5e50c5..2fe8b93617 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_snapshot.h"
 #include "access/global_csn_log.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
@@ -1526,9 +1527,35 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
                                        hdr->nabortrels, abortrels,
                                        gid);
 
+    /*
+     * GlobalSnapshot callbacks that should be called right before we are
+     * going to become visible. Details in comments to this functions.
+     */
+    if (isCommit)
+        GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+    else
+        GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
     ProcArrayRemove(proc, latestXid);
 
     /*
+     * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+     * Should be called after ProcArrayEndTransaction, but before releasing
+     * transaction locks, since TransactionIdGetGlobalCSN relies on
+     * XactLockTableWait to await global_csn.
+     */
+    if (isCommit)
+    {
+        GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+    }
+    else
+    {
+        Assert(GlobalCSNIsInProgress(
+                   pg_atomic_read_u64(&proc->assignedGlobalCsn)));
+    }
+
+    /*
      * In case we fail while running the callbacks, mark the gxact invalid so
      * no one else will try to commit/rollback, and so it will be recycled if
      * we fail after this point.  It is still locked by our backend so it
@@ -2513,3 +2540,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 
     return;
 }
+
+/*
+ * GlobalSnapshotPrepareTwophase
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ *
+ * This function is a counterpart of GlobalSnapshotPrepareCurrent() for
+ * twophase transactions.
+ */
+static GlobalCSN
+GlobalSnapshotPrepareTwophase(const char *gid)
+{
+    GlobalTransaction gxact;
+    PGXACT       *pgxact;
+    char       *buf;
+    TransactionId xid;
+    xl_xact_parsed_prepare parsed;
+
+    if (!track_global_snapshots)
+        ereport(ERROR,
+            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("could not prepare transaction for global commit"),
+                errhint("Make sure the configuration parameter \"%s\" is enabled.",
+                        "track_global_snapshots")));
+
+    /*
+     * Validate the GID, and lock the GXACT to ensure that two backends do not
+     * try to access the same GID at once.
+     */
+    gxact = LockGXact(gid, GetUserId());
+    pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+    xid = pgxact->xid;
+
+    if (gxact->ondisk)
+        buf = ReadTwoPhaseFile(xid, true);
+    else
+        XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
+    ParsePrepareRecord(0, buf, &parsed);
+
+    GlobalCSNLogSetCSN(xid, parsed.nsubxacts,
+                    parsed.subxacts, InDoubtGlobalCSN);
+
+    /* Unlock our GXACT */
+    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+    gxact->locking_backend = InvalidBackendId;
+    LWLockRelease(TwoPhaseStateLock);
+
+    pfree(buf);
+
+    return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * SQL interface to GlobalSnapshotPrepareTwophase()
+ *
+ * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT
+ */
+Datum
+pg_global_snapshot_prepare(PG_FUNCTION_ARGS)
+{
+    const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+    GlobalCSN    global_csn;
+
+    global_csn = GlobalSnapshotPrepareTwophase(gid);
+
+    PG_RETURN_INT64(global_csn);
+}
+
+
+/*
+ * TwoPhaseAssignGlobalCsn
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ *
+ * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for
+ * twophase transactions.
+ */
+static void
+GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn)
+{
+    GlobalTransaction gxact;
+    PGPROC       *proc;
+
+    if (!track_global_snapshots)
+        ereport(ERROR,
+            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("could not prepare transaction for global commit"),
+                errhint("Make sure the configuration parameter \"%s\" is enabled.",
+                        "track_global_snapshots")));
+
+    if (!GlobalCSNIsNormal(global_csn))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+    /*
+     * Validate the GID, and lock the GXACT to ensure that two backends do not
+     * try to access the same GID at once.
+     */
+    gxact = LockGXact(gid, GetUserId());
+    proc = &ProcGlobal->allProcs[gxact->pgprocno];
+
+    /* Set global_csn and defuse ProcArrayRemove from assigning one. */
+    pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn);
+
+    /* Unlock our GXACT */
+    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+    gxact->locking_backend = InvalidBackendId;
+    LWLockRelease(TwoPhaseStateLock);
+}
+
+/*
+ * SQL interface to GlobalSnapshotAssignCsnTwoPhase()
+ *
+ * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn'
+ */
+Datum
+pg_global_snapshot_assign(PG_FUNCTION_ARGS)
+{
+    const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+    GlobalCSN    global_csn = PG_GETARG_INT64(1);
+
+    GlobalSnapshotAssignCsnTwoPhase(gid, global_csn);
+    PG_RETURN_VOID();
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9aa63c8792..0086adadf1 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_snapshot.h"
 #include "access/multixact.h"
 #include "access/parallel.h"
 #include "access/subtrans.h"
@@ -1341,6 +1342,14 @@ RecordTransactionCommit(void)
 
     /* Reset XactLastRecEnd until the next transaction writes something */
     XactLastRecEnd = 0;
+
+    /*
+     * Mark our transaction as InDoubt in GlobalCsnLog and get ready for
+     * commit.
+     */
+    if (markXidCommitted)
+        GlobalSnapshotPrecommit(MyProc, xid, nchildren, children);
+
 cleanup:
     /* Clean up local data */
     if (rels)
@@ -1602,6 +1611,11 @@ RecordTransactionAbort(bool isSubXact)
      */
     TransactionIdAbortTree(xid, nchildren, children);
 
+    /*
+     * Mark our transaction as Aborted in GlobalCsnLog.
+     */
+    GlobalSnapshotAbort(MyProc, xid, nchildren, children);
+
     END_CRIT_SECTION();
 
     /* Compute latestXid while we have the child XIDs handy */
@@ -2060,6 +2074,21 @@ CommitTransaction(void)
     ProcArrayEndTransaction(MyProc, latestXid);
 
     /*
+     * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+     * Should be called after ProcArrayEndTransaction, but before releasing
+     * transaction locks.
+     */
+    if (!is_parallel_worker)
+    {
+        TransactionId  xid = GetTopTransactionIdIfAny();
+        TransactionId *subxids;
+        int               nsubxids;
+
+        nsubxids = xactGetCommittedChildren(&subxids);
+        GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids);
+    }
+
+    /*
      * This is all post-commit cleanup.  Note that if an error is raised here,
      * it's too late to abort the transaction.  This should be just
      * noncritical resource releasing.
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ca0d934c76..edb0d07aca 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7070,6 +7070,7 @@ StartupXLOG(void)
             StartupCLOG();
             StartupGlobalCSNLog(oldestActiveXID);
             StartupSUBTRANS(oldestActiveXID);
+            GlobalSnapshotStartup(oldestActiveXID);
 
             /*
              * If we're beginning at a shutdown checkpoint, we know that
@@ -7869,6 +7870,7 @@ StartupXLOG(void)
         StartupCLOG();
         StartupGlobalCSNLog(oldestActiveXID);
         StartupSUBTRANS(oldestActiveXID);
+        GlobalSnapshotStartup(oldestActiveXID);
     }
 
     /*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2af468fc6a..3a04f6a824 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/global_snapshot.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
         size = add_size(size, WalSndShmemSize());
         size = add_size(size, WalRcvShmemSize());
         size = add_size(size, ApplyLauncherShmemSize());
+        size = add_size(size, GlobalSnapshotShmemSize());
         size = add_size(size, SnapMgrShmemSize());
         size = add_size(size, BTreeShmemSize());
         size = add_size(size, SyncScanShmemSize());
@@ -273,6 +275,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
     SyncScanShmemInit();
     AsyncShmemInit();
     BackendRandomShmemInit();
+    GlobalSnapshotShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 64ab249615..fb7f74d4cd 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -47,6 +47,7 @@
 
 #include "access/clog.h"
 #include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -91,6 +92,8 @@ typedef struct ProcArrayStruct
     TransactionId replication_slot_xmin;
     /* oldest catalog xmin of any replication slot */
     TransactionId replication_slot_catalog_xmin;
+    /* xmin of oldest active global snapshot */
+    TransactionId global_snapshot_xmin;
 
     /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
     int            pgprocnos[FLEXIBLE_ARRAY_MEMBER];
@@ -246,6 +249,7 @@ CreateSharedProcArray(void)
         procArray->lastOverflowedXid = InvalidTransactionId;
         procArray->replication_slot_xmin = InvalidTransactionId;
         procArray->replication_slot_catalog_xmin = InvalidTransactionId;
+        procArray->global_snapshot_xmin = InvalidTransactionId;
     }
 
     allProcs = ProcGlobal->allProcs;
@@ -352,6 +356,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
         if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
                                   latestXid))
             ShmemVariableCache->latestCompletedXid = latestXid;
+
+        /*
+         * Assign global csn while holding ProcArrayLock for non-global
+         * COMMIT PREPARED. After lock is released consequent
+         * GlobalSnapshotCommit() will write this value to GlobalCsnLog.
+         *
+         * In case of global commit proc->assignedGlobalCsn is already set
+         * by prior AssignGlobalCsn().
+         */
+        if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+            pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
     }
     else
     {
@@ -432,6 +447,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 
         proc->lxid = InvalidLocalTransactionId;
         pgxact->xmin = InvalidTransactionId;
+        proc->originalXmin = InvalidTransactionId;
+
         /* must be cleared with xid/xmin: */
         pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
         pgxact->delayChkpt = false; /* be sure this is cleared in abort */
@@ -454,6 +471,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
     pgxact->xid = InvalidTransactionId;
     proc->lxid = InvalidLocalTransactionId;
     pgxact->xmin = InvalidTransactionId;
+    proc->originalXmin = InvalidTransactionId;
+
     /* must be cleared with xid/xmin: */
     pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
     pgxact->delayChkpt = false; /* be sure this is cleared in abort */
@@ -467,6 +486,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
     if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
                               latestXid))
         ShmemVariableCache->latestCompletedXid = latestXid;
+
+    /*
+     * Assign global csn while holding ProcArrayLock for non-global
+     * COMMIT. After lock is released consequent GlobalSnapshotFinish() will
+     * write this value to GlobalCsnLog.
+     *
+     * In case of global commit MyProc->assignedGlobalCsn is already set
+     * by prior AssignGlobalCsn().
+     *
+     * TODO: in case of group commit we can generate one GlobalSnapshot for
+     * whole group to save time on timestamp aquisition.
+     */
+    if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+        pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
 }
 
 /*
@@ -616,6 +649,7 @@ ProcArrayClearTransaction(PGPROC *proc)
     pgxact->xid = InvalidTransactionId;
     proc->lxid = InvalidLocalTransactionId;
     pgxact->xmin = InvalidTransactionId;
+    proc->originalXmin = InvalidTransactionId;
     proc->recoveryConflictPending = false;
 
     /* redundant, but just in case */
@@ -1320,6 +1354,7 @@ GetOldestXmin(Relation rel, int flags)
 
     volatile TransactionId replication_slot_xmin = InvalidTransactionId;
     volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+    volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
 
     /*
      * If we're not computing a relation specific limit, or if a shared
@@ -1356,8 +1391,9 @@ GetOldestXmin(Relation rel, int flags)
             proc->databaseId == MyDatabaseId ||
             proc->databaseId == 0)    /* always include WalSender */
         {
-            /* Fetch xid just once - see GetNewTransactionId */
+            /* Fetch both xids just once - see GetNewTransactionId */
             TransactionId xid = pgxact->xid;
+            TransactionId original_xmin = proc->originalXmin;
 
             /* First consider the transaction's own Xid, if any */
             if (TransactionIdIsNormal(xid) &&
@@ -1370,8 +1406,17 @@ GetOldestXmin(Relation rel, int flags)
              * We must check both Xid and Xmin because a transaction might
              * have an Xmin but not (yet) an Xid; conversely, if it has an
              * Xid, that could determine some not-yet-set Xmin.
+             *
+             * In case of oldestXmin calculation for GlobalSnapshotMapXmin()
+             * pgxact->xmin should be changed to proc->originalXmin. Details
+             * in commets to GlobalSnapshotMapXmin.
              */
-            xid = pgxact->xmin; /* Fetch just once */
+            if ((flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+                    TransactionIdIsValid(original_xmin))
+                xid = original_xmin;
+            else
+                xid = pgxact->xmin; /* Fetch just once */
+
             if (TransactionIdIsNormal(xid) &&
                 TransactionIdPrecedes(xid, result))
                 result = xid;
@@ -1381,6 +1426,7 @@ GetOldestXmin(Relation rel, int flags)
     /* fetch into volatile var while ProcArrayLock is held */
     replication_slot_xmin = procArray->replication_slot_xmin;
     replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+    global_snapshot_xmin = procArray->global_snapshot_xmin;
 
     if (RecoveryInProgress())
     {
@@ -1422,6 +1468,11 @@ GetOldestXmin(Relation rel, int flags)
             result = FirstNormalTransactionId;
     }
 
+    if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+        TransactionIdIsValid(global_snapshot_xmin) &&
+        NormalTransactionIdPrecedes(global_snapshot_xmin, result))
+        result = global_snapshot_xmin;
+
     /*
      * Check whether there are replication slots requiring an older xmin.
      */
@@ -1517,6 +1568,8 @@ GetSnapshotData(Snapshot snapshot)
     bool        suboverflowed = false;
     volatile TransactionId replication_slot_xmin = InvalidTransactionId;
     volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+    volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
+    volatile GlobalCSN       global_csn = FrozenGlobalCSN;
 
     Assert(snapshot != NULL);
 
@@ -1705,10 +1758,18 @@ GetSnapshotData(Snapshot snapshot)
     /* fetch into volatile var while ProcArrayLock is held */
     replication_slot_xmin = procArray->replication_slot_xmin;
     replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+    global_snapshot_xmin = procArray->global_snapshot_xmin;
 
     if (!TransactionIdIsValid(MyPgXact->xmin))
         MyPgXact->xmin = TransactionXmin = xmin;
 
+    /*
+     * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays
+     * synchronized.
+     */
+    if (track_global_snapshots)
+        global_csn = GlobalSnapshotGenerate(false);
+
     LWLockRelease(ProcArrayLock);
 
     /*
@@ -1724,6 +1785,10 @@ GetSnapshotData(Snapshot snapshot)
     if (!TransactionIdIsNormal(RecentGlobalXmin))
         RecentGlobalXmin = FirstNormalTransactionId;
 
+    if (TransactionIdIsValid(global_snapshot_xmin) &&
+        TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
+        RecentGlobalXmin = global_snapshot_xmin;
+
     /* Check whether there's a replication slot requiring an older xmin. */
     if (TransactionIdIsValid(replication_slot_xmin) &&
         NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -1779,6 +1844,12 @@ GetSnapshotData(Snapshot snapshot)
         MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
     }
 
+    snapshot->imported_global_csn = false;
+    snapshot->global_csn = global_csn;
+    /* if (global_snapshot_defer_time > 0 && IsNormalProcessingMode()) */
+    if (global_snapshot_defer_time > 0 && IsUnderPostmaster)
+        GlobalSnapshotMapXmin(snapshot->global_csn);
+
     return snapshot;
 }
 
@@ -3007,6 +3078,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
     LWLockRelease(ProcArrayLock);
 }
 
+/*
+ * ProcArraySetGlobalSnapshotXmin
+ */
+void
+ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
+{
+    /* We rely on atomic fetch/store of xid */
+    procArray->global_snapshot_xmin = xmin;
+}
+
+/*
+ * ProcArrayGetGlobalSnapshotXmin
+ */
+TransactionId
+ProcArrayGetGlobalSnapshotXmin(void)
+{
+    return procArray->global_snapshot_xmin;
+}
 
 #define XidCacheRemove(i) \
     do { \
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 9615058f29..8ef731d560 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -51,3 +51,4 @@ BackendRandomLock                    43
 LogicalRepWorkerLock                44
 CLogTruncationLock                    45
 GlobalCSNLogControlLock                46
+GlobalSnapshotXidMapLock            47
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f30e082b2..ed497185be 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -37,6 +37,7 @@
 
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/global_snapshot.h"
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -417,6 +418,9 @@ InitProcess(void)
     MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
     pg_atomic_init_u32(&MyProc->clogGroupNext, INVALID_PGPROCNO);
 
+    MyProc->originalXmin = InvalidTransactionId;
+    pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN);
+
     /*
      * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
      * on it.  That allows us to repoint the process latch, which so far
@@ -559,6 +563,7 @@ InitAuxiliaryProcess(void)
     MyProc->lwWaitMode = 0;
     MyProc->waitLock = NULL;
     MyProc->waitProcLock = NULL;
+    MyProc->originalXmin = InvalidTransactionId;
 #ifdef USE_ASSERT_CHECKING
     {
         int            i;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2701528c55..434b672ee2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
 
 #include "access/commit_ts.h"
 #include "access/gin.h"
+#include "access/global_snapshot.h"
 #include "access/rmgr.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -1024,7 +1025,7 @@ static struct config_bool ConfigureNamesBool[] =
             gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
         },
         &track_global_snapshots,
-        true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+        false, /* XXX: Seems that RESOURCES_MEM isn't the best catagory */
         NULL, NULL, NULL
     },
     {
@@ -2349,6 +2350,16 @@ static struct config_int ConfigureNamesInt[] =
         NULL, NULL, NULL
     },
 
+    {
+        {"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER,
+            gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."),
+            NULL
+        },
+        &global_snapshot_defer_time,
+        5, 0, INT_MAX,
+        NULL, NULL, NULL
+    },
+
     /*
      * See also CheckRequiredParameterValues() if this parameter changes
      */
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c0d3fb8491..ed1237f5c0 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -254,6 +254,8 @@
                 # and comma-separated list of application_name
                 # from standby(s); '*' = all
 #vacuum_defer_cleanup_age = 0    # number of xacts by which cleanup is delayed
+#global_snapshot_defer_time = 0    # minimal age of records which allowed to be
+                # vacuumed, in seconds
 
 # - Standby Servers -
 
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index edf59efc29..d4cf0710fc 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -48,6 +48,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/global_snapshot.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
@@ -245,6 +246,8 @@ typedef struct SerializedSnapshotData
     CommandId    curcid;
     TimestampTz whenTaken;
     XLogRecPtr    lsn;
+    GlobalCSN    global_csn;
+    bool        imported_global_csn;
 } SerializedSnapshotData;
 
 Size
@@ -992,7 +995,9 @@ SnapshotResetXmin(void)
                                         pairingheap_first(&RegisteredSnapshots));
 
     if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
+    {
         MyPgXact->xmin = minSnapshot->xmin;
+    }
 }
 
 /*
@@ -2081,6 +2086,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
     serialized_snapshot.curcid = snapshot->curcid;
     serialized_snapshot.whenTaken = snapshot->whenTaken;
     serialized_snapshot.lsn = snapshot->lsn;
+    serialized_snapshot.global_csn = snapshot->global_csn;
+    serialized_snapshot.imported_global_csn = snapshot->imported_global_csn;
 
     /*
      * Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2155,6 +2162,8 @@ RestoreSnapshot(char *start_address)
     snapshot->curcid = serialized_snapshot.curcid;
     snapshot->whenTaken = serialized_snapshot.whenTaken;
     snapshot->lsn = serialized_snapshot.lsn;
+    snapshot->global_csn = serialized_snapshot.global_csn;
+    snapshot->imported_global_csn = serialized_snapshot.imported_global_csn;
 
     /* Copy XIDs, if present. */
     if (serialized_snapshot.xcnt > 0)
@@ -2192,3 +2201,97 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
 {
     SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
 }
+
+/*
+ * ExportGlobalSnapshot
+ *
+ * Export global_csn so that caller can expand this transaction to other
+ * nodes.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+GlobalCSN
+ExportGlobalSnapshot()
+{
+    if (!track_global_snapshots)
+        ereport(ERROR,
+            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+             errmsg("could not export global snapshot"),
+             errhint("Make sure the configuration parameter \"%s\" is enabled.",
+                     "track_global_snapshots")));
+
+    return CurrentSnapshot->global_csn;
+}
+
+/* SQL accessor to ExportGlobalSnapshot() */
+Datum
+pg_global_snapshot_export(PG_FUNCTION_ARGS)
+{
+    GlobalCSN    global_csn = ExportGlobalSnapshot();
+    PG_RETURN_UINT64(global_csn);
+}
+
+/*
+ * ImportGlobalSnapshot
+ *
+ * Import global_csn and retract this backends xmin to the value that was
+ * actual when we had such global_csn.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+void
+ImportGlobalSnapshot(GlobalCSN snap_global_csn)
+{
+    volatile TransactionId xmin;
+
+    if (!track_global_snapshots)
+        ereport(ERROR,
+            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+             errmsg("could not import global snapshot"),
+             errhint("Make sure the configuration parameter \"%s\" is enabled.",
+                     "track_global_snapshots")));
+
+    if (global_snapshot_defer_time <= 0)
+        ereport(ERROR,
+            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+             errmsg("could not import global snapshot"),
+             errhint("Make sure the configuration parameter \"%s\" is positive.",
+                     "global_snapshot_defer_time")));
+
+    /*
+     * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that
+     * resulting xmin will be evicted from map before we will set it into our
+     * backend's xmin.
+     */
+    LWLockAcquire(ProcArrayLock, LW_SHARED);
+    xmin = GlobalSnapshotToXmin(snap_global_csn);
+    if (!TransactionIdIsValid(xmin))
+    {
+        LWLockRelease(ProcArrayLock);
+        elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old");
+    }
+    MyProc->originalXmin = MyPgXact->xmin;
+    MyPgXact->xmin = TransactionXmin = xmin;
+    LWLockRelease(ProcArrayLock);
+
+    CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */
+    CurrentSnapshot->global_csn = snap_global_csn;
+    CurrentSnapshot->imported_global_csn = true;
+    GlobalSnapshotSync(snap_global_csn);
+
+    Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin));
+    Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin));
+}
+
+/* SQL accessor to ImportGlobalSnapshot() */
+Datum
+pg_global_snapshot_import(PG_FUNCTION_ARGS)
+{
+    GlobalCSN    global_csn = PG_GETARG_UINT64(0);
+    ImportGlobalSnapshot(global_csn);
+    PG_RETURN_VOID();
+}
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index f7c4c9188c..f2fbc77fa8 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -63,6 +63,7 @@
 
 #include "postgres.h"
 
+#include "access/global_snapshot.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
@@ -1462,8 +1463,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
 }
 
 /*
- * XidInMVCCSnapshot
- *        Is the given XID still-in-progress according to the snapshot?
+ * XidInLocalMVCCSnapshot
+ *        Is the given XID still-in-progress according to the local snapshot?
  *
  * Note: GetSnapshotData never stores either top xid or subxids of our own
  * backend into a snapshot, so these xids will not be reported as "running"
@@ -1471,8 +1472,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
  * TransactionIdIsCurrentTransactionId first, except when it's known the
  * XID could not be ours anyway.
  */
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
     uint32        i;
 
@@ -1584,6 +1585,62 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 }
 
 /*
+ * XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot, taking into account fact that
+ * snapshot can be global. When track_global_snapshots is switched off
+ * just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+    bool in_snapshot;
+
+    if (snapshot->imported_global_csn)
+    {
+        Assert(track_global_snapshots);
+        /* No point to using snapshot info except CSN */
+        return XidInvisibleInGlobalSnapshot(xid, snapshot);
+    }
+
+    in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+    if (!track_global_snapshots)
+    {
+        Assert(GlobalCSNIsFrozen(snapshot->global_csn));
+        return in_snapshot;
+    }
+
+    if (in_snapshot)
+    {
+        /*
+         * This xid may be already in unknown state and in that case
+         * we must wait and recheck.
+         *
+         * TODO: this check can be skipped if we know for sure that there were
+         * no global transactions when this snapshot was taken. That requires
+         * some changes to mechanisms of global snapshots exprot/import (if
+         * backend set xmin then we should have a-priori knowledge that this
+         * transaction going to be global or local -- right now this is not
+         * enforced). Leave that for future and don't complicate this patch.
+         */
+        return XidInvisibleInGlobalSnapshot(xid, snapshot);
+    }
+    else
+    {
+#ifdef USE_ASSERT_CHECKING
+        /* Check that global snapshot gives the same results as local one */
+        if (XidInvisibleInGlobalSnapshot(xid, snapshot))
+        {
+            GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid);
+            Assert(GlobalCSNIsAborted(gcsn));
+        }
+#endif
+        return false;
+    }
+}
+
+/*
  * Is the tuple really only locked?  That is, is it not updated?
  *
  * It's easy to check just infomask bits if the locker is not a multi; but
diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h
new file mode 100644
index 0000000000..246b180cfd
--- /dev/null
+++ b/src/include/access/global_snapshot.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.h
+ *      Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef GLOBAL_SNAPSHOT_H
+#define GLOBAL_SNAPSHOT_H
+
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of GlobalCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 GlobalCSN_atomic;
+
+#define InProgressGlobalCSN     UINT64CONST(0x0)
+#define AbortedGlobalCSN     UINT64CONST(0x1)
+#define FrozenGlobalCSN         UINT64CONST(0x2)
+#define InDoubtGlobalCSN     UINT64CONST(0x3)
+#define FirstNormalGlobalCSN UINT64CONST(0x4)
+
+#define GlobalCSNIsInProgress(csn)    ((csn) == InProgressGlobalCSN)
+#define GlobalCSNIsAborted(csn)        ((csn) == AbortedGlobalCSN)
+#define GlobalCSNIsFrozen(csn)        ((csn) == FrozenGlobalCSN)
+#define GlobalCSNIsInDoubt(csn)        ((csn) == InDoubtGlobalCSN)
+#define GlobalCSNIsNormal(csn)        ((csn) >= FirstNormalGlobalCSN)
+
+
+extern int global_snapshot_defer_time;
+
+
+extern Size GlobalSnapshotShmemSize(void);
+extern void GlobalSnapshotShmemInit(void);
+extern void GlobalSnapshotStartup(TransactionId oldestActiveXID);
+
+extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn);
+extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn);
+
+extern GlobalCSN GlobalSnapshotGenerate(bool locked);
+
+extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern void GlobalSnapshotSync(GlobalCSN remote_gcsn);
+
+extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid);
+
+extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid);
+extern void GlobalSnapshotAssignCsnGlobal(const char *gid,
+                                          GlobalCSN global_csn);
+
+extern GlobalCSN GlobalSnapshotPrepareCurrent(void);
+extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn);
+
+extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+                                TransactionId *subxids);
+extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+                                    TransactionId *subxids);
+extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+                                    TransactionId *subxids);
+
+#endif                            /* GLOBAL_SNAPSHOT_H */
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 0e932daa48..f8b774f393 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "utils/snapshot.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a14651010f..e0e20c2e6c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10206,4 +10206,18 @@
   proisstrict => 'f', prorettype => 'bool', proargtypes => 'oid int4 int4 any',
   proargmodes => '{i,i,i,v}', prosrc => 'satisfies_hash_partition' },
 
+# global transaction handling
+{ oid => '3430', descr => 'export global transaction snapshot',
+  proname => 'pg_global_snapshot_export', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snapshot_export' },
+{ oid => '3431', descr => 'import global transaction snapshot',
+  proname => 'pg_global_snapshot_import', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snapshot_import' },
+{ oid => '3432', descr => 'prepare distributed transaction for commit, get global_csn',
+  proname => 'pg_global_snapshot_prepare', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snapshot_prepare' },
+{ oid => '3433', descr => 'assign global_csn to distributed transaction',
+  proname => 'pg_global_snapshot_assign', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snapshot_assign' },
+
 ]
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index f5b6026ef5..75ec93b46b 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
 #define USECS_PER_MINUTE INT64CONST(60000000)
 #define USECS_PER_SEC    INT64CONST(1000000)
 
+#define NSECS_PER_SEC    INT64CONST(1000000000)
+#define NSECS_PER_USEC    INT64CONST(1000)
+
 /*
  * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
  * Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index 101f513ba6..3026e71f83 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -250,6 +250,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
 #define PG_GETARG_FLOAT4(n)  DatumGetFloat4(PG_GETARG_DATUM(n))
 #define PG_GETARG_FLOAT8(n)  DatumGetFloat8(PG_GETARG_DATUM(n))
 #define PG_GETARG_INT64(n)     DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n)     DatumGetUInt64(PG_GETARG_DATUM(n))
 /* use this if you want the raw, possibly-toasted input datum: */
 #define PG_GETARG_RAW_VARLENA_P(n)    ((struct varlena *) PG_GETARG_POINTER(n))
 /* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index f968444671..ebc3836c9c 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -138,6 +138,9 @@ typedef struct timespec instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
     (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+    (((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
 #else                            /* !HAVE_CLOCK_GETTIME */
 
 /* Use gettimeofday() */
@@ -202,6 +205,10 @@ typedef struct timeval instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
     (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+    (((uint64) (t).tv_sec * (uint64) 1000000000) + \
+        (uint64) (t).tv_usec * (uint64) 1000)
+
 #endif                            /* HAVE_CLOCK_GETTIME */
 
 #else                            /* WIN32 */
@@ -234,6 +241,9 @@ typedef LARGE_INTEGER instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
     ((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+    ((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
 static inline double
 GetTimerFrequency(void)
 {
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 5c19a61dcf..7e24d539cc 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,8 +15,10 @@
 #define _PROC_H_
 
 #include "access/clog.h"
+#include "access/global_snapshot.h"
 #include "access/xlogdefs.h"
 #include "lib/ilist.h"
+#include "utils/snapshot.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -57,6 +59,7 @@ struct XidCache
 #define        PROC_IN_LOGICAL_DECODING    0x10    /* currently doing logical
                                                  * decoding outside xact */
 #define        PROC_RESERVED                0x20    /* reserved for procarray */
+#define        PROC_RESERVED2                0x40    /* reserved for procarray */
 
 /* flags reset at EOXact */
 #define        PROC_VACUUM_STATE_MASK \
@@ -200,6 +203,18 @@ struct PGPROC
     PGPROC       *lockGroupLeader;    /* lock group leader, if I'm a member */
     dlist_head    lockGroupMembers;    /* list of members, if I'm a leader */
     dlist_node    lockGroupLink;    /* my member link, if I'm a member */
+
+    /*
+     * assignedGlobalCsn holds GlobalCSN for this transaction.  It is generated
+     * under a ProcArray lock and later is writter to a GlobalCSNLog.  This
+     * variable defined as atomic only for case of group commit, in all other
+     * scenarios only backend responsible for this proc entry is working with
+     * this variable.
+     */
+    GlobalCSN_atomic assignedGlobalCsn;
+
+    /* Original xmin of this backend before global snapshot was imported */
+    TransactionId originalXmin;
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 75bab2985f..e68a87575e 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -36,6 +36,10 @@
 
 #define        PROCARRAY_SLOTS_XMIN            0x20    /* replication slot xmin,
                                                      * catalog_xmin */
+
+#define        PROCARRAY_NON_IMPORTED_XMIN        0x40    /* use originalXmin instead
+                                                     * of xmin to properly
+                                                     * maintain gsXidMap */
 /*
  * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
  * PGXACT->vacuumFlags. Other flags are used for different purposes and
@@ -124,4 +128,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
                                 TransactionId *catalog_xmin);
 
+extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
+
+extern TransactionId ProcArrayGetGlobalSnapshotXmin(void);
+
 #endif                            /* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 83806f3040..1a066fd8d8 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -87,6 +87,9 @@ extern void AtSubCommit_Snapshot(int level);
 extern void AtSubAbort_Snapshot(int level);
 extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
 
+extern GlobalCSN ExportGlobalSnapshot(void);
+extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn);
+
 extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 318d41e6f7..1563407824 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -115,6 +115,14 @@ typedef struct SnapshotData
 
     TimestampTz whenTaken;        /* timestamp when snapshot was taken */
     XLogRecPtr    lsn;            /* position in the WAL stream when taken */
+
+    /*
+     * GlobalCSN for cross-node snapshot isolation support.
+     * Will be used only if track_global_snapshots is enabled.
+     */
+    GlobalCSN    global_csn;
+    /* Did we have our own global_csn or imported one from different node */
+    bool        imported_global_csn;
 } SnapshotData;
 
 /*
-- 
2.11.0

From 8881170d9a7e01cb1ace337e3392ad7a74725211 Mon Sep 17 00:00:00 2001
From: Stas Kelvich <stanconn@gmail.com>
Date: Wed, 25 Apr 2018 16:39:09 +0300
Subject: [PATCH 3/3] postgres_fdw support for global snapshots

---
 contrib/postgres_fdw/Makefile                  |   9 +
 contrib/postgres_fdw/connection.c              | 292 ++++++++++++++++++++++---
 contrib/postgres_fdw/postgres_fdw.c            |  12 +
 contrib/postgres_fdw/postgres_fdw.h            |   2 +
 contrib/postgres_fdw/t/001_bank_coordinator.pl | 264 ++++++++++++++++++++++
 contrib/postgres_fdw/t/002_bank_participant.pl | 240 ++++++++++++++++++++
 src/test/perl/PostgresNode.pm                  |  31 +++
 7 files changed, 823 insertions(+), 27 deletions(-)
 create mode 100644 contrib/postgres_fdw/t/001_bank_coordinator.pl
 create mode 100644 contrib/postgres_fdw/t/002_bank_participant.pl

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index 85394b4f1f..02ae067cd0 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -23,3 +23,12 @@ top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 include $(top_srcdir)/contrib/contrib-global.mk
 endif
+
+# Global makefile will do temp-install for 'check'. Since REGRESS is defined,
+# PGXS (included from contrib-global.mk or directly) will care to add
+# postgres_fdw to it as EXTRA_INSTALL and build pg_regress. It will also
+# actually run pg_regress, so the only thing left is tap tests.
+check: tapcheck
+
+tapcheck: temp-install
+    $(prove_check)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index fe4893a8e0..3759eaa8e1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -14,9 +14,11 @@
 
 #include "postgres_fdw.h"
 
+#include "access/global_snapshot.h"
 #include "access/htup_details.h"
 #include "catalog/pg_user_mapping.h"
 #include "access/xact.h"
+#include "access/xlog.h" /* GetSystemIdentifier() */
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -24,6 +26,8 @@
 #include "utils/hsearch.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
+#include "utils/snapshot.h"
 #include "utils/syscache.h"
 
 
@@ -65,6 +69,21 @@ typedef struct ConnCacheEntry
  */
 static HTAB *ConnectionHash = NULL;
 
+/*
+ * FdwTransactionState
+ *
+ * Holds number of open remote transactions and shared state
+ * needed for all connection entries.
+ */
+typedef struct FdwTransactionState
+{
+    char        *gid;
+    int            nparticipants;
+    GlobalCSN    global_csn;
+    bool        two_phase_commit;
+} FdwTransactionState;
+static FdwTransactionState *fdwTransState;
+
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -72,6 +91,9 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/* counter of prepared tx made by this backend */
+static int two_phase_xact_count = 0;
+
 /* prototypes of private functions */
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void disconnect_pg_server(ConnCacheEntry *entry);
@@ -80,6 +102,7 @@ static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void deallocate_prepared_stmts(ConnCacheEntry *entry);
 static void pgfdw_subxact_callback(SubXactEvent event,
                        SubTransactionId mySubid,
                        SubTransactionId parentSubid,
@@ -136,6 +159,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
                                       pgfdw_inval_callback, (Datum) 0);
     }
 
+    /* allocate FdwTransactionState */
+    if (fdwTransState == NULL)
+    {
+        MemoryContext oldcxt;
+        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+        fdwTransState = palloc0(sizeof(FdwTransactionState));
+        MemoryContextSwitchTo(oldcxt);
+    }
+
     /* Set flag that we did GetConnection during the current transaction */
     xact_got_connection = true;
 
@@ -388,7 +420,8 @@ configure_remote_session(PGconn *conn)
 }
 
 /*
- * Convenience subroutine to issue a non-data-returning SQL command to remote
+ * Convenience subroutine to issue a non-data-returning SQL command or
+ * statement to remote node.
  */
 static void
 do_sql_command(PGconn *conn, const char *sql)
@@ -398,7 +431,8 @@ do_sql_command(PGconn *conn, const char *sql)
     if (!PQsendQuery(conn, sql))
         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     res = pgfdw_get_result(conn, sql);
-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+            PQresultStatus(res) != PGRES_TUPLES_OK)
         pgfdw_report_error(ERROR, res, conn, true, sql);
     PQclear(res);
 }
@@ -426,6 +460,10 @@ begin_remote_xact(ConnCacheEntry *entry)
         elog(DEBUG3, "starting remote transaction on connection %p",
              entry->conn);
 
+        if (UseGlobalSnapshots && (!IsolationUsesXactSnapshot() ||
+                                   IsolationIsSerializable()))
+            elog(ERROR, "Global snapshots support only REPEATABLE READ");
+
         if (IsolationIsSerializable())
             sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
         else
@@ -434,6 +472,23 @@ begin_remote_xact(ConnCacheEntry *entry)
         do_sql_command(entry->conn, sql);
         entry->xact_depth = 1;
         entry->changing_xact_state = false;
+
+        if (UseGlobalSnapshots)
+        {
+            char import_sql[128];
+
+            /* Export our snapshot */
+            if (fdwTransState->global_csn == 0)
+                fdwTransState->global_csn = ExportGlobalSnapshot();
+
+            snprintf(import_sql, sizeof(import_sql),
+                "SELECT pg_global_snapshot_import("UINT64_FORMAT")",
+                fdwTransState->global_csn);
+
+            do_sql_command(entry->conn, import_sql);
+        }
+
+        fdwTransState->nparticipants += 1;
     }
 
     /*
@@ -643,6 +698,94 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
         PQclear(res);
 }
 
+/* Callback typedef for BroadcastStmt */
+typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg);
+
+/* Broadcast sql in parallel to all ConnectionHash entries */
+static bool
+BroadcastStmt(char const * sql, unsigned expectedStatus,
+                BroadcastCmdResHandler handler, void *arg)
+{
+    HASH_SEQ_STATUS scan;
+    ConnCacheEntry *entry;
+    bool        allOk = true;
+
+    /* Broadcast sql */
+    hash_seq_init(&scan, ConnectionHash);
+    while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+    {
+        pgfdw_reject_incomplete_xact_state_change(entry);
+
+        if (entry->xact_depth > 0 && entry->conn != NULL)
+        {
+            if (!PQsendQuery(entry->conn, sql))
+            {
+                PGresult   *res = PQgetResult(entry->conn);
+
+                elog(WARNING, "Failed to send command %s", sql);
+                pgfdw_report_error(WARNING, res, entry->conn, true, sql);
+                PQclear(res);
+            }
+        }
+    }
+
+    /* Collect responses */
+    hash_seq_init(&scan, ConnectionHash);
+    while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+    {
+        if (entry->xact_depth > 0 && entry->conn != NULL)
+        {
+            PGresult   *result = PQgetResult(entry->conn);
+
+            if (PQresultStatus(result) != expectedStatus ||
+                (handler && !handler(result, arg)))
+            {
+                elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result),
expectedStatus);
+                pgfdw_report_error(ERROR, result, entry->conn, true, sql);
+                allOk = false;
+            }
+            PQclear(result);
+            PQgetResult(entry->conn);    /* consume NULL result */
+        }
+    }
+
+    return allOk;
+}
+
+/* Wrapper for broadcasting commands */
+static bool
+BroadcastCmd(char const *sql)
+{
+    return BroadcastStmt(sql, PGRES_COMMAND_OK, NULL, NULL);
+}
+
+/* Wrapper for broadcasting statements */
+static bool
+BroadcastFunc(char const *sql)
+{
+    return BroadcastStmt(sql, PGRES_TUPLES_OK, NULL, NULL);
+}
+
+/* Callback for selecting maximal csn */
+static bool
+MaxCsnCB(PGresult *result, void *arg)
+{
+    char           *resp;
+    GlobalCSN       *max_csn = (GlobalCSN *) arg;
+    GlobalCSN        csn = 0;
+
+    resp = PQgetvalue(result, 0, 0);
+
+    if (resp == NULL || (*resp) == '\0' ||
+            sscanf(resp, UINT64_FORMAT, &csn) != 1)
+        return false;
+
+    if (*max_csn < csn)
+        *max_csn = csn;
+
+    return true;
+}
+
 /*
  * pgfdw_xact_callback --- cleanup at main-transaction end.
  */
@@ -656,6 +799,86 @@ pgfdw_xact_callback(XactEvent event, void *arg)
     if (!xact_got_connection)
         return;
 
+    /* Handle possible two-phase commit */
+    if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
+    {
+        bool include_local_tx = false;
+
+        /* Should we take into account this node? */
+        if (TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+        {
+            include_local_tx = true;
+            fdwTransState->nparticipants += 1;
+        }
+
+        /* Switch to 2PC mode there were more than one participant */
+        if (UseGlobalSnapshots && fdwTransState->nparticipants > 1)
+            fdwTransState->two_phase_commit = true;
+
+        if (fdwTransState->two_phase_commit)
+        {
+            GlobalCSN    max_csn = InProgressGlobalCSN,
+                        my_csn = InProgressGlobalCSN;
+            bool    res;
+            char   *sql;
+
+            fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
+                                          (long long) GetCurrentTimestamp(),
+                                          (long long) GetSystemIdentifier(),
+                                          MyProcPid,
+                                          GetCurrentTransactionIdIfAny(),
+                                          ++two_phase_xact_count,
+                                          fdwTransState->nparticipants);
+
+            /* Broadcast PREPARE */
+            sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
+            res = BroadcastCmd(sql);
+            if (!res)
+                goto error;
+
+            /* Broadcast pg_global_snapshot_prepare() */
+            if (include_local_tx)
+                my_csn = GlobalSnapshotPrepareCurrent();
+
+            sql = psprintf("SELECT pg_global_snapshot_prepare('%s')",
+                                                        fdwTransState->gid);
+            res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn);
+            if (!res)
+                goto error;
+
+            /* select maximal global csn */
+            if (include_local_tx && my_csn > max_csn)
+                max_csn = my_csn;
+
+            /* Broadcast pg_global_snapshot_assign() */
+            if (include_local_tx)
+                GlobalSnapshotAssignCsnCurrent(max_csn);
+            sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
+                            fdwTransState->gid, max_csn);
+            res = BroadcastFunc(sql);
+
+error:
+            if (!res)
+            {
+                sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid);
+                BroadcastCmd(sql);
+                elog(ERROR, "Failed to PREPARE transaction on remote node");
+            }
+
+            /*
+             * Do not fall down. Consequent COMMIT event will clean thing up.
+             */
+            return;
+        }
+    }
+
+    /* COMMIT open transaction of we were doing 2PC */
+    if (fdwTransState->two_phase_commit &&
+        (event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT))
+    {
+        BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid));
+    }
+
     /*
      * Scan all connection cache entries to find open remote transactions, and
      * close them.
@@ -663,8 +886,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
     hash_seq_init(&scan, ConnectionHash);
     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
     {
-        PGresult   *res;
-
         /* Ignore cache entry if no open connection right now */
         if (entry->conn == NULL)
             continue;
@@ -681,6 +902,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
             {
                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
                 case XACT_EVENT_PRE_COMMIT:
+                    Assert(!fdwTransState->two_phase_commit);
 
                     /*
                      * If abort cleanup previously failed for this connection,
@@ -693,28 +915,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                     do_sql_command(entry->conn, "COMMIT TRANSACTION");
                     entry->changing_xact_state = false;
 
-                    /*
-                     * If there were any errors in subtransactions, and we
-                     * made prepared statements, do a DEALLOCATE ALL to make
-                     * sure we get rid of all prepared statements. This is
-                     * annoying and not terribly bulletproof, but it's
-                     * probably not worth trying harder.
-                     *
-                     * DEALLOCATE ALL only exists in 8.3 and later, so this
-                     * constrains how old a server postgres_fdw can
-                     * communicate with.  We intentionally ignore errors in
-                     * the DEALLOCATE, so that we can hobble along to some
-                     * extent with older servers (leaking prepared statements
-                     * as we go; but we don't really support update operations
-                     * pre-8.3 anyway).
-                     */
-                    if (entry->have_prep_stmt && entry->have_error)
-                    {
-                        res = PQexec(entry->conn, "DEALLOCATE ALL");
-                        PQclear(res);
-                    }
-                    entry->have_prep_stmt = false;
-                    entry->have_error = false;
+                    deallocate_prepared_stmts(entry);
                     break;
                 case XACT_EVENT_PRE_PREPARE:
 
@@ -729,10 +930,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                      */
                     ereport(ERROR,
                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                             errmsg("cannot prepare a transaction that modified remote tables")));
+                                errmsg("cannot prepare a transaction that modified remote tables")));
                     break;
                 case XACT_EVENT_PARALLEL_COMMIT:
                 case XACT_EVENT_COMMIT:
+                    if (fdwTransState->two_phase_commit)
+                        deallocate_prepared_stmts(entry);
+                    else /* Pre-commit should have closed the open transaction */
+                        elog(ERROR, "missed cleaning up connection during pre-commit");
+                    break;
                 case XACT_EVENT_PREPARE:
                     /* Pre-commit should have closed the open transaction */
                     elog(ERROR, "missed cleaning up connection during pre-commit");
@@ -828,6 +1034,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
     /* Also reset cursor numbering for next transaction */
     cursor_number = 0;
+
+    /* Reset fdwTransState */
+    memset(fdwTransState, '\0', sizeof(FdwTransactionState));
+}
+
+/*
+ * If there were any errors in subtransactions, and we
+ * made prepared statements, do a DEALLOCATE ALL to make
+ * sure we get rid of all prepared statements. This is
+ * annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this
+ * constrains how old a server postgres_fdw can
+ * communicate with.  We intentionally ignore errors in
+ * the DEALLOCATE, so that we can hobble along to some
+ * extent with older servers (leaking prepared statements
+ * as we go; but we don't really support update operations
+ * pre-8.3 anyway).
+ */
+static void
+deallocate_prepared_stmts(ConnCacheEntry *entry)
+{
+    PGresult   *res;
+
+    if (entry->have_prep_stmt && entry->have_error)
+    {
+        res = PQexec(entry->conn, "DEALLOCATE ALL");
+        PQclear(res);
+    }
+    entry->have_prep_stmt = false;
+    entry->have_error = false;
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 5699252091..64279c8664 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -268,6 +268,9 @@ typedef struct
     List       *already_used;    /* expressions already dealt with */
 } ec_member_foreign_arg;
 
+bool        UseGlobalSnapshots;
+void        _PG_init(void);
+
 /*
  * SQL functions
  */
@@ -5806,3 +5809,12 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
     /* We didn't find any suitable equivalence class expression */
     return NULL;
 }
+
+void
+_PG_init(void)
+{
+    DefineCustomBoolVariable("postgres_fdw.use_global_snapshots",
+                             "Use global snapshots for FDW transactions", NULL,
+                             &UseGlobalSnapshots, false, PGC_USERSET, 0, NULL,
+                             NULL, NULL);
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 70b538e2f9..8cf5b12798 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -186,4 +186,6 @@ extern const char *get_jointype_name(JoinType jointype);
 extern bool is_builtin(Oid objectId);
 extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
 
+extern bool UseGlobalSnapshots;
+
 #endif                            /* POSTGRES_FDW_H */
diff --git a/contrib/postgres_fdw/t/001_bank_coordinator.pl b/contrib/postgres_fdw/t/001_bank_coordinator.pl
new file mode 100644
index 0000000000..1e31f33349
--- /dev/null
+++ b/contrib/postgres_fdw/t/001_bank_coordinator.pl
@@ -0,0 +1,264 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $master = get_new_node("master");
+$master->init;
+$master->append_conf('postgresql.conf', qq(
+    max_prepared_transactions = 30
+    log_checkpoints = true
+    postgres_fdw.use_global_snapshots = on
+    track_global_snapshots = on
+    default_transaction_isolation = 'REPEATABLE READ'
+));
+$master->start;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+    max_prepared_transactions = 30
+    global_snapshot_defer_time = 15
+    track_global_snapshots = on
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+    max_prepared_transactions = 30
+    global_snapshot_defer_time = 15
+    track_global_snapshots = on
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+$master->safe_psql('postgres', qq[
+    CREATE EXTENSION postgres_fdw;
+    CREATE TABLE accounts(id integer primary key, amount integer);
+    CREATE TABLE global_transactions(tx_time timestamp);
+]);
+
+foreach my $node ($shard1, $shard2)
+{
+    my $port = $node->port;
+    my $host = $node->host;
+
+    $node->safe_psql('postgres',
+            "CREATE TABLE accounts(id integer primary key, amount integer)");
+
+    $master->safe_psql('postgres', qq[
+        CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port
'$port');
+        CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name
'accounts');
+        CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+    ])
+}
+
+$shard1->safe_psql('postgres', qq[
+    insert into accounts select 2*id-1, 0 from generate_series(1, 10010) as id;
+    CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+$shard2->safe_psql('postgres', qq[
+    insert into accounts select 2*id, 0 from generate_series(1, 10010) as id;
+    CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+diag("master: @{[$master->connstr('postgres')]}");
+diag("shard1: @{[$shard1->connstr('postgres')]}");
+diag("shard2: @{[$shard2->connstr('postgres')]}");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+    \set id random(1, 20000)
+    BEGIN;
+    WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+        INSERT into global_transactions SELECT now() FROM upd;
+    UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+    COMMIT;
+});
+
+my $bank1 = File::Temp->new();
+append_to_file($bank1, q{
+    \set id random(1, 10000)
+    BEGIN;
+    WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = (2*:id + 1) RETURNING *)
+        INSERT into local_transactions SELECT now() FROM upd;
+    UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 3);
+    COMMIT;
+});
+
+my $bank2 = File::Temp->new();
+append_to_file($bank2, q{
+    \set id random(1, 10000)
+
+    BEGIN;
+    WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = 2*:id RETURNING *)
+        INSERT into local_transactions SELECT now() FROM upd;
+    UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 2);
+    COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+    my ($node, $table) = @_;
+    my $count;
+
+    $count = $node->safe_psql('postgres',"select count(*) from $table");
+    $node->safe_psql('postgres',"delete from $table");
+    diag($node->name, ": completed $count transactions");
+    return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+
+
+my $pgb_handle;
+
+$pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+while (time() - $started < $seconds)
+{
+    $total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+    if ( ($total ne $oldtotal) and ($total ne '') )
+    {
+        $isolation_errors++;
+        $oldtotal = $total;
+        diag("Isolation error. Total = $total");
+    }
+    if ($total ne '') { $selects++; }
+}
+
+$master->pgbench_await($pgb_handle);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+    count_and_delete_rows($master, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# Concurrent global and local transactions
+###############################################################################
+
+my ($pgb_handle1, $pgb_handle2, $pgb_handle3);
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$started = time();
+$selects = 0;
+$oldtotal = 0;
+while (time() - $started < $seconds)
+{
+    $total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+    if ( ($total ne $oldtotal) and ($total ne '') )
+    {
+        $isolation_errors++;
+        $oldtotal = $total;
+        diag("Isolation error. Total = $total");
+    }
+    if ($total ne '') { $selects++; }
+}
+
+diag("selects = $selects");
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+diag("completed $selects selects");
+die "" unless ( $selects > 0 &&
+    count_and_delete_rows($master, 'global_transactions') > 0 &&
+    count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+    count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global and local transactions');
+
+
+###############################################################################
+# Snapshot stability
+###############################################################################
+
+my ($hashes, $hash1, $hash2);
+my $stability_errors = 0;
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$selects = 0;
+$started = time();
+while (time() - $started < $seconds)
+{
+    foreach my $node ($master, $shard1, $shard2)
+    {
+        ($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[
+            begin isolation level repeatable read;
+            select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+            select pg_sleep(3);
+            select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+            commit;
+        ]);
+
+        if ($hash1 ne $hash2)
+        {
+            diag("oops");
+            $stability_errors++;
+        }
+        elsif ($hash1 eq '' or $hash2 eq '')
+        {
+            die;
+        }
+        else
+        {
+            $selects++;
+        }
+    }
+}
+
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+die "" unless ( $selects > 0 &&
+    count_and_delete_rows($master, 'global_transactions') > 0 &&
+    count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+    count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($stability_errors, 0, 'snapshot is stable during concurrent global and local transactions');
+
+$master->stop;
+$shard1->stop;
+$shard2->stop;
diff --git a/contrib/postgres_fdw/t/002_bank_participant.pl b/contrib/postgres_fdw/t/002_bank_participant.pl
new file mode 100644
index 0000000000..bf80d21d7a
--- /dev/null
+++ b/contrib/postgres_fdw/t/002_bank_participant.pl
@@ -0,0 +1,240 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+    max_prepared_transactions = 30
+    postgres_fdw.use_global_snapshots = on
+    global_snapshot_defer_time = 15
+    track_global_snapshots = on
+    default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+    max_prepared_transactions = 30
+    postgres_fdw.use_global_snapshots = on
+    global_snapshot_defer_time = 15
+    track_global_snapshots = on
+    default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+my @shards = ($shard1, $shard2);
+
+foreach my $node (@shards)
+{
+    $node->safe_psql('postgres', qq[
+        CREATE EXTENSION postgres_fdw;
+        CREATE TABLE accounts(id integer primary key, amount integer);
+        CREATE TABLE accounts_local() inherits(accounts);
+        CREATE TABLE global_transactions(tx_time timestamp);
+        CREATE TABLE local_transactions(tx_time timestamp);
+    ]);
+
+    foreach my $neighbor (@shards)
+    {
+        next if ($neighbor eq $node);
+
+        my $port = $neighbor->port;
+        my $host = $neighbor->host;
+
+        $node->safe_psql('postgres', qq[
+            CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw
+                    options(dbname 'postgres', host '$host', port '$port');
+            CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts)
+                    server shard_$port options(table_name 'accounts_local');
+            CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+        ]);
+    }
+}
+
+$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;");
+$shard2->psql('postgres', "insert into accounts_local select 2*id,   0 from generate_series(1, 10010) as id;");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+    \set id random(1, 20000)
+    BEGIN;
+    WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+        INSERT into global_transactions SELECT now() FROM upd;
+    UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+    COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+    my ($node, $table) = @_;
+    my $count;
+
+    $count = $node->safe_psql('postgres',"select count(*) from $table");
+    $node->safe_psql('postgres',"delete from $table");
+    diag($node->name, ": completed $count transactions");
+    return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+my $i;
+
+
+my ($pgb_handle1, $pgb_handle2);
+
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+while (time() - $started < $seconds)
+{
+    my $shard = $shard1;
+    foreach my $shard (@shards)
+    {
+        $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+        if ( ($total ne $oldtotal) and ($total ne '') )
+        {
+            $isolation_errors++;
+            $oldtotal = $total;
+            diag("$i: Isolation error. Total = $total");
+        }
+        if ($total ne '') { $selects++; }
+    }
+    $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+    count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+    count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# And do the same after soft restart
+###############################################################################
+
+$shard1->restart;
+$shard2->restart;
+$shard1->poll_query_until('postgres', "select 't'")
+    or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+    or die "Timed out waiting for shard2 to became online";
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+    my $shard = $shard1;
+    foreach my $shard (@shards)
+    {
+        $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+        if ( ($total ne $oldtotal) and ($total ne '') )
+        {
+            $isolation_errors++;
+            $oldtotal = $total;
+            diag("$i: Isolation error. Total = $total");
+        }
+        if ($total ne '') { $selects++; }
+    }
+    $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+    count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+    count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after restart');
+
+###############################################################################
+# And do the same after hard restart
+###############################################################################
+
+$shard1->teardown_node;
+$shard2->teardown_node;
+$shard1->start;
+$shard2->start;
+$shard1->poll_query_until('postgres', "select 't'")
+    or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+    or die "Timed out waiting for shard2 to became online";
+
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+    my $shard = $shard1;
+    foreach my $shard (@shards)
+    {
+        $total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+        if ( ($total ne $oldtotal) and ($total ne '') )
+        {
+            $isolation_errors++;
+            $oldtotal = $total;
+            diag("$i: Isolation error. Total = $total");
+        }
+        if ($total ne '') { $selects++; }
+    }
+    $i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+    count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+    count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after hard restart');
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 79fb457075..d72a28f661 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1796,6 +1796,37 @@ sub pg_recvlogical_upto
     }
 }
 
+sub pgbench()
+{
+    my ($self, $node, @args) = @_;
+    my $pgbench_handle = $self->pgbench_async($node, @args);
+    $self->pgbench_await($pgbench_handle);
+}
+
+sub pgbench_async()
+{
+    my ($self, @args) = @_;
+
+    my ($in, $out, $err, $rc);
+    $in = '';
+    $out = '';
+
+    my @pgbench_command = (
+        'pgbench',
+        -h => $self->host,
+        -p => $self->port,
+        @args
+    );
+    my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
+    return $handle;
+}
+
+sub pgbench_await()
+{
+    my ($self, $pgbench_handle) = @_;
+    IPC::Run::finish($pgbench_handle) || BAIL_OUT("pgbench exited with $?");
+}
+
 =pod
 
 =back
-- 
2.11.0


pgsql-hackers by date:

Previous
From: Sergei Kornilov
Date:
Subject: Re: Online enabling of checksums
Next
From: Andrey Klychkov
Date:
Subject: Fwd: Re[2]: Alter index rename concurrently to