From f2a0af132a8fbcd9cae94bce713b076e4a95747e Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Mon, 18 Oct 2021 14:07:14 +0800 Subject: [PATCH v44 1/5] Allow publishing the tables of schema. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A new option "FOR ALL TABLES IN SCHEMA" in Create/Alter Publication allows one or more schemas to be specified, whose tables are selected by the publisher for sending the data to the subscriber. The new syntax allows specifying both the tables and schemas. For example: CREATE PUBLICATION pub1 FOR TABLE t1,t2,t3, ALL TABLES IN SCHEMA s1,s2; OR ALTER PUBLICATION pub1 ADD TABLE t1,t2,t3, ALL TABLES IN SCHEMA s1,s2; A new system table "pg_publication_namespace" has been added, to maintain the schemas that the user wants to publish through the publication. Modified the output plugin (pgoutput) to publish the changes if the relation is part of schema publication. CATALOG_VERSION_NO needs to be updated while committing, as this feature involves a catalog change. Author: Vignesh C, Hou Zhijie, Amit Kapila Syntax-by: Tom Lane, Álvaro Herrera, Peter Eisentraut Reviewed-by: Greg Nancarrow, Masahiko Sawada, Hou Zhijie, Amit Kapila, Haiying Tang, Ajin Cherian, Rahila Syed, Bharath Rupireddy Tested-by: Haiying Tang Discussion: https://www.postgresql.org/message-id/CALDaNm0OANxuJ6RXqwZsM1MSY4s19nuH3734j4a72etDwvBETQ%40mail.gmail.com --- src/backend/catalog/Makefile | 4 +- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 9 + src/backend/catalog/objectaddress.c | 149 ++++++ src/backend/catalog/pg_publication.c | 329 +++++++++++- src/backend/commands/alter.c | 1 + src/backend/commands/event_trigger.c | 4 + src/backend/commands/publicationcmds.c | 472 ++++++++++++++++-- src/backend/commands/seclabel.c | 1 + src/backend/commands/tablecmds.c | 28 ++ src/backend/nodes/copyfuncs.c | 22 +- src/backend/nodes/equalfuncs.c | 21 +- src/backend/parser/gram.y | 307 +++++++++--- src/backend/replication/pgoutput/pgoutput.c | 19 +- src/backend/utils/cache/relcache.c | 7 + src/backend/utils/cache/syscache.c | 23 + src/include/catalog/dependency.h | 1 + src/include/catalog/pg_publication.h | 11 +- .../catalog/pg_publication_namespace.h | 47 ++ src/include/commands/publicationcmds.h | 1 + src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 36 +- src/include/utils/syscache.h | 2 + src/test/regress/expected/oidjoins.out | 2 + src/test/regress/expected/sanity_check.out | 1 + src/tools/pgindent/typedefs.list | 4 + 26 files changed, 1384 insertions(+), 120 deletions(-) create mode 100644 src/include/catalog/pg_publication_namespace.h diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index d297e77361..4e6efda97f 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,8 +68,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_namespace.h \ + pg_publication_rel.h pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 89792b154e..ce0a4ff14e 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3427,6 +3427,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -3566,6 +3567,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_TRANSFORM: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 91c3e976e0..9f8eb1a37f 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -49,6 +49,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -178,6 +179,7 @@ static const Oid object_classes[] = { ExtensionRelationId, /* OCLASS_EXTENSION */ EventTriggerRelationId, /* OCLASS_EVENT_TRIGGER */ PolicyRelationId, /* OCLASS_POLICY */ + PublicationNamespaceRelationId, /* OCLASS_PUBLICATION_NAMESPACE */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ @@ -1456,6 +1458,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePolicyById(object->objectId); break; + case OCLASS_PUBLICATION_NAMESPACE: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_PUBLICATION_REL: RemovePublicationRelById(object->objectId); break; @@ -2850,6 +2856,9 @@ getObjectClass(const ObjectAddress *object) case PolicyRelationId: return OCLASS_POLICY; + case PublicationNamespaceRelationId: + return OCLASS_PUBLICATION_NAMESPACE; + case PublicationRelationId: return OCLASS_PUBLICATION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 8c94939baa..2bae3fbb17 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -48,6 +48,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -825,6 +826,10 @@ static const struct object_type_map { "publication", OBJECT_PUBLICATION }, + /* OCLASS_PUBLICATION_NAMESPACE */ + { + "publication namespace", OBJECT_PUBLICATION_NAMESPACE + }, /* OCLASS_PUBLICATION_REL */ { "publication relation", OBJECT_PUBLICATION_REL @@ -875,6 +880,8 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1113,6 +1120,10 @@ get_object_address(ObjectType objtype, Node *object, address = get_object_address_usermapping(castNode(List, object), missing_ok); break; + case OBJECT_PUBLICATION_NAMESPACE: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_PUBLICATION_REL: address = get_object_address_publication_rel(castNode(List, object), &relation, @@ -1935,6 +1946,49 @@ get_object_address_publication_rel(List *object, return address; } +/* + * Find the ObjectAddress for a publication schema. The first element of the + * object parameter is the schema name, the second is the publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + Publication *pub; + char *pubname; + char *schemaname; + Oid schemaid; + + ObjectAddressSet(address, PublicationNamespaceRelationId, InvalidOid); + + /* Fetch schema name and publication name from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaid = get_namespace_oid(schemaname, missing_ok); + if (!OidIsValid(schemaid)) + return address; + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache */ + address.objectId = + GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication schema \"%s\" in publication \"%s\" does not exist", + schemaname, pubname))); + + return address; +} + /* * Find the ObjectAddress for a default ACL. */ @@ -2206,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_DOMCONSTRAINT: case OBJECT_CAST: case OBJECT_USER_MAPPING: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_DEFACL: case OBJECT_TRANSFORM: @@ -2299,6 +2354,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2848,6 +2904,55 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) return tuple; } +/* + * getPublicationSchemaInfo + * + * Get publication name and schema name from the object address into pubname and + * nspname. Both pubname and nspname are palloc'd strings which will be freed by + * the caller. + */ +static bool +getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, + char **pubname, char **nspname) +{ + HeapTuple tup; + Form_pg_publication_namespace pnform; + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + return false; + } + + pnform = (Form_pg_publication_namespace) GETSTRUCT(tup); + *pubname = get_publication_name(pnform->pnpubid, missing_ok); + if (!(*pubname)) + { + ReleaseSysCache(tup); + return false; + } + + *nspname = get_namespace_name(pnform->pnnspid); + if (!(*nspname)) + { + Oid schemaid = pnform->pnnspid; + + pfree(*pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + schemaid); + return false; + } + + ReleaseSysCache(tup); + return true; +} + /* * getObjectDescription: build an object description for messages * @@ -3872,6 +3977,22 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, + &pubname, &nspname)) + break; + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + pfree(pubname); + pfree(nspname); + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; @@ -4473,6 +4594,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication"); break; + case OCLASS_PUBLICATION_NAMESPACE: + appendStringInfoString(&buffer, "publication namespace"); + break; + case OCLASS_PUBLICATION_REL: appendStringInfoString(&buffer, "publication relation"); break; @@ -5683,6 +5808,30 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, &pubname, + &nspname)) + break; + appendStringInfo(&buffer, "%s in publication %s", + nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + else + pfree(pubname); + + if (objname) + *objname = list_make1(nspname); + else + pfree(nspname); + + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9cd0c82f93..b67c95b9ae 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,7 +28,9 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" @@ -76,6 +78,30 @@ check_publication_add_relation(Relation targetrel) errdetail("Temporary and unlogged relations cannot be replicated."))); } +/* + * Check if schema can be in given publication and throws appropriate + * error if not. + */ +static void +check_publication_add_schema(Oid schemaid) +{ + /* Can't be system namespace */ + if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("This operation is not supported for system schemas."))); + + /* Can't be temporary namespace */ + if (isAnyTempNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("Temporary schemas cannot be replicated."))); +} + /* * Returns if relation represented by oid and Form_pg_class entry * is publishable. @@ -105,6 +131,45 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) relid >= FirstNormalObjectId; } +/* + * Filter out the partitions whose parent tables were also specified in + * the publication. + */ +static List * +filter_partitions(List *relids) +{ + List *result = NIL; + ListCell *lc; + ListCell *lc2; + + foreach(lc, relids) + { + bool skip = false; + List *ancestors = NIL; + Oid relid = lfirst_oid(lc); + + if (get_rel_relispartition(relid)) + ancestors = get_partition_ancestors(relid); + + foreach(lc2, ancestors) + { + /* + * Check if the parent table exists in the published table list. + */ + if (list_member_oid(relids, lfirst_oid(lc2))) + { + skip = true; + break; + } + } + + if (!skip) + result = lappend_oid(result, relid); + } + + return result; +} + /* * Another variant of this, taking a Relation. */ @@ -262,6 +327,89 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_namespace]; + bool nulls[Natts_pg_publication_namespace]; + Oid psschid; + Publication *pub = GetPublication(pubid); + List *schemaRels = NIL; + ObjectAddress myself, + referenced; + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaid), pub->name))); + } + + check_publication_add_schema(schemaid); + + /* Form a tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId, + Anum_pg_publication_namespace_oid); + values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid); + values[Anum_pg_publication_namespace_pnpubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_namespace_pnnspid - 1] = + ObjectIdGetDatum(schemaid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table */ + table_close(rel, RowExclusiveLock); + + /* + * Invalidate relcache so that publication info is rebuilt. See + * publication_add_relation for why we need to consider all the + * partitions. + */ + schemaRels = GetSchemaPublicationRelations(schemaid, + PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -428,6 +576,151 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Gets the list of schema oids for a publication. + * + * This should only be used FOR ALL TABLES IN SCHEMA publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all schemas associated with the publication */ + pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_namespace_pnpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, + PublicationNamespacePnnspidPnpubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_namespace pubsch; + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->pnnspid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + +/* + * Gets the list of publication oids associated with a specified schema. + */ +List * +GetSchemaPublications(Oid schemaid) +{ + List *result = NIL; + CatCList *pubschlist; + int i; + + /* Find all publications associated with the schema */ + pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid)); + for (i = 0; i < pubschlist->n_members; i++) + { + HeapTuple tup = &pubschlist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubschlist); + + return result; +} + +/* + * Get the list of publishable relation oids for a specified schema. + */ +List * +GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) +{ + Relation classRel; + ScanKeyData key[1]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + + Assert(OidIsValid(schemaid)); + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaid); + + /* get all the relations present in the specified schema */ + scan = table_beginscan_catalog(classRel, 1, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + char relkind; + + if (!is_publishable_class(relid, relForm)) + continue; + + relkind = get_rel_relkind(relid); + if (relkind == RELKIND_RELATION) + result = lappend_oid(result, relid); + else if (relkind == RELKIND_PARTITIONED_TABLE) + { + List *partitionrels = NIL; + + /* + * It is quite possible that some of the partitions are in a + * different schema than the parent table, so we need to get such + * partitions separately. + */ + partitionrels = GetPubPartitionOptionRelations(partitionrels, + pub_partopt, + relForm->oid); + result = list_concat_unique_oid(result, partitionrels); + } + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + return result; +} + +/* + * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA + * publication. + */ +List * +GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +{ + List *result = NIL; + List *pubschemalist = GetPublicationSchemas(pubid); + ListCell *cell; + + foreach(cell, pubschemalist) + { + Oid schemaid = lfirst_oid(cell); + List *schemaRels = NIL; + + schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt); + result = list_concat(result, schemaRels); + } + + return result; +} + /* * Get publication using oid * @@ -555,12 +848,46 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * need those. */ if (publication->alltables) + { tables = GetAllTablesPublicationRelations(publication->pubviaroot); + } else - tables = GetPublicationRelations(publication->oid, + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + + if (schemarelids) + { + /* + * If the publication publishes partition changes via their + * respective root partitioned tables, we must exclude + * partitions in favor of including the root partitioned + * tables. Otherwise, the function could return both the child + * and parent tables which could cause data of the child table + * to be double-published on the subscriber side. + * + * XXX As of now, we do this when a publication has associated + * schema or for all tables publication. See + * GetAllTablesPublicationRelations(). + */ + tables = list_concat_unique_oid(relids, schemarelids); + if (publication->pubviaroot) + tables = filter_partitions(tables); + } + else + tables = relids; + + } + funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index c47d54e96b..40044070cf 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -660,6 +660,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_EVENT_TRIGGER: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 71612d577e..df264329d8 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -973,6 +973,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_POLICY: case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROUTINE: case OBJECT_RULE: @@ -1050,6 +1051,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_EXTENSION: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -2126,6 +2128,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -2208,6 +2211,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c7f91611d..0e23028c94 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,7 +25,9 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -34,6 +36,7 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" @@ -45,11 +48,16 @@ #include "utils/syscache.h" #include "utils/varlena.h" +static List *OpenReliIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void LockSchemaList(List *schemalist); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -135,6 +143,97 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the PublicationObjSpecType list into schema oid list and rangevar + * list. + */ +static void +ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, + List **rels, List **schemas) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + + if (!pubobjspec_list) + return; + + foreach(cell, pubobjspec_list) + { + Oid schemaid; + List *search_path; + + pubobj = (PublicationObjSpec *) lfirst(cell); + + switch (pubobj->pubobjtype) + { + case PUBLICATIONOBJ_TABLE: + *rels = lappend(*rels, pubobj->pubtable); + break; + case PUBLICATIONOBJ_REL_IN_SCHEMA: + schemaid = get_namespace_oid(pubobj->name, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + case PUBLICATIONOBJ_CURRSCHEMA: + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for CURRENT_SCHEMA")); + + schemaid = linitial_oid(search_path); + list_free(search_path); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaid); + break; + default: + /* shouldn't happen */ + elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype); + break; + } + } +} + +/* + * Check if any of the given relation's schema is a member of the given schema + * list. + */ +static void +CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, + PublicationObjSpecType checkobjtype) +{ + ListCell *lc; + + foreach(lc, rels) + { + PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc); + Relation rel = pub_rel->relation; + Oid relSchemaId = RelationGetNamespace(rel); + + if (list_member_oid(schemaidlist, relSchemaId)) + { + if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(relSchemaId)), + errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", + RelationGetRelationName(rel), + get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_TABLE) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(relSchemaId), + RelationGetRelationName(rel)), + errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", + get_namespace_name(relSchemaId))); + } + } +} + /* * Create new publication. */ @@ -152,6 +251,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; + List *relations = NIL; + List *schemaidlist = NIL; /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); @@ -221,21 +322,44 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); - if (stmt->tables) - { - List *rels; - - Assert(list_length(stmt->tables) > 0); - - rels = OpenTableList(stmt->tables); - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); - } - else if (stmt->for_all_tables) + /* Associate objects with the publication. */ + if (stmt->for_all_tables) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); } + else + { + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + + /* FOR ALL TABLES IN SCHEMA requires superuser */ + if (list_length(schemaidlist) > 0 && !superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); + + if (list_length(relations) > 0) + { + List *rels; + + rels = OpenTableList(relations); + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + PublicationAddTables(puboid, rels, true, NULL); + CloseTableList(rels); + } + + if (list_length(schemaidlist) > 0) + { + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. + */ + LockSchemaList(schemaidlist); + PublicationAddSchemas(puboid, schemaidlist, true, NULL); + } + } table_close(rel, RowExclusiveLock); @@ -318,13 +442,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, } else { + List *relids = NIL; + List *schemarelids = NIL; + /* * For any partitioned tables contained in the publication, we must * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); InvalidatePublicationRels(relids); } @@ -361,28 +491,36 @@ InvalidatePublicationRels(List *relids) * Add or remove table to/from publication. */ static void -AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, - HeapTuple tup) +AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; - /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->puballtables) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL TABLES", - NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + /* + * It is quite possible that for the SET case user has not specified any + * tables in which case we need to remove all the existing tables. + */ + if (!tables && stmt->action != DEFELEM_SET) + return; - Assert(list_length(stmt->tables) > 0); + rels = OpenTableList(tables); - rels = OpenTableList(stmt->tables); + if (stmt->action == DEFELEM_ADD) + { + List *schemas = NIL; - if (stmt->tableAction == DEFELEM_ADD) + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_TABLE); PublicationAddTables(pubid, rels, false, stmt); - else if (stmt->tableAction == DEFELEM_DROP) + } + else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { @@ -391,6 +529,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, List *delrels = NIL; ListCell *oldlc; + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + /* Calculate which relations to drop. */ foreach(oldlc, oldrelids) { @@ -440,11 +581,113 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, CloseTableList(rels); } +/* + * Alter the publication schemas. + * + * Add or remove schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, + HeapTuple tup, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* + * It is quite possible that for the SET case user has not specified any + * schema in which case we need to remove all the existing schemas. + */ + if (!schemaidlist && stmt->action != DEFELEM_SET) + return; + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. + */ + LockSchemaList(schemaidlist); + if (stmt->action == DEFELEM_ADD) + { + List *rels; + List *reloids; + + reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + rels = OpenReliIdList(reloids); + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_REL_IN_SCHEMA); + + CloseTableList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + } + else if (stmt->action == DEFELEM_DROP) + PublicationDropSchemas(pubform->oid, schemaidlist, false); + else /* DEFELEM_SET */ + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + + /* Identify which schemas should be dropped */ + delschemas = list_difference_oid(oldschemaids, schemaidlist); + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. + */ + LockSchemaList(delschemas); + + /* And drop them */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); + } + + return; +} + +/* + * Check if relations and schemas can be in a given publication and throw + * appropriate error if not. + */ +static void +CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + schemaidlist && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to add or set schemas"))); + + /* + * Check that user is allowed to manipulate the publication tables in + * schema + */ + if (schemaidlist && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + + /* Check that user is allowed to manipulate the publication tables. */ + if (tables && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); +} + /* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) @@ -474,7 +717,29 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); else - AlterPublicationTables(stmt, rel, tup); + { + List *relations = NIL; + List *schemaidlist = NIL; + + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + + CheckAlterPublication(stmt, tup, relations, schemaidlist); + + /* + * Lock the publication so nobody else can do anything with it. This + * prevents concurrent alter to add table(s) that were already going + * to become part of the publication by adding corresponding schema(s) + * via this command and similarly it will prevent the concurrent + * addition of schema(s) for which there is any corresponding table + * being added by this command. + */ + LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + AccessExclusiveLock); + + AlterPublicationTables(stmt, tup, relations, schemaidlist); + AlterPublicationSchemas(stmt, tup, schemaidlist); + } /* Cleanup. */ heap_freetuple(tup); @@ -552,9 +817,90 @@ RemovePublicationById(Oid pubid) } /* - * Open relations specified by a PublicationTable list. - * In the returned list of PublicationRelInfo, tables are locked - * in ShareUpdateExclusiveLock mode in order to add them to a publication. + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid psoid) +{ + Relation rel; + HeapTuple tup; + List *schemaRels = NIL; + Form_pg_publication_namespace pubsch; + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", psoid); + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + /* + * Invalidate relcache so that publication info is rebuilt. See + * RemovePublicationRelById for why we need to consider all the + * partitions. + */ + schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, + PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + +/* + * Lock the schemas specified in the schema list in AccessShareLock mode in + * order to prevent concurrent schema deletion. + */ +static void +LockSchemaList(List *schemalist) +{ + ListCell *lc; + + foreach(lc, schemalist) + { + Oid schemaid = lfirst_oid(lc); + + /* Allow query cancel in case this takes a long time */ + CHECK_FOR_INTERRUPTS(); + LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock); + } +} + +/* + * Open relations specified by a relid list. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. + */ +static List * +OpenReliIdList(List *relids) +{ + ListCell *lc; + List *rels = NIL; + + foreach(lc, relids) + { + PublicationRelInfo *pub_rel; + Oid relid = lfirst_oid(lc); + Relation rel = table_open(relid, + ShareUpdateExclusiveLock); + + pub_rel = palloc(sizeof(PublicationRelInfo)); + pub_rel->relation = rel; + rels = lappend(rels, pub_rel); + } + + return rels; +} + +/* + * Open relations specified by a RangeVar list. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -692,6 +1038,34 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, } } +/* + * Add listed schemas to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || !stmt->for_all_tables); + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + ObjectAddress obj; + + obj = publication_add_schema(pubid, schemaid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationNamespaceRelationId, + obj.objectId, 0); + } + } +} + /* * Remove listed tables from the publication. */ @@ -727,6 +1101,40 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } } +/* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid psid; + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + + psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(psid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tables from schema \"%s\" are not part of the publication", + get_namespace_name(schemaid)))); + } + + ObjectAddressSet(obj, PublicationNamespaceRelationId, psid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + /* * Internal workhorse for changing a publication owner */ diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index 308e0adb55..53c18628a7 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -79,6 +79,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPERATOR: case OBJECT_OPFAMILY: case OBJECT_POLICY: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 1a2f159f24..857cc5ce6e 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -12286,6 +12286,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EXTENSION: case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -15994,6 +15995,33 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1); nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL); + /* + * Check that setting the relation to a different schema won't result in a + * publication having both a schema and the same schema's table, as this + * is not supported. + */ + if (stmt->objectType == OBJECT_TABLE) + { + ListCell *lc; + List *schemaPubids = GetSchemaPublications(nspOid); + List *relPubids = GetRelationPublications(RelationGetRelid(rel)); + + foreach(lc, relPubids) + { + Oid pubid = lfirst_oid(lc); + + if (list_member_oid(schemaPubids, pubid)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move table \"%s\" to schema \"%s\"", + RelationGetRelationName(rel), stmt->newschema), + errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".", + stmt->newschema, + RelationGetRelationName(rel), + get_publication_name(pubid, false))); + } + } + /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 70e9e54d3e..f3606bfd81 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4820,6 +4820,19 @@ _copyPublicationTable(const PublicationTable *from) return newnode; } +static PublicationObjSpec* +_copyPublicationObject(const PublicationObjSpec *from) +{ + PublicationObjSpec *newnode = makeNode(PublicationObjSpec); + + COPY_SCALAR_FIELD(pubobjtype); + COPY_STRING_FIELD(name); + COPY_NODE_FIELD(pubtable); + COPY_LOCATION_FIELD(location); + + return newnode; +} + static CreatePublicationStmt * _copyCreatePublicationStmt(const CreatePublicationStmt *from) { @@ -4827,7 +4840,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); return newnode; @@ -4840,9 +4853,9 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); - COPY_SCALAR_FIELD(tableAction); + COPY_SCALAR_FIELD(action); return newnode; } @@ -5890,6 +5903,9 @@ copyObjectImpl(const void *from) case T_PublicationTable: retval = _copyPublicationTable(from); break; + case T_PublicationObjSpec: + retval = _copyPublicationObject(from); + break; /* * MISCELLANEOUS NODES diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 19eff20102..abd827ed79 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2310,7 +2310,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); return true; @@ -2322,9 +2322,9 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); - COMPARE_SCALAR_FIELD(tableAction); + COMPARE_SCALAR_FIELD(action); return true; } @@ -3046,6 +3046,18 @@ _equalPartitionCmd(const PartitionCmd *a, const PartitionCmd *b) return true; } +static bool +_equalPublicationObject(const PublicationObjSpec* a, + const PublicationObjSpec* b) +{ + COMPARE_SCALAR_FIELD(pubobjtype); + COMPARE_STRING_FIELD(name); + COMPARE_NODE_FIELD(pubtable); + COMPARE_LOCATION_FIELD(location); + + return true; +} + /* * Stuff from pg_list.h */ @@ -3897,6 +3909,9 @@ equal(const void *a, const void *b) case T_PublicationTable: retval = _equalPublicationTable(a, b); break; + case T_PublicationObjSpec: + retval = _equalPublicationObject(a, b); + break; default: elog(ERROR, "unrecognized node type: %d", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 08f1bf1031..871fa20c93 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -195,12 +195,17 @@ static Node *makeXmlExpr(XmlExprOp op, char *name, List *named_args, static List *mergeTableFuncParameters(List *func_args, List *columns); static TypeName *TableFuncTypeName(List *columns); static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner); +static RangeVar *makeRangeVarFromQualifiedName(char *name, List *rels, + int location, + core_yyscan_t yyscanner); static void SplitColQualList(List *qualList, List **constraintList, CollateClause **collClause, core_yyscan_t yyscanner); static void processCASbits(int cas_bits, int location, const char *constrType, bool *deferrable, bool *initdeferred, bool *not_valid, bool *no_inherit, core_yyscan_t yyscanner); +static void preprocess_pubobj_list(List *pubobjspec_list, + core_yyscan_t yyscanner); static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %} @@ -256,6 +261,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionBoundSpec *partboundspec; RoleSpec *rolespec; + PublicationObjSpec *publicationobjectspec; struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; @@ -425,14 +431,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list publication_table_list + drop_option_list pub_obj_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables publication_table %type opt_fdw_options fdw_options %type fdw_option @@ -517,6 +522,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type table_ref %type joined_table %type relation_expr +%type extended_relation_expr %type relation_expr_opt_alias %type tablesample_clause opt_repeatable_clause %type target_el set_target insert_column_item @@ -553,6 +559,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type createdb_opt_name plassign_target %type var_value zone_value %type auth_ident RoleSpec opt_granted_by +%type PublicationObjSpec %type unreserved_keyword type_func_name_keyword %type col_name_keyword reserved_keyword @@ -9591,69 +9598,131 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] + * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] + * + * pub_obj is one of: + * + * TABLE table [, ...] + * ALL TABLES IN SCHEMA schema [, ...] * *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR ALL TABLES opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR pub_obj_list opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $6; + n->pubobjects = (List *)$5; + preprocess_pubobj_list(n->pubobjects, yyscanner); $$ = (Node *)n; } ; -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } - ; - -publication_for_tables: - FOR TABLE publication_table_list +/* + * FOR TABLE and FOR ALL TABLES IN SCHEMA specifications + * + * This rule parses publication objects with and without keyword prefixes. + * + * The actual type of the object without keyword prefix depends on the previous + * one with keyword prefix. It will be preprocessed in preprocess_pubobj_list(). + * + * For the object without keyword prefix, we cannot just use relation_expr here, + * because some extended expressions in relation_expr cannot be used as a + * schemaname and we cannot differentiate it. So, we extract the rules from + * relation_expr here. + */ +PublicationObjSpec: + TABLE relation_expr { - $$ = (Node *) $3; + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $2; } - | FOR ALL TABLES + | ALL TABLES IN_P SCHEMA ColId { - $$ = (Node *) makeInteger(true); + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + $$->name = $5; + $$->location = @5; } - ; + | ALL TABLES IN_P SCHEMA CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA; + $$->location = @5; + } + | ColId + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->name = $1; + $$->location = @1; + } + | ColId indirection + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->location = @1; + } + /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ + | extended_relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $1; + } + | CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->location = @1; + } + ; -publication_table_list: - publication_table +pub_obj_list: PublicationObjSpec { $$ = list_make1($1); } - | publication_table_list ',' publication_table - { $$ = lappend($1, $3); } - ; - -publication_table: relation_expr - { - PublicationTable *n = makeNode(PublicationTable); - n->relation = $1; - $$ = (Node *) n; - } + | pub_obj_list ',' PublicationObjSpec + { $$ = lappend($1, $3); } ; /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) * - * ALTER PUBLICATION name ADD TABLE table [, table2] + * ALTER PUBLICATION name ADD pub_obj [, ...] + * + * ALTER PUBLICATION name DROP pub_obj [, ...] * - * ALTER PUBLICATION name DROP TABLE table [, table2] + * ALTER PUBLICATION name SET pub_obj [, ...] * - * ALTER PUBLICATION name SET TABLE table [, table2] + * pub_obj is one of: + * + * TABLE table_name [, ...] + * ALL TABLES IN SCHEMA schema_name [, ...] * *****************************************************************************/ @@ -9665,28 +9734,31 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE publication_table_list + | ALTER PUBLICATION name ADD_P pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_ADD; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE publication_table_list + | ALTER PUBLICATION name SET pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_SET; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE publication_table_list + | ALTER PUBLICATION name DROP pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_DROP; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_DROP; $$ = (Node *)n; } ; @@ -12430,7 +12502,14 @@ relation_expr: $$->inh = true; $$->alias = NULL; } - | qualified_name '*' + | extended_relation_expr + { + $$ = $1; + } + ; + +extended_relation_expr: + qualified_name '*' { /* inheritance query, explicitly */ $$ = $1; @@ -15104,28 +15183,7 @@ qualified_name: } | ColId indirection { - check_qualified_name($2, yyscanner); - $$ = makeRangeVar(NULL, NULL, @1); - switch (list_length($2)) - { - case 1: - $$->catalogname = NULL; - $$->schemaname = $1; - $$->relname = strVal(linitial($2)); - break; - case 2: - $$->catalogname = $1; - $$->schemaname = strVal(linitial($2)); - $$->relname = strVal(lsecond($2)); - break; - default: - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("improper qualified name (too many dotted names): %s", - NameListToString(lcons(makeString($1), $2))), - parser_errposition(@1))); - break; - } + $$ = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); } ; @@ -17060,6 +17118,43 @@ TableFuncTypeName(List *columns) return result; } +/* + * Convert a relation_name with name and namelist to a RangeVar using + * makeRangeVar. + */ +static RangeVar * +makeRangeVarFromQualifiedName(char *name, List *namelist, int location, + core_yyscan_t yyscanner) +{ + RangeVar *r; + + check_qualified_name(namelist, yyscanner); + r = makeRangeVar(NULL, NULL, location); + + switch (list_length(namelist)) + { + case 1: + r->catalogname = NULL; + r->schemaname = name; + r->relname = strVal(linitial(namelist)); + break; + case 2: + r->catalogname = name; + r->schemaname = strVal(linitial(namelist)); + r->relname = strVal(lsecond(namelist)); + break; + default: + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("improper qualified name (too many dotted names): %s", + NameListToString(lcons(makeString(name), namelist))), + parser_errposition(location)); + break; + } + + return r; +} + /* * Convert a list of (dotted) names to a RangeVar (like * makeRangeVarFromNameList, but with position support). The @@ -17210,6 +17305,74 @@ processCASbits(int cas_bits, int location, const char *constrType, } } +/* + * Process pubobjspec_list to check for errors in any of the objects and + * convert PUBLICATIONOBJ_CONTINUATION into appropriate PublicationObjSpecType. + */ +static void +preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + PublicationObjSpecType prevobjtype = PUBLICATIONOBJ_CONTINUATION; + + if (!pubobjspec_list) + return; + + pubobj = (PublicationObjSpec *) linitial(pubobjspec_list); + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s)"), + parser_errposition(pubobj->location)); + + foreach(cell, pubobjspec_list) + { + pubobj = (PublicationObjSpec *) lfirst(cell); + + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + pubobj->pubobjtype = prevobjtype; + + if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE) + { + /* relation name or rangevar must be set for this type of object */ + if (!pubobj->name && !pubobj->pubtable) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid table name at or near"), + parser_errposition(pubobj->location)); + else if (pubobj->name) + { + /* convert it to rangevar */ + PublicationTable *pubtable = makeNode(PublicationTable); + pubtable->relation = makeRangeVar(NULL, pubobj->name, + pubobj->location); + pubobj->pubtable = pubtable; + pubobj->name = NULL; + } + } + else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA || + pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA) + { + /* + * We can distinguish between the different type of schema + * objects based on whether name and rangevar is set. + */ + if (pubobj->name) + pubobj->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + else if (!pubobj->name && !pubobj->pubtable) + pubobj->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA; + else if (!pubobj->name) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid schema name at or near"), + parser_errposition(pubobj->location)); + } + + prevobjtype = pubobj->pubobjtype; + } +} + /*---------- * Recursive view transformation * diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd93..6f6a203dea 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1068,6 +1068,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP, + rel_sync_cache_publication_cb, + (Datum) 0); } /* @@ -1146,7 +1149,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Validate the entry */ if (!entry->replicate_valid) { + Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); + + /* + * We don't acquire a lock on the namespace system table as we build + * the cache entry using a historic snapshot and all the later changes + * are absorbed while decoding WAL. + */ + List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; @@ -1203,6 +1214,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid ancestor = lfirst_oid(lc2); if (list_member_oid(GetRelationPublications(ancestor), + pub->oid) || + list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)), pub->oid)) { ancestor_published = true; @@ -1212,7 +1225,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } } - if (list_member_oid(pubids, pub->oid) || ancestor_published) + if (list_member_oid(pubids, pub->oid) || + list_member_oid(schemaPubids, pub->oid) || + ancestor_published) publish = true; } @@ -1343,7 +1358,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) } /* - * Publication relation map syscache invalidation callback + * Publication relation/schema map syscache invalidation callback */ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 13d9994af3..20a80034ab 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5447,6 +5447,7 @@ GetRelationPublicationActions(Relation relation) List *puboids; ListCell *lc; MemoryContext oldcxt; + Oid schemaid; PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); /* @@ -5462,6 +5463,9 @@ GetRelationPublicationActions(Relation relation) /* Fetch the publication membership info. */ puboids = GetRelationPublications(RelationGetRelid(relation)); + schemaid = RelationGetNamespace(relation); + puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ @@ -5474,6 +5478,9 @@ GetRelationPublicationActions(Relation relation) puboids = list_concat_unique_oid(puboids, GetRelationPublications(ancestor)); + schemaid = get_rel_namespace(ancestor); + puboids = list_concat_unique_oid(puboids, + GetSchemaPublications(schemaid)); } } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6cb78dea8..56870b46e4 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -50,6 +50,7 @@ #include "catalog/pg_partitioned_table.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" @@ -617,6 +618,28 @@ static const struct cachedesc cacheinfo[] = { }, 8 }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACE */ + PublicationNamespaceObjectIndexId, + 1, + { + Anum_pg_publication_namespace_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACEMAP */ + PublicationNamespacePnnspidPnpubidIndexId, + 2, + { + Anum_pg_publication_namespace_pnnspid, + Anum_pg_publication_namespace_pnpubid, + 0, + 0 + }, + 64 + }, {PublicationRelationId, /* PUBLICATIONOID */ PublicationObjectIndexId, 1, diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 2885f35ccd..3eca295ff4 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -122,6 +122,7 @@ typedef enum ObjectClass OCLASS_EVENT_TRIGGER, /* pg_event_trigger */ OCLASS_POLICY, /* pg_policy */ OCLASS_PUBLICATION, /* pg_publication */ + OCLASS_PUBLICATION_NAMESPACE, /* pg_publication_namespace */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 82f2536c65..f816d14b4d 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -111,13 +111,22 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetPublicationSchemas(Oid pubid); +extern List *GetSchemaPublications(Oid schemaid); +extern List *GetSchemaPublicationRelations(Oid schemaid, + PublicationPartOpt pub_partopt); +extern List *GetAllSchemaPublicationRelations(Oid puboid, + PublicationPartOpt pub_partopt); extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, + PublicationRelInfo *targetrel, bool if_not_exists); +extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, + bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_namespace.h b/src/include/catalog/pg_publication_namespace.h new file mode 100644 index 0000000000..b7e16af819 --- /dev/null +++ b/src/include/catalog/pg_publication_namespace.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_namespace.h + * definition of the system catalog for mappings between schemas and + * publications (pg_publication_namespace) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_namespace.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_NAMESPACE_H +#define PG_PUBLICATION_NAMESPACE_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_namespace_d.h" + + +/* ---------------- + * pg_publication_namespace definition. cpp turns this into + * typedef struct FormData_pg_publication_namespace + * ---------------- + */ +CATALOG(pg_publication_namespace,8901,PublicationNamespaceRelationId) +{ + Oid oid; /* oid */ + Oid pnpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ + Oid pnnspid BKI_LOOKUP(pg_namespace); /* Oid of the schema */ +} FormData_pg_publication_namespace; + +/* ---------------- + * Form_pg_publication_namespace corresponds to a pointer to a tuple with + * the format of pg_publication_namespace relation. + * ---------------- + */ +typedef FormData_pg_publication_namespace *Form_pg_publication_namespace; + +DECLARE_UNIQUE_INDEX_PKEY(pg_publication_namespace_oid_index, 8902, PublicationNamespaceObjectIndexId, on pg_publication_namespace using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_index, 8903, PublicationNamespacePnnspidPnpubidIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops)); + +#endif /* PG_PUBLICATION_NAMESPACE_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 77a299bb18..4ba68c70ee 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -26,6 +26,7 @@ extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationById(Oid pubid); extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index e0057daa06..c9b55a66f3 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -487,6 +487,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationObjSpec, T_PublicationTable, /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3138877553..42eacd530f 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -353,6 +353,29 @@ typedef struct RoleSpec int location; /* token location, or -1 if unknown */ } RoleSpec; +/* forward declaration */ +struct PublicationTable; + +/* + * Publication object type + */ +typedef enum PublicationObjSpecType +{ + PUBLICATIONOBJ_TABLE, /* Table type */ + PUBLICATIONOBJ_REL_IN_SCHEMA, /* Relations in schema type */ + PUBLICATIONOBJ_CURRSCHEMA, /* Get the first element from search_path */ + PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */ +} PublicationObjSpecType; + +typedef struct PublicationObjSpec +{ + NodeTag type; + PublicationObjSpecType pubobjtype; /* type of this publication object */ + char *name; + struct PublicationTable *pubtable; + int location; /* token location, or -1 if unknown */ +} PublicationObjSpec; + /* * FuncCall - a function or aggregate invocation * @@ -1816,6 +1839,7 @@ typedef enum ObjectType OBJECT_POLICY, OBJECT_PROCEDURE, OBJECT_PUBLICATION, + OBJECT_PUBLICATION_NAMESPACE, OBJECT_PUBLICATION_REL, OBJECT_ROLE, OBJECT_ROUTINE, @@ -3647,7 +3671,7 @@ typedef struct CreatePublicationStmt NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3659,10 +3683,14 @@ typedef struct AlterPublicationStmt /* parameters used for ALTER PUBLICATION ... WITH */ List *options; /* List of DefElem nodes */ - /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ - List *tables; /* List of tables to add/drop */ + /* + * Parameters used for ALTER PUBLICATION ... ADD/DROP/SET publication + * objects. + */ + List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ - DefElemAction tableAction; /* What action to perform with the tables */ + DefElemAction action; /* What action to perform with the + * tables/schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index d74a348600..c8cfbc30f6 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -76,6 +76,8 @@ enum SysCacheIdentifier PROCNAMEARGSNSP, PROCOID, PUBLICATIONNAME, + PUBLICATIONNAMESPACE, + PUBLICATIONNAMESPACEMAP, PUBLICATIONOID, PUBLICATIONREL, PUBLICATIONRELMAP, diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 1461e947cd..215eb899be 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -258,6 +258,8 @@ NOTICE: checking pg_transform {trftosql} => pg_proc {oid} NOTICE: checking pg_sequence {seqrelid} => pg_class {oid} NOTICE: checking pg_sequence {seqtypid} => pg_type {oid} NOTICE: checking pg_publication {pubowner} => pg_authid {oid} +NOTICE: checking pg_publication_namespace {pnpubid} => pg_publication {oid} +NOTICE: checking pg_publication_namespace {pnnspid} => pg_namespace {oid} NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 982b6aff53..d04dc66db9 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -140,6 +140,7 @@ pg_partitioned_table|t pg_policy|t pg_proc|t pg_publication|t +pg_publication_namespace|t pg_publication_rel|t pg_range|t pg_replication_origin|t diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cb5b5ec74c..4d9a9f0d4a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -778,6 +778,7 @@ FormData_pg_partitioned_table FormData_pg_policy FormData_pg_proc FormData_pg_publication +FormData_pg_publication_namespace FormData_pg_publication_rel FormData_pg_range FormData_pg_replication_origin @@ -834,6 +835,7 @@ Form_pg_partitioned_table Form_pg_policy Form_pg_proc Form_pg_publication +Form_pg_publication_namespace Form_pg_publication_rel Form_pg_range Form_pg_replication_origin @@ -2046,6 +2048,8 @@ PsqlSettings Publication PublicationActions PublicationInfo +PublicationObjSpec +PublicationObjSpecType PublicationPartOpt PublicationRelInfo PublicationTable -- 2.30.2