From d4d2c5a8a3b0b7d154e1c20928a1e4796675a8d5 Mon Sep 17 00:00:00 2001 From: dilipkumar Date: Wed, 30 Sep 2020 12:13:31 +0530 Subject: [PATCH v1] Collect command invalidation in form of changes --- .../replication/logical/reorderbuffer.c | 100 +++++++++++++++--- src/include/replication/reorderbuffer.h | 11 ++ 2 files changed, 94 insertions(+), 17 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1975d629a6..70d3ca0dac 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -235,7 +235,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); -static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); /* * --------------------------------------- @@ -482,6 +482,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, pfree(change->data.msg.message); change->data.msg.message = NULL; break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + if (change->data.inval.invalidations) + pfree(change->data.inval.invalidations); + change->data.inval.invalidations = NULL; + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: if (change->data.snapshot) { @@ -2233,17 +2238,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, TeardownHistoricSnapshot(false); SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); - - /* - * Every time the CommandId is incremented, we could - * see new catalog contents, so execute all - * invalidations. - */ - ReorderBufferExecuteInvalidations(rb, txn); } break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + + /* + * Execute the invalidation messages locally. + * + * XXX Do we need to care about relcacheInitFileInval and + * the other fields added to ReorderBufferChange, or just + * about the message itself? + */ + ReorderBufferExecuteInvalidations( + change->data.inval.ninvalidations, + change->data.inval.invalidations); + break; + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; @@ -2306,7 +2318,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2345,7 +2357,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2813,12 +2826,27 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage *msgs) { ReorderBufferTXN *txn; + MemoryContext oldcontext; + ReorderBufferChange *change; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + MemoryContextAlloc(rb->context, + sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + /* - * We collect all the invalidations under the top transaction so that we - * can execute them all together. + * Additionally, collect all the invalidations under the top transaction + * so that we can execute them all together. */ if (txn->toptxn) txn = txn->toptxn; @@ -2830,8 +2858,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, { txn->ninvalidations = nmsgs; txn->invalidations = (SharedInvalidationMessage *) - MemoryContextAlloc(rb->context, - sizeof(SharedInvalidationMessage) * nmsgs); + palloc(sizeof(SharedInvalidationMessage) * nmsgs); memcpy(txn->invalidations, msgs, sizeof(SharedInvalidationMessage) * nmsgs); } @@ -2845,6 +2872,8 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, nmsgs * sizeof(SharedInvalidationMessage)); txn->ninvalidations += nmsgs; } + + MemoryContextSwitchTo(oldcontext); } /* @@ -2852,12 +2881,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, * in the changestream but we don't know which those are. */ static void -ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs) { int i; - for (i = 0; i < txn->ninvalidations; i++) - LocalExecuteInvalidationMessage(&txn->invalidations[i]); + for (i = 0; i < nmsgs; i++) + LocalExecuteInvalidationMessage(&msgs[i]); } /* @@ -3279,6 +3308,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + char *data; + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + sz += inval_size; + + ReorderBufferSerializeReserve(rb, sz); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + memcpy(data, change->data.inval.invalidations, inval_size); + data += inval_size; + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: @@ -3556,6 +3603,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } + case REORDER_BUFFER_CHANGE_INVALIDATION: + sz += sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; @@ -3822,6 +3874,20 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + change->data.inval.invalidations = + MemoryContextAlloc(rb->context, inval_size); + + /* read the message */ + memcpy(change->data.inval.invalidations, data, inval_size); + data += inval_size; + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1ae17d5f11..69d0aebb56 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -57,6 +57,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE, + REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, @@ -149,6 +150,14 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + + /* Invalidation. */ + struct + { + uint32 ninvalidations; /* Number of messages */ + SharedInvalidationMessage *invalidations; /* invalidation + * message */ + } inval; } data; /* @@ -562,6 +571,8 @@ void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr ls void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid); +void ReorderBufferAddInvalidation(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + int nmsgs, SharedInvalidationMessage *msgs); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, -- 2.23.0