[PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical |
Date | |
Msg-id | 1339586927-13156-7-git-send-email-andres@2ndquadrant.com Whole thread Raw |
In response to | [RFC][PATCH] Logical Replication/BDR prototype and architecture (Andres Freund <andres@2ndquadrant.com>) |
Responses |
Re: [PATCH 07/16] Log enough data into the wal to
reconstruct logical changes from it if wal_level=logical
Re: [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical |
List | pgsql-hackers |
From: Andres Freund <andres@anarazel.de> This adds a new wal_level value 'logical' Missing cases: - heap_multi_insert - primary key changes for updates - no primary key - LOG_NEWPAGE ---src/backend/access/heap/heapam.c | 135 ++++++++++++++++++++++++++++---src/backend/access/transam/xlog.c | 1 +src/backend/catalog/index.c | 74 +++++++++++++++++src/bin/pg_controldata/pg_controldata.c | 2 +src/include/access/xlog.h | 3 +-src/include/catalog/index.h | 4 +6 files changed, 207 insertions(+),12 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 9519e73..9149d53 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -52,6 +52,7 @@#include "access/xact.h"#include "access/xlogutils.h"#include "catalog/catalog.h" +#include "catalog/index.h"#include "catalog/namespace.h"#include "miscadmin.h"#include "pgstat.h" @@ -1937,10 +1938,19 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; - XLogRecData rdata[3]; + XLogRecData rdata[4]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; + /* + * For the logical replication case we need the tuple even if were + * doing a full page write. We could alternatively store a pointer into + * the fpw though. + * For that to work we add another rdata entry for the buffer in that + * case. + */ + bool need_tuple = wal_level == WAL_LEVEL_LOGICAL; + xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node; xlrec.target.tid= heaptup->t_self; @@ -1960,18 +1970,32 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ rdata[1].data = (char*) &xlhdr; rdata[1].len = SizeOfHeapHeader; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next =&(rdata[2]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *) heaptup->t_data+ offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData,t_bits); - rdata[2].buffer = buffer; + rdata[2].buffer = need_tuple ? InvalidBuffer : buffer; rdata[2].buffer_std = true; rdata[2].next =NULL; /* + * add record for the buffer without actual content thats removed if + * fpw is done for that buffer + */ + if(need_tuple){ + rdata[2].next = &(rdata[3]); + + rdata[3].data = NULL; + rdata[3].len = 0; + rdata[3].buffer = buffer; + rdata[3].buffer_std = true; + rdata[3].next = NULL; + } + + /* * If this is the single and first tuple on page, we can reinit the * page instead of restoringthe whole thing. Set flag, and hide * buffer references from XLogInsert. @@ -1980,7 +2004,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, PageGetMaxOffsetNumber(page)== FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; - rdata[1].buffer = rdata[2].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID,info, rdata); @@ -2568,7 +2592,9 @@ l1: { xl_heap_delete xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; + XLogRecData rdata[4]; + + bool need_tuple = wal_level == WAL_LEVEL_LOGICAL && relation->rd_id >= FirstNormalObjectId; xlrec.all_visible_cleared= all_visible_cleared; xlrec.target.node = relation->rd_node; @@ -2584,6 +2610,73 @@ l1: rdata[1].buffer_std = true; rdata[1].next = NULL; + /* + * XXX: We could decide not to log changes when the origin is not the + * local node, that should reduce redundant logging. + */ + if(need_tuple){ + xl_heap_header xlhdr; + + Oid indexoid = InvalidOid; + int16 pknratts; + int16 pkattnum[INDEX_MAX_KEYS]; + Oid pktypoid[INDEX_MAX_KEYS]; + Oid pkopclass[INDEX_MAX_KEYS]; + TupleDesc desc = RelationGetDescr(relation); + Relation index_rel; + TupleDesc indexdesc; + int natt; + + Datum idxvals[INDEX_MAX_KEYS]; + bool idxisnull[INDEX_MAX_KEYS]; + HeapTuple idxtuple; + + MemSet(pkattnum, 0, sizeof(pkattnum)); + MemSet(pktypoid, 0, sizeof(pktypoid)); + MemSet(pkopclass, 0, sizeof(pkopclass)); + MemSet(idxvals, 0, sizeof(idxvals)); + MemSet(idxisnull, 0, sizeof(idxisnull)); + relationFindPrimaryKey(relation, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass); + + if(!indexoid){ + elog(WARNING, "Could not find primary key for table with oid %u", + relation->rd_id); + goto no_index_found; + } + + index_rel = index_open(indexoid, AccessShareLock); + + indexdesc = RelationGetDescr(index_rel); + + for(natt = 0; natt < indexdesc->natts; natt++){ + idxvals[natt] = + fastgetattr(&tp, pkattnum[natt], desc, &idxisnull[natt]); + Assert(!idxisnull[natt]); + } + + idxtuple = heap_form_tuple(indexdesc, idxvals, idxisnull); + + xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2; + xlhdr.t_infomask = idxtuple->t_data->t_infomask; + xlhdr.t_hoff = idxtuple->t_data->t_hoff; + + rdata[1].next = &(rdata[2]); + rdata[2].data = (char*)&xlhdr; + rdata[2].len = SizeOfHeapHeader; + rdata[2].buffer = InvalidBuffer; + rdata[2].next = NULL; + + rdata[2].next = &(rdata[3]); + rdata[3].data = (char *) idxtuple->t_data + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].len = idxtuple->t_len - offsetof(HeapTupleHeaderData, t_bits); + rdata[3].buffer = InvalidBuffer; + rdata[3].next = NULL; + + heap_close(index_rel, NoLock); + no_index_found: + ; + } + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(page, recptr); @@ -4413,9 +4506,14 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, xl_heap_header xlhdr; uint8 info; XLogRecPtr recptr; - XLogRecData rdata[4]; + XLogRecData rdata[5]; Page page = BufferGetPage(newbuf); + /* + * Just as for XLOG_HEAP_INSERT we need to make sure the tuple + */ + bool need_tuple = wal_level == WAL_LEVEL_LOGICAL; + /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); @@ -4446,28 +4544,43 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, xlhdr.t_hoff = newtup->t_data->t_hoff; /* - * As with insert records, we need not store the rdata[2] segment if we - * decide to store the whole buffer instead. + * As with insert's logging , we need not store the the Datum containing + * tuples separately from the buffer if we do logical replication that + * is... */ rdata[2].data = (char *) &xlhdr; rdata[2].len = SizeOfHeapHeader; - rdata[2].buffer = newbuf; + rdata[2].buffer = need_tuple ? InvalidBuffer : newbuf; rdata[2].buffer_std = true; rdata[2].next = &(rdata[3]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData,t_bits); rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[3].buffer = newbuf; + rdata[3].buffer = need_tuple ? InvalidBuffer : newbuf; rdata[3].buffer_std = true; rdata[3].next = NULL; + /* + * separate storage for the buffer reference of the new page in the + * wal_level=logical case + */ + if(need_tuple){ + rdata[3].next = &(rdata[4]); + + rdata[4].data = NULL, + rdata[4].len = 0; + rdata[4].buffer = newbuf; + rdata[4].buffer_std = true; + rdata[4].next = NULL; + } + /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) ==FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; - rdata[2].buffer = rdata[3].buffer = InvalidBuffer; + rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info,rdata); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 166efb0..c6feed0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -105,6 +105,7 @@ const struct config_enum_entry wal_level_options[] = { {"minimal", WAL_LEVEL_MINIMAL, false}, {"archive",WAL_LEVEL_ARCHIVE, false}, {"hot_standby", WAL_LEVEL_HOT_STANDBY, false}, + {"logical", WAL_LEVEL_LOGICAL, false}, {NULL, 0, false}}; diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 9e8b1cc..4cddcac 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -49,6 +49,7 @@#include "nodes/nodeFuncs.h"#include "optimizer/clauses.h"#include "parser/parser.h" +#include "parser/parse_relation.h"#include "storage/bufmgr.h"#include "storage/lmgr.h"#include "storage/predicate.h" @@ -3311,3 +3312,76 @@ ResetReindexPending(void){ pendingReindexedIndexes = NIL;} + +/* + * relationFindPrimaryKey + * Find primary key for a relation if it exists. + * + * If no primary key is found *indexOid is set to InvalidOid + * + * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey. + * + * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to + * make this more efficient. + */ +void +relationFindPrimaryKey(Relation pkrel, Oid *indexOid, + int16 *nratts, int16 *attnums, Oid *atttypids, + Oid *opclasses){ + List *indexoidlist; + ListCell *indexoidscan; + HeapTuple indexTuple = NULL; + Datum indclassDatum; + bool isnull; + oidvector *indclass; + int i; + Form_pg_index indexStruct = NULL; + + *indexOid = InvalidOid; + + indexoidlist = RelationGetIndexList(pkrel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if(!HeapTupleIsValid(indexTuple)) + elog(ERROR, "cache lookup failed for index %u", indexoid); + + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); + if(indexStruct->indisprimary && indexStruct->indimmediate) + { + *indexOid = indexoid; + break; + } + ReleaseSysCache(indexTuple); + + } + list_free(indexoidlist); + + if (!OidIsValid(*indexOid)) + return; + + /* Must get indclass the hard way */ + indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + + *nratts = indexStruct->indnatts; + /* + * Now build the list of PK attributes from the indkey definition (we + * assume a primary key cannot have expressional elements) + */ + for (i = 0; i < indexStruct->indnatts; i++) + { + int pkattno = indexStruct->indkey.values[i]; + + attnums[i] = pkattno; + atttypids[i] = attnumTypeId(pkrel, pkattno); + opclasses[i] = indclass->values[i]; + } + + ReleaseSysCache(indexTuple); +} diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index c00183a..47715c9 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -82,6 +82,8 @@ wal_level_str(WalLevel wal_level) return "archive"; case WAL_LEVEL_HOT_STANDBY: return "hot_standby"; + case WAL_LEVEL_LOGICAL: + return "logical"; } return _("unrecognized wal_level");} diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index df5f232..2843aca 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -199,7 +199,8 @@ typedef enum WalLevel{ WAL_LEVEL_MINIMAL = 0, WAL_LEVEL_ARCHIVE, - WAL_LEVEL_HOT_STANDBY + WAL_LEVEL_HOT_STANDBY, + WAL_LEVEL_LOGICAL} WalLevel;extern int wal_level; diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 7c8198f..2ba0ac3 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -101,4 +101,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);extern bool ReindexIsProcessingIndex(Oid indexOid);externOid IndexGetRelation(Oid indexId, bool missing_ok); +extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid, + int16 *nratts, int16 *attnums, Oid *atttypids, + Oid *opclasses); +#endif /* INDEX_H */ -- 1.7.10.rc3.3.g19a6c.dirty
pgsql-hackers by date: