From de5930bf4440dc8c03c9b96a29ef13596872c389 Mon Sep 17 00:00:00 2001 From: Khanna Date: Thu, 11 Jul 2024 10:11:00 +0530 Subject: [PATCH v17 4/4] Improve include generated column option handling by using bms Improve include generated column option handling by using bms --- src/backend/replication/logical/proto.c | 44 +++------ src/backend/replication/pgoutput/pgoutput.c | 102 ++++++++++++++++---- src/include/replication/logicalproto.h | 12 +-- 3 files changed, 99 insertions(+), 59 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 1c35fb6cff..26956a54ab 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -30,12 +30,10 @@ #define TRUNCATE_RESTART_SEQS (1<<1) static void logicalrep_write_attrs(StringInfo out, Relation rel, - Bitmapset *columns, - bool include_generated_columns); + Bitmapset *columns); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns, - bool include_generated_columns); + bool binary, Bitmapset *columns); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -414,8 +412,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *newslot, bool binary, Bitmapset *columns, - bool include_generated_columns) + TupleTableSlot *newslot, bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -427,8 +424,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -461,8 +457,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary, Bitmapset *columns, - bool include_generated_columns) + bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -483,13 +478,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns); } /* @@ -539,7 +532,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, - Bitmapset *columns, bool include_generated_columns) + Bitmapset *columns) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -559,8 +552,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns, - include_generated_columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns); } /* @@ -676,7 +668,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, */ void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, - Bitmapset *columns, bool include_generated_columns) + Bitmapset *columns) { char *relname; @@ -698,7 +690,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel, columns, include_generated_columns); + logicalrep_write_attrs(out, rel, columns); } /* @@ -775,8 +767,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns, - bool include_generated_columns) + bool binary, Bitmapset *columns) { TupleDesc desc; Datum *values; @@ -795,8 +786,6 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, if (att->attgenerated) { - if (!include_generated_columns) - continue; if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) continue; @@ -825,8 +814,6 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, if (att->attgenerated) { - if (!include_generated_columns) - continue; if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) continue; @@ -950,8 +937,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, - bool include_generated_columns) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { TupleDesc desc; int i; @@ -971,8 +957,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, if (att->attgenerated) { - if (!include_generated_columns) - continue; if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) continue; @@ -1001,8 +985,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, if (att->attgenerated) { - if (!include_generated_columns) - continue; if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) continue; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 944554d5d7..19b6d4e7e8 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -86,8 +86,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns, - bool include_generated_columns); + Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -165,8 +164,10 @@ typedef struct RelationSyncEntry AttrMap *attrmap; /* - * Columns included in the publication, or NULL if all columns are - * included implicitly. Note that the attnums in this bitmap are not + * Columns should be publicated, or NULL if all columns are included + * implicitly. This bitmap only considers the column list of the + * publication and include_generated_columns option: other reasons should + * be checked at user side. Note that the attnums in this bitmap are not * publication and include_generated_columns option: other reasons should * be checked at user side. Note that the attnums in this bitmap are not * shifted by FirstLowInvalidHeapAttributeNumber. @@ -746,13 +747,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx, relentry->columns, - data->include_generated_columns); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx, relentry->columns, - data->include_generated_columns); + send_relation_and_attrs(relation, xid, ctx, relentry->columns); if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -766,7 +765,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns, bool include_generated_columns) + Bitmapset *columns) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -786,9 +785,6 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->attisdropped) continue; - if (att->attgenerated && (att->attgenerated != ATTRIBUTE_GENERATED_STORED || !include_generated_columns)) - continue; - if (att->atttypid < FirstGenbkiObjectId) continue; @@ -802,7 +798,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation, columns, include_generated_columns); + logicalrep_write_rel(ctx->out, xid, relation, columns); OutputPluginWrite(ctx, false); } @@ -1028,6 +1024,34 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, } } +/* + * Prepare new column list bitmap. This includes all the columns of the table. + */ +static Bitmapset * +prepare_all_columns_bms(PGOutputData *data, RelationSyncEntry *entry, + TupleDesc desc) +{ + Bitmapset *cols = NULL; + MemoryContext oldcxt = NULL; + + pgoutput_ensure_entry_cxt(data, entry); + oldcxt = MemoryContextSwitchTo(entry->entry_cxt); + + for (int i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped) + continue; + + cols = bms_add_member(cols, i + 1); + } + + MemoryContextSwitchTo(oldcxt); + + return cols; +} + /* * Initialize the column list. */ @@ -1118,7 +1142,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, * If column list includes all the columns of the table, * set it to NULL. */ - if (bms_num_members(cols) == nliveatts) + if (bms_num_members(cols) == nliveatts && + data->include_generated_columns) { bms_free(cols); cols = NULL; @@ -1129,6 +1154,46 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, } } + /* Do additional checks if the generated columns must be replicated */ + if (!data->include_generated_columns) + { + TupleDesc desc = RelationGetDescr(relation); + int nliveatts = 0; + + for (int i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + /* Skip if the attribute is dropped */ + if (att->attisdropped) + continue; + + /* Count all valid attributes */ + nliveatts++; + + /* Skip if the attribute is not generated */ + if (!att->attgenerated) + continue; + + /* Prepare new bms if not allocated yet */ + if (cols == NULL) + cols = prepare_all_columns_bms(data, entry, desc); + + /* Delete the corresponding column from the bms */ + cols = bms_del_member(cols, i + 1); + } + + /* + * If column list includes all the columns of the table, set it to + * NULL. + */ + if (bms_num_members(cols) == nliveatts) + { + bms_free(cols); + cols = NULL; + } + } + if (first) { entry->columns = cols; @@ -1554,18 +1619,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary, relentry->columns, - data->include_generated_columns); + data->binary, relentry->columns); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, - new_slot, data->binary, relentry->columns, - data->include_generated_columns); + new_slot, data->binary, relentry->columns); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, - data->binary, relentry->columns, - data->include_generated_columns); + data->binary, relentry->columns); break; default: Assert(false); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 34ec40b07e..b9a64d9c95 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -225,22 +225,19 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, - bool binary, Bitmapset *columns, - bool include_generated_columns); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, - Bitmapset *columns, - bool include_generated_columns); + Bitmapset *columns); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - bool binary, Bitmapset *columns, - bool include_generated_columns); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, @@ -251,8 +248,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel, Bitmapset *columns, - bool include_generated_columns); + Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); -- 2.41.0.windows.3