From 19a3952e7db725a41d726bbfe54bbda6bf401975 Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Fri, 13 May 2022 19:47:44 +0000 Subject: [PATCH 10/12] Handle partitioned table creation on the apply worker: whether a partitioned table should be added to pg_subscription_rel catalog depends on the setting of publish_via_partition_root of the publication. Thus we need to connect to the source DB and check whehter the partitioned table should be subscribed. --- src/backend/commands/subscriptioncmds.c | 54 +++++++++++++++++++++- src/backend/replication/logical/worker.c | 35 ++++++++++++--- src/include/commands/subscriptioncmds.h | 3 ++ src/test/regress/expected/psql.out | 6 +-- src/test/subscription/t/030_rep_ddls.pl | 57 ++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 10 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 83e6eae855..25d0a23db9 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -23,7 +23,6 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" -#include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/defrem.h" @@ -746,6 +745,59 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, return myself; } +/* + * Check if a partitioned table is being published by any publication subscirbed by the subscription. + * Whether a partitioned table is published also depends on the publication option + * publish_via_partition_root. But the subscriber doesn't know the setting of publish_via_partition_root, + * this is why we need to check the source DB so that we can decide whether to subscribe to the partitioned + * table (could be either root or leaf table) during replication of create partitioned table. + */ +bool +IsPartitionedTablePublishedOnSource(Subscription *sub, char *schema_name, char *table_name) +{ + char *err; + WalRcvExecResult *res; + StringInfoData cmd; + Oid tableRow[1] = {BOOLOID}; + bool result; + + WalReceiverConn *wrconn; + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + PG_TRY(); + { + initStringInfo(&cmd); + appendStringInfoString(&cmd, "SELECT TRUE\n" + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + get_publications_str(sub->publications, &cmd, true); + appendStringInfoString(&cmd, ") AND t.schemaname = '"); + appendStringInfoString(&cmd, schema_name); + appendStringInfoString(&cmd, "' AND t.tablename = '"); + appendStringInfoString(&cmd, table_name); + appendStringInfoString(&cmd, "'"); + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + result = tuplestore_tuple_count(res->tuplestore) > 0; + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + + return result; +} + static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 39702f4a0f..1108030179 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -143,6 +143,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -2548,6 +2549,7 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear bool snapshot_set = false; char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */ + bool is_partitioned_table = false; commandTag = CreateCommandTag((Node *)command); @@ -2564,6 +2566,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear RangeVar *rv = cstmt->relation; schemaname = rv->schemaname; relname = rv->relname; + if (cstmt->inhRelations != NIL || cstmt->partspec != NULL) + is_partitioned_table = true; } else if (commandTag == CMDTAG_CREATE_TABLE_AS) { @@ -2765,12 +2769,31 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear if (relid != InvalidOid) { - AddSubscriptionRelState(MySubscription->oid, relid, - SUBREL_STATE_INIT, - InvalidXLogRecPtr); - ereport(DEBUG1, - (errmsg_internal("table \"%s\" added to subscription \"%s\"", - relname, MySubscription->name))); + bool subscribe_table = true; + + if (is_partitioned_table) + { + Relation rel = RelationIdGetRelation(relid); + char *table_name = RelationGetRelationName(rel); + char *schema_name = get_namespace_name(RelationGetNamespace(rel)); + /* + * Connect to the source DB and check whehter the partitioned table should be subscribed. + * Because it depends on the setting of publish_via_partition_root, which the subscription + * doesn't know. + */ + subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name); + RelationClose(rel); + } + + if (subscribe_table) + { + AddSubscriptionRelState(MySubscription->oid, relid, + SUBREL_STATE_INIT, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg_internal("table \"%s\" added to subscription \"%s\"", + relname, MySubscription->name))); + } } } } diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 2cbe7d7b65..451953af60 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -16,6 +16,7 @@ #define SUBSCRIPTIONCMDS_H #include "catalog/objectaddress.h" +#include "catalog/pg_subscription.h" #include "parser/parse_node.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, @@ -26,4 +27,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); +extern bool IsPartitionedTablePublishedOnSource(Subscription *sub, char *schema_name, char *table_name); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out index 60acbd1241..3429e34339 100644 --- a/src/test/regress/expected/psql.out +++ b/src/test/regress/expected/psql.out @@ -5969,9 +5969,9 @@ List of schemas (0 rows) \dRp "no.such.publication" - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -------+-------+------------+---------+---------+---------+-----------+---------- + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs +------+-------+------------+---------+---------+---------+-----------+----------+---------------------+------------------ (0 rows) \dRs "no.such.subscription" diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl index 51126b489b..a03d598266 100644 --- a/src/test/subscription/t/030_rep_ddls.pl +++ b/src/test/subscription/t/030_rep_ddls.pl @@ -431,6 +431,63 @@ $node_publisher->wait_for_catchup('mysub'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep where non_volatile = 0.01;"); is($result, qq(2), 'Alter table add column default 0.01 is replicated'); +# Test partitioned table creation is replicated based on the setting of publish_via_partition_root +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE s1.test_part_a (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE s1.test_part_a_1 PARTITION OF s1.test_part_a FOR VALUES IN (1,2,3,4,5); + ALTER TABLE s1.test_part_a_1 ADD PRIMARY KEY (a); + ALTER TABLE s1.test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; + + CREATE TABLE s1.test_part_a_2 PARTITION OF s1.test_part_a FOR VALUES IN (6,7,8,9,10); + ALTER TABLE s1.test_part_a_2 ADD PRIMARY KEY (b); + ALTER TABLE s1.test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; + + -- initial data, one row in each partition + INSERT INTO s1.test_part_a VALUES (1, 3); + INSERT INTO s1.test_part_a VALUES (6, 4); +)); +$result = $node_publisher->safe_psql('postgres', "SELECT count(*) from s1.test_part_a;"); +is($result, qq(2), 'Partitioned table is created and populated'); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.test_part_a;"); +is($result, qq(2), 'Partitioned table and data is replicated'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a'::regclass::oid;"); +is($result, qq(0), 'Root table of the partitioned table is not subscribed since publish_via_partition_root is false by default'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a_1'::regclass::oid OR srrelid = 's1.test_part_a_2'::regclass::oid;"); +is($result, qq(2), 'Only leaf tables of the partitioned table are subscribed since publish_via_partition_root is false by default'); + +# Test only root partition table is subscribed when publish_via_partition_root is enabled +$node_publisher->safe_psql('postgres', qq( + DROP TABLE s1.test_part_a; + ALTER PUBLICATION mypub SET (publish_via_partition_root = true); + CREATE TABLE s1.test_part_a (a int, b int, c int) PARTITION BY LIST (a); + + CREATE TABLE s1.test_part_a_1 PARTITION OF s1.test_part_a FOR VALUES IN (1,2,3,4,5); + ALTER TABLE s1.test_part_a_1 ADD PRIMARY KEY (a); + ALTER TABLE s1.test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; + + CREATE TABLE s1.test_part_a_2 PARTITION OF s1.test_part_a FOR VALUES IN (6,7,8,9,10); + ALTER TABLE s1.test_part_a_2 ADD PRIMARY KEY (b); + ALTER TABLE s1.test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; + + -- initial data, one row in each partition + INSERT INTO s1.test_part_a VALUES (1, 3); + INSERT INTO s1.test_part_a VALUES (6, 4); +)); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a'::regclass::oid;"); +is($result, qq(1), 'Only root table of the partitioned table is subscribed since publish_via_partition_root is enabled'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a_1'::regclass::oid OR srrelid = 's1.test_part_a_2'::regclass::oid;"); +is($result, qq(0), 'Leaf tables of the partitioned table are not subscribed since publish_via_partition_root is enabled'); + #TODO TEST certain DDLs are not replicated # Test DDL statement that rewrites table with volatile functions are not replicated $node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN volatile double precision DEFAULT 3 * random();"); -- 2.32.0