From 4eafa47f2751076c6caaa4f16011d1953a869da4 Mon Sep 17 00:00:00 2001 From: amitlan Date: Thu, 2 Dec 2021 12:12:51 +0900 Subject: [PATCH] wip: don't add partition to publication if parent present --- src/backend/catalog/pg_publication.c | 77 +++++++++++++++--- src/backend/commands/publicationcmds.c | 96 ++++++++++++++++++----- src/backend/commands/tablecmds.c | 48 ++++++++++++ src/backend/nodes/list.c | 26 ++++++ src/include/commands/publicationcmds.h | 1 + src/include/nodes/pg_list.h | 3 +- src/test/regress/expected/publication.out | 43 +++++++++- src/test/regress/sql/publication.sql | 25 +++++- 8 files changed, 286 insertions(+), 33 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 63579b2f82..e98f575dcc 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -253,6 +253,42 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, return result; } +/* + * Checks if the relation being added or its ancestor is already present in the + * publication. If so, error out if asked to do so. + */ +static bool +publication_relation_exists(Publication *pub, + Oid relid, Oid ancestor_oid, + bool error_if_exists) +{ + if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid))) + { + if (error_if_exists) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("relation \"%s\" is already member of publication \"%s\"", + get_rel_name(relid), pub->name))); + return true; + } + else if (OidIsValid(ancestor_oid) && + SearchSysCacheExists2(PUBLICATIONRELMAP, + ObjectIdGetDatum(ancestor_oid), + ObjectIdGetDatum(pub->oid))) + { + if (error_if_exists) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("ancestor \"%s\" of relation \"%s\" is already member of publication \"%s\"", + get_rel_name(ancestor_oid), get_rel_name(relid), + pub->name))); + return true; + } + + return false; +} + /* * Insert new publication / relation mapping. */ @@ -271,25 +307,42 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, referenced; List *relids = NIL; + /* If table already present in the publication, don't add it again. */ + if (publication_relation_exists(pub, relid, InvalidOid, !if_not_exists)) + return InvalidObjectAddress; + rel = table_open(PublicationRelRelationId, RowExclusiveLock); /* - * Check for duplicates. Note that this does not really prevent - * duplicates, it's here just to provide nicer error message in common - * case. The real protection is the unique key on the catalog. + * If a partition's ancestor is already present in the publication, don't + * add it. Partition will be implicitly considered to be a part of the + * publication via the ancestor. */ - if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), - ObjectIdGetDatum(pubid))) + if (targetrel->relation->rd_rel->relispartition) { - table_close(rel, RowExclusiveLock); + List *ancestors = get_partition_ancestors(relid); + ListCell *lc; + bool skip_add = false; - if (if_not_exists) - return InvalidObjectAddress; + foreach(lc, ancestors) + { + Oid ancestor_oid = lfirst_oid(lc); - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel->relation), pub->name))); + if (publication_relation_exists(pub, relid, ancestor_oid, + !if_not_exists)) + { + skip_add = true; + break; + } + } + + list_free(ancestors); + + if (skip_add) + { + table_close(rel, RowExclusiveLock); + return InvalidObjectAddress; + } } check_publication_add_relation(targetrel->relation); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 7d4a0e95f6..d1fc307c95 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -899,6 +899,7 @@ OpenTableList(List *tables) List *relids = NIL; List *rels = NIL; ListCell *lc; + bool contains_partition = false; /* * Open, share-lock, and check all the explicitly-specified relations @@ -917,6 +918,9 @@ OpenTableList(List *tables) rel = table_openrv(t->relation, ShareUpdateExclusiveLock); myrelid = RelationGetRelid(rel); + if (rel->rd_rel->relispartition) + contains_partition = true; + /* * Filter out duplicates if user specifies "foo, foo". * @@ -973,6 +977,52 @@ OpenTableList(List *tables) } } + /* + * De-duplicate partitions. + * + * This is to handle the case where a user inadvertently specifies + * "foo, partition_of_foo" or "partition_of_foo, foo". We'd only need to + * add the parent table "foo" in either of those cases. + */ + if (list_length(relids) > 1 && contains_partition) + { + List *new_rels = NIL; + + foreach(lc, rels) + { + PublicationRelInfo *pubrel = lfirst(lc); + Relation rel = pubrel->relation; + bool skip_rel = false; + + if (rel->rd_rel->relispartition) + { + List *ancestors = get_partition_ancestors(RelationGetRelid(rel)); + ListCell *l; + + foreach(l, ancestors) + { + Oid ancestor_oid = lfirst_oid(l); + + if (list_member_oid(relids, ancestor_oid)) + { + table_close(rel, ShareUpdateExclusiveLock); + skip_rel = true; + } + } + + list_free(ancestors); + } + + if (!skip_rel) + new_rels = lappend(new_rels, pubrel); + else + pfree(pubrel); + } + + list_free(rels); + rels = new_rels; + } + list_free(relids); return rels; @@ -1058,15 +1108,40 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, } } +/* + * Removes the relation's membership in given publication by dropping the + * corresponding pg_publication_rel entry. + */ +void +RemovePublicationRel(Oid pubid, Oid relid, bool missing_ok) +{ + ObjectAddress obj; + Oid pubrelid = GetSysCacheOid2(PUBLICATIONRELMAP, + Anum_pg_publication_rel_oid, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(pubrelid)) + { + if (missing_ok) + return; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("relation \"%s\" is not part of the publication", + get_rel_name(relid)))); + } + + ObjectAddressSet(obj, PublicationRelRelationId, pubrelid); + performDeletion(&obj, DROP_CASCADE, 0); +} + /* * Remove listed tables from the publication. */ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok) { - ObjectAddress obj; ListCell *lc; - Oid prid; foreach(lc, rels) { @@ -1074,22 +1149,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); - prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, - ObjectIdGetDatum(relid), - ObjectIdGetDatum(pubid)); - if (!OidIsValid(prid)) - { - if (missing_ok) - continue; - - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("relation \"%s\" is not part of the publication", - RelationGetRelationName(rel)))); - } - - ObjectAddressSet(obj, PublicationRelRelationId, prid); - performDeletion(&obj, DROP_CASCADE, 0); + RemovePublicationRel(pubid, relid, missing_ok); } } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c35f09998c..7117ba7539 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -52,6 +52,7 @@ #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/policy.h" +#include "commands/publicationcmds.h" #include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" @@ -586,6 +587,8 @@ static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel, List *partConstraint, bool validate_default); static void CloneRowTriggersToPartition(Relation parent, Relation partition); +static void AttachPartitionRemoveDuplicatePublications(Relation parentrel, + Relation attachrel); static void DetachAddConstraintIfNeeded(List **wqueue, Relation partRel); static void DropClonedTriggersFromPartition(Oid partitionId); static ObjectAddress ATExecDetachPartition(List **wqueue, AlteredTableInfo *tab, @@ -17471,6 +17474,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, */ CloneForeignKeyConstraints(wqueue, rel, attachrel); + /* + * Check if the partition is in publications that the parent or some + * other higher-level ancestor is also present in. + */ + AttachPartitionRemoveDuplicatePublications(rel, attachrel); + /* * Generate partition constraint from the partition bound specification. * If the parent itself is a partition, make sure to include its @@ -17893,6 +17902,45 @@ CloneRowTriggersToPartition(Relation parent, Relation partition) table_close(pg_trigger, RowExclusiveLock); } +/* + * Removes a table being attached to the parent relation from any publications + * in which the parent relation or its ancestor is already present. + * + * This it to maintain the invariant that a partition is not present + * duplicatively in the publications where one of its partition ancestors is + * already present. The partition will be implicitly present in the + * publication after the successful ATTACH, because the parent or its ancestor + * is present. + */ +static void +AttachPartitionRemoveDuplicatePublications(Relation parentrel, + Relation attachrel) +{ + List *attachrel_pubs; + ListCell *lc; + Oid attachrelid = RelationGetRelid(attachrel); + Oid parentid = RelationGetRelid(parentrel); + List *ancestors = list_make1_oid(parentid); + + if (parentrel->rd_rel->relispartition) + ancestors = list_concat(ancestors, get_partition_ancestors(parentid)); + + attachrel_pubs = GetRelationPublications(RelationGetRelid(attachrel)); + foreach(lc, attachrel_pubs) + { + Oid pubid = lfirst_oid(lc); + List *pub_rels = GetPublicationRelations(pubid, + PUBLICATION_PART_ROOT); + + if (list_intersection_oid(pub_rels, ancestors) != NIL) + RemovePublicationRel(pubid, attachrelid, false); + list_free(pub_rels); + } + + list_free(ancestors); + list_free(attachrel_pubs); +} + /* * ALTER TABLE DETACH PARTITION * diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index 410ffe0835..27d4be2758 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -1178,6 +1178,32 @@ list_intersection_int(const List *list1, const List *list2) return result; } +/* + * As list_intersection but operates on lists of oids. + */ +List * +list_intersection_oid(const List *list1, const List *list2) +{ + List *result; + const ListCell *cell; + + if (list1 == NIL || list2 == NIL) + return NIL; + + Assert(IsOidList(list1)); + Assert(IsOidList(list2)); + + result = NIL; + foreach(cell, list1) + { + if (list_member_oid(list2, lfirst_oid(cell))) + result = lappend_oid(result, lfirst_oid(cell)); + } + + check_list_invariants(result); + return result; +} + /* * Return a list that contains all the cells in list1 that are not in * list2. The returned list is freshly allocated via palloc(), but the diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 4ba68c70ee..397675ba7b 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -25,6 +25,7 @@ extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt); extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationById(Oid pubid); +extern void RemovePublicationRel(Oid pubid, Oid relid, bool missing_ok); extern void RemovePublicationRelById(Oid proid); extern void RemovePublicationSchemaById(Oid psoid); diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index c3f47db888..69d78793a2 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -575,8 +575,9 @@ extern List *list_union_oid(const List *list1, const List *list2); extern List *list_intersection(const List *list1, const List *list2); extern List *list_intersection_int(const List *list1, const List *list2); +extern List *list_intersection_oid(const List *list1, const List *list2); -/* currently, there's no need for list_intersection_ptr etc */ +/* currently, there's no need for list_intersection_ptr */ extern List *list_difference(const List *list1, const List *list2); extern List *list_difference_ptr(const List *list1, const List *list2); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 1feb558968..f5a15dfcc1 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -237,8 +237,49 @@ HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted; -- works again, because update is no longer replicated UPDATE testpub_parted2 SET a = 2; -DROP TABLE testpub_parted1, testpub_parted2; DROP PUBLICATION testpub_forparted, testpub_forparted1; +-- test behavior where a partition is added to publication where the parent +-- is already present +ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted FOR TABLE testpub_parted1, testpub_parted, testpub_parted2; +RESET client_min_messages; +-- must show only testpub_parted +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_parted" + +ALTER TABLE testpub_parted DETACH PARTITION testpub_parted2; +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2; +-- must show only testpub_parted, testpub_parted2 +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_parted" + "public.testpub_parted2" + +ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2); +-- must show only testpub_parted +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_parted" + +-- errors because parent already in publication +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2; +ERROR: ancestor "testpub_parted" of relation "testpub_parted2" is already member of publication "testpub_forparted" +DROP PUBLICATION testpub_forparted; +DROP TABLE testpub_parted1, testpub_parted2; -- Test cache invalidation FOR ALL TABLES publication SET client_min_messages = 'ERROR'; CREATE TABLE testpub_tbl4(a int); diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 8fa0435c32..ca5708335f 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -131,9 +131,32 @@ UPDATE testpub_parted2 SET a = 2; ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted; -- works again, because update is no longer replicated UPDATE testpub_parted2 SET a = 2; -DROP TABLE testpub_parted1, testpub_parted2; + DROP PUBLICATION testpub_forparted, testpub_forparted1; +-- test behavior where a partition is added to publication where the parent +-- is already present +ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted FOR TABLE testpub_parted1, testpub_parted, testpub_parted2; +RESET client_min_messages; +-- must show only testpub_parted +\dRp+ testpub_forparted +ALTER TABLE testpub_parted DETACH PARTITION testpub_parted2; +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2; +-- must show only testpub_parted, testpub_parted2 +\dRp+ testpub_forparted +ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2); +-- must show only testpub_parted +\dRp+ testpub_forparted +-- errors because parent already in publication +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted2; + +DROP PUBLICATION testpub_forparted; +DROP TABLE testpub_parted1, testpub_parted2; + + + -- Test cache invalidation FOR ALL TABLES publication SET client_min_messages = 'ERROR'; CREATE TABLE testpub_tbl4(a int); -- 2.24.1