[PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical - Mailing list pgsql-hackers

From Andres Freund
Subject [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical
Date
Msg-id 1339586927-13156-7-git-send-email-andres@2ndquadrant.com
Whole thread Raw
In response to [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
Responses Re: [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical
Re: [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical
List pgsql-hackers
From: Andres Freund <andres@anarazel.de>

This adds a new wal_level value 'logical'

Missing cases:
- heap_multi_insert
- primary key changes for updates
- no primary key
- LOG_NEWPAGE
---src/backend/access/heap/heapam.c        |  135 ++++++++++++++++++++++++++++---src/backend/access/transam/xlog.c
|    1 +src/backend/catalog/index.c             |   74 +++++++++++++++++src/bin/pg_controldata/pg_controldata.c |    2
+src/include/access/xlog.h              |    3 +-src/include/catalog/index.h             |    4 +6 files changed, 207
insertions(+),12 deletions(-)
 

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9519e73..9149d53 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -52,6 +52,7 @@#include "access/xact.h"#include "access/xlogutils.h"#include "catalog/catalog.h"
+#include "catalog/index.h"#include "catalog/namespace.h"#include "miscadmin.h"#include "pgstat.h"
@@ -1937,10 +1938,19 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,        xl_heap_insert xlrec;
xl_heap_header xlhdr;        XLogRecPtr    recptr;
 
-        XLogRecData rdata[3];
+        XLogRecData rdata[4];        Page        page = BufferGetPage(buffer);        uint8        info =
XLOG_HEAP_INSERT;
+        /*
+         * For the logical replication case we need the tuple even if were
+         * doing a full page write. We could alternatively store a pointer into
+         * the fpw though.
+         * For that to work we add another rdata entry for the buffer in that
+         * case.
+         */
+        bool        need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+        xlrec.all_visible_cleared = all_visible_cleared;        xlrec.target.node = relation->rd_node;
xlrec.target.tid= heaptup->t_self;
 
@@ -1960,18 +1970,32 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,         */        rdata[1].data =
(char*) &xlhdr;        rdata[1].len = SizeOfHeapHeader;
 
-        rdata[1].buffer = buffer;
+        rdata[1].buffer = need_tuple ? InvalidBuffer : buffer;        rdata[1].buffer_std = true;        rdata[1].next
=&(rdata[2]);        /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */        rdata[2].data = (char *)
heaptup->t_data+ offsetof(HeapTupleHeaderData, t_bits);        rdata[2].len = heaptup->t_len -
offsetof(HeapTupleHeaderData,t_bits);
 
-        rdata[2].buffer = buffer;
+        rdata[2].buffer = need_tuple ? InvalidBuffer : buffer;        rdata[2].buffer_std = true;        rdata[2].next
=NULL;        /*
 
+         * add record for the buffer without actual content thats removed if
+         * fpw is done for that buffer
+         */
+        if(need_tuple){
+            rdata[2].next = &(rdata[3]);
+
+            rdata[3].data = NULL;
+            rdata[3].len = 0;
+            rdata[3].buffer = buffer;
+            rdata[3].buffer_std = true;
+            rdata[3].next = NULL;
+        }
+
+        /*         * If this is the single and first tuple on page, we can reinit the         * page instead of
restoringthe whole thing.  Set flag, and hide         * buffer references from XLogInsert.
 
@@ -1980,7 +2004,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
PageGetMaxOffsetNumber(page)== FirstOffsetNumber)        {            info |= XLOG_HEAP_INIT_PAGE;
 
-            rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+            rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;        }        recptr =
XLogInsert(RM_HEAP_ID,info, rdata);
 
@@ -2568,7 +2592,9 @@ l1:    {        xl_heap_delete xlrec;        XLogRecPtr    recptr;
-        XLogRecData rdata[2];
+        XLogRecData rdata[4];
+
+        bool need_tuple = wal_level == WAL_LEVEL_LOGICAL && relation->rd_id  >= FirstNormalObjectId;
xlrec.all_visible_cleared= all_visible_cleared;        xlrec.target.node = relation->rd_node;
 
@@ -2584,6 +2610,73 @@ l1:        rdata[1].buffer_std = true;        rdata[1].next = NULL;
+        /*
+         * XXX: We could decide not to log changes when the origin is not the
+         * local node, that should reduce redundant logging.
+         */
+        if(need_tuple){
+            xl_heap_header xlhdr;
+
+            Oid indexoid = InvalidOid;
+            int16 pknratts;
+            int16 pkattnum[INDEX_MAX_KEYS];
+            Oid pktypoid[INDEX_MAX_KEYS];
+            Oid pkopclass[INDEX_MAX_KEYS];
+            TupleDesc desc = RelationGetDescr(relation);
+            Relation index_rel;
+            TupleDesc indexdesc;
+            int natt;
+
+            Datum idxvals[INDEX_MAX_KEYS];
+            bool idxisnull[INDEX_MAX_KEYS];
+            HeapTuple idxtuple;
+
+            MemSet(pkattnum, 0, sizeof(pkattnum));
+            MemSet(pktypoid, 0, sizeof(pktypoid));
+            MemSet(pkopclass, 0, sizeof(pkopclass));
+            MemSet(idxvals, 0, sizeof(idxvals));
+            MemSet(idxisnull, 0, sizeof(idxisnull));
+            relationFindPrimaryKey(relation, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+            if(!indexoid){
+                elog(WARNING, "Could not find primary key for table with oid %u",
+                     relation->rd_id);
+                goto no_index_found;
+            }
+
+            index_rel = index_open(indexoid, AccessShareLock);
+
+            indexdesc = RelationGetDescr(index_rel);
+
+            for(natt = 0; natt < indexdesc->natts; natt++){
+                idxvals[natt] =
+                    fastgetattr(&tp, pkattnum[natt], desc, &idxisnull[natt]);
+                Assert(!idxisnull[natt]);
+            }
+
+            idxtuple = heap_form_tuple(indexdesc, idxvals, idxisnull);
+
+            xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2;
+            xlhdr.t_infomask = idxtuple->t_data->t_infomask;
+            xlhdr.t_hoff = idxtuple->t_data->t_hoff;
+
+            rdata[1].next = &(rdata[2]);
+            rdata[2].data = (char*)&xlhdr;
+            rdata[2].len = SizeOfHeapHeader;
+            rdata[2].buffer = InvalidBuffer;
+            rdata[2].next = NULL;
+
+            rdata[2].next = &(rdata[3]);
+            rdata[3].data = (char *) idxtuple->t_data + offsetof(HeapTupleHeaderData, t_bits);
+            rdata[3].len = idxtuple->t_len - offsetof(HeapTupleHeaderData, t_bits);
+            rdata[3].buffer = InvalidBuffer;
+            rdata[3].next = NULL;
+
+            heap_close(index_rel, NoLock);
+        no_index_found:
+            ;
+        }
+        recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);        PageSetLSN(page, recptr);
@@ -4413,9 +4506,14 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,    xl_heap_header xlhdr;
uint8       info;    XLogRecPtr    recptr;
 
-    XLogRecData rdata[4];
+    XLogRecData rdata[5];    Page        page = BufferGetPage(newbuf);
+    /*
+     * Just as for XLOG_HEAP_INSERT we need to make sure the tuple
+     */
+    bool        need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+    /* Caller should not call me on a non-WAL-logged relation */    Assert(RelationNeedsWAL(reln));
@@ -4446,28 +4544,43 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,    xlhdr.t_hoff =
newtup->t_data->t_hoff;   /*
 
-     * As with insert records, we need not store the rdata[2] segment if we
-     * decide to store the whole buffer instead.
+     * As with insert's logging , we need not store the the Datum containing
+     * tuples separately from the buffer if we do logical replication that
+     * is...     */    rdata[2].data = (char *) &xlhdr;    rdata[2].len = SizeOfHeapHeader;
-    rdata[2].buffer = newbuf;
+    rdata[2].buffer = need_tuple ? InvalidBuffer : newbuf;    rdata[2].buffer_std = true;    rdata[2].next =
&(rdata[3]);   /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */    rdata[3].data = (char *) newtup->t_data +
offsetof(HeapTupleHeaderData,t_bits);    rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
 
-    rdata[3].buffer = newbuf;
+    rdata[3].buffer = need_tuple ? InvalidBuffer : newbuf;    rdata[3].buffer_std = true;    rdata[3].next = NULL;
+    /*
+     * separate storage for the buffer reference of the new page in the
+     * wal_level=logical case
+    */
+    if(need_tuple){
+        rdata[3].next = &(rdata[4]);
+
+        rdata[4].data = NULL,
+        rdata[4].len = 0;
+        rdata[4].buffer = newbuf;
+        rdata[4].buffer_std = true;
+        rdata[4].next = NULL;
+    }
+    /* If new tuple is the single and first tuple on page... */    if (ItemPointerGetOffsetNumber(&(newtup->t_self))
==FirstOffsetNumber &&        PageGetMaxOffsetNumber(page) == FirstOffsetNumber)    {        info |=
XLOG_HEAP_INIT_PAGE;
-        rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+        rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = InvalidBuffer;    }    recptr = XLogInsert(RM_HEAP_ID,
info,rdata); 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 166efb0..c6feed0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ const struct config_enum_entry wal_level_options[] = {    {"minimal", WAL_LEVEL_MINIMAL, false},
{"archive",WAL_LEVEL_ARCHIVE, false},    {"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
 
+    {"logical", WAL_LEVEL_LOGICAL, false},    {NULL, 0, false}};
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9e8b1cc..4cddcac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -49,6 +49,7 @@#include "nodes/nodeFuncs.h"#include "optimizer/clauses.h"#include "parser/parser.h"
+#include "parser/parse_relation.h"#include "storage/bufmgr.h"#include "storage/lmgr.h"#include "storage/predicate.h"
@@ -3311,3 +3312,76 @@ ResetReindexPending(void){    pendingReindexedIndexes = NIL;}
+
+/*
+ * relationFindPrimaryKey
+ *        Find primary key for a relation if it exists.
+ *
+ * If no primary key is found *indexOid is set to InvalidOid
+ *
+ * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey.
+ *
+ * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to
+ * make this more efficient.
+ */
+void
+relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                       int16 *nratts, int16 *attnums, Oid *atttypids,
+                       Oid *opclasses){
+    List *indexoidlist;
+    ListCell *indexoidscan;
+    HeapTuple indexTuple = NULL;
+    Datum indclassDatum;
+    bool isnull;
+    oidvector  *indclass;
+    int i;
+    Form_pg_index indexStruct = NULL;
+
+    *indexOid = InvalidOid;
+
+    indexoidlist = RelationGetIndexList(pkrel);
+
+    foreach(indexoidscan, indexoidlist)
+    {
+        Oid indexoid = lfirst_oid(indexoidscan);
+
+        indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+        if(!HeapTupleIsValid(indexTuple))
+            elog(ERROR, "cache lookup failed for index %u", indexoid);
+
+        indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+        if(indexStruct->indisprimary && indexStruct->indimmediate)
+        {
+            *indexOid = indexoid;
+            break;
+        }
+        ReleaseSysCache(indexTuple);
+
+    }
+    list_free(indexoidlist);
+
+    if (!OidIsValid(*indexOid))
+        return;
+
+    /* Must get indclass the hard way */
+    indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+                                    Anum_pg_index_indclass, &isnull);
+    Assert(!isnull);
+    indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+    *nratts = indexStruct->indnatts;
+    /*
+     * Now build the list of PK attributes from the indkey definition (we
+     * assume a primary key cannot have expressional elements)
+     */
+    for (i = 0; i < indexStruct->indnatts; i++)
+    {
+        int            pkattno = indexStruct->indkey.values[i];
+
+        attnums[i] = pkattno;
+        atttypids[i] = attnumTypeId(pkrel, pkattno);
+        opclasses[i] = indclass->values[i];
+    }
+
+    ReleaseSysCache(indexTuple);
+}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index c00183a..47715c9 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -82,6 +82,8 @@ wal_level_str(WalLevel wal_level)            return "archive";        case WAL_LEVEL_HOT_STANDBY:
      return "hot_standby";
 
+        case WAL_LEVEL_LOGICAL:
+            return "logical";    }    return _("unrecognized wal_level");}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index df5f232..2843aca 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -199,7 +199,8 @@ typedef enum WalLevel{    WAL_LEVEL_MINIMAL = 0,    WAL_LEVEL_ARCHIVE,
-    WAL_LEVEL_HOT_STANDBY
+    WAL_LEVEL_HOT_STANDBY,
+    WAL_LEVEL_LOGICAL} WalLevel;extern int    wal_level;
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 7c8198f..2ba0ac3 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -101,4 +101,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);extern bool ReindexIsProcessingIndex(Oid
indexOid);externOid    IndexGetRelation(Oid indexId, bool missing_ok);
 
+extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                                   int16 *nratts, int16 *attnums, Oid *atttypids,
+                                   Oid *opclasses);
+#endif   /* INDEX_H */
-- 
1.7.10.rc3.3.g19a6c.dirty



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: [PATCH 05/16] Preliminary: Introduce Bgworker process
Next
From: Andres Freund
Date:
Subject: [PATCH 09/16] Decode wal (with wal_level=logical) into changes in an ApplyCache instance