From 7a94899f8d92b5878d9551e7c0b3eb620c9e8a4b Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Thu, 16 Jun 2022 05:25:40 +0000 Subject: [PATCH 12/12] Support replication of ALTER TABLE commands that rewrite table with potentially volatile functions. This is done by enabling logical replication of table rewrite and converting the rewrite inserts to updates which can be replayed on the subscriber without violating primary key constraint. An improvement is to only replicate the rewrite insert/updates when a volatile function is used to generate the rewritten value. --- src/backend/replication/logical/worker.c | 41 ------------------- src/backend/replication/pgoutput/pgoutput.c | 45 +++++++++++++++++++-- src/test/subscription/t/030_rep_ddls.pl | 14 ++++--- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ef79d10115..698ae14270 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2548,47 +2548,6 @@ preprocess_ddl(RawStmt *command, char **schemaname, char **relname, bool *is_par } break; } - /* - * ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported. - * Until we support logical replication of table rewrite, see ATRewriteTables() - * for details on table rewrite. - */ - case T_AlterTableStmt: - { - AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt; - ListCell *lc; - - foreach(lc, atstmt->cmds) - { - AlterTableCmd *cmd = lfirst_node(AlterTableCmd, lc); - - if (cmd->subtype == AT_AddColumn) - { - ColumnDef *colDef; - ListCell *c; - - colDef = castNode(ColumnDef, cmd->def); - foreach(c, colDef->constraints) - { - Constraint *con = lfirst_node(Constraint, c); - - if (con->contype == CONSTR_DEFAULT) - { - Node *expr; - ParseState *pstate = make_parsestate(NULL); - - expr = transformExpr(pstate, copyObject(con->raw_expr), EXPR_KIND_COLUMN_DEFAULT); - if (contain_volatile_functions(expr)) - { - elog(ERROR, - "Do not support replication of DDL statement that rewrites table using volatile functions"); - } - } - } - } - } - break; - } case T_DropStmt: { DropStmt *dstmt = (DropStmt *) command->stmt; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index a66367fe7a..fd4140484c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -413,6 +413,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* This plugin uses binary protocol. */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + opt->receive_rewrites = true; /* * This is replication start and not slot initialization. @@ -1368,9 +1369,22 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChangeType action = change->action; TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; + bool table_rewrite = false; update_replication_progress(ctx, false); + /* + * For heap rewrites, we might need to replicate them if the rewritten + * table publishes rewrite ddl message. So get the actual relation here and + * check the pubaction later. + */ + if (relation->rd_rel->relrewrite) + { + table_rewrite = true; + relation = RelationIdGetRelation(relation->rd_rel->relrewrite); + targetrel = relation; + } + if (!is_publishable_relation(relation)) return; @@ -1404,6 +1418,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* + * We don't publish table rewrite change unless we publish the rewrite ddl + * message. + */ + if (table_rewrite && + (!relentry->pubactions.pubddl_database || !relentry->pubactions.pubddl_table)) + return; + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -1433,8 +1455,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* Check row filter */ - if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, - &action)) + if (!table_rewrite && + !pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, &action)) break; /* @@ -1454,8 +1476,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, maybe_send_schema(ctx, change, relation, relentry); OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary, relentry->columns); + + /* + * Convert the rewrite inserts to updates so that the subscriber + * can replay it. This is needed to make sure the data between + * publisher and subscriber is consistent. + */ + if (table_rewrite) + logicalrep_write_update(ctx->out, xid, targetrel, + NULL, new_slot, data->binary, + relentry->columns); + else + logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, + data->binary, relentry->columns); + OutputPluginWrite(ctx, true); break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -1585,6 +1619,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ancestor = NULL; } + if (table_rewrite) + RelationClose(relation); + /* Cleanup */ MemoryContextSwitchTo(old); MemoryContextReset(data->context); diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl index a03d598266..f5c50ffe88 100644 --- a/src/test/subscription/t/030_rep_ddls.pl +++ b/src/test/subscription/t/030_rep_ddls.pl @@ -488,13 +488,17 @@ is($result, qq(1), 'Only root table of the partitioned table is subscribed since $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a_1'::regclass::oid OR srrelid = 's1.test_part_a_2'::regclass::oid;"); is($result, qq(0), 'Leaf tables of the partitioned table are not subscribed since publish_via_partition_root is enabled'); -#TODO TEST certain DDLs are not replicated -# Test DDL statement that rewrites table with volatile functions are not replicated +# Test DDL statement that rewrites table with volatile functions are replicated with the same values from the publisher $node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN volatile double precision DEFAULT 3 * random();"); -$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'volatile';"); -is($result, qq(1), 'Alter table add column default random() is executed on the publisher DB.'); +$result = $node_publisher->safe_psql('postgres', "SELECT avg(volatile) FROM test_rep;;"); + +$node_publisher->wait_for_catchup('mysub'); -$result = $node_subscriber->wait_for_log("Do not support replication of DDL statement that rewrites table using volatile functions", $result); +$result_sub = $node_subscriber->safe_psql('postgres', "SELECT avg(volatile) FROM test_rep;;"); +is($result, qq($result_sub), 'Alter table add column default random() is replicated correctly'); + +#TODO TEST certain DDLs are not replicated +#$result = $node_subscriber->wait_for_log("Do not support replication of DDL statement that rewrites table using volatile functions", $result); pass "DDL replication tests passed!"; -- 2.32.0