From e2a28c6d4cd0d735fd0ff1c287b0e289c2d29022 Mon Sep 17 00:00:00 2001 From: Hannu Krosing Date: Thu, 4 Dec 2025 21:21:04 +0100 Subject: [PATCH] Made tuple ids and info about HOT updates available to logical decoding Modified test_decoding to show both old tid has format -(pageno, slot) new tid has format +(pageno, slot) if it is a HOT update, ith is decoded prefixed with 'HOT ' Sample usage: hannu=# SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding'); pg_create_logical_replication_slot ------------------------------------ (test_slot,0/1BF1B38) (1 row) hannu=# CREATE TABLE nokey(data text); CREATE TABLE hannu=# insert into nokey (data) values('a'); INSERT 0 1 hannu=# update nokey set data = 'b'; UPDATE 1 hannu=# delete from nokey ; DELETE 1 hannu=# SELECT lsn, xid, data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL); lsn | xid | data -----------+-----+------------------------------------------------------------ 0/1C20538 | 767 | BEGIN 767 0/1C2B1E8 | 767 | COMMIT 767 0/1C2B220 | 768 | BEGIN 768 0/1C2B220 | 768 | table public.nokey: INSERT:+(0,1) data[text]:'a' 0/1C2B290 | 768 | COMMIT 768 0/1C2B300 | 769 | BEGIN 769 0/1C2B300 | 769 | table public.nokey: HOT UPDATE:-(0,1)+(0,2) data[text]:'b' 0/1C2B378 | 769 | COMMIT 769 0/1C2B3B0 | 770 | BEGIN 770 0/1C2B3B0 | 770 | table public.nokey: DELETE:-(0,2) (no-tuple-data) 0/1C2B418 | 770 | COMMIT 770 (11 rows) --- contrib/test_decoding/test_decoding.c | 33 +++++++++++++++++++- src/backend/replication/logical/decode.c | 38 +++++++++++++++++++----- src/include/replication/reorderbuffer.h | 7 +++++ 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index bb495563200..7a1805d5a97 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -596,6 +596,23 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_ } } + +static inline char* _format_tid(char *tidbuf, char prefix, ItemPointer itemPtr) +{ + BlockNumber blockNumber; + OffsetNumber offsetNumber; + + blockNumber = ItemPointerGetBlockNumberNoCheck(itemPtr); + offsetNumber = ItemPointerGetOffsetNumberNoCheck(itemPtr); + + tidbuf[0] = prefix; + /* Perhaps someday we should output this as a record. */ + snprintf(tidbuf+1, 32-1, "(%u,%u)", blockNumber, offsetNumber); + + return tidbuf; +} + + /* * callback for individual changed tuples */ @@ -608,6 +625,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Form_pg_class class_form; TupleDesc tupdesc; MemoryContext old; + char tidbuf[32]; data = ctx->output_plugin_private; txndata = txn->output_plugin_private; @@ -639,6 +657,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: appendStringInfoString(ctx->out, " INSERT:"); + if (change->data.tp.newctid.ip_posid) + appendStringInfoString(ctx->out, + _format_tid(tidbuf, '+', &(change->data.tp.newctid))); if (change->data.tp.newtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); else @@ -647,7 +668,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, false); break; case REORDER_BUFFER_CHANGE_UPDATE: + if (change->data.tp.is_hot_update) + appendStringInfoString(ctx->out, " HOT"); appendStringInfoString(ctx->out, " UPDATE:"); + if (change->data.tp.oldctid.ip_posid) + appendStringInfoString(ctx->out, + _format_tid(tidbuf, '-', &(change->data.tp.oldctid))); + if (change->data.tp.newctid.ip_posid) + appendStringInfoString(ctx->out, + _format_tid(tidbuf, '+', &(change->data.tp.newctid))); if (change->data.tp.oldtuple != NULL) { appendStringInfoString(ctx->out, " old-key:"); @@ -666,7 +695,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; case REORDER_BUFFER_CHANGE_DELETE: appendStringInfoString(ctx->out, " DELETE:"); - + if (change->data.tp.oldctid.ip_posid) + appendStringInfoString(ctx->out, + _format_tid(tidbuf, '-', &(change->data.tp.oldctid))); /* if there was no PK, we only know that a delete happened */ if (change->data.tp.oldtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9..a04ae2a717a 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,7 +42,7 @@ /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -502,7 +502,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP_UPDATE: if (SnapBuildProcessChange(builder, xid, buf->origptr) && !ctx->fast_forward) - DecodeUpdate(ctx, buf); + DecodeUpdate(ctx, buf, info == XLOG_HEAP_HOT_UPDATE); break; case XLOG_HEAP_DELETE: @@ -909,6 +909,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_insert *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blkno; xlrec = (xl_heap_insert *) XLogRecGetData(r); @@ -920,7 +921,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -945,6 +946,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); + ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offnum); change->data.tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, @@ -959,18 +961,20 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * Updates can possibly contain a new tuple and the old primary key. */ static void -DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool is_hot) { XLogReaderState *r = buf->record; xl_heap_update *xlrec; ReorderBufferChange *change; char *data; RelFileLocator target_locator; + BlockNumber blkno; + BlockNumber old_blkno = InvalidBlockNumber; xlrec = (xl_heap_update *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -983,6 +987,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator)); + ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->new_offnum); + + /* If block 1 is present, it contains old tuple */ + if (XLogRecHasBlockRef(r, 1)) + { + RelFileLocator old_locator; + XLogRecGetBlockTag(r, 1, &old_locator, NULL, &old_blkno); + } + + if (BlockNumberIsValid(old_blkno)) + ItemPointerSet(&change->data.tp.oldctid, old_blkno, xlrec->old_offnum); + else + ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->old_offnum); + if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE) { Size datalen; @@ -1015,6 +1033,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } change->data.tp.clear_toast_afterwards = true; + change->data.tp.is_hot_update = is_hot; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); @@ -1032,11 +1051,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_delete *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blkno; xlrec = (xl_heap_delete *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blkno); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1070,6 +1090,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) datalen, change->data.tp.oldtuple); } + ItemPointerSet(&change->data.tp.oldctid, blkno, xlrec->offnum); change->data.tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, @@ -1127,6 +1148,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) char *tupledata; Size tuplelen; RelFileLocator rlocator; + BlockNumber blkno; xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); @@ -1138,7 +1160,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blkno); if (rlocator.dbOid != ctx->slot->data.database) return; @@ -1168,6 +1190,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator)); + ItemPointerSet(&change->data.tp.newctid, blkno, xlrec->offsets[i]); + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); data = ((char *) xlhdr) + SizeOfMultiInsertTuple; datalen = xlhdr->datalen; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index fa0745552f8..8c305ec6e41 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -104,6 +104,13 @@ typedef struct ReorderBufferChange HeapTuple oldtuple; /* valid for INSERT || UPDATE */ HeapTuple newtuple; + + /* ctid for old tuple; valid for DELETE || UPDATE */ + ItemPointerData oldctid; + /* ctid for new tuple; valid for INSERT || UPDATE */ + ItemPointerData newctid; + /* update type - only valid for UPDATE */ + bool is_hot_update; } tp; /* -- 2.43.0