From 2a2ac4be202b9226a12934ef7097764d6d8cb638 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 26 Sep 2019 17:20:53 +0200 Subject: [PATCH 03/13] Issue individual invalidations with wal_level=logical When wal_level=logical, write individual invalidations into WAL so that decoding can use this information. We still add the invalidations to the cache, and write them to WAL at commit time in RecordTransactionCommit(). This uses the existing XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource manager (see LogStandbyInvalidations for details). So existing code relying on those invalidations (e.g. redo) does not need to be changed. The individual invalidations are written are written using a new xlog record type XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See LogLogicalInvalidations for details. These new xlog records are ignored by existing redo procedures, which still rely on the invalidations written to commit records. The invalidations are decoded and added as a new ReorderBufferChange type (REORDER_BUFFER_CHANGE_INVALIDATION), and then executed during replay, unlike the existing invalidations (which are either decoded as part of commit record, or executed immediately during decoding and not added to reorderbuffer at all). LogStandbyInvalidations was accumulating all the invalidations in memory, and then only wrote them once at commit time, which may reduce the performance impact by amortizing the overhead and deduplicating the invalidations. The new invalidations are written to WAL immediately, without any such caching. Perhaps it would be possible to add similar caching, e.g. at the command level, or something like that? --- src/backend/access/rmgrdesc/xactdesc.c | 52 ++++++++++++++++ src/backend/access/transam/xact.c | 7 +++ src/backend/replication/logical/decode.c | 17 ++++++ src/backend/replication/logical/reorderbuffer.c | 52 +++++++++++++++- src/backend/utils/cache/inval.c | 81 +++++++++++++++++++++++++ src/include/access/xact.h | 18 +++++- src/include/replication/reorderbuffer.h | 14 +++++ 7 files changed, 238 insertions(+), 3 deletions(-) diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 66fc8fb..9cff9f0 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -20,6 +20,11 @@ #include "storage/standbydefs.h" #include "utils/timestamp.h" +static void xact_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval); + /* * Parse the WAL format of an xact commit and abort records into an easier to * understand format. @@ -312,6 +317,14 @@ xact_desc(StringInfo buf, XLogReaderState *record) xact_desc_abort(buf, XLogRecGetInfo(record), xlrec); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + xl_xact_invalidations *xlrec = (xl_xact_invalidations *) rec; + + xact_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, + xlrec->dbId, xlrec->tsId, + xlrec->relcacheInitFileInval); + } } const char * @@ -336,7 +349,46 @@ xact_identify(uint8 info) case XLOG_XACT_ABORT_PREPARED: id = "ABORT_PREPARED"; break; + case XLOG_XACT_INVALIDATIONS: + id = "INVALIDATION"; + break; } return id; } + +static void +xact_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval) +{ + int i; + + if (relcacheInitFileInval) + appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", + dbId, tsId); + + appendStringInfoString(buf, "; inval msgs:"); + for (i = 0; i < nmsgs; i++) + { + SharedInvalidationMessage *msg = &msgs[i]; + + if (msg->id >= 0) + appendStringInfo(buf, " catcache %d", msg->id); + else if (msg->id == SHAREDINVALCATALOG_ID) + appendStringInfo(buf, " catalog %u", msg->cat.catId); + else if (msg->id == SHAREDINVALRELCACHE_ID) + appendStringInfo(buf, " relcache %u", msg->rc.relId); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALSMGR_ID) + appendStringInfoString(buf, " smgr"); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALRELMAP_ID) + appendStringInfo(buf, " relmap db %u", msg->rm.dbId); + else if (msg->id == SHAREDINVALSNAPSHOT_ID) + appendStringInfo(buf, " snapshot %u", msg->sn.relId); + else + appendStringInfo(buf, " unrecognized id %d", msg->id); + } +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 33141fb..dc3633c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5890,6 +5890,13 @@ xact_redo(XLogReaderState *record) XLogRecGetOrigin(record)); LWLockRelease(TwoPhaseStateLock); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + /* + * XXX we do ignore this for now, what matters are invalidations + * written into the commit record. + */ + } else elog(PANIC, "xact_redo: unknown op code %u", info); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index ff74c65..c100054 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -282,6 +282,23 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeAbort(ctx, buf, &parsed, xid); break; } + case XLOG_XACT_INVALIDATIONS: + { + TransactionId xid; + xl_xact_invalidations *invals; + + xid = XLogRecGetXid(r); + invals = (xl_xact_invalidations *) XLogRecGetData(r); + + /* XXX for now we're issuing invalidations one by one */ + Assert(invals->nmsgs == 1); + + ReorderBufferAddInvalidation(reorder, xid, buf->origptr, + invals->dbId, invals->tsId, + invals->relcacheInitFileInval, + invals->msgs[0]); + } + break; case XLOG_XACT_PREPARE: /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 6228140..08b4d4f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -460,6 +460,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: break; } @@ -1811,6 +1812,18 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + + /* + * Execute the invalidation message locally. + * + * XXX Do we need to care about relcacheInitFileInval and + * the other fields added to ReorderBufferChange, or just + * about the message itself? + */ + LocalExecuteInvalidationMessage(&change->data.inval.msg); + break; + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; @@ -2204,6 +2217,38 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, /* * Setup the invalidation of the toplevel transaction. + */ +void +ReorderBufferAddInvalidation(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, + Oid dbId, Oid tsId, bool relcacheInitFileInval, + SharedInvalidationMessage msg) +{ + MemoryContext oldcontext; + ReorderBufferChange *change; + + /* XXX Should we even write invalidations without valid XID? */ + if (xid == InvalidTransactionId) + return; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.dbId = dbId; + change->data.inval.tsId = tsId; + change->data.inval.relcacheInitFileInval = relcacheInitFileInval; + change->data.inval.msg = msg; + + ReorderBufferQueueChange(rb, xid, lsn, change); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Setup the invalidation of the toplevel transaction. * * This needs to be done before ReorderBufferCommit is called! */ @@ -2643,6 +2688,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: /* ReorderBufferChange contains everything important */ break; } @@ -2739,6 +2785,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: /* ReorderBufferChange contains everything important */ break; } @@ -3014,6 +3061,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: break; } @@ -3025,8 +3073,8 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, * although we don't check the memory limit when restoring the changes in * this branch (we only do that when initially queueing the changes after * decoding), because we will release the changes later, and that will - * update the accounting too (subtracting the size from the counters). - * And we don't want to underflow there. + * update the accounting too (subtracting the size from the counters). And + * we don't want to underflow there. */ ReorderBufferChangeMemoryUpdate(rb, change, true); } diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index f09e3a9..f921fdf 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -104,6 +104,7 @@ #include "catalog/pg_constraint.h" #include "miscadmin.h" #include "storage/sinval.h" +#include "storage/standby.h" #include "storage/smgr.h" #include "utils/catcache.h" #include "utils/inval.h" @@ -210,6 +211,9 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; +static void LogLogicalInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval); + /* ---------------------------------------------------------------- * Invalidation list support functions * @@ -489,6 +493,18 @@ RegisterCatcacheInvalidation(int cacheId, { AddCatcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, cacheId, hashValue, dbId); + + /* Issue an invalidation WAL record (when wal_level=logical) */ + if (XLogLogicalInfoActive()) + { + SharedInvalidationMessage msg; + + msg.cc.id = (int8) cacheId; + msg.cc.dbId = dbId; + msg.cc.hashValue = hashValue; + + LogLogicalInvalidations(1, &msg, false); + } } /* @@ -501,6 +517,18 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId) { AddCatalogInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, dbId, catId); + + /* Issue an invalidation WAL record (when wal_level=logical) */ + if (XLogLogicalInfoActive()) + { + SharedInvalidationMessage msg; + + msg.cat.id = SHAREDINVALCATALOG_ID; + msg.cat.dbId = dbId; + msg.cat.catId = catId; + + LogLogicalInvalidations(1, &msg, false); + } } /* @@ -511,6 +539,8 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId) static void RegisterRelcacheInvalidation(Oid dbId, Oid relId) { + bool RelcacheInitFileInval = false; + AddRelcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, dbId, relId); @@ -529,7 +559,22 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId) * as well. Also zap when we are invalidating whole relcache. */ if (relId == InvalidOid || RelationIdIsInInitFile(relId)) + { transInvalInfo->RelcacheInitFileInval = true; + RelcacheInitFileInval = true; + } + + /* Issue an invalidation WAL record (when wal_level=logical) */ + if (XLogLogicalInfoActive()) + { + SharedInvalidationMessage msg; + + msg.rc.id = SHAREDINVALRELCACHE_ID; + msg.rc.dbId = dbId; + msg.rc.relId = relId; + + LogLogicalInvalidations(1, &msg, RelcacheInitFileInval); + } } /* @@ -543,6 +588,18 @@ RegisterSnapshotInvalidation(Oid dbId, Oid relId) { AddSnapshotInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, dbId, relId); + + /* Issue an invalidation WAL record (when wal_level=logical) */ + if (XLogLogicalInfoActive()) + { + SharedInvalidationMessage msg; + + msg.sn.id = SHAREDINVALSNAPSHOT_ID; + msg.sn.dbId = dbId; + msg.sn.relId = relId; + + LogLogicalInvalidations(1, &msg, false); + } } /* @@ -1501,3 +1558,27 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) i = ccitem->link - 1; } } + +/* + * Emit WAL for invalidations. + */ +static void +LogLogicalInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval) +{ + xl_xact_invalidations xlrec; + + /* prepare record */ + memset(&xlrec, 0, sizeof(xlrec)); + xlrec.dbId = MyDatabaseId; + xlrec.tsId = MyDatabaseTableSpace; + xlrec.relcacheInitFileInval = relcacheInitFileInval; + xlrec.nmsgs = nmsgs; + + /* perform insertion */ + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvalidations); + XLogRegisterData((char *) msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); +} diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7553f84..b26d399 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -145,7 +145,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_ABORT 0x20 #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 -/* free opcode 0x50 */ +#define XLOG_XACT_INVALIDATIONS 0x50 /* free opcode 0x60 */ /* free opcode 0x70 */ @@ -189,6 +189,22 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, ((xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) != 0) /* + * Invalidations logged with wal_level=logical. + * + * XXX Currently nmsgs=1 but that might change in the future. + */ +typedef struct xl_xact_invalidations +{ + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + bool relcacheInitFileInval; /* invalidate relcache init file */ + int nmsgs; /* number of shared inval msgs */ + SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]; +} xl_xact_invalidations; + +#define MinSizeOfXactInvalidations offsetof(xl_xact_invalidations, msgs) + +/* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we * only include what's needed. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4dcef80..82dcb7f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -57,6 +57,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE, + REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, @@ -149,6 +150,16 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + + /* Invalidation. */ + struct + { + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + bool relcacheInitFileInval; /* invalidate relcache init + * file */ + SharedInvalidationMessage msg; /* invalidation message */ + } inval; } data; /* @@ -437,6 +448,9 @@ void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr ls void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid); +void ReorderBufferAddInvalidation(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + Oid dbId, Oid tsId, bool relcacheInitFileInval, + SharedInvalidationMessage msg); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, -- 1.8.3.1