[PATCH 15/16] Activate/Implement the "apply process" which applies received updates from another node - Mailing list pgsql-hackers

From Andres Freund
Subject [PATCH 15/16] Activate/Implement the "apply process" which applies received updates from another node
Date
Msg-id 1339586927-13156-15-git-send-email-andres@2ndquadrant.com
Whole thread Raw
In response to [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
List pgsql-hackers
From: Andres Freund <andres@anarazel.de>

One apply process currently can only apply changes from one database in another
cluster (with a specific node_id).

Currently synchronous_commit=off is statically set in the apply process because
after a crash we can safely recover all changes which we didn't apply so there
is no point of incurring the overhead of synchronous commits. This might be
problematic in combination with synchronous replication.

Missing/Todo:
- The foreign node_id currently is hardcoded (2, 1 depending on the local id) as is the database (postgres). This
obviouslyneed to change.
 
- Proper mainloop with error handling, PROCESS_INTERRUPTS and everything
- Start multiple apply processes (per node_id per database)
- Possibly switch databases during runtime?
---src/backend/postmaster/bgworker.c         |   10 +-src/backend/replication/logical/logical.c |  194
+++++++++++++++++++++++++++++src/include/replication/logical.h        |    3 +3 files changed, 198 insertions(+), 9
deletions(-)

diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 8144050..bbb7e86 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -52,6 +52,7 @@#include "postmaster/bgworker.h"#include "postmaster/fork_process.h"#include
"postmaster/postmaster.h"
+#include "replication/logical.h"#include "storage/bufmgr.h"#include "storage/ipc.h"#include "storage/latch.h"
@@ -91,8 +92,6 @@ static void bgworker_sigterm_handler(SIGNAL_ARGS);NON_EXEC_STATIC void BgWorkerMain(int argc, char
*argv[]);
-static bool do_logicalapply(void);
-/******************************************************************** *                      BGWORKER CODE
********************************************************************/
@@ -394,10 +393,3 @@ NumBgWorkers(void)    return numWorkers;#endif}
-
-static bool
-do_logicalapply(void)
-{
-    elog(LOG, "doing logical apply");
-    return false;
-}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 4f34488..7fadafe 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -13,7 +13,201 @@ * */#include "postgres.h"
+
+#include "access/xlogreader.h"
+
+#include "replication/applycache.h"#include "replication/logical.h"
+#include "replication/apply.h"
+#include "replication/decode.h"
+#include "replication/walreceiver.h"
+/*FIXME: XLogRead*/
+#include "replication/walsender_private.h"
+
+#include "storage/ipc.h"
+#include "storage/proc.h"
+int guc_replication_origin_id = InvalidMultimasterNodeId;RepNodeId current_replication_origin_id =
InvalidMultimasterNodeId;XLogRecPtrcurrent_replication_origin_lsn = {0, 0};
 
+
+static XLogReaderState* xlogreader_state = 0;
+
+static bool
+replay_record_is_interesting(XLogReaderState* state, XLogRecord* r)
+{
+    /*
+     * we filtered in the sender, so no filtering necessary atm.
+     *
+     * If we want to introduce per-table filtering after a proxy or such this
+     * would be the place.
+     */
+    return true;
+}
+
+static void
+replay_writeout_data(XLogReaderState* state, char* data, Size len)
+{
+    /* no data needs to persists after this */
+    return;
+}
+
+static void
+replay_finished_record(XLogReaderState* state, XLogRecordBuffer* buf)
+{
+    ReaderApplyState* apply_state = state->private_data;
+    ApplyCache *cache = apply_state->apply_cache;
+
+    DecodeRecordIntoApplyCache(cache, buf);
+}
+
+static void
+replay_read_page(XLogReaderState* state, char* cur_page, XLogRecPtr startptr)
+{
+    XLogPageHeader page_header;
+
+    Assert((startptr.xrecoff % XLOG_BLCKSZ) == 0);
+
+    /* FIXME: more sensible/efficient implementation */
+    XLogRead(cur_page, receiving_from_node_id, startptr, XLOG_BLCKSZ);
+
+    page_header = (XLogPageHeader)cur_page;
+
+    if (page_header->xlp_magic != XLOG_PAGE_MAGIC)
+    {
+        elog(FATAL, "page header magic %x, should be %x at %X/%X", page_header->xlp_magic,
+             XLOG_PAGE_MAGIC, startptr.xlogid, startptr.xrecoff);
+    }
+}
+
+bool
+do_logicalapply(void)
+{
+    XLogRecPtr *from;
+    XLogRecPtr *to;
+    ReaderApplyState *apply_state;
+    int res;
+
+    static bool initialized = false;
+
+    if (!initialized)
+    {
+        /*
+         * FIXME: We need a sensible implementation for choosing this.
+         */
+        if (guc_replication_origin_id == 1)
+        {
+            receiving_from_node_id = 2;
+        }
+        else
+        {
+            receiving_from_node_id = 1;
+        }
+    }
+
+    ResetLatch(&MyProc->procLatch);
+
+    SpinLockAcquire(&WalRcv->mutex);
+    from = &WalRcv->mm_applyState[receiving_from_node_id];
+    to = &WalRcv->mm_receiveState[receiving_from_node_id];
+    SpinLockRelease(&WalRcv->mutex);
+
+    if (XLByteEQ(*to, zeroRecPtr)){
+        /* shmem state not ready, walreceivers didn't start up yet */
+        return false;
+    }
+
+    if (!initialized)
+    {
+        ApplyCache *apply_cache;
+
+        initialized = true;
+
+        elog(LOG, "at node %u were receiving from %u",
+             guc_replication_origin_id,
+             receiving_from_node_id);
+
+        /* FIXME: do we want to set that permanently? */
+        current_replication_origin_id = receiving_from_node_id;
+
+        /* we cannot loose anything due to this as we just restart replay */
+        SetConfigOption("synchronous_commit", "off",
+                        PGC_SUSET, PGC_S_OVERRIDE);
+
+        WalRcv->mm_receiveLatch[receiving_from_node_id] = &MyProc->procLatch;
+
+        /* initialize xlogreader */
+        xlogreader_state = XLogReaderAllocate();
+        XLogReaderReset(xlogreader_state);
+
+        xlogreader_state->is_record_interesting = replay_record_is_interesting;
+        xlogreader_state->finished_record = replay_finished_record;
+        xlogreader_state->writeout_data = replay_writeout_data;
+        xlogreader_state->read_page = replay_read_page;
+        xlogreader_state->private_data = malloc(sizeof(ReaderApplyState));
+        if (!xlogreader_state->private_data)
+            elog(ERROR, "Could not allocate the ReaderApplyState struct");
+
+        xlogreader_state->startptr = *from;
+        xlogreader_state->curptr = *from;
+
+        apply_state = (ReaderApplyState*)xlogreader_state->private_data;
+
+        /*
+         * allocate an ApplyCache that will apply data using lowlevel calls
+         * without type conversion et al. This requires binary compatibility
+         * between both systems.
+         * XXX: This would be the place too hook different apply methods, like
+         * producing sql and applying it.
+         */
+        apply_cache = ApplyCacheAllocate();
+        apply_cache->begin = apply_begin_txn;
+        apply_cache->apply_change = apply_change;
+        apply_cache->commit = apply_commit_txn;
+        apply_state->apply_cache = apply_cache;
+
+        apply_cache->private_data = malloc(sizeof(ApplyApplyCacheState));
+        if (!apply_cache->private_data)
+            elog(ERROR, "Could not allocate the DecodeApplyCacheState struct");
+
+        elog(WARNING, "initialized");
+
+    }
+
+    if(XLByteLT(*to, *from))
+    {
+        goto wait;
+    }
+
+    xlogreader_state->endptr = *to;
+
+    XLogReaderRead(xlogreader_state);
+
+    SpinLockAcquire(&WalRcv->mutex);
+    /*
+     * FIXME: This is not enough to recover properly after a crash because we
+     * loose in-progress transactions. For that we need two pointers: One to
+     * remember which is the lsn we committed last and which is the lsn with
+     * the oldest, in-progress, transaction. Then we can start reading at the
+     * latter and just throw away everything which commits before the former.
+     */
+    WalRcv->mm_applyState[receiving_from_node_id] = xlogreader_state->curptr;
+    SpinLockRelease(&WalRcv->mutex);
+
+wait:
+    /*
+     * if we either need data to complete reading or have finished everything
+     * up to this point
+     */
+    if (xlogreader_state->needs_input || !xlogreader_state->incomplete)
+    {
+        res = WaitLatch(&MyProc->procLatch,
+                        WL_LATCH_SET|WL_POSTMASTER_DEATH, 0);
+        if (res & WL_POSTMASTER_DEATH)
+        {
+            elog(WARNING, "got deathsig");
+            proc_exit(0);
+        }
+    }
+    return true;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index fc9e120..aa19ab9 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -25,4 +25,7 @@ extern XLogRecPtr current_replication_origin_lsn;#define MaxMultimasterNodeId (2<<3)#define LCRDIR
           "pg_lcr"
 
+
+bool do_logicalapply(void);
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: [PATCH 16/16] current version of the design document
Next
From: Andres Freund
Date:
Subject: [PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions