[PATCH 10/16] Introduce the concept that wal has a 'origin' node - Mailing list pgsql-hackers

From Andres Freund
Subject [PATCH 10/16] Introduce the concept that wal has a 'origin' node
Date
Msg-id 1339586927-13156-10-git-send-email-andres@2ndquadrant.com
Whole thread Raw
In response to [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
Responses Re: [PATCH 10/16] Introduce the concept that wal has a 'origin' node
Re: [PATCH 10/16] Introduce the concept that wal has a 'origin' node
List pgsql-hackers
From: Andres Freund <andres@anarazel.de>

One solution to avoid loops when doing wal based logical replication in
topologies which are more complex than one unidirectional transport is
introducing the concept of a 'origin_id' into the wal stream. Luckily there is
some padding in the XLogRecord struct that allows us to add that field without
further bloating the struct.
This solution was chosen because it allows for just about any topology and is
inobstrusive.

This adds a new configuration parameter multimaster_node_id which determines
the id used for wal originating in one cluster.

When applying changes from wal from another cluster code can set the variable
current_replication_origin_id. This is a global variable because passing it
through everything which can generate wal would be far to intrusive.
---src/backend/access/transam/xact.c             |   48 +++++++++++++++++++------src/backend/access/transam/xlog.c
      |    3 +-src/backend/access/transam/xlogreader.c       |    2 ++src/backend/replication/logical/Makefile      |
2 +-src/backend/replication/logical/logical.c     |   19 ++++++++++src/backend/utils/misc/guc.c                  |   19
++++++++++src/backend/utils/misc/postgresql.conf.sample|    3 ++src/include/access/xlog.h                     |    4
+--src/include/access/xlogdefs.h                |    2 ++src/include/replication/logical.h             |   22
++++++++++++10files changed, 110 insertions(+), 14 deletions(-)create mode 100644
src/backend/replication/logical/logical.ccreatemode 100644 src/include/replication/logical.h
 

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3cc2bfa..dc30a17 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,8 +36,9 @@#include "libpq/be-fsstubs.h"#include "miscadmin.h"#include "pgstat.h"
-#include "replication/walsender.h"
+#include "replication/logical.h"#include "replication/syncrep.h"
+#include "replication/walsender.h"#include "storage/lmgr.h"#include "storage/predicate.h"#include
"storage/procarray.h"
@@ -4545,12 +4546,13 @@ xactGetCommittedChildren(TransactionId **ptr) * actions for which the order of execution is
critical.*/static void
 
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
-                          TransactionId *sub_xids, int nsubxacts,
-                          SharedInvalidationMessage *inval_msgs, int nmsgs,
-                          RelFileNode *xnodes, int nrels,
-                          Oid dbId, Oid tsId,
-                          uint32 xinfo)
+xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
+                          XLogRecPtr lsn, XLogRecPtr origin_lsn,
+                          TransactionId *sub_xids, int nsubxacts,
+                          SharedInvalidationMessage *inval_msgs, int nmsgs,
+                          RelFileNode *xnodes, int nrels,
+                          Oid dbId, Oid tsId,
+                          uint32 xinfo){    TransactionId max_xid;    int            i;
@@ -4659,8 +4661,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, * Utility function to call
xact_redo_commit_internalafter breaking down xlrec */static void
 
+<<<<<<< HEADxact_redo_commit(xl_xact_commit *xlrec,                 TransactionId xid, XLogRecPtr lsn)
+=======
+xact_redo_commit(xl_xact_commit *xlrec, RepNodeId originating_node,
+                            TransactionId xid, XLogRecPtr lsn)
+>>>>>>> Introduce the concept that wal has a 'origin' node{    TransactionId *subxacts;    SharedInvalidationMessage
*inval_msgs;
@@ -4670,18 +4677,26 @@ xact_redo_commit(xl_xact_commit *xlrec,    /* invalidation messages array follows subxids */
inval_msgs= (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
+<<<<<<< HEAD    xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
inval_msgs,xlrec->nmsgs,                              xlrec->xnodes, xlrec->nrels,
xlrec->dbId,                             xlrec->tsId,                              xlrec->xinfo);
 
+=======
+    xact_redo_commit_internal(xid, originating_node, lsn, xlrec->origin_lsn,
+                              subxacts, xlrec->nsubxacts, inval_msgs,
+                              xlrec->nmsgs, xlrec->xnodes, xlrec->nrels,
+                              xlrec->dbId, xlrec->tsId, xlrec->xinfo);
+>>>>>>> Introduce the concept that wal has a 'origin' node}/* * Utility function to call xact_redo_commit_internal
forcompact form of message. */static void
 
+<<<<<<< HEADxact_redo_commit_compact(xl_xact_commit_compact *xlrec,                         TransactionId xid,
XLogRecPtrlsn){
 
@@ -4691,6 +4706,18 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,                              InvalidOid,
      /* dbId */                              InvalidOid,        /* tsId */                              0);        /*
xinfo*/
 
+=======
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
+                            TransactionId xid, XLogRecPtr lsn)
+{
+    xact_redo_commit_internal(xid, originating_node, lsn, zeroRecPtr, xlrec->subxacts,
+                                xlrec->nsubxacts,
+                                NULL, 0,        /* inval msgs */
+                                NULL, 0,        /* relfilenodes */
+                                InvalidOid,        /* dbId */
+                                InvalidOid,        /* tsId */
+                                0);                /* xinfo */
+>>>>>>> Introduce the concept that wal has a 'origin' node}/*
@@ -4786,17 +4813,18 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)    /* Backup blocks are not used in xact records
*/   Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
+    /* FIXME: we probably shouldn't pass xl_origin_id at multiple places, hm */    if (info ==
XLOG_XACT_COMMIT_COMPACT)   {        xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *)
XLogRecGetData(record);
-        xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+        xact_redo_commit_compact(xlrec, record->xl_origin_id, record->xl_xid, lsn);    }    else if (info ==
XLOG_XACT_COMMIT)   {        xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
 
-        xact_redo_commit(xlrec, record->xl_xid, lsn);
+        xact_redo_commit(xlrec, record->xl_origin_id, record->xl_xid, lsn);    }    else if (info == XLOG_XACT_ABORT)
 {
 
@@ -4814,7 +4842,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)    {        xl_xact_commit_prepared *xlrec =
(xl_xact_commit_prepared*) XLogRecGetData(record);
 
-        xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
+        xact_redo_commit(&xlrec->crec, record->xl_origin_id, xlrec->xid, lsn);        RemoveTwoPhaseFile(xlrec->xid,
false);   }    else if (info == XLOG_XACT_ABORT_PREPARED)
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c6feed0..504b4d0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@#include "postmaster/startup.h"#include "replication/walreceiver.h"#include "replication/walsender.h"
+#include "replication/logical.h"#include "storage/bufmgr.h"#include "storage/fd.h"#include "storage/ipc.h"
@@ -1032,7 +1033,7 @@ begin:;    record->xl_len = len;        /* doesn't include backup blocks */    record->xl_info =
info;   record->xl_rmid = rmid;
 
-
+    record->xl_origin_id = current_replication_origin_id;    /* Now we can finish computing the record's CRC */
COMP_CRC32(rdata_crc,(char *) record + sizeof(pg_crc32),               SizeOfXLogRecord - sizeof(pg_crc32));
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 6f15d66..bacd31e 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -24,6 +24,7 @@#include "access/xlogreader.h"/* FIXME */
+#include "replication/logical.h" /* InvalidMultimasterNodeId */#include "replication/walsender_private.h"#include
"replication/walprotocol.h"
@@ -563,6 +564,7 @@ XLogReaderRead(XLogReaderState* state)                spacer.xl_len = temp_record->xl_tot_len -
SizeOfXLogRecord;               spacer.xl_rmid = RM_XLOG_ID;                spacer.xl_info = XLOG_NOOP;
 
+                spacer.xl_origin_id = InvalidMultimasterNodeId;                state->writeout_data(state,
                       (char*)&spacer,
 
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 7dd9663..c2d6d82 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.globaloverride CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = applycache.o decode.o
+OBJS = applycache.o decode.o logical.oinclude $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
new file mode 100644
index 0000000..4f34488
--- /dev/null
+++ b/src/backend/replication/logical/logical.c
@@ -0,0 +1,19 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *      src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+#include "replication/logical.h"
+int guc_replication_origin_id = InvalidMultimasterNodeId;
+RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
+XLogRecPtr current_replication_origin_lsn = {0, 0};
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 93c798b..46b0657 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -60,6 +60,7 @@#include "replication/syncrep.h"#include "replication/walreceiver.h"#include
"replication/walsender.h"
+#include "replication/logical.h"#include "storage/bufmgr.h"#include "storage/standby.h"#include "storage/fd.h"
@@ -198,6 +199,7 @@ static const char *show_tcp_keepalives_interval(void);static const char
*show_tcp_keepalives_count(void);staticbool check_maxconnections(int *newval, void **extra, GucSource source);static
voidassign_maxconnections(int newval, void *extra);
 
+static void assign_replication_node_id(int newval, void *extra);static bool check_maxworkers(int *newval, void
**extra,GucSource source);static void assign_maxworkers(int newval, void *extra);static bool
check_autovacuum_max_workers(int*newval, void **extra, GucSource source);
 
@@ -1598,6 +1600,16 @@ static struct config_int ConfigureNamesInt[] =    },    {
+        {"multimaster_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+            gettext_noop("node id for multimaster."),
+            NULL
+        },
+        &guc_replication_origin_id,
+        InvalidMultimasterNodeId, InvalidMultimasterNodeId, MaxMultimasterNodeId,
+        NULL, assign_replication_node_id, NULL
+    },
+
+    {        {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,            gettext_noop("Sets the maximum number
ofconcurrent connections."),            NULL
 
@@ -8629,6 +8641,13 @@ assign_maxconnections(int newval, void *extra)    MaxBackends = newval + MaxWorkers +
autovacuum_max_workers+ 1;}
 
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+    guc_replication_origin_id = newval;
+    current_replication_origin_id = newval;
+}
+static boolcheck_maxworkers(int *newval, void **extra, GucSource source){
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ce3fc08..12f8a3f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -241,6 +241,9 @@#hot_standby_feedback = off        # send info from standby to prevent                    # query
conflicts
+# - Multi Master Servers -
+
+#multimaster_node_id = 0 #invalid node
id#------------------------------------------------------------------------------#QUERY TUNING
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2843aca..dd89cff 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -47,8 +47,8 @@ typedef struct XLogRecord    uint32        xl_len;            /* total len of rmgr data */    uint8
    xl_info;        /* flag bits, see below */    RmgrId        xl_rmid;        /* resource manager for this record */
 
-
-    /* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
+    RepNodeId   xl_origin_id;   /* what node did originally cause this record to be written */
+    /* Depending on MAXALIGN, there are either 0 or 4 wasted bytes here */    /* ACTUAL LOG DATA FOLLOWS AT END OF
STRUCT*/
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 2768427..6b6700a 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -84,6 +84,8 @@ extern XLogRecPtr zeroRecPtr; */typedef uint32 TimeLineID;
+typedef uint16 RepNodeId;
+/* *    Because O_DIRECT bypasses the kernel buffers, and because we never *    read those buffers except during crash
recoveryor if wal_level != minimal,
 
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
new file mode 100644
index 0000000..0698b61
--- /dev/null
+++ b/src/include/replication/logical.h
@@ -0,0 +1,22 @@
+/*
+ * logical.h
+ *
+ * PostgreSQL logical replication support
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/logical.h
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "access/xlogdefs.h"
+
+extern int guc_replication_origin_id;
+extern RepNodeId current_replication_origin_id;
+extern XLogRecPtr current_replication_origin_lsn;
+
+#define InvalidMultimasterNodeId 0
+#define MaxMultimasterNodeId (2<<3)
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes
Next
From: Andres Freund
Date:
Subject: [PATCH 06/16] Add support for a generic wal reading facility dubbed XLogReader