From 3498e773879b3b87cd7ad46a1430f32b9c628ac7 Mon Sep 17 00:00:00 2001 From: Mikhail Kharitonov Date: Tue, 12 Aug 2025 14:15:54 +0300 Subject: [PATCH v3 1/2] logical replication: add *_extended API; pgoutput uses leaf based O/K; doc note --- doc/src/sgml/logical-replication.sgml | 8 +++ src/backend/replication/logical/proto.c | 79 +++++++++++++-------- src/backend/replication/pgoutput/pgoutput.c | 4 +- src/include/replication/logicalproto.h | 23 ++++-- 4 files changed, 76 insertions(+), 38 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index a0761cfee3f..9c34423204f 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -946,6 +946,14 @@ HINT: To initiate replication, you must manually create the replication slot, e row filter is used. + + When publish_via_partition_root is true, the relation OID and + the tuple layout in logical replication messages correspond to the root + partitioned table. However, for UPDATE and DELETE, the + old-tuple flag (O vs K) is determined by the replica identity of the + leaf partition that actually stored the old row. + + diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 1b3d9eb49dd..bec198dc162 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -447,39 +447,51 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) * Write UPDATE to the output stream. */ void -logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary, Bitmapset *columns, - PublishGencolsType include_gencols_type) +logicalrep_write_update_extended(StringInfo out, TransactionId xid, + Relation leafrel, Relation pubrel, + TupleTableSlot *oldslot, TupleTableSlot *newslot, + bool binary, Bitmapset *columns, + PublishGencolsType include_gencols_type) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); - Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || - rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || - rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) pq_sendint32(out, xid); /* use Oid as relation identifier */ - pq_sendint32(out, RelationGetRelid(rel)); + pq_sendint32(out, RelationGetRelid(pubrel)); if (oldslot != NULL) { - if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + Assert(leafrel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || + leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || + leafrel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + + if (leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns, + logicalrep_write_tuple(out, pubrel, oldslot, binary, columns, include_gencols_type); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns, + logicalrep_write_tuple(out, pubrel, newslot, binary, columns, include_gencols_type); } +/* Backward-compatible wrappers keep the old exported symbols alive. */ +void +logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, + TupleTableSlot *oldslot, TupleTableSlot *newslot, + bool binary, Bitmapset *columns, + PublishGencolsType include_gencols_type) +{ + logicalrep_write_update_extended(out, xid, rel, rel, oldslot, newslot, + binary, columns, include_gencols_type); +} + /* * Read UPDATE from stream. */ @@ -521,19 +533,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, return relid; } -/* - * Write DELETE to the output stream. - */ void -logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *oldslot, bool binary, - Bitmapset *columns, - PublishGencolsType include_gencols_type) +logicalrep_write_delete_extended(StringInfo out, TransactionId xid, + Relation leafrel, Relation pubrel, + TupleTableSlot *oldslot, bool binary, + Bitmapset *columns, + PublishGencolsType include_gencols_type) { - Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || - rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || - rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); /* transaction ID (if not valid, we're not streaming) */ @@ -541,15 +547,26 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, xid); /* use Oid as relation identifier */ - pq_sendint32(out, RelationGetRelid(rel)); - - if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - pq_sendbyte(out, 'O'); /* old tuple follows */ - else - pq_sendbyte(out, 'K'); /* old key follows */ + pq_sendint32(out, RelationGetRelid(pubrel)); + Assert(oldslot != NULL); + Assert(leafrel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || + leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || + leafrel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + pq_sendbyte(out, (leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) ? 'O' : 'K'); + logicalrep_write_tuple(out, pubrel, oldslot, binary, columns, include_gencols_type); +} - logicalrep_write_tuple(out, rel, oldslot, binary, columns, - include_gencols_type); +/* + * Write DELETE to the output stream. + */ +void +logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, + TupleTableSlot *oldslot, bool binary, + Bitmapset *columns, + PublishGencolsType include_gencols_type) +{ + logicalrep_write_delete_extended(out, xid, rel, rel, oldslot, binary, + columns, include_gencols_type); } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 80540c017bd..81056768587 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1589,12 +1589,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry->include_gencols_type); break; case REORDER_BUFFER_CHANGE_UPDATE: - logicalrep_write_update(ctx->out, xid, targetrel, old_slot, + logicalrep_write_update_extended(ctx->out, xid, relation, targetrel, old_slot, new_slot, data->binary, relentry->columns, relentry->include_gencols_type); break; case REORDER_BUFFER_CHANGE_DELETE: - logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, + logicalrep_write_delete_extended(ctx->out, xid, relation, targetrel, old_slot, data->binary, relentry->columns, relentry->include_gencols_type); break; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index b261c60d3fa..81932c065c5 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -228,17 +228,30 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid, PublishGencolsType include_gencols_type); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, - Relation rel, TupleTableSlot *oldslot, - TupleTableSlot *newslot, bool binary, - Bitmapset *columns, + Relation rel, + TupleTableSlot *oldslot, TupleTableSlot *newslot, + bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type); + +extern void logicalrep_write_update_extended(StringInfo out, TransactionId xid, + Relation leafrel, Relation pubrel, + TupleTableSlot *oldslot, TupleTableSlot *newslot, + bool binary, Bitmapset *columns, + PublishGencolsType include_gencols_type); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, TransactionId xid, - Relation rel, TupleTableSlot *oldslot, - bool binary, Bitmapset *columns, + Relation rel, + TupleTableSlot *oldslot, bool binary, + Bitmapset *columns, PublishGencolsType include_gencols_type); + +extern void logicalrep_write_delete_extended(StringInfo out, TransactionId xid, + Relation leafrel, Relation pubrel, + TupleTableSlot *oldslot, bool binary, + Bitmapset *columns, + PublishGencolsType include_gencols_type); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, base-commit: b227b0bb4e032e19b3679bedac820eba3ac0d1cf -- 2.34.1