From cff177a82f36cca1cd7e216898c29f9b2677edac Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 28 Feb 2023 13:27:14 +0200 Subject: [PATCH 4/4] Include command ID in heapam records on catalog tables. Logical decoding needs to track the cmin/cmax of catalog tuples, so that it can build the "historic snapshots". The cmin/cmax are not WAL-logged, so visibility checks on historic snapshots could not rely on the cmin/cmax on the heap tuple itself. Instead, every modification on a catalog table wrote an additional XLOG_HEAP2_NEW_CID record, with the cmin/cmax of the tuple, and we would track those in the reorder buffer. --- src/backend/access/heap/heapam.c | 190 ++++++------------ src/backend/access/rmgrdesc/heapdesc.c | 16 -- src/backend/replication/logical/decode.c | 185 ++++++++++++++++- .../replication/logical/reorderbuffer.c | 49 ++++- src/backend/replication/logical/snapbuild.c | 45 ++--- src/include/access/heapam_xlog.h | 33 ++- src/include/replication/reorderbuffer.h | 3 +- src/include/replication/snapbuild.h | 4 +- 8 files changed, 330 insertions(+), 195 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f82176491a1..df304eef9b4 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -75,7 +75,7 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); -static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, +static XLogRecPtr log_heap_update(Relation reln, CommandId cid, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared); @@ -111,7 +111,6 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status uint16 infomask, Relation rel, int *remaining); static void index_delete_sort(TM_IndexDeleteOp *delstate); static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); -static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); @@ -1903,13 +1902,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; int bufflags = 0; - - /* - * If this is a catalog, we need to transmit combo CIDs to properly - * decode, so log that as well. - */ - if (RelationIsAccessibleInLogicalDecoding(relation)) - log_heap_new_cid(relation, heaptup); + bool need_cid = RelationIsAccessibleInLogicalDecoding(relation); /* * If this is the single and first tuple on page, we can reinit the @@ -1929,6 +1922,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED; if (options & HEAP_INSERT_SPECULATIVE) xlrec.flags |= XLH_INSERT_IS_SPECULATIVE; + if (need_cid) + xlrec.flags |= XLH_INSERT_ON_CATALOG_RELATION | XLH_INSERT_CONTAINS_CID; + Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer)); /* @@ -1953,6 +1949,10 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xlhdr.t_infomask = heaptup->t_data->t_infomask; xlhdr.t_hoff = heaptup->t_data->t_hoff; + /* include command id if needed for logical decoding */ + if (need_cid) + XLogRegisterData((char *) &cid, sizeof(CommandId)); + /* * note we mark xlhdr as belonging to buffer; if XLogInsert decides to * write the whole page to the xlog, we don't need to store @@ -2075,7 +2075,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, bool needwal; Size saveFreeSpace; bool need_tuple_data = RelationIsLogicallyLogged(relation); - bool need_cids = RelationIsAccessibleInLogicalDecoding(relation); + bool need_cid = RelationIsAccessibleInLogicalDecoding(relation); /* currently not needed (thus unsupported) for heap_multi_insert() */ Assert(!(options & HEAP_INSERT_NO_LOGICAL)); @@ -2159,13 +2159,6 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, */ RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false); - /* - * For logical decoding we need combo CIDs to properly decode the - * catalog. - */ - if (needwal && need_cids) - log_heap_new_cid(relation, heaptuples[ndone]); - for (nthispage = 1; ndone + nthispage < ntuples; nthispage++) { HeapTuple heaptup = heaptuples[ndone + nthispage]; @@ -2174,13 +2167,6 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, break; RelationPutHeapTuple(relation, buffer, heaptup, false); - - /* - * For logical decoding we need combo CIDs to properly decode the - * catalog. - */ - if (needwal && need_cids) - log_heap_new_cid(relation, heaptup); } /* @@ -2216,6 +2202,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, char *tupledata; int totaldatalen; char *scratchptr = scratch.data; + uint16 *offsets = NULL; bool init; int bufflags = 0; @@ -2229,6 +2216,13 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, xlrec = (xl_heap_multi_insert *) scratchptr; scratchptr += SizeOfHeapMultiInsert; + /* include command id if needed for logical decoding */ + if (need_cid) + { + memcpy(scratchptr, &cid, sizeof(CommandId)); + scratchptr += sizeof(CommandId); + } + /* * Allocate offsets array. Unless we're reinitializing the page, * in that case the tuples are stored in order starting at @@ -2236,7 +2230,10 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, * explicitly. */ if (!init) + { + offsets = (uint16 *) scratchptr; scratchptr += nthispage * sizeof(OffsetNumber); + } /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; @@ -2263,7 +2260,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, int datalen; if (!init) - xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self); + offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self); /* xl_multi_insert_tuple needs two-byte alignment. */ tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr); scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple; @@ -2286,6 +2283,9 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, if (need_tuple_data) xlrec->flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; + if (need_cid) + xlrec->flags |= XLH_INSERT_ON_CATALOG_RELATION | XLH_INSERT_CONTAINS_CID; + /* * Signal that this is the last xl_heap_multi_insert record * emitted by this call to heap_multi_insert(). Needed for logical @@ -2766,13 +2766,7 @@ l1: xl_heap_delete xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; - - /* - * For logical decode we need combo CIDs to properly decode the - * catalog - */ - if (RelationIsAccessibleInLogicalDecoding(relation)) - log_heap_new_cid(relation, &tp); + bool need_cid = RelationIsAccessibleInLogicalDecoding(relation); xlrec.flags = 0; if (all_visible_cleared) @@ -2791,10 +2785,16 @@ l1: else xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY; } + if (need_cid) + xlrec.flags |= XLH_DELETE_ON_CATALOG_RELATION | XLH_DELETE_CONTAINS_CID; XLogBeginInsert(); XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); + /* include command id if needed for logical decoding */ + if (need_cid) + XLogRegisterData((char *) &cid, sizeof(CommandId)); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); /* @@ -2963,6 +2963,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, infomask2_old_tuple, infomask_new_tuple, infomask2_new_tuple; + CommandId orig_cid = cid; Assert(ItemPointerIsValid(otid)); @@ -3728,17 +3729,7 @@ l2: { XLogRecPtr recptr; - /* - * For logical decoding we need combo CIDs to properly decode the - * catalog. - */ - if (RelationIsAccessibleInLogicalDecoding(relation)) - { - log_heap_new_cid(relation, &oldtup); - log_heap_new_cid(relation, heaptup); - } - - recptr = log_heap_update(relation, buffer, + recptr = log_heap_update(relation, orig_cid, buffer, newbuf, &oldtup, heaptup, old_key_tuple, all_visible_cleared, @@ -8267,9 +8258,13 @@ log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer, Buffer vm_buffer, /* * Perform XLogInsert for a heap-update operation. Caller must already * have modified the buffer(s) and marked them dirty. + * + * 'cid' is the command ID performing the update. On the old tuple, it + * might've been replaced with a combocid; 'cid' is the original, real + * command ID. */ static XLogRecPtr -log_heap_update(Relation reln, Buffer oldbuf, +log_heap_update(Relation reln, CommandId cid, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared) @@ -8284,6 +8279,7 @@ log_heap_update(Relation reln, Buffer oldbuf, XLogRecPtr recptr; Page page = BufferGetPage(newbuf); bool need_tuple_data = RelationIsLogicallyLogged(reln); + bool need_cid = RelationIsAccessibleInLogicalDecoding(reln); bool init; int bufflags; @@ -8359,6 +8355,9 @@ log_heap_update(Relation reln, Buffer oldbuf, xlrec.flags |= XLH_UPDATE_PREFIX_FROM_OLD; if (suffixlen > 0) xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD; + if (need_cid) + xlrec.flags |= XLH_UPDATE_ON_CATALOG_RELATION | XLH_UPDATE_CONTAINS_CID; + if (need_tuple_data) { xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE; @@ -8403,6 +8402,10 @@ log_heap_update(Relation reln, Buffer oldbuf, XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate); + /* include command id if needed for logical decoding */ + if (need_cid) + XLogRegisterData((char *) &cid, sizeof(CommandId)); + /* * Prepare WAL data for the new tuple. */ @@ -8484,78 +8487,6 @@ log_heap_update(Relation reln, Buffer oldbuf, return recptr; } -/* - * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record - * - * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog - * tuples. - */ -static XLogRecPtr -log_heap_new_cid(Relation relation, HeapTuple tup) -{ - xl_heap_new_cid xlrec; - - XLogRecPtr recptr; - HeapTupleHeader hdr = tup->t_data; - - Assert(ItemPointerIsValid(&tup->t_self)); - Assert(tup->t_tableOid != InvalidOid); - - xlrec.top_xid = GetTopTransactionId(); - xlrec.target_locator = relation->rd_locator; - xlrec.target_tid = tup->t_self; - - /* - * If the tuple got inserted & deleted in the same TX we definitely have a - * combo CID, set cmin and cmax. - */ - if (hdr->t_infomask & HEAP_COMBOCID) - { - Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID)); - Assert(!HeapTupleHeaderXminInvalid(hdr)); - xlrec.cmin = HeapTupleHeaderGetCmin(hdr); - xlrec.cmax = HeapTupleHeaderGetCmax(hdr); - } - /* No combo CID, so only cmin or cmax can be set by this TX */ - else - { - /* - * Tuple inserted. - * - * We need to check for LOCK ONLY because multixacts might be - * transferred to the new tuple in case of FOR KEY SHARE updates in - * which case there will be an xmax, although the tuple just got - * inserted. - */ - if (hdr->t_infomask & HEAP_XMAX_INVALID || - HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask)) - { - xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr); - xlrec.cmax = InvalidCommandId; - } - /* Tuple from a different tx updated or deleted. */ - else - { - xlrec.cmin = InvalidCommandId; - xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr); - } - } - - /* - * Note that we don't need to register the buffer here, because this - * operation does not modify the page. The insert/update/delete that - * called us certainly did, but that's WAL-logged separately. - */ - XLogBeginInsert(); - XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid); - - /* will be looked at irrespective of origin */ - - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID); - - return recptr; -} - /* * Build a heap tuple representing the configured REPLICA IDENTITY to represent * the old tuple in an UPDATE or DELETE. @@ -9253,13 +9184,29 @@ heap_xlog_multi_insert(XLogReaderState *record) Size freespace = 0; int i; bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0; + CommandId cid = InvalidCommandId; + OffsetNumber *offsets = NULL; XLogRedoAction action; + char *recdata; /* * Insertion doesn't overwrite MVCC data, so no conflict processing is * required. */ - xlrec = (xl_heap_multi_insert *) XLogRecGetData(record); + recdata = XLogRecGetData(record); + xlrec = (xl_heap_multi_insert *) recdata; + recdata += SizeOfHeapMultiInsert; + + if ((xlrec->flags & XLH_INSERT_CONTAINS_CID) != 0) + { + memcpy(&cid, recdata, sizeof(CommandId)); + recdata += sizeof(CommandId); + } + if (!isinit) + { + offsets = (OffsetNumber *) recdata; + recdata += xlrec->ntuples * sizeof(OffsetNumber); + } XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); @@ -9316,7 +9263,7 @@ heap_xlog_multi_insert(XLogReaderState *record) if (isinit) offnum = FirstOffsetNumber + i; else - offnum = xlrec->offsets[i]; + offnum = offsets[i]; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "invalid max offset number"); @@ -9932,13 +9879,6 @@ heap2_redo(XLogReaderState *record) case XLOG_HEAP2_LOCK_UPDATED: heap_xlog_lock_updated(record); break; - case XLOG_HEAP2_NEW_CID: - - /* - * Nothing to do on a real replay, only used during logical - * decoding. - */ - break; case XLOG_HEAP2_REWRITE: heap_xlog_logical_rewrite(record); break; diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c index ae35f2c88dc..e617949cec7 100644 --- a/src/backend/access/rmgrdesc/heapdesc.c +++ b/src/backend/access/rmgrdesc/heapdesc.c @@ -165,19 +165,6 @@ heap2_desc(StringInfo buf, XLogReaderState *record) xlrec->offnum, xlrec->xmax, xlrec->flags); out_infobits(buf, xlrec->infobits_set); } - else if (info == XLOG_HEAP2_NEW_CID) - { - xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec; - - appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u", - xlrec->target_locator.spcOid, - xlrec->target_locator.dbOid, - xlrec->target_locator.relNumber, - ItemPointerGetBlockNumber(&(xlrec->target_tid)), - ItemPointerGetOffsetNumber(&(xlrec->target_tid))); - appendStringInfo(buf, "; cmin: %u, cmax: %u", - xlrec->cmin, xlrec->cmax); - } } const char * @@ -253,9 +240,6 @@ heap2_identify(uint8 info) case XLOG_HEAP2_LOCK_UPDATED: id = "LOCK_UPDATED"; break; - case XLOG_HEAP2_NEW_CID: - id = "NEW_CID"; - break; case XLOG_HEAP2_REWRITE: id = "REWRITE"; break; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index d8962345da4..1b4767b950e 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -51,6 +51,11 @@ static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeUpdateOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeDeleteOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeMultiInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid, bool two_phase); @@ -396,16 +401,8 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP2_MULTI_INSERT: if (SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeMultiInsert(ctx, buf); + DecodeMultiInsertOnCatalog(ctx, buf); break; - case XLOG_HEAP2_NEW_CID: - { - xl_heap_new_cid *xlrec; - - xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record); - SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec); - - break; - } case XLOG_HEAP2_REWRITE: /* @@ -455,6 +452,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP_INSERT: if (SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeInsert(ctx, buf); + DecodeInsertOnCatalog(ctx, buf); break; /* @@ -466,11 +464,13 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP_UPDATE: if (SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeUpdate(ctx, buf); + DecodeUpdateOnCatalog(ctx, buf); break; case XLOG_HEAP_DELETE: if (SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeDelete(ctx, buf); + DecodeDeleteOnCatalog(ctx, buf); break; case XLOG_HEAP_TRUNCATE: @@ -1060,6 +1060,173 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) buf->origptr, change, false); } +/* If this is an INSERT on a catalog table, extract cmin */ +static void +DecodeInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_heap_insert *xlrec; + RelFileLocator rlocator; + BlockNumber blknum; + ItemPointerData ctid; + CommandId cid; + char *recdata; + + recdata = XLogRecGetData(r); + + xlrec = (xl_heap_insert *) recdata; + recdata += SizeOfHeapInsert; + + if (!(xlrec->flags & XLH_INSERT_ON_CATALOG_RELATION)) + return; + Assert((xlrec->flags & XLH_INSERT_CONTAINS_CID) != 0); + + memcpy(&cid, recdata, sizeof(CommandId)); + recdata += sizeof(CommandId); + + XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blknum); + + ItemPointerSet(&ctid, blknum, xlrec->offnum); + + SnapBuildProcessNewCid(ctx->snapshot_builder, + XLogRecGetXid(r), + rlocator, + ctid, + buf->origptr, cid, InvalidCommandId); +} + +/* If this is an UPDATE on a catalog table, extract cmin/cmax */ +static void +DecodeUpdateOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_heap_update *xlrec; + RelFileLocator rlocator; + BlockNumber newblk; + BlockNumber oldblk; + ItemPointerData ctid; + CommandId cid; + char *recdata; + + recdata = XLogRecGetData(r); + xlrec = (xl_heap_update *) recdata; + recdata += SizeOfHeapUpdate; + + if (!(xlrec->flags & XLH_UPDATE_ON_CATALOG_RELATION)) + return; + Assert((xlrec->flags & XLH_UPDATE_CONTAINS_CID) != 0); + + memcpy(&cid, recdata, sizeof(CommandId)); + recdata += sizeof(CommandId); + + /* cmin on new tuple */ + XLogRecGetBlockTag(r, 0, &rlocator, NULL, &newblk); + ItemPointerSet(&ctid, newblk, xlrec->new_offnum); + SnapBuildProcessNewCid(ctx->snapshot_builder, + XLogRecGetXid(r), + rlocator, + ctid, + buf->origptr, cid, InvalidCommandId); + + /* cmax on old tuple */ + if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &oldblk, NULL)) + oldblk = newblk; + ItemPointerSet(&ctid, oldblk, xlrec->old_offnum); + SnapBuildProcessNewCid(ctx->snapshot_builder, + XLogRecGetXid(r), + rlocator, + ctid, + buf->origptr, InvalidCommandId, cid); +} + +/* If this is a DELETE on a catalog table, extract cmax */ +static void +DecodeDeleteOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_heap_delete *xlrec; + RelFileLocator rlocator; + BlockNumber blknum; + ItemPointerData ctid; + CommandId cid; + char *recdata; + + recdata = XLogRecGetData(r); + + xlrec = (xl_heap_delete *) XLogRecGetData(r); + recdata += SizeOfHeapUpdate; + + if (!(xlrec->flags & XLH_DELETE_ON_CATALOG_RELATION)) + return; + Assert((xlrec->flags & XLH_DELETE_CONTAINS_CID) != 0); + + memcpy(&cid, recdata, sizeof(CommandId)); + recdata += sizeof(CommandId); + + XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blknum); + ItemPointerSet(&ctid, blknum, xlrec->offnum); + SnapBuildProcessNewCid(ctx->snapshot_builder, + XLogRecGetXid(r), + rlocator, + ctid, + buf->origptr, InvalidCommandId, cid); +} + +/* If this is a multi-INSERT on a catalog table, extract cmin */ +static void +DecodeMultiInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_heap_multi_insert *xlrec; + RelFileLocator rlocator; + BlockNumber blknum; + ItemPointerData ctid; + bool isinit; + CommandId cid; + OffsetNumber *offsets = NULL; + char *recdata; + + recdata = XLogRecGetData(r); + + xlrec = (xl_heap_multi_insert *) recdata; + recdata += SizeOfHeapMultiInsert; + + if (!(xlrec->flags & XLH_INSERT_ON_CATALOG_RELATION)) + return; + Assert((xlrec->flags & XLH_INSERT_CONTAINS_CID) != 0); + + memcpy(&cid, recdata, sizeof(CommandId)); + recdata += sizeof(CommandId); + + isinit = (XLogRecGetInfo(buf->record) & XLOG_HEAP_INIT_PAGE) != 0; + if (!isinit) + { + offsets = (uint16 *) recdata; + recdata += xlrec->ntuples * sizeof(uint16); + } + + XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blknum); + + /* Record the same command ID for all the inserted tuples */ + ItemPointerSetBlockNumber(&ctid, blknum); + for (int i = 0; i < xlrec->ntuples; i++) + { + OffsetNumber offnum; + + if (isinit) + offnum = FirstOffsetNumber + i; + else + offnum = offsets[i]; + ItemPointerSetOffsetNumber(&ctid, offnum); + + SnapBuildProcessNewCid(ctx->snapshot_builder, + XLogRecGetXid(r), + rlocator, + ctid, + buf->origptr, cid, InvalidCommandId); + } +} + /* * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs. * diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 1e7c7e55f45..5aeab8b40af 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1137,6 +1137,8 @@ static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn) { + dlist_mutable_iter iter; + Assert(subtxn->toplevel_xid == txn->xid); if (subtxn->base_snapshot != NULL) @@ -1181,6 +1183,28 @@ ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, subtxn->base_snapshot_lsn = InvalidXLogRecPtr; } } + + /* + * Also move the tuplecids entries to the parent + */ + dlist_foreach_modify(iter, &subtxn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == subtxn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + dlist_delete(&change->node); + subtxn->ntuplecids--; + + dlist_push_tail(&txn->tuplecids, &change->node); + txn->ntuplecids++; + + change->txn = txn; + } } /* @@ -1542,7 +1566,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. - * They are always stored in the toplevel transaction. */ dlist_foreach_modify(iter, &txn->tuplecids) { @@ -1672,8 +1695,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep { /* * If this is a prepared txn, cleanup the tuplecids we stored for - * decoding catalog snapshot access. They are always stored in the - * toplevel transaction. + * decoding catalog snapshot access. */ dlist_foreach_modify(iter, &txn->tuplecids) { @@ -1776,10 +1798,10 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) else { /* - * Maybe we already saw this tuple before in this transaction, but - * if so it must have the same cmin. + * Maybe we already saw this tuple before in this transaction. + * Cmin should be set only in the original insert, and not change afterwards. */ - Assert(ent->cmin == change->data.tuplecid.cmin); + Assert(change->data.tuplecid.cmin == InvalidCommandId); /* * cmax may be initially invalid, but once set it can only grow, @@ -3231,7 +3253,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, } /* - * Add new (relfilelocator, tid) -> (cmin, cmax) mappings. + * Add new (relfilelocator, tid) -> (cmin, cmax) mapping. * * We do not include this change type in memory accounting, because we * keep CIDs in a separate list and do not evict them when reaching @@ -3248,6 +3270,12 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + if (TransactionIdIsValid(txn->toplevel_xid)) + { + xid = txn->toplevel_xid; + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + } + change->data.tuplecid.locator = locator; change->data.tuplecid.tid = tid; change->data.tuplecid.cmin = cmin; @@ -5090,7 +5118,10 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) * old records that did not yet have a cmax (e.g. pg_class' own * entry while rewriting it) during rewrites, so allow that. */ - Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin); + if (new_ent->cmin == InvalidCommandId) + new_ent->cmin = ent->cmin; + else + Assert(new_ent->cmin == ent->cmin); Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax); } else @@ -5214,7 +5245,7 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) /* * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on - * combo CIDs. + * cmin/cmax stored on the tuple. */ bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index a86d5b1c74d..da081f0694f 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -41,16 +41,15 @@ * transactions we need Snapshots that see intermediate versions of the * catalog in a transaction. During normal operation this is achieved by using * CommandIds/cmin/cmax. The problem with that however is that for space - * efficiency reasons, the cmin and cmax are not included in WAL records. We - * cannot read the cmin/cmax from the tuple itself, either, because it is - * reset on crash recovery. Even if we could, we could not decode combocids + * efficiency reasons, the cmin and cmax are not normally included in WAL + * records. We cannot read the value from the tuple itself, either, because it + * is reset on crash recovery. Even if we could, we could not decode combocids * which are only tracked in the original backend's memory. To work around - * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a - * catalog row is modified, which includes the cmin and cmax of the - * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the - * reorder buffer, and use them at visibility checks instead of the cmin/cmax - * on the tuple itself. Check the reorderbuffer.c's comment above - * ResolveCminCmaxDuringDecoding() for details. + * that, we do include the command ID in heap records on catalog tables, if + * wal_level >= logical. During decoding, we insert the ctid -> (cmin,cmax) + * mappings into the reorder buffer, and use them at visibility checks instead + * of the cin/cmax on the tuple itself. Check the reorderbuffer.c's comment + * above ResolveCminCmaxDuringDecoding() for details. * * To facilitate all this we need our own visibility routine, as the normal * ones are optimized for different usecases. @@ -811,13 +810,14 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) } /* - * Do CommandId/combo CID handling after reading an xl_heap_new_cid record. - * This implies that a transaction has done some form of write to system - * catalogs. + * Do CommandId/combo CID handling after reading a heap insert/uppdate/delete + * record on a catalog table. This implies that a transaction has done some + * form of write to system catalogs. */ void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, - XLogRecPtr lsn, xl_heap_new_cid *xlrec) + RelFileLocator rlocator, ItemPointerData ctid, + XLogRecPtr lsn, CommandId cmin, CommandId cmax) { CommandId cid; @@ -827,18 +827,17 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, */ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); - ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, - xlrec->target_locator, xlrec->target_tid, - xlrec->cmin, xlrec->cmax); + ReorderBufferAddNewTupleCids(builder->reorder, xid, lsn, + rlocator, ctid, + cmin, cmax); /* figure out new command id */ - if (xlrec->cmin != InvalidCommandId && - xlrec->cmax != InvalidCommandId) - cid = Max(xlrec->cmin, xlrec->cmax); - else if (xlrec->cmax != InvalidCommandId) - cid = xlrec->cmax; - else if (xlrec->cmin != InvalidCommandId) - cid = xlrec->cmin; + if (cmin != InvalidCommandId && cmax != InvalidCommandId) + cid = Max(cmin, cmax); + else if (cmax != InvalidCommandId) + cid = cmax; + else if (cmin != InvalidCommandId) + cid = cmin; else { cid = InvalidCommandId; /* silence compiler */ diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index d3b475e3144..13029a8bc50 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -57,7 +57,7 @@ #define XLOG_HEAP2_VISIBLE 0x40 #define XLOG_HEAP2_MULTI_INSERT 0x50 #define XLOG_HEAP2_LOCK_UPDATED 0x60 -#define XLOG_HEAP2_NEW_CID 0x70 +/* 0x70 was XLOG_HEAP2_NEW_CID */ /* * xl_heap_insert/xl_heap_multi_insert flag values, 8 bits are available. @@ -68,12 +68,13 @@ #define XLH_INSERT_IS_SPECULATIVE (1<<2) #define XLH_INSERT_CONTAINS_NEW_TUPLE (1<<3) #define XLH_INSERT_ON_TOAST_RELATION (1<<4) - /* all_frozen_set always implies all_visible_set */ #define XLH_INSERT_ALL_FROZEN_SET (1<<5) +#define XLH_INSERT_CONTAINS_CID (1<<6) +#define XLH_INSERT_ON_CATALOG_RELATION (1<<7) /* - * xl_heap_update flag values, 8 bits are available. + * xl_heap_update flag values, 16 bits are available. */ /* PD_ALL_VISIBLE was cleared */ #define XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED (1<<0) @@ -84,6 +85,8 @@ #define XLH_UPDATE_CONTAINS_NEW_TUPLE (1<<4) #define XLH_UPDATE_PREFIX_FROM_OLD (1<<5) #define XLH_UPDATE_SUFFIX_FROM_OLD (1<<6) +#define XLH_UPDATE_CONTAINS_CID (1<<7) +#define XLH_UPDATE_ON_CATALOG_RELATION (1<<8) /* convenience macro for checking whether any form of old tuple was logged */ #define XLH_UPDATE_CONTAINS_OLD \ @@ -98,6 +101,8 @@ #define XLH_DELETE_CONTAINS_OLD_KEY (1<<2) #define XLH_DELETE_IS_SUPER (1<<3) #define XLH_DELETE_IS_PARTITION_MOVE (1<<4) +#define XLH_DELETE_CONTAINS_CID (1<<5) +#define XLH_DELETE_ON_CATALOG_RELATION (1<<6) /* convenience macro for checking whether any form of old tuple was logged */ #define XLH_DELETE_CONTAINS_OLD \ @@ -110,6 +115,8 @@ typedef struct xl_heap_delete OffsetNumber offnum; /* deleted tuple's offset */ uint8 infobits_set; /* infomask bits */ uint8 flags; + + /* If XLH_DELETE_CONTAINS_CID is set, command ID follows */ } xl_heap_delete; #define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8)) @@ -156,6 +163,8 @@ typedef struct xl_heap_insert OffsetNumber offnum; /* inserted tuple's offset */ uint8 flags; + /* If XLH_INSERT_CONTAINS_CID is set, command ID follows */ + /* xl_heap_header & TUPLE DATA in backup block 0 */ } xl_heap_insert; @@ -176,10 +185,14 @@ typedef struct xl_heap_multi_insert { uint8 flags; uint16 ntuples; - OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]; + + /* If XLH_INSERT_CONTAINS_CID is set, command ID follows */ + + /* If XLOG_HEAP_INIT_PAGE is set, offsets array follows */ + } xl_heap_multi_insert; -#define SizeOfHeapMultiInsert offsetof(xl_heap_multi_insert, offsets) +#define SizeOfHeapMultiInsert (offsetof(xl_heap_multi_insert, ntuples) + sizeof(uint16)) typedef struct xl_multi_insert_tuple { @@ -212,11 +225,13 @@ typedef struct xl_multi_insert_tuple typedef struct xl_heap_update { TransactionId old_xmax; /* xmax of the old tuple */ - OffsetNumber old_offnum; /* old tuple's offset */ - uint8 old_infobits_set; /* infomask bits to set on old tuple */ - uint8 flags; TransactionId new_xmax; /* xmax of the new tuple */ + OffsetNumber old_offnum; /* old tuple's offset */ OffsetNumber new_offnum; /* new tuple's offset */ + uint16 flags; + uint8 old_infobits_set; /* infomask bits to set on old tuple */ + + /* If XLH_UPDATE_CONTAINS_CID is set, command ID follows */ /* * If XLH_UPDATE_CONTAINS_OLD_TUPLE or XLH_UPDATE_CONTAINS_OLD_KEY flags @@ -224,7 +239,7 @@ typedef struct xl_heap_update */ } xl_heap_update; -#define SizeOfHeapUpdate (offsetof(xl_heap_update, new_offnum) + sizeof(OffsetNumber)) +#define SizeOfHeapUpdate (offsetof(xl_heap_update, old_infobits_set) + sizeof(uint8)) /* * This is what we need to know about page pruning (both during VACUUM and diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index d13cef4eae2..0fd783dc98d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -356,8 +356,7 @@ typedef struct ReorderBufferTXN /* * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples. - * Those are always assigned to the toplevel transaction. (Keep track of - * #entries to create a hash of the right size) + * (Keep track of #entries to create a hash of the right size) */ dlist_head tuplecids; uint64 ntuplecids; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index f49b941b53e..11cad6c5363 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -85,8 +85,8 @@ extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn); extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, - XLogRecPtr lsn, - struct xl_heap_new_cid *xlrec); + RelFileLocator rlocator, ItemPointerData ctid, + XLogRecPtr lsn, CommandId cmin, CommandId cmax); extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); -- 2.30.2