[PATCH 12/16] Add state to keep track of logical replication - Mailing list pgsql-hackers

From Andres Freund
Subject [PATCH 12/16] Add state to keep track of logical replication
Date
Msg-id 1339586927-13156-12-git-send-email-andres@2ndquadrant.com
Whole thread Raw
In response to [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
List pgsql-hackers
From: Andres Freund <andres@anarazel.de>

In order to have restartable replication with minimal additional writes its
very useful to know up to which point we have replayed/received changes from a
foreign node.

One representation of that is the lsn of changes at the originating cluster.

We need to keep track of the point up to which we received data and up to where
we applied data.

For that we added a field 'origin_lsn' to commit records. This allows to keep
track of the apply position with crash recovery with minimal additional io. We
only added the field to non-compact commit records to reduce the overhead in
case logical replication is not used.

Checkpoints need to keep track of the apply/receive positions as well because
otherwise it would be hard to determine the lsn from where to restart
receive/apply after a shutdown/crash if no changes happened since the last
shutdown/crash.

While running the startup process, the walreceiver and a (future) apply process
will need a coherent picture those two states so add shared memory state to
keep track of it. Currently this is represented in the walreceivers shared
memory segment. This will likely need to change.

During crash recovery/physical replication the origin_lsn field of commit
records is used to update the shared memory, and thus the next checkpoint's,
notion of the apply state.

Missing:

- For correct crash recovery we need more state than the 'apply lsn' because transactions on the originating side can
overlap.At the lsn we just applied many other transaction can be in-progres. To correctly handle that we need to keep
trackof oldest start lsn of any transaction currently being reassembled (c.f. ApplyCache). Then we can start to
reassemblethe ApplyCache up from that point and throw away any transaction which comitted before the recorded/recovered
applylsn. It should be sufficient to store that knowledge in shared memory and checkpoint records.
 
---src/backend/access/transam/xact.c          |   22 ++++++++-src/backend/access/transam/xlog.c          |   73
++++++++++++++++++++++++++++src/backend/replication/walreceiverfuncs.c|    8 +++src/include/access/xact.h
  |    1 +src/include/catalog/pg_control.h           |   13 ++++-src/include/replication/walreceiver.h      |   13
+++++6files changed, 128 insertions(+), 2 deletions(-)
 

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index dc30a17..40ac965 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -39,11 +39,13 @@#include "replication/logical.h"#include "replication/syncrep.h"#include "replication/walsender.h"
+#include "replication/walreceiver.h"#include "storage/lmgr.h"#include "storage/predicate.h"#include
"storage/procarray.h"#include"storage/sinvaladt.h"#include "storage/smgr.h"
 
+#include "storage/spin.h"#include "utils/combocid.h"#include "utils/guc.h"#include "utils/inval.h"
@@ -1015,7 +1017,8 @@ RecordTransactionCommit(void)        /*         * Do we need the long commit record? If not, use
thecompact format.         */
 
-        if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+        if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
+            (wal_level == WAL_LEVEL_LOGICAL && current_replication_origin_id != guc_replication_origin_id))        {
        XLogRecData rdata[4];            int            lastrdata = 0;
 
@@ -1037,6 +1040,8 @@ RecordTransactionCommit(void)            xlrec.nrels = nrels;            xlrec.nsubxacts =
nchildren;           xlrec.nmsgs = nmsgs;
 
+            xlrec.origin_lsn = current_replication_origin_lsn;
+            rdata[0].data = (char *) (&xlrec);            rdata[0].len = MinSizeOfXactCommit;
rdata[0].buffer= InvalidBuffer;
 
@@ -4575,6 +4580,21 @@ xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
LWLockRelease(XidGenLock);   }
 
+    /*
+     * record where were at wrt to recovery. We need that to know from where on
+     * to restart applying logical change records
+     */
+    if(LogicalWalReceiverActive() && !XLByteEQ(origin_lsn, zeroRecPtr))
+    {
+        /*
+         * probably we don't need the locking because no lcr receiver can run
+         * yet.
+         */
+        SpinLockAcquire(&WalRcv->mutex);
+        WalRcv->mm_applyState[originating_node] = origin_lsn;
+        SpinLockRelease(&WalRcv->mutex);
+    }
+    if (standbyState == STANDBY_DISABLED)    {        /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0622726..20a4611 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5183,6 +5183,7 @@ BootStrapXLOG(void)    uint64        sysidentifier;    struct timeval tv;    pg_crc32    crc;
+    int i;    /*     * Select a hopefully-unique system identifier code for this installation.
@@ -5229,6 +5230,13 @@ BootStrapXLOG(void)    checkPoint.time = (pg_time_t) time(NULL);    checkPoint.oldestActiveXid =
InvalidTransactionId;
+    for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId;
+        i++){
+        checkPoint.logicalReceiveState[i] = zeroRecPtr;
+        checkPoint.logicalApplyState[i] = zeroRecPtr;
+    }
+
+    ShmemVariableCache->nextXid = checkPoint.nextXid;    ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount= 0;
 
@@ -6314,6 +6322,53 @@ StartupXLOG(void)        InRecovery = true;    }
+    /*
+     * setup shared memory state for logical wal receiver
+     *
+     * Do this unconditionally so enabling/disabling/enabling logical replay
+     * doesn't loose information due to rewriting pg_control
+     */
+    {
+        int i;
+
+        Assert(WalRcv);
+        /* locking is not really required here afaics, but ... */
+        SpinLockAcquire(&WalRcv->mutex);
+
+        for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+            i++)
+        {
+            XLogRecPtr* receiveState = &ControlFile->checkPointCopy.logicalReceiveState[i];
+            XLogRecPtr* applyState = &ControlFile->checkPointCopy.logicalApplyState[i];
+            if(i == guc_replication_origin_id && (
+                   !XLByteEQ(*receiveState, zeroRecPtr) ||
+                   !XLByteEQ(*applyState, zeroRecPtr))
+                )
+            {
+                elog(WARNING, "logical recovery state for own db. apply: %X/%X, receive %X/%X, origin %d",
+                     applyState->xlogid, applyState->xrecoff,
+                     receiveState->xlogid, receiveState->xrecoff,
+                     guc_replication_origin_id);
+                WalRcv->mm_receiveState[i] = zeroRecPtr;
+                WalRcv->mm_applyState[i] = zeroRecPtr;
+            }
+            else{
+                WalRcv->mm_receiveState[i] = *receiveState;
+                WalRcv->mm_applyState[i] = *applyState;
+            }
+        }
+        SpinLockRelease(&WalRcv->mutex);
+
+        /* FIXME: remove at some point */
+        for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+            i++){
+            elog(LOG, "restored apply state for node %d to %X/%X, receive %X/%X",
+                 i,
+                 WalRcv->mm_applyState[i].xlogid, WalRcv->mm_applyState[i].xrecoff,
+                 WalRcv->mm_receiveState[i].xlogid, WalRcv->mm_receiveState[i].xrecoff);
+        }
+    }
+    /* REDO */    if (InRecovery)    {
@@ -7906,6 +7961,24 @@ CreateCheckPoint(int flags)                             &checkPoint.nextMultiOffset);    /*
+     * fill out where are at wrt logical replay. Do this unconditionally so we
+     * don't loose information due to rewriting pg_control when toggling
+     * logical replay
+     */
+    {
+        int i;
+        SpinLockAcquire(&WalRcv->mutex);
+
+        for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+            i++){
+            checkPoint.logicalApplyState[i] = WalRcv->mm_applyState[i];
+            checkPoint.logicalReceiveState[i] = WalRcv->mm_receiveState[i];
+        }
+        SpinLockRelease(&WalRcv->mutex);
+        elog(LOG, "updated logical checkpoint data");
+    }
+
+    /*     * Having constructed the checkpoint record, ensure all shmem disk buffers     * and commit-log buffers are
flushedto disk.     *
 
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 876196f..cb49282 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,6 +64,14 @@ WalRcvShmemInit(void)        MemSet(WalRcv, 0, WalRcvShmemSize());        WalRcv->walRcvState =
WALRCV_STOPPED;       SpinLockInit(&WalRcv->mutex);
 
+
+        memset(&WalRcv->mm_receiveState,
+               0, sizeof(WalRcv->mm_receiveState));
+        memset(&WalRcv->mm_applyState,
+               0, sizeof(WalRcv->mm_applyState));
+
+        memset(&WalRcv->mm_receiveLatch,
+               0, sizeof(WalRcv->mm_receiveLatch));    }}
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b12d2a0..2757782 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -137,6 +137,7 @@ typedef struct xl_xact_commit    int            nmsgs;            /* number of shared inval msgs */
  Oid            dbId;            /* MyDatabaseId */    Oid            tsId;            /* MyDatabaseTableSpace */
 
+    XLogRecPtr    origin_lsn;     /* location of originating commit */    /* Array of RelFileNode(s) to drop at commit
*/   RelFileNode xnodes[1];        /* VARIABLE LENGTH ARRAY */    /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
 
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 5cff396..bc6316e 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -16,12 +16,13 @@#define PG_CONTROL_H#include "access/xlogdefs.h"
+#include "replication/logical.h"#include "pgtime.h"                /* for pg_time_t */#include "utils/pg_crc.h"/*
Versionidentifier for this pg_control format */
 
-#define PG_CONTROL_VERSION    922
+#define PG_CONTROL_VERSION    923/* * Body of CheckPoint XLOG records.  This is declared here because we keep
@@ -50,6 +51,13 @@ typedef struct CheckPoint     * it's set to InvalidTransactionId.     */    TransactionId
oldestActiveXid;
+
+    /*
+     * The replay state from every other node. This is only needed if wal_level
+     * >= logical and thus is only filled then.
+     */
+    XLogRecPtr logicalApplyState[MaxMultimasterNodeId - 1];
+    XLogRecPtr logicalReceiveState[MaxMultimasterNodeId - 1];} CheckPoint;/* XLOG info values for XLOG rmgr */
@@ -85,6 +93,9 @@ typedef enum DBState * NOTE: try to keep this under 512 bytes so that it will fit on one physical *
sectorof typical disk drives.  This reduces the odds of corruption due to * power failure midway through a write.
 
+ *
+ * FIXME: in order to allow many nodes in mm (which increases checkpoint size)
+ * we should change the writing of this to write(temp_file);fsync();rename();fsync(); */typedef struct
ControlFileData
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index d21ec94..c9ab1be 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -14,6 +14,8 @@#include "access/xlog.h"#include "access/xlogdefs.h"
+#include "replication/logical.h"
+#include "storage/latch.h"#include "storage/spin.h"#include "pgtime.h"
@@ -90,6 +92,17 @@ typedef struct    char        conninfo[MAXCONNINFO];    slock_t        mutex;            /* locks
sharedvariables shown above */
 
+
+    /*
+     * replay point up to which we replayed for every node
+     * XXX: should possibly be dynamically sized?
+     * FIXME: should go to its own shm segment?
+     */
+    XLogRecPtr  mm_receiveState[MaxMultimasterNodeId - 1];
+    XLogRecPtr  mm_applyState[MaxMultimasterNodeId - 1];
+
+    Latch*       mm_receiveLatch[MaxMultimasterNodeId - 1];
+} WalRcvData;extern WalRcvData *WalRcv;
-- 
1.7.10.rc3.3.g19a6c.dirty



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level
Next
From: Andres Freund
Date:
Subject: [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes