From 4ab8be0e2b4e7d8183e6b74d0f37778e6a489c50 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Thu, 23 Sep 2021 13:40:27 +0800 Subject: [PATCH v33 1/6] Added schema level support for publication. This patch adds schema-level support for publication. A new option "FOR ALL TABLES IN SCHEMA" allows one or more schemas to be specified, whose tables are selected by the publisher for sending the data to the subscriber. A new system table "pg_publication_namespace" has been added, to maintain the schemas that the user wants to publish through the publication. The schema/publication/publication_namespace dependency was created to handle the corresponding renaming/removal of schemas to the publication/publication_namespace when the schema is renamed/dropped. The Decoder identifies if the relation is part of the publication and replicates it to the subscriber. CATALOG_VERSION_NO needs to be updated while committing, as this feature involves a catalog change. --- src/backend/catalog/Makefile | 4 +- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 9 + src/backend/catalog/objectaddress.c | 149 ++++++ src/backend/catalog/pg_publication.c | 296 ++++++++++- src/backend/commands/alter.c | 1 + src/backend/commands/event_trigger.c | 4 + src/backend/commands/publicationcmds.c | 468 +++++++++++++++--- src/backend/commands/seclabel.c | 1 + src/backend/commands/tablecmds.c | 4 +- src/backend/nodes/copyfuncs.c | 20 +- src/backend/nodes/equalfuncs.c | 15 +- src/backend/parser/gram.y | 266 +++++++--- src/backend/replication/pgoutput/pgoutput.c | 17 +- src/backend/utils/cache/relcache.c | 7 + src/backend/utils/cache/syscache.c | 23 + src/include/catalog/dependency.h | 1 + src/include/catalog/pg_publication.h | 15 +- .../catalog/pg_publication_namespace.h | 47 ++ src/include/commands/publicationcmds.h | 1 + src/include/nodes/nodes.h | 2 +- src/include/nodes/parsenodes.h | 36 +- src/include/utils/syscache.h | 2 + src/test/regress/expected/oidjoins.out | 2 + src/test/regress/expected/sanity_check.out | 1 + src/tools/pgindent/typedefs.list | 5 +- 26 files changed, 1225 insertions(+), 173 deletions(-) create mode 100644 src/include/catalog/pg_publication_namespace.h diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index d297e77361..4e6efda97f 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,8 +68,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_namespace.h \ + pg_publication_rel.h pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 89792b154e..ce0a4ff14e 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3427,6 +3427,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -3566,6 +3567,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFAULT: case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_TRANSFORM: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 91c3e976e0..9f8eb1a37f 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -49,6 +49,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -178,6 +179,7 @@ static const Oid object_classes[] = { ExtensionRelationId, /* OCLASS_EXTENSION */ EventTriggerRelationId, /* OCLASS_EVENT_TRIGGER */ PolicyRelationId, /* OCLASS_POLICY */ + PublicationNamespaceRelationId, /* OCLASS_PUBLICATION_NAMESPACE */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ @@ -1456,6 +1458,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePolicyById(object->objectId); break; + case OCLASS_PUBLICATION_NAMESPACE: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_PUBLICATION_REL: RemovePublicationRelById(object->objectId); break; @@ -2850,6 +2856,9 @@ getObjectClass(const ObjectAddress *object) case PolicyRelationId: return OCLASS_POLICY; + case PublicationNamespaceRelationId: + return OCLASS_PUBLICATION_NAMESPACE; + case PublicationRelationId: return OCLASS_PUBLICATION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 8c94939baa..2bae3fbb17 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -48,6 +48,7 @@ #include "catalog/pg_policy.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" @@ -825,6 +826,10 @@ static const struct object_type_map { "publication", OBJECT_PUBLICATION }, + /* OCLASS_PUBLICATION_NAMESPACE */ + { + "publication namespace", OBJECT_PUBLICATION_NAMESPACE + }, /* OCLASS_PUBLICATION_REL */ { "publication relation", OBJECT_PUBLICATION_REL @@ -875,6 +880,8 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1113,6 +1120,10 @@ get_object_address(ObjectType objtype, Node *object, address = get_object_address_usermapping(castNode(List, object), missing_ok); break; + case OBJECT_PUBLICATION_NAMESPACE: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_PUBLICATION_REL: address = get_object_address_publication_rel(castNode(List, object), &relation, @@ -1935,6 +1946,49 @@ get_object_address_publication_rel(List *object, return address; } +/* + * Find the ObjectAddress for a publication schema. The first element of the + * object parameter is the schema name, the second is the publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + Publication *pub; + char *pubname; + char *schemaname; + Oid schemaid; + + ObjectAddressSet(address, PublicationNamespaceRelationId, InvalidOid); + + /* Fetch schema name and publication name from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaid = get_namespace_oid(schemaname, missing_ok); + if (!OidIsValid(schemaid)) + return address; + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache */ + address.objectId = + GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication schema \"%s\" in publication \"%s\" does not exist", + schemaname, pubname))); + + return address; +} + /* * Find the ObjectAddress for a default ACL. */ @@ -2206,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_DOMCONSTRAINT: case OBJECT_CAST: case OBJECT_USER_MAPPING: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_DEFACL: case OBJECT_TRANSFORM: @@ -2299,6 +2354,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2848,6 +2904,55 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) return tuple; } +/* + * getPublicationSchemaInfo + * + * Get publication name and schema name from the object address into pubname and + * nspname. Both pubname and nspname are palloc'd strings which will be freed by + * the caller. + */ +static bool +getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, + char **pubname, char **nspname) +{ + HeapTuple tup; + Form_pg_publication_namespace pnform; + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + return false; + } + + pnform = (Form_pg_publication_namespace) GETSTRUCT(tup); + *pubname = get_publication_name(pnform->pnpubid, missing_ok); + if (!(*pubname)) + { + ReleaseSysCache(tup); + return false; + } + + *nspname = get_namespace_name(pnform->pnnspid); + if (!(*nspname)) + { + Oid schemaid = pnform->pnnspid; + + pfree(*pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + schemaid); + return false; + } + + ReleaseSysCache(tup); + return true; +} + /* * getObjectDescription: build an object description for messages * @@ -3872,6 +3977,22 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, + &pubname, &nspname)) + break; + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + pfree(pubname); + pfree(nspname); + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; @@ -4473,6 +4594,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication"); break; + case OCLASS_PUBLICATION_NAMESPACE: + appendStringInfoString(&buffer, "publication namespace"); + break; + case OCLASS_PUBLICATION_REL: appendStringInfoString(&buffer, "publication relation"); break; @@ -5683,6 +5808,30 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_NAMESPACE: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, &pubname, + &nspname)) + break; + appendStringInfo(&buffer, "%s in publication %s", + nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + else + pfree(pubname); + + if (objname) + *objname = list_make1(nspname); + else + pfree(nspname); + + break; + } + case OCLASS_PUBLICATION_REL: { HeapTuple tup; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9cd0c82f93..04e785b192 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,7 +28,9 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" @@ -38,7 +40,6 @@ #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" -#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -76,6 +77,30 @@ check_publication_add_relation(Relation targetrel) errdetail("Temporary and unlogged relations cannot be replicated."))); } +/* + * Check if schema can be in given publication and throws appropriate + * error if not. + */ +static void +check_publication_add_schema(Oid schemaid) +{ + /* Can't be system namespace */ + if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("This operation is not supported for system schemas."))); + + /* Can't be temporary namespace */ + if (isAnyTempNamespace(schemaid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(schemaid)), + errdetail("Temporary schemas cannot be replicated."))); +} + /* * Returns if relation represented by oid and Form_pg_class entry * is publishable. @@ -152,7 +177,7 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, NULL); if (pub_partopt == PUBLICATION_PART_ALL) - result = list_concat(result, all_parts); + result = list_concat_unique_oid(result, all_parts); else if (pub_partopt == PUBLICATION_PART_LEAF) { ListCell *lc; @@ -162,14 +187,14 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid partOid = lfirst_oid(lc); if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE) - result = lappend_oid(result, partOid); + result = list_append_unique_oid(result, partOid); } } else Assert(false); } else - result = lappend_oid(result, relid); + result = list_append_unique_oid(result, relid); return result; } @@ -178,14 +203,14 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, +publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel->relation); + Oid relid = RelationGetRelid(targetrel); Oid prrelid; Publication *pub = GetPublication(pubid); ObjectAddress myself, @@ -210,10 +235,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel->relation), pub->name))); + RelationGetRelationName(targetrel), pub->name))); } - check_publication_add_relation(targetrel->relation); + check_publication_add_relation(targetrel); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -262,6 +287,84 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_namespace]; + bool nulls[Natts_pg_publication_namespace]; + Oid psschid; + Publication *pub = GetPublication(pubid); + List *schemaRels = NIL; + ObjectAddress myself, + referenced; + + check_publication_add_schema(schemaid); + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaid), pub->name))); + } + + /* Form a tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId, + Anum_pg_publication_namespace_oid); + values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid); + values[Anum_pg_publication_namespace_pnpubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_namespace_pnnspid - 1] = + ObjectIdGetDatum(schemaid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table */ + table_close(rel, RowExclusiveLock); + + /* Invalidate relcache so that publication info is rebuilt. */ + schemaRels = GetSchemaPublicationRelations(schemaid, PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -328,6 +431,73 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } +/* + * Gets the list of schema oids for a publication. + * + * This should only be used FOR ALL TABLES IN SCHEMA publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the schema */ + pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_namespace_pnpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, + PublicationNamespacePnnspidPnpubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_namespace pubsch; + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->pnnspid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + + +/* + * Gets the list of publication oids associated with a specified schema. + */ +List * +GetSchemaPublications(Oid schemaid) +{ + List *result = NIL; + CatCList *pubschlist; + int i; + + /* Find all publications associated with the schema */ + pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid)); + for (i = 0; i < pubschlist->n_members; i++) + { + HeapTuple tup = &pubschlist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubschlist); + + return result; +} + /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ @@ -366,7 +536,7 @@ GetAllTablesPublications(void) } /* - * Gets list of all relation published by FOR ALL TABLES publication(s). + * Gets the list of relations published. * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the @@ -428,6 +598,100 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Get the list of publishable relation oids for a specified schema. + */ +List * +GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) +{ + Relation classRel; + ScanKeyData key[3]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + int keycount = 0; + + Assert(OidIsValid(schemaid)); + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_RELATION)); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaid); + + /* get all the relations present in the given schema */ + scan = table_beginscan_catalog(classRel, keycount, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm)) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + + keycount = 0; + ScanKeyInit(&key[keycount++], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_PARTITIONED_TABLE)); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaid); + + /* + * It is quite possible that some of the partitions are in a different + * schema than the parent table, so we need to get such partitions + * separately. + */ + scan = table_beginscan_catalog(classRel, keycount, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + + if (is_publishable_class(relForm->oid, relForm)) + result = GetPubPartitionOptionRelations(result, pub_partopt, + relForm->oid); + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + return result; +} + +/* + * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA + * publication. + */ +List * +GetAllSchemaPublicationRelations(Oid puboid, PublicationPartOpt pub_partopt) +{ + List *result = NIL; + List *pubschemalist = GetPublicationSchemas(puboid); + ListCell *cell; + + foreach(cell, pubschemalist) + { + Oid schemaid = lfirst_oid(cell); + List *schemaRels = NIL; + + schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt); + result = list_concat(result, schemaRels); + } + + return result; +} + /* * Get publication using oid * @@ -557,10 +821,22 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) if (publication->alltables) tables = GetAllTablesPublicationRelations(publication->pubviaroot); else - tables = GetPublicationRelations(publication->oid, + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + + tables = list_concat_unique_oid(relids, schemarelids); + } + funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index c47d54e96b..40044070cf 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -660,6 +660,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_EVENT_TRIGGER: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 71612d577e..df264329d8 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -973,6 +973,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_POLICY: case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROUTINE: case OBJECT_RULE: @@ -1050,6 +1051,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_EXTENSION: case OCLASS_POLICY: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -2126,6 +2128,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: @@ -2208,6 +2211,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_ROLE: case OBJECT_RULE: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c7f91611d..c59cd5ad49 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,7 +25,9 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -34,22 +36,28 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" -#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" +static List *OpenReliIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void LockSchemaList(List *schemalist); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -135,6 +143,90 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the PublicationObjSpecType list into schema oid list and rangevar + * list. + */ +static void +ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, + List **rels, List **schemas) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + + if (!pubobjspec_list) + return; + + foreach(cell, pubobjspec_list) + { + pubobj = (PublicationObjSpec *) lfirst(cell); + if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE) + *rels = lappend(*rels, pubobj->object); + else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA) + { + Oid schemaid; + char *schemaname; + + schemaname = strVal(pubobj->object); + if (strcmp(schemaname, "CURRENT_SCHEMA") == 0) + { + List *search_path; + + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected for CURRENT_SCHEMA")); + + schemaid = linitial_oid(search_path); + list_free(search_path); + } + else + schemaid = get_namespace_oid(schemaname, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaid); + } + } +} + +/* + * Check if any of the given relation's schema is a member of the given schema + * list. + */ +static void +CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, + PublicationObjSpecType checkobjtype) +{ + ListCell *lc; + + foreach(lc, rels) + { + Relation rel = (Relation) lfirst(lc); + Oid relSchemaId = RelationGetNamespace(rel); + + if (list_member_oid(schemaidlist, relSchemaId)) + { + if (checkobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema \"%s\" to publication", + get_namespace_name(relSchemaId)), + errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", + RelationGetRelationName(rel), + get_namespace_name(relSchemaId))); + else if (checkobjtype == PUBLICATIONOBJ_TABLE) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(relSchemaId), + RelationGetRelationName(rel)), + errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", + get_namespace_name(relSchemaId))); + } + } +} + /* * Create new publication. */ @@ -152,6 +244,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; + List *relations = NIL; + List *schemaidlist = NIL; /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); @@ -221,17 +315,36 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); - if (stmt->tables) + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + if (relations != NIL) { List *rels; - Assert(list_length(stmt->tables) > 0); + Assert(list_length(relations) > 0); - rels = OpenTableList(stmt->tables); + rels = OpenTableList(relations); + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); PublicationAddTables(puboid, rels, true, NULL); CloseTableList(rels); } - else if (stmt->for_all_tables) + + if (schemaidlist != NIL) + { + /* FOR ALL TABLES IN SCHEMA requires superuser */ + if (!superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); + + Assert(list_length(schemaidlist) > 0); + + LockSchemaList(schemaidlist); + PublicationAddSchemas(puboid, schemaidlist, true, NULL); + } + + if (stmt->for_all_tables) { /* Invalidate relcache so that publication info is rebuilt. */ CacheInvalidateRelcacheAll(); @@ -318,13 +431,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, } else { + List *relids = NIL; + List *schemarelids = NIL; + /* * For any partitioned tables contained in the publication, we must * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); InvalidatePublicationRels(relids); } @@ -361,28 +480,32 @@ InvalidatePublicationRels(List *relids) * Add or remove table to/from publication. */ static void -AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, - HeapTuple tup) +AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; - /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->puballtables) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL TABLES", - NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + if (!tables && stmt->action != DEFELEM_SET) + return; - Assert(list_length(stmt->tables) > 0); + rels = OpenTableList(tables); - rels = OpenTableList(stmt->tables); + if (stmt->action == DEFELEM_ADD) + { + List *schemas = NIL; - if (stmt->tableAction == DEFELEM_ADD) + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_TABLE); PublicationAddTables(pubid, rels, false, stmt); - else if (stmt->tableAction == DEFELEM_DROP) + } + else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { @@ -391,6 +514,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, List *delrels = NIL; ListCell *oldlc; + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + /* Calculate which relations to drop. */ foreach(oldlc, oldrelids) { @@ -400,10 +526,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, foreach(newlc, rels) { - PublicationRelInfo *newpubrel; + Relation newrel = (Relation) lfirst(newlc); - newpubrel = (PublicationRelInfo *) lfirst(newlc); - if (RelationGetRelid(newpubrel->relation) == oldrelid) + if (RelationGetRelid(newrel) == oldrelid) { found = true; break; @@ -412,16 +537,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, /* Not yet in the list, open it and add to the list */ if (!found) { - Relation oldrel; - PublicationRelInfo *pubrel; + Relation oldrel = table_open(oldrelid, + ShareUpdateExclusiveLock); - /* Wrap relation into PublicationRelInfo */ - oldrel = table_open(oldrelid, ShareUpdateExclusiveLock); - - pubrel = palloc(sizeof(PublicationRelInfo)); - pubrel->relation = oldrel; - - delrels = lappend(delrels, pubrel); + delrels = lappend(delrels, oldrel); } } @@ -440,11 +559,102 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, CloseTableList(rels); } +/* + * Alter the publication schemas. + * + * Add/Remove/Set all tables from schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, + HeapTuple tup, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + if (!schemaidlist && stmt->action != DEFELEM_SET) + return; + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. No need to unlock the schemas, the locks + * will be released automatically at the end of alter publication command. + */ + LockSchemaList(schemaidlist); + if (stmt->action == DEFELEM_ADD) + { + List *rels; + List *reloids; + + reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + rels = OpenReliIdList(reloids); + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_REL_IN_SCHEMA); + + CloseTableList(rels); + PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + } + else if (stmt->action == DEFELEM_DROP) + PublicationDropSchemas(pubform->oid, schemaidlist, false); + else /* DEFELEM_SET */ + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + + /* Identify which schemas should be dropped */ + delschemas = list_difference_oid(oldschemaids, schemaidlist); + LockSchemaList(delschemas); + + /* And drop them */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); + } + + return; +} + +/* + * Check if relations and schemas can be in given publication and throws + * appropriate error if not. + */ +static void +CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + schemaidlist && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to add or set schemas"))); + + /* Check that user is allowed to manipulate the publication tables in schema */ + if (schemaidlist && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + + /* Check that user is allowed to manipulate the publication tables. */ + if (tables && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); +} + /* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) @@ -474,7 +684,22 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); else - AlterPublicationTables(stmt, rel, tup); + { + List *relations = NIL; + List *schemaidlist = NIL; + + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + + CheckAlterPublication(stmt, tup, relations, schemaidlist); + + /* Lock the publication so nobody else can do anything with it. */ + LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + AccessExclusiveLock); + + AlterPublicationTables(stmt, tup, relations, schemaidlist); + AlterPublicationSchemas(stmt, tup, schemaidlist); + } /* Cleanup. */ heap_freetuple(tup); @@ -538,7 +763,7 @@ RemovePublicationById(Oid pubid) if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for publication %u", pubid); - pubform = (Form_pg_publication)GETSTRUCT(tup); + pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate relcache so that publication info is rebuilt. */ if (pubform->puballtables) @@ -552,9 +777,84 @@ RemovePublicationById(Oid pubid) } /* - * Open relations specified by a PublicationTable list. - * In the returned list of PublicationRelInfo, tables are locked - * in ShareUpdateExclusiveLock mode in order to add them to a publication. + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid psoid) +{ + Relation rel; + HeapTuple tup; + List *schemaRels = NIL; + Form_pg_publication_namespace pubsch; + + rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", psoid); + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + /* Invalidate relcache so that publication info is rebuilt. */ + schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, + PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + +/* + * The schemas specified in the schema list are locked in AccessShareLock mode + * in order to prevent concurrent schema deletion. No need to unlock the + * schemas, the locks will be released at the end of the command. + */ +static void +LockSchemaList(List *schemalist) +{ + ListCell *lc; + + foreach(lc, schemalist) + { + Oid schemaid = lfirst_oid(lc); + + /* Allow query cancel in case this takes a long time */ + CHECK_FOR_INTERRUPTS(); + LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock); + } +} + +/* + * Open relations specified by a relid list. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. + */ +static List * +OpenReliIdList(List *relids) +{ + ListCell *lc; + List *rels = NIL; + + foreach(lc, relids) + { + Oid relid = lfirst_oid(lc); + Relation rel = table_open(relid, + ShareUpdateExclusiveLock); + + rels = lappend(rels, rel); + } + + return rels; +} + +/* + * Open relations specified by a RangeVar list. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -568,16 +868,15 @@ OpenTableList(List *tables) */ foreach(lc, tables) { - PublicationTable *t = lfirst_node(PublicationTable, lc); - bool recurse = t->relation->inh; + RangeVar *rv = lfirst_node(RangeVar, lc); + bool recurse = rv->inh; Relation rel; Oid myrelid; - PublicationRelInfo *pub_rel; /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); - rel = table_openrv(t->relation, ShareUpdateExclusiveLock); + rel = table_openrv(rv, ShareUpdateExclusiveLock); myrelid = RelationGetRelid(rel); /* @@ -593,9 +892,7 @@ OpenTableList(List *tables) continue; } - pub_rel = palloc(sizeof(PublicationRelInfo)); - pub_rel->relation = rel; - rels = lappend(rels, pub_rel); + rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); /* @@ -628,9 +925,7 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); - pub_rel = palloc(sizeof(PublicationRelInfo)); - pub_rel->relation = rel; - rels = lappend(rels, pub_rel); + rels = lappend(rels, rel); relids = lappend_oid(relids, childrelid); } } @@ -651,10 +946,9 @@ CloseTableList(List *rels) foreach(lc, rels) { - PublicationRelInfo *pub_rel; + Relation rel = (Relation) lfirst(lc); - pub_rel = (PublicationRelInfo *) lfirst(lc); - table_close(pub_rel->relation, NoLock); + table_close(rel, NoLock); } } @@ -671,8 +965,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, foreach(lc, rels) { - PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc); - Relation rel = pub_rel->relation; + Relation rel = (Relation) lfirst(lc); ObjectAddress obj; /* Must be owner of the table or superuser. */ @@ -680,7 +973,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), RelationGetRelationName(rel)); - obj = publication_add_relation(pubid, pub_rel, if_not_exists); + obj = publication_add_relation(pubid, rel, if_not_exists); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, @@ -692,6 +985,34 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, } } +/* + * Add listed schemas to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || !stmt->for_all_tables); + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + ObjectAddress obj; + + obj = publication_add_schema(pubid, schemaid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationNamespaceRelationId, + obj.objectId, 0); + } + } +} + /* * Remove listed tables from the publication. */ @@ -704,8 +1025,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) foreach(lc, rels) { - PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc); - Relation rel = pubrel->relation; + Relation rel = (Relation) lfirst(lc); Oid relid = RelationGetRelid(rel); prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, @@ -727,6 +1047,40 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } } +/* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid psid; + + foreach(lc, schemas) + { + Oid schemaid = lfirst_oid(lc); + + psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, + Anum_pg_publication_namespace_oid, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(psid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tables from schema \"%s\" are not part of the publication", + get_namespace_name(schemaid)))); + } + + ObjectAddressSet(obj, PublicationNamespaceRelationId, psid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + /* * Internal workhorse for changing a publication owner */ diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index ddc019cb39..73cd9f04a5 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -79,6 +79,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPERATOR: case OBJECT_OPFAMILY: case OBJECT_POLICY: + case OBJECT_PUBLICATION_NAMESPACE: case OBJECT_PUBLICATION_REL: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index dbee6ae199..f91b9963c7 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -12252,6 +12252,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EXTENSION: case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: + case OCLASS_PUBLICATION_NAMESPACE: case OCLASS_PUBLICATION_REL: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -15832,7 +15833,8 @@ ATPrepChangePersistence(Relation rel, bool toLogged) * UNLOGGED as UNLOGGED tables can't be published. */ if (!toLogged && - list_length(GetRelationPublications(RelationGetRelid(rel))) > 0) + (list_length(GetRelationPublications(RelationGetRelid(rel))) > 0 || + list_length(GetSchemaPublications(rel->rd_rel->relnamespace)) > 0)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 228387eaee..ade93023b8 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4817,7 +4817,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); return newnode; @@ -4830,9 +4830,9 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); - COPY_SCALAR_FIELD(tableAction); + COPY_SCALAR_FIELD(action); return newnode; } @@ -4958,12 +4958,14 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from) return newnode; } -static PublicationTable * -_copyPublicationTable(const PublicationTable *from) +static PublicationObjSpec * +_copyPublicationObject(const PublicationObjSpec *from) { - PublicationTable *newnode = makeNode(PublicationTable); + PublicationObjSpec *newnode = makeNode(PublicationObjSpec); - COPY_NODE_FIELD(relation); + COPY_SCALAR_FIELD(pubobjtype); + COPY_NODE_FIELD(object); + COPY_LOCATION_FIELD(location); return newnode; } @@ -5887,8 +5889,8 @@ copyObjectImpl(const void *from) case T_PartitionCmd: retval = _copyPartitionCmd(from); break; - case T_PublicationTable: - retval = _copyPublicationTable(from); + case T_PublicationObjSpec: + retval = _copyPublicationObject(from); break; /* diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 800f588b5c..1dcd63d64f 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2302,7 +2302,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); return true; @@ -2314,9 +2314,9 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); - COMPARE_SCALAR_FIELD(tableAction); + COMPARE_SCALAR_FIELD(action); return true; } @@ -3134,9 +3134,10 @@ _equalBitString(const BitString *a, const BitString *b) } static bool -_equalPublicationTable(const PublicationTable *a, const PublicationTable *b) +_equalPublicationObject(const PublicationObjSpec *a, + const PublicationObjSpec *b) { - COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(object); return true; } @@ -3894,8 +3895,8 @@ equal(const void *a, const void *b) case T_PartitionCmd: retval = _equalPartitionCmd(a, b); break; - case T_PublicationTable: - retval = _equalPublicationTable(a, b); + case T_PublicationObjSpec: + retval = _equalPublicationObject(a, b); break; default: diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index e3068a374e..ed6a4ffd9b 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -195,12 +195,17 @@ static Node *makeXmlExpr(XmlExprOp op, char *name, List *named_args, static List *mergeTableFuncParameters(List *func_args, List *columns); static TypeName *TableFuncTypeName(List *columns); static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_t yyscanner); +static RangeVar *makeRangeVarFromQualifiedName(char *name, List *rels, + int location, + core_yyscan_t yyscanner); static void SplitColQualList(List *qualList, List **constraintList, CollateClause **collClause, core_yyscan_t yyscanner); static void processCASbits(int cas_bits, int location, const char *constrType, bool *deferrable, bool *initdeferred, bool *not_valid, bool *no_inherit, core_yyscan_t yyscanner); +static void preprocess_pubobj_list (List *pubobjspec_list, + core_yyscan_t yyscanner); static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %} @@ -256,6 +261,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionBoundSpec *partboundspec; RoleSpec *rolespec; + PublicationObjSpec *publicationobjectspec; struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; @@ -425,14 +431,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list publication_table_list + drop_option_list pub_obj_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables publication_table %type opt_fdw_options fdw_options %type fdw_option @@ -517,6 +522,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type table_ref %type joined_table %type relation_expr +%type extended_relation_expr %type relation_expr_opt_alias %type tablesample_clause opt_repeatable_clause %type target_el set_target insert_column_item @@ -554,6 +560,9 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type var_value zone_value %type auth_ident RoleSpec opt_granted_by +%type PublicationObjSpec +%type pubobj_expr +%type pubobj_name %type unreserved_keyword type_func_name_keyword %type col_name_keyword reserved_keyword %type bare_label_keyword @@ -9591,69 +9600,120 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] + * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] + * + * pub_obj is one of: + * + * TABLE table [, ...] + * ALL TABLES IN SCHEMA schema [[, ...] * *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR ALL TABLES opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR pub_obj_list opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $6; + n->pubobjects = (List *)$5; + preprocess_pubobj_list(n->pubobjects, yyscanner); $$ = (Node *)n; } ; -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } +pubobj_expr: + pubobj_name + { + $$ = makeNode(PublicationObjSpec); + $$->object = $1; + $$->location = @1; + } + | extended_relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->object = (Node *)$1; + $$->location = @1; + } + | CURRENT_SCHEMA + { + $$ = makeNode(PublicationObjSpec); + $$->object = (Node *)makeString("CURRENT_SCHEMA"); + $$->location = @1; + } ; -publication_for_tables: - FOR TABLE publication_table_list +/* + * This can be either a schema or relation name. For relations, the inheritance + * will be implicit. + */ +pubobj_name: + ColId { - $$ = (Node *) $3; + $$ = (Node *)makeString($1); } - | FOR ALL TABLES + | ColId indirection { - $$ = (Node *) makeInteger(true); + $$ = (Node *)makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); } ; -publication_table_list: - publication_table - { $$ = list_make1($1); } - | publication_table_list ',' publication_table - { $$ = lappend($1, $3); } +/* FOR TABLE and FOR ALL TABLES IN SCHEMA specifications */ +PublicationObjSpec: TABLE pubobj_expr + { + $$ = $2; + $$->pubobjtype = PUBLICATIONOBJ_TABLE; + } + | ALL TABLES IN_P SCHEMA pubobj_expr + { + $$ = $5; + $$->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + } + | pubobj_expr + { + $$ = $1; + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + } ; -publication_table: relation_expr - { - PublicationTable *n = makeNode(PublicationTable); - n->relation = $1; - $$ = (Node *) n; - } +pub_obj_list: PublicationObjSpec + { $$ = list_make1($1); } + | pub_obj_list ',' PublicationObjSpec + { $$ = lappend($1, $3); } ; /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) * - * ALTER PUBLICATION name ADD TABLE table [, table2] + * ALTER PUBLICATION name ADD pub_obj [, ...] + * + * ALTER PUBLICATION name DROP pub_obj [, ...] * - * ALTER PUBLICATION name DROP TABLE table [, table2] + * ALTER PUBLICATION name SET pub_obj [, ...] * - * ALTER PUBLICATION name SET TABLE table [, table2] + * pub_obj is one of: + * + * TABLE table_name [, ...] + * ALL TABLES IN SCHEMA schema_name [, ...] * *****************************************************************************/ @@ -9665,28 +9725,31 @@ AlterPublicationStmt: n->options = $5; $$ = (Node *)n; } - | ALTER PUBLICATION name ADD_P TABLE publication_table_list + | ALTER PUBLICATION name ADD_P pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_ADD; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_ADD; $$ = (Node *)n; } - | ALTER PUBLICATION name SET TABLE publication_table_list + | ALTER PUBLICATION name SET pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_SET; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_SET; $$ = (Node *)n; } - | ALTER PUBLICATION name DROP TABLE publication_table_list + | ALTER PUBLICATION name DROP pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; - n->tables = $6; - n->tableAction = DEFELEM_DROP; + n->pubobjects = $5; + preprocess_pubobj_list(n->pubobjects, yyscanner); + n->action = DEFELEM_DROP; $$ = (Node *)n; } ; @@ -12430,7 +12493,14 @@ relation_expr: $$->inh = true; $$->alias = NULL; } - | qualified_name '*' + | extended_relation_expr + { + $$ = $1; + } + ; + +extended_relation_expr: + qualified_name '*' { /* inheritance query, explicitly */ $$ = $1; @@ -15104,28 +15174,7 @@ qualified_name: } | ColId indirection { - check_qualified_name($2, yyscanner); - $$ = makeRangeVar(NULL, NULL, @1); - switch (list_length($2)) - { - case 1: - $$->catalogname = NULL; - $$->schemaname = $1; - $$->relname = strVal(linitial($2)); - break; - case 2: - $$->catalogname = $1; - $$->schemaname = strVal(linitial($2)); - $$->relname = strVal(lsecond($2)); - break; - default: - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("improper qualified name (too many dotted names): %s", - NameListToString(lcons(makeString($1), $2))), - parser_errposition(@1))); - break; - } + $$ = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); } ; @@ -17045,6 +17094,41 @@ TableFuncTypeName(List *columns) return result; } +/* + * Convert a relation_name with name and namelist to a RangeVar using + * makeRangeVar. + */ +static RangeVar * +makeRangeVarFromQualifiedName(char *name, List *namelist, int location, + core_yyscan_t yyscanner) +{ + RangeVar *r = makeRangeVar(NULL, NULL, location); + + check_qualified_name(namelist, yyscanner); + switch (list_length(namelist)) + { + case 1: + r->catalogname = NULL; + r->schemaname = name; + r->relname = strVal(linitial(namelist)); + break; + case 2: + r->catalogname = name; + r->schemaname = strVal(linitial(namelist)); + r->relname = strVal(lsecond(namelist)); + break; + default: + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("improper qualified name (too many dotted names): %s", + NameListToString(lcons(makeString(name), namelist))), + parser_errposition(location)); + break; + } + + return r; +} + /* * Convert a list of (dotted) names to a RangeVar (like * makeRangeVarFromNameList, but with position support). The @@ -17195,6 +17279,52 @@ processCASbits(int cas_bits, int location, const char *constrType, } } +/* + * Process pubobjspec_list to check for errors in any of the objects and + * convert PUBLICATIONOBJ_CONTINUATION into appropriate PublicationObjSpecType + * type. + */ +static void +preprocess_pubobj_list (List *pubobjspec_list, core_yyscan_t yyscanner) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + PublicationObjSpecType prevobjtype = PUBLICATIONOBJ_CONTINUATION; + + if (!pubobjspec_list) + return; + + pubobj = (PublicationObjSpec *) linitial(pubobjspec_list); + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s)"), + parser_errposition(pubobj->location)); + + foreach(cell, pubobjspec_list) + { + Node *node; + + pubobj = (PublicationObjSpec *) lfirst(cell); + node = (Node *) pubobj->object; + + if (pubobj->pubobjtype == PUBLICATIONOBJ_CONTINUATION) + pubobj->pubobjtype = prevobjtype; + + if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE && IsA(node, String)) + pubobj->object = (Node *)makeRangeVar(NULL, strVal(node), + pubobj->location); + else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA && + !IsA(node, String)) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid schema name at or near"), + parser_errposition(pubobj->location)); + + prevobjtype = pubobj->pubobjtype; + } +} + /*---------- * Recursive view transformation * diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd93..e902ed73da 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1068,6 +1068,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP, + rel_sync_cache_publication_cb, + (Datum) 0); } /* @@ -1146,7 +1149,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Validate the entry */ if (!entry->replicate_valid) { + Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); + + /* + * We don't acquire a lock on the namespace system table as we build + * the cache entry using a historic snapshot and all the later changes + * are absorbed while decoding WAL. + */ + List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; @@ -1203,6 +1214,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid ancestor = lfirst_oid(lc2); if (list_member_oid(GetRelationPublications(ancestor), + pub->oid) || + list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)), pub->oid)) { ancestor_published = true; @@ -1212,7 +1225,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } } - if (list_member_oid(pubids, pub->oid) || ancestor_published) + if (list_member_oid(pubids, pub->oid) || + list_member_oid(schemaPubids, pub->oid) || + ancestor_published) publish = true; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 13d9994af3..20a80034ab 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5447,6 +5447,7 @@ GetRelationPublicationActions(Relation relation) List *puboids; ListCell *lc; MemoryContext oldcxt; + Oid schemaid; PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); /* @@ -5462,6 +5463,9 @@ GetRelationPublicationActions(Relation relation) /* Fetch the publication membership info. */ puboids = GetRelationPublications(RelationGetRelid(relation)); + schemaid = RelationGetNamespace(relation); + puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ @@ -5474,6 +5478,9 @@ GetRelationPublicationActions(Relation relation) puboids = list_concat_unique_oid(puboids, GetRelationPublications(ancestor)); + schemaid = get_rel_namespace(ancestor); + puboids = list_concat_unique_oid(puboids, + GetSchemaPublications(schemaid)); } } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6cb78dea8..d6c656edc8 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -50,6 +50,7 @@ #include "catalog/pg_partitioned_table.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" @@ -617,6 +618,28 @@ static const struct cachedesc cacheinfo[] = { }, 8 }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPCE */ + PublicationNamespaceObjectIndexId, + 1, + { + Anum_pg_publication_namespace_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACEMAP */ + PublicationNamespacePnnspidPnpubidIndexId, + 2, + { + Anum_pg_publication_namespace_pnnspid, + Anum_pg_publication_namespace_pnpubid, + 0, + 0 + }, + 64 + }, {PublicationRelationId, /* PUBLICATIONOID */ PublicationObjectIndexId, 1, diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 2885f35ccd..3eca295ff4 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -122,6 +122,7 @@ typedef enum ObjectClass OCLASS_EVENT_TRIGGER, /* pg_event_trigger */ OCLASS_POLICY, /* pg_policy */ OCLASS_PUBLICATION, /* pg_publication */ + OCLASS_PUBLICATION_NAMESPACE, /* pg_publication_namespace */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 82f2536c65..5911824d09 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -83,11 +83,6 @@ typedef struct Publication PublicationActions pubactions; } Publication; -typedef struct PublicationRelInfo -{ - Relation relation; -} PublicationRelInfo; - extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); @@ -114,10 +109,18 @@ extern List *GetAllTablesPublicationRelations(bool pubviaroot); extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid); +extern List *GetPublicationSchemas(Oid pubid); +extern List *GetSchemaPublications(Oid schemaid); +extern List *GetAllSchemaPublicationRelations(Oid puboid, + PublicationPartOpt pub_partopt); +extern List *GetSchemaPublicationRelations(Oid schemaid, + PublicationPartOpt pub_partopt); extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists); +extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, + bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_namespace.h b/src/include/catalog/pg_publication_namespace.h new file mode 100644 index 0000000000..b7e16af819 --- /dev/null +++ b/src/include/catalog/pg_publication_namespace.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_namespace.h + * definition of the system catalog for mappings between schemas and + * publications (pg_publication_namespace) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_namespace.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_NAMESPACE_H +#define PG_PUBLICATION_NAMESPACE_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_namespace_d.h" + + +/* ---------------- + * pg_publication_namespace definition. cpp turns this into + * typedef struct FormData_pg_publication_namespace + * ---------------- + */ +CATALOG(pg_publication_namespace,8901,PublicationNamespaceRelationId) +{ + Oid oid; /* oid */ + Oid pnpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ + Oid pnnspid BKI_LOOKUP(pg_namespace); /* Oid of the schema */ +} FormData_pg_publication_namespace; + +/* ---------------- + * Form_pg_publication_namespace corresponds to a pointer to a tuple with + * the format of pg_publication_namespace relation. + * ---------------- + */ +typedef FormData_pg_publication_namespace *Form_pg_publication_namespace; + +DECLARE_UNIQUE_INDEX_PKEY(pg_publication_namespace_oid_index, 8902, PublicationNamespaceObjectIndexId, on pg_publication_namespace using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_index, 8903, PublicationNamespacePnnspidPnpubidIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops)); + +#endif /* PG_PUBLICATION_NAMESPACE_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 77a299bb18..4ba68c70ee 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -26,6 +26,7 @@ extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationById(Oid pubid); extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index e0057daa06..8220c72469 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -479,6 +479,7 @@ typedef enum NodeTag T_CTESearchClause, T_CTECycleClause, T_CommonTableExpr, + T_PublicationObjSpec, T_RoleSpec, T_TriggerTransition, T_PartitionElem, @@ -487,7 +488,6 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, - T_PublicationTable, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3138877553..15aacf7165 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -353,6 +353,26 @@ typedef struct RoleSpec int location; /* token location, or -1 if unknown */ } RoleSpec; +/* + * Publication object type + */ +typedef enum PublicationObjSpecType +{ + PUBLICATIONOBJ_TABLE, /* Table type */ + PUBLICATIONOBJ_REL_IN_SCHEMA, /* Relations in schema type */ + PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */ +} PublicationObjSpecType; + +typedef struct PublicationObjSpec +{ + NodeTag type; + PublicationObjSpecType pubobjtype; /* type of this publication object */ + Node *object; /* publication object could be: + * RangeVar - table object + * String - tablename or schemaname */ + int location; /* token location, or -1 if unknown */ +} PublicationObjSpec; + /* * FuncCall - a function or aggregate invocation * @@ -1816,6 +1836,7 @@ typedef enum ObjectType OBJECT_POLICY, OBJECT_PROCEDURE, OBJECT_PUBLICATION, + OBJECT_PUBLICATION_NAMESPACE, OBJECT_PUBLICATION_REL, OBJECT_ROLE, OBJECT_ROUTINE, @@ -3636,18 +3657,12 @@ typedef struct AlterTSConfigurationStmt bool missing_ok; /* for DROP - skip error if missing? */ } AlterTSConfigurationStmt; -typedef struct PublicationTable -{ - NodeTag type; - RangeVar *relation; /* relation to be published */ -} PublicationTable; - typedef struct CreatePublicationStmt { NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3659,10 +3674,11 @@ typedef struct AlterPublicationStmt /* parameters used for ALTER PUBLICATION ... WITH */ List *options; /* List of DefElem nodes */ - /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ - List *tables; /* List of tables to add/drop */ + /* ALTER PUBLICATION ... ADD/DROP TABLE/ALL TABLES IN SCHEMA parameters */ + List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ - DefElemAction tableAction; /* What action to perform with the tables */ + DefElemAction action; /* What action to perform with the + * tables/schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index d74a348600..c8cfbc30f6 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -76,6 +76,8 @@ enum SysCacheIdentifier PROCNAMEARGSNSP, PROCOID, PUBLICATIONNAME, + PUBLICATIONNAMESPACE, + PUBLICATIONNAMESPACEMAP, PUBLICATIONOID, PUBLICATIONREL, PUBLICATIONRELMAP, diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 1461e947cd..215eb899be 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -258,6 +258,8 @@ NOTICE: checking pg_transform {trftosql} => pg_proc {oid} NOTICE: checking pg_sequence {seqrelid} => pg_class {oid} NOTICE: checking pg_sequence {seqtypid} => pg_type {oid} NOTICE: checking pg_publication {pubowner} => pg_authid {oid} +NOTICE: checking pg_publication_namespace {pnpubid} => pg_publication {oid} +NOTICE: checking pg_publication_namespace {pnnspid} => pg_namespace {oid} NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 982b6aff53..d04dc66db9 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -140,6 +140,7 @@ pg_partitioned_table|t pg_policy|t pg_proc|t pg_publication|t +pg_publication_namespace|t pg_publication_rel|t pg_range|t pg_replication_origin|t diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 402a6617a9..bfc6909a43 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -778,6 +778,7 @@ FormData_pg_partitioned_table FormData_pg_policy FormData_pg_proc FormData_pg_publication +FormData_pg_publication_namespace FormData_pg_publication_rel FormData_pg_range FormData_pg_replication_origin @@ -834,6 +835,7 @@ Form_pg_partitioned_table Form_pg_policy Form_pg_proc Form_pg_publication +Form_pg_publication_namespace Form_pg_publication_rel Form_pg_range Form_pg_replication_origin @@ -2046,9 +2048,10 @@ PsqlSettings Publication PublicationActions PublicationInfo +PublicationObjSpec +PublicationObjSpecType PublicationPartOpt PublicationRelInfo -PublicationTable PullFilter PullFilterOps PushFilter -- 2.30.2