From be0d3e08151de723a4a104bd172d8e1d772303e5 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Thu, 8 Sep 2022 16:24:52 +0200 Subject: [PATCH v1] Add cid to heap xlog records that insert/update/delete tuples This allows physical replicas to participate in remote transactions and statements, and physical replicas to serve pages to the primary without loss of transaction guarantees. Authors: Anastasia Lubennikova, Matthias van de Meent --- src/backend/access/heap/heapam.c | 21 +++++++++++++++------ src/include/access/heapam_xlog.h | 5 +++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index aab8d6fa4e..9ddb262808 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2454,6 +2454,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, xlrec->flags = XLH_INSERT_ALL_FROZEN_SET; xlrec->ntuples = nthispage; + xlrec->cid = cid; /* * Write out an xl_multi_insert_tuple and the tuple data itself @@ -2977,6 +2978,7 @@ l1: tp.t_data->t_infomask2); xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); xlrec.xmax = new_xmax; + xlrec.cid = HeapTupleHeaderGetRawCommandId(tp.t_data); if (old_key_tuple != NULL) { @@ -2999,6 +3001,7 @@ l1: xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; xlhdr.t_infomask = old_key_tuple->t_data->t_infomask; xlhdr.t_hoff = old_key_tuple->t_data->t_hoff; + xlhdr.t_cid = HeapTupleHeaderGetRawCommandId(old_key_tuple->t_data); XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader); XLogRegisterData((char *) old_key_tuple->t_data @@ -3705,6 +3708,7 @@ l2: oldtup.t_data->t_infomask2); xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0; + xlrec.cid = HeapTupleHeaderGetRawCommandId(oldtup.t_data); XLogRegisterData((char *) &xlrec, SizeOfHeapLock); recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK); PageSetLSN(page, recptr); @@ -4892,6 +4896,7 @@ failed: xlrec.infobits_set = compute_infobits(new_infomask, tuple->t_data->t_infomask2); xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0; + xlrec.cid = HeapTupleHeaderGetRawCommandId(tuple->t_data); XLogRegisterData((char *) &xlrec, SizeOfHeapLock); /* we don't decode row locks atm, so no need to log the origin */ @@ -5941,6 +5946,7 @@ heap_abort_speculative(Relation relation, ItemPointer tid) tp.t_data->t_infomask2); xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); xlrec.xmax = xid; + xlrec.cid = HeapTupleHeaderGetRawCommandId(tp.t_data); XLogBeginInsert(); XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); @@ -8342,6 +8348,7 @@ log_heap_update(Relation reln, Buffer oldbuf, /* Prepare WAL data for the new page */ xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self); xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data); + xlrec.cid = HeapTupleHeaderGetRawCommandId(newtup->t_data); bufflags = REGBUF_STANDARD; if (init) @@ -8379,6 +8386,7 @@ log_heap_update(Relation reln, Buffer oldbuf, xlhdr.t_infomask2 = newtup->t_data->t_infomask2; xlhdr.t_infomask = newtup->t_data->t_infomask; xlhdr.t_hoff = newtup->t_data->t_hoff; + xlhdr.t_cid = HeapTupleHeaderGetRawCommandId(newtup->t_data); Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= newtup->t_len); /* @@ -8420,6 +8428,7 @@ log_heap_update(Relation reln, Buffer oldbuf, xlhdr_idx.t_infomask2 = old_key_tuple->t_data->t_infomask2; xlhdr_idx.t_infomask = old_key_tuple->t_data->t_infomask; xlhdr_idx.t_hoff = old_key_tuple->t_data->t_hoff; + xlhdr_idx.t_cid = HeapTupleHeaderGetRawCommandId(old_key_tuple->t_data); XLogRegisterData((char *) &xlhdr_idx, SizeOfHeapHeader); @@ -9050,7 +9059,7 @@ heap_xlog_delete(XLogReaderState *record) HeapTupleHeaderSetXmax(htup, xlrec->xmax); else HeapTupleHeaderSetXmin(htup, InvalidTransactionId); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + HeapTupleHeaderSetCmax(htup, xlrec->cid, false); /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); @@ -9151,7 +9160,7 @@ heap_xlog_insert(XLogReaderState *record) htup->t_infomask = xlhdr.t_infomask; htup->t_hoff = xlhdr.t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetCmin(htup, xlhdr.t_cid); htup->t_ctid = target_tid; if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum, @@ -9294,7 +9303,7 @@ heap_xlog_multi_insert(XLogReaderState *record) htup->t_infomask = xlhdr->t_infomask; htup->t_hoff = xlhdr->t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetCmin(htup, xlrec->cid); ItemPointerSetBlockNumber(&htup->t_ctid, blkno); ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); @@ -9434,7 +9443,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask, &htup->t_infomask2); HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + HeapTupleHeaderSetCmax(htup, xlrec->cid, false); /* Set forward chain link in t_ctid */ htup->t_ctid = newtid; @@ -9567,7 +9576,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) htup->t_hoff = xlhdr.t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetCmin(htup, xlhdr.t_cid); HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = newtid; @@ -9708,7 +9717,7 @@ heap_xlog_lock(XLogReaderState *record) offnum); } HeapTupleHeaderSetXmax(htup, xlrec->locking_xid); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + HeapTupleHeaderSetCmax(htup, xlrec->cid, false); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 1705e736be..ab42e0c83f 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -107,6 +107,7 @@ typedef struct xl_heap_delete { TransactionId xmax; /* xmax of the deleted tuple */ + CommandId cid; /* CommandId of the deletion operation */ OffsetNumber offnum; /* deleted tuple's offset */ uint8 infobits_set; /* infomask bits */ uint8 flags; @@ -145,6 +146,7 @@ typedef struct xl_heap_header { uint16 t_infomask2; uint16 t_infomask; + CommandId t_cid; uint8 t_hoff; } xl_heap_header; @@ -176,6 +178,7 @@ typedef struct xl_heap_multi_insert { uint8 flags; uint16 ntuples; + CommandId cid; OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER]; } xl_heap_multi_insert; @@ -216,6 +219,7 @@ typedef struct xl_heap_update uint8 old_infobits_set; /* infomask bits to set on old tuple */ uint8 flags; TransactionId new_xmax; /* xmax of the new tuple */ + CommandId cid; /* CommandId of the update operation */ OffsetNumber new_offnum; /* new tuple's offset */ /* @@ -278,6 +282,7 @@ typedef struct xl_heap_vacuum typedef struct xl_heap_lock { TransactionId locking_xid; /* might be a MultiXactId not xid */ + CommandId cid; /* CommandId of the locking command */ OffsetNumber offnum; /* locked tuple's offset on page */ int8 infobits_set; /* infomask and infomask2 bits to set */ uint8 flags; /* XLH_LOCK_* flag bits */ -- 2.30.2