From d5524bb4287dd6c9d1e69404561dffc2b8c7c3e7 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 1 Oct 2021 17:20:29 +0530 Subject: [PATCH v1] Changes by Amit. --- src/backend/catalog/pg_publication.c | 146 ++++++++++---------- src/backend/commands/publicationcmds.c | 34 +++-- src/backend/nodes/copyfuncs.c | 26 ++-- src/backend/nodes/equalfuncs.c | 22 +-- src/backend/parser/gram.y | 29 ++-- src/backend/replication/pgoutput/pgoutput.c | 2 +- src/backend/utils/cache/syscache.c | 2 +- src/include/catalog/pg_publication.h | 10 +- src/include/nodes/nodes.h | 2 +- src/include/nodes/parsenodes.h | 4 +- 10 files changed, 147 insertions(+), 130 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 4e8ccdabc6..644eb17680 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -303,8 +303,6 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) ObjectAddress myself, referenced; - check_publication_add_schema(schemaid); - rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); /* @@ -327,6 +325,8 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) get_namespace_name(schemaid), pub->name))); } + check_publication_add_schema(schemaid); + /* Form a tuple */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -358,7 +358,10 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) /* Close the table */ table_close(rel, RowExclusiveLock); - /* Invalidate relcache so that publication info is rebuilt. */ + /* + * Invalidate relcache so that publication info is rebuilt. See + * publication_add_relation for why we need to consider all the partitions. + */ schemaRels = GetSchemaPublicationRelations(schemaid, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -431,73 +434,6 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } -/* - * Gets the list of schema oids for a publication. - * - * This should only be used FOR ALL TABLES IN SCHEMA publications. - */ -List * -GetPublicationSchemas(Oid pubid) -{ - List *result = NIL; - Relation pubschsrel; - ScanKeyData scankey; - SysScanDesc scan; - HeapTuple tup; - - /* Find all publications associated with the schema */ - pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); - - ScanKeyInit(&scankey, - Anum_pg_publication_namespace_pnpubid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(pubid)); - - scan = systable_beginscan(pubschsrel, - PublicationNamespacePnnspidPnpubidIndexId, - true, NULL, 1, &scankey); - while (HeapTupleIsValid(tup = systable_getnext(scan))) - { - Form_pg_publication_namespace pubsch; - - pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); - - result = lappend_oid(result, pubsch->pnnspid); - } - - systable_endscan(scan); - table_close(pubschsrel, AccessShareLock); - - return result; -} - - -/* - * Gets the list of publication oids associated with a specified schema. - */ -List * -GetSchemaPublications(Oid schemaid) -{ - List *result = NIL; - CatCList *pubschlist; - int i; - - /* Find all publications associated with the schema */ - pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, - ObjectIdGetDatum(schemaid)); - for (i = 0; i < pubschlist->n_members; i++) - { - HeapTuple tup = &pubschlist->members[i]->tuple; - Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; - - result = lappend_oid(result, pubid); - } - - ReleaseSysCacheList(pubschlist); - - return result; -} - /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ @@ -536,7 +472,7 @@ GetAllTablesPublications(void) } /* - * Gets the list of relations published. + * Gets the list of relation published by FOR ALL TABLES publication(s). * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the @@ -598,6 +534,72 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Gets the list of schema oids for a publication. + * + * This should only be used FOR ALL TABLES IN SCHEMA publications. + */ +List* +GetPublicationSchemas(Oid pubid) +{ + List* result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the schema */ + pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_namespace_pnpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, + PublicationNamespacePnnspidPnpubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_namespace pubsch; + + pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->pnnspid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + +/* + * Gets the list of publication oids associated with a specified schema. + */ +List* +GetSchemaPublications(Oid schemaid) +{ + List* result = NIL; + CatCList* pubschlist; + int i; + + /* Find all publications associated with the schema */ + pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid)); + for (i = 0; i < pubschlist->n_members; i++) + { + HeapTuple tup = &pubschlist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubschlist); + + return result; +} + /* * Get the list of publishable relation oids for a specified schema. */ @@ -820,7 +822,9 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * need those. */ if (publication->alltables) + { tables = GetAllTablesPublicationRelations(publication->pubviaroot); + } else { List *relids, diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 2f4d0b1544..73d46612c4 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -176,7 +176,6 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, *schemas = list_append_unique_oid(*schemas, schemaid); break; case PUBLICATIONOBJ_CURRSCHEMA: - search_path = fetch_search_path(false); if (search_path == NIL) /* nothing valid in search_path? */ ereport(ERROR, @@ -190,7 +189,9 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, *schemas = list_append_unique_oid(*schemas, schemaid); break; default: - Assert(0); + /* shouldn't happen */ + elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype); + break; } } } @@ -320,6 +321,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); + /* Associate objects with the publication. */ if (stmt->for_all_tables) { /* Invalidate relcache so that publication info is rebuilt. */ @@ -350,9 +352,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); /* - * The schemas specified in the schema list are locked in - * AccessShareLock mode in order to prevent concurrent schema - * deletion. + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. */ LockSchemaList(schemaidlist); PublicationAddSchemas(puboid, schemaidlist, true, NULL); @@ -496,6 +497,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; + /* + * It is quite possible that for the SET case user has not specified any + * tables in which case we need to remove all the existing tables. + */ if (!tables && stmt->action != DEFELEM_SET) return; @@ -571,7 +576,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, /* * Alter the publication schemas. * - * Add/Remove/Set all tables from schemas to/from publication. + * Add or remove schemas to/from publication. */ static void AlterPublicationSchemas(AlterPublicationStmt *stmt, @@ -583,8 +588,8 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, return; /* - * The schemas specified in the schema list are locked in AccessShareLock - * mode in order to prevent concurrent schema deletion. + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. */ LockSchemaList(schemaidlist); if (stmt->action == DEFELEM_ADD) @@ -612,9 +617,8 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, delschemas = list_difference_oid(oldschemaids, schemaidlist); /* - * The schemas specified in the schema list are locked in - * AccessShareLock mode in order to prevent concurrent schema - * deletion. + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. */ LockSchemaList(delschemas); @@ -820,7 +824,10 @@ RemovePublicationSchemaById(Oid psoid) pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); - /* Invalidate relcache so that publication info is rebuilt. */ + /* + * Invalidate relcache so that publication info is rebuilt. See + * RemovePublicationRelById for why we need to consider all the partitions. + */ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, PUBLICATION_PART_ALL); InvalidatePublicationRels(schemaRels); @@ -834,8 +841,7 @@ RemovePublicationSchemaById(Oid psoid) /* * The schemas specified in the schema list are locked in AccessShareLock mode - * in order to prevent concurrent schema deletion. No need to unlock the - * schemas, the locks will be released at the end of the command. + * in order to prevent concurrent schema deletion. */ static void LockSchemaList(List *schemalist) diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 553cd834e6..dfa5d8d705 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4810,6 +4810,19 @@ _copyPartitionCmd(const PartitionCmd *from) return newnode; } +static PublicationObjSpec* +_copyPublicationObject(const PublicationObjSpec *from) +{ + PublicationObjSpec *newnode = makeNode(PublicationObjSpec); + + COPY_SCALAR_FIELD(pubobjtype); + COPY_STRING_FIELD(name); + COPY_NODE_FIELD(rangevar); + COPY_LOCATION_FIELD(location); + + return newnode; +} + static CreatePublicationStmt * _copyCreatePublicationStmt(const CreatePublicationStmt *from) { @@ -4958,19 +4971,6 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from) return newnode; } -static PublicationObjSpec * -_copyPublicationObject(const PublicationObjSpec *from) -{ - PublicationObjSpec *newnode = makeNode(PublicationObjSpec); - - COPY_SCALAR_FIELD(pubobjtype); - COPY_STRING_FIELD(name); - COPY_NODE_FIELD(rangevar); - COPY_LOCATION_FIELD(location); - - return newnode; -} - /* * copyObjectImpl -- implementation of copyObject(); see nodes/nodes.h * diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 054b2d94e5..0532bb20ee 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -3038,6 +3038,18 @@ _equalPartitionCmd(const PartitionCmd *a, const PartitionCmd *b) return true; } +static bool +_equalPublicationObject(const PublicationObjSpec* a, + const PublicationObjSpec* b) +{ + COMPARE_SCALAR_FIELD(pubobjtype); + COMPARE_STRING_FIELD(name); + COMPARE_NODE_FIELD(rangevar); + COMPARE_LOCATION_FIELD(location); + + return true; +} + /* * Stuff from pg_list.h */ @@ -3133,16 +3145,6 @@ _equalBitString(const BitString *a, const BitString *b) return true; } -static bool -_equalPublicationObject(const PublicationObjSpec *a, - const PublicationObjSpec *b) -{ - COMPARE_STRING_FIELD(name); - COMPARE_NODE_FIELD(rangevar); - - return true; -} - /* * equal * returns whether two nodes are equal diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 4f11ef5cbc..e7b33de27f 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -559,8 +559,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type createdb_opt_name plassign_target %type var_value zone_value %type auth_ident RoleSpec opt_granted_by - %type PublicationObjSpec + %type unreserved_keyword type_func_name_keyword %type col_name_keyword reserved_keyword %type bare_label_keyword @@ -17108,9 +17108,11 @@ static RangeVar * makeRangeVarFromQualifiedName(char *name, List *namelist, int location, core_yyscan_t yyscanner) { - RangeVar *r = makeRangeVar(NULL, NULL, location); + RangeVar *r; check_qualified_name(namelist, yyscanner); + r = makeRangeVar(NULL, NULL, location); + switch (list_length(namelist)) { case 1: @@ -17315,10 +17317,12 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE) { - /* relation name was specified as CURRENT_SCHEMA */ + /* relation name or rangevar must be set for this type of object */ if (!pubobj->name && !pubobj->rangevar) - pubobj->rangevar = makeRangeVar(NULL, "CURRENT_SCHEMA", - pubobj->location); + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid table name at or near"), + parser_errposition(pubobj->location)); else if (pubobj->name) { /* convert it to rangevar */ @@ -17327,14 +17331,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) pubobj->name = NULL; } } - else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA) + else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA || + pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA) { /* - * Schema name was specified as CURRENT_SCHEMA, set pubobjtype as - * PUBLICATIONOBJ_CURRSCHEMA to indicate the schema name should be - * set with the first schema in search_path. + * We can distinguish between the different type of schema + * objects based on whether name and rangevar is set. */ - if (!pubobj->name && !pubobj->rangevar) + if (pubobj->name) + pubobj->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA; + else if (!pubobj->name && !pubobj->rangevar) pubobj->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA; else if (!pubobj->name) ereport(ERROR, @@ -17343,8 +17349,7 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) parser_errposition(pubobj->location)); } - prevobjtype = (pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA) ? - PUBLICATIONOBJ_REL_IN_SCHEMA : pubobj->pubobjtype; + prevobjtype = pubobj->pubobjtype; } } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e902ed73da..6f6a203dea 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1358,7 +1358,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) } /* - * Publication relation map syscache invalidation callback + * Publication relation/schema map syscache invalidation callback */ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6c656edc8..56870b46e4 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -618,7 +618,7 @@ static const struct cachedesc cacheinfo[] = { }, 8 }, - {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPCE */ + {PublicationNamespaceRelationId, /* PUBLICATIONNAMESPACE */ PublicationNamespaceObjectIndexId, 1, { diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 5911824d09..a4c894ec9d 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -106,17 +106,17 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); -extern List *GetPubPartitionOptionRelations(List *result, - PublicationPartOpt pub_partopt, - Oid relid); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); -extern List *GetAllSchemaPublicationRelations(Oid puboid, - PublicationPartOpt pub_partopt); extern List *GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt); +extern List *GetAllSchemaPublicationRelations(Oid puboid, + PublicationPartOpt pub_partopt); extern bool is_publishable_relation(Relation rel); +extern List *GetPubPartitionOptionRelations(List* result, + PublicationPartOpt pub_partopt, + Oid relid); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 8220c72469..d34b4ac8e5 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -479,7 +479,6 @@ typedef enum NodeTag T_CTESearchClause, T_CTECycleClause, T_CommonTableExpr, - T_PublicationObjSpec, T_RoleSpec, T_TriggerTransition, T_PartitionElem, @@ -488,6 +487,7 @@ typedef enum NodeTag T_PartitionRangeDatum, T_PartitionCmd, T_VacuumRelation, + T_PublicationObjSpec, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 35a6b8ddde..c75dbece52 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -360,8 +360,8 @@ typedef enum PublicationObjSpecType { PUBLICATIONOBJ_TABLE, /* Table type */ PUBLICATIONOBJ_REL_IN_SCHEMA, /* Relations in schema type */ - PUBLICATIONOBJ_CONTINUATION, /* Continuation of previous type */ - PUBLICATIONOBJ_CURRSCHEMA /* Get the first element from search_path */ + PUBLICATIONOBJ_CURRSCHEMA, /* Get the first element from search_path */ + PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */ } PublicationObjSpecType; typedef struct PublicationObjSpec -- 2.28.0.windows.1