From 447c15912c16e98fb3b67d088afbbc6ff05848f8 Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Thu, 13 Nov 2025 10:56:20 +0530 Subject: [PATCH v27 2/3] Skip publishing the tables specified in EXCEPT TABLE. A new "EXCEPT TABLE" clause for CREATE/ALTER PUBLICATION allows one or more tables to be excluded. The publisher will not send the data of excluded tables to the subscriber. The new syntax allows specifying excluded relations when creating or altering a publication. For example: CREATE PUBLICATION pub1 FOR ALL TABLES EXCEPT TABLE (t1,t2); or ALTER PUBLICATION pub1 ADD ALL TABLES EXCEPT TABLE (t1,t2); A new column "prexcept" is added to table "pg_publication_rel", to flag the relations that the user wants to exclude from the publications. pg_dump is updated to identify and dump the excluded tables of the publications. The psql \d family of commands can now display excluded tables. e.g. psql \dRp+ variant will now display associated "except tables" if any. Bump catalog version. --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/logical-replication.sgml | 10 +- doc/src/sgml/ref/alter_publication.sgml | 22 +- doc/src/sgml/ref/create_publication.sgml | 47 +++- doc/src/sgml/ref/psql-ref.sgml | 5 +- src/backend/catalog/pg_publication.c | 99 +++++-- src/backend/commands/publicationcmds.c | 247 ++++++++++++++---- src/backend/commands/tablecmds.c | 4 +- src/backend/parser/gram.y | 42 ++- src/backend/replication/pgoutput/pgoutput.c | 25 +- src/backend/utils/cache/relcache.c | 17 +- src/bin/pg_dump/pg_dump.c | 56 +++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/pg_dump_sort.c | 18 ++ src/bin/pg_dump/t/002_pg_dump.pl | 22 +- src/bin/psql/describe.c | 58 +++- src/bin/psql/tab-complete.in.c | 15 +- src/include/catalog/pg_publication.h | 10 +- src/include/catalog/pg_publication_rel.h | 1 + src/include/commands/publicationcmds.h | 5 +- src/include/nodes/parsenodes.h | 4 + src/test/regress/expected/publication.out | 99 ++++++- src/test/regress/sql/publication.sql | 52 +++- src/test/subscription/meson.build | 1 + .../t/037_rep_changes_except_table.pl | 186 +++++++++++++ 25 files changed, 910 insertions(+), 145 deletions(-) create mode 100644 src/test/subscription/t/037_rep_changes_except_table.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 2fc63442980..a4d32de58ec 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6581,6 +6581,15 @@ SCRAM-SHA-256$<iteration count>:&l if there is no publication qualifying condition. + + + prexcept bool + + + True if the relation must be excluded + + + prattrs int2vector diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 79ecd09614f..c27d7462efd 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2547,10 +2547,12 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER - To add tables to a publication, the user must have ownership rights on the - table. To add all tables in schema to a publication, the user must be a - superuser. To create a publication that publishes all tables, all tables in - schema, or all sequences automatically, the user must be a superuser. + To create a publication using FOR ALL TABLES, + FOR ALL SEQUENCES or FOR TABLES IN SCHEMA, the + user must be a superuser. To add ALL TABLES or + TABLES IN SCHEMA to a publication, the user must be a + superuser. To add tables to a publication, the user must have ownership + rights on the table. diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index d8c24efd787..8c3d219b9ea 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -22,6 +22,7 @@ PostgreSQL documentation ALTER PUBLICATION name ADD publication_object [, ...] +ALTER PUBLICATION name ADD ALL TABLES [ EXCEPT [ TABLE ] ( table_exception_object [, ... ] ) ] ALTER PUBLICATION name SET publication_object [, ...] ALTER PUBLICATION name DROP publication_drop_object [, ...] ALTER PUBLICATION name SET ( publication_parameter [= value] [, ... ] ) @@ -38,6 +39,11 @@ ALTER PUBLICATION name RESET TABLE [ ONLY ] table_name [ * ] [, ... ] TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] + +where table_exception_object is: + + [ ONLY ] table_name [ * ] + @@ -93,8 +99,9 @@ ALTER PUBLICATION name RESET You must own the publication to use ALTER PUBLICATION. - Adding a table to a publication additionally requires owning that table. - The ADD TABLES IN SCHEMA, + Adding or excluding a table from a publication requires ownership of that + table. The ADD ALL TABLES, + ADD TABLES IN SCHEMA, SET TABLES IN SCHEMA to a publication and RESET of publication requires the invoking user to be a superuser. To alter the owner, you must be able to @@ -135,7 +142,8 @@ ALTER PUBLICATION name RESET table name, only that table is affected. If ONLY is not specified, the table and all its descendant tables (if any) are affected. Optionally, * can be specified after the table - name to explicitly indicate that descendant tables are included. + name to explicitly indicate that descendant tables are affected. For + partitioned tables, ONLY does not have any effect. @@ -244,6 +252,14 @@ ALTER PUBLICATION sales_publication ADD TABLES IN SCHEMA marketing, sales; + + Alter publication production_publication to publish + all tables except users and + departments: + +ALTER PUBLICATION production_publication ADD ALL TABLES EXCEPT (users, departments); + + Add tables users, departments and schema diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 66a70e5c5b5..8b616651272 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -32,8 +32,12 @@ CREATE PUBLICATION name where all_publication_object is one of: - ALL TABLES + ALL TABLES [ EXCEPT [ TABLE ] ( table_exception_object [, ... ] ) ] ALL SEQUENCES + +where table_exception_object is: + + [ ONLY ] table_name [ * ] @@ -160,7 +164,9 @@ CREATE PUBLICATION name Marks the publication as one that replicates changes for all tables in - the database, including tables created in the future. + the database, including tables created in the future. If + EXCEPT TABLE is specified, then exclude replicating + the changes for the specified tables. @@ -180,6 +186,35 @@ CREATE PUBLICATION name + + EXCEPT TABLE + + + This clause specifies a list of tables to be excluded from the + publication. It can only be used with FOR ALL TABLES. + If ONLY is specified before the table name, only + that table is excluded from the publication. If ONLY is + not specified, the table and all its descendant tables (if any) are + excluded. Optionally, * can be specified after the + table name to explicitly indicate that descendant tables are excluded. + + + The partitioned table or its partitions are excluded from the publication + based on the parameter publish_via_partition_root. + When publish_via_partition_root is set to + true, specifying a root partitioned table in + EXCEPT TABLE excludes it and all its partitions from + replication. Specifying a leaf partition has no effect, as its changes are + still replicated via the root partitioned table. When + publish_via_partition_root is set to + false, specifying a partitioned table or non-leaf + partition has no effect, as changes are replicated via the leaf + partitions. Specifying a leaf partition excludes only that partition from + replication. + + + + WITH ( publication_parameter [= value] [, ... ] ) @@ -463,6 +498,14 @@ CREATE PUBLICATION production_publication FOR TABLE users, departments, TABLES I CREATE PUBLICATION sales_publication FOR TABLES IN SCHEMA marketing, sales; + + Create a publication that publishes all changes in all the tables except + users and departments: + +CREATE PUBLICATION mypublication FOR ALL TABLES EXCEPT (users, departments); + + + Create a publication that publishes all changes for table users, but replicates only columns user_id and diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index f56c70263e0..f1b3ce380b6 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -2103,8 +2103,9 @@ SELECT $1 \parse stmt1 listed. If x is appended to the command name, the results are displayed in expanded mode. - If + is appended to the command name, the tables and - schemas associated with each publication are shown as well. + If + is appended to the command name, the tables, + excluded tables, and schemas associated with each publication are shown + as well. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index ac2f4ee3561..bec3a34e48f 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -354,7 +354,8 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, * ancestor is at the end of the list. */ Oid -GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level) +GetTopMostAncestorInPublication(Oid puboid, List *ancestors, + int *ancestor_level, bool puballtables) { ListCell *lc; Oid topmost_relid = InvalidOid; @@ -366,32 +367,44 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); - List *apubids = GetRelationPublications(ancestor); - List *aschemaPubids = NIL; + List *apubids = GetRelationPublications(ancestor, false); + List *aschemapubids = NIL; + List *aexceptpubids = NIL; + bool set_top = false; level++; - if (list_member_oid(apubids, puboid)) + /* check if member of table publications */ + set_top = list_member_oid(apubids, puboid); + if (!set_top) { - topmost_relid = ancestor; + aschemapubids = GetSchemaPublications(get_rel_namespace(ancestor)); - if (ancestor_level) - *ancestor_level = level; + /* check if member of schema publications */ + set_top = list_member_oid(aschemapubids, puboid); + + /* + * If the publication is all tables publication and the table is + * not part of exception tables. + */ + if (!set_top && puballtables) + { + aexceptpubids = GetRelationPublications(ancestor, true); + set_top = !list_member_oid(aexceptpubids, puboid); + } } - else + + if (set_top) { - aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); - if (list_member_oid(aschemaPubids, puboid)) - { - topmost_relid = ancestor; + topmost_relid = ancestor; - if (ancestor_level) - *ancestor_level = level; - } + if (ancestor_level) + *ancestor_level = level; } list_free(apubids); - list_free(aschemaPubids); + list_free(aschemapubids); + list_free(aexceptpubids); } return topmost_relid; @@ -466,6 +479,17 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, RelationGetRelationName(targetrel), pub->name))); } + /* + * Check for partitions of partitioned table which are specified with + * EXCEPT clause and partitioned table is published with + * publish_via_partition_root = true. + */ + if (pub->alltables && pri->except && targetrel->rd_rel->relispartition && + pub->pubviaroot) + ereport(WARNING, + (errmsg("partition \"%s\" will be replicated as publish_via_partition_root is \"%s\"", + RelationGetRelationName(targetrel), "true"))); + check_publication_add_relation(targetrel); /* Validate and translate column names into a Bitmapset of attnums. */ @@ -482,6 +506,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_publication_rel_prexcept - 1] = + BoolGetDatum(pri->except); /* Add qualifications, if available */ if (pri->whereClause != NULL) @@ -749,9 +775,9 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) return myself; } -/* Gets list of publication oids for a relation */ +/* Gets list of publication oids for a relation that matches the except_flag */ List * -GetRelationPublications(Oid relid) +GetRelationPublications(Oid relid, bool except_flag) { List *result = NIL; CatCList *pubrellist; @@ -765,7 +791,8 @@ GetRelationPublications(Oid relid) HeapTuple tup = &pubrellist->members[i]->tuple; Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; - result = lappend_oid(result, pubid); + if (except_flag == ((Form_pg_publication_rel) GETSTRUCT(tup))->prexcept) + result = lappend_oid(result, pubid); } ReleaseSysCacheList(pubrellist); @@ -774,13 +801,14 @@ GetRelationPublications(Oid relid) } /* - * Gets list of relation oids for a publication. + * Gets list of relation oids for a publication that matches the except_flag. * * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES * should use GetAllPublicationRelations(). */ List * -GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt, + bool except_flag) { List *result; Relation pubrelsrel; @@ -805,8 +833,11 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) Form_pg_publication_rel pubrel; pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); - result = GetPubPartitionOptionRelations(result, pub_partopt, - pubrel->prrelid); + + if (except_flag == pubrel->prexcept) + result = GetPubPartitionOptionRelations(result, pub_partopt, + pubrel->prrelid); + } systable_endscan(scan); @@ -866,13 +897,19 @@ GetAllTablesPublications(void) * publication. */ List * -GetAllPublicationRelations(char relkind, bool pubviaroot) +GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; + List *exceptlist; + + exceptlist = GetPublicationRelations(pubid, pubviaroot ? + PUBLICATION_PART_ALL : + PUBLICATION_PART_ROOT, + true); Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot)); @@ -891,7 +928,8 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !(relForm->relispartition && pubviaroot)) + !(relForm->relispartition && pubviaroot) && + !list_member_oid(exceptlist, relid)) result = lappend_oid(result, relid); } @@ -912,7 +950,8 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !relForm->relispartition) + !relForm->relispartition && + !list_member_oid(exceptlist, relid)) result = lappend_oid(result, relid); } @@ -1168,7 +1207,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * those. Otherwise, get the partitioned table itself. */ if (pub_elem->alltables) - pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION, + pub_elem_tables = GetAllPublicationRelations(pub_elem->oid, + RELKIND_RELATION, pub_elem->pubviaroot); else { @@ -1178,7 +1218,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) relids = GetPublicationRelations(pub_elem->oid, pub_elem->pubviaroot ? PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); + PUBLICATION_PART_LEAF, + false); schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid, pub_elem->pubviaroot ? PUBLICATION_PART_ROOT : @@ -1367,7 +1408,7 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS) publication = GetPublicationByName(pubname, false); if (publication->allsequences) - sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false); + sequences = GetAllPublicationRelations(publication->oid, RELKIND_SEQUENCE, false); funcctx->user_fctx = (void *) sequences; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 50239513e3f..6fb69e3f3ba 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -170,6 +170,39 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the PublicationObjSpec list which is part of + * PublicationAllObjSpecType list into PublicationTable list. + */ +static void +ObjectsInAllPublicationToOids(List *puballobjspec_list, + ParseState *pstate, List **rels) +{ + if (!puballobjspec_list) + return; + + foreach_ptr(PublicationAllObjSpec, puballobj, puballobjspec_list) + { + switch (puballobj->pubobjtype) + { + case PUBLICATION_ALL_SEQUENCES: + break; + case PUBLICATION_ALL_TABLES: + foreach_ptr(PublicationObjSpec, pubobj, puballobj->except_objects) + { + pubobj->pubtable->except = true; + *rels = lappend(*rels, pubobj->pubtable); + } + break; + default: + /* shouldn't happen */ + elog(ERROR, "invalid publication object type %d", + puballobj->pubobjtype); + break; + } + } +} + /* * Convert the PublicationObjSpecType list into schema oid list and * PublicationTable list. @@ -194,6 +227,11 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, switch (pubobj->pubobjtype) { case PUBLICATIONOBJ_TABLE: + pubobj->pubtable->except = false; + *rels = lappend(*rels, pubobj->pubtable); + break; + case PUBLICATIONOBJ_EXCEPT_TABLE: + pubobj->pubtable->except = true; *rels = lappend(*rels, pubobj->pubtable); break; case PUBLICATIONOBJ_TABLES_IN_SCHEMA: @@ -268,7 +306,7 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context) */ bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, - bool pubviaroot) + bool pubviaroot, bool puballtables) { HeapTuple rftuple; Oid relid = RelationGetRelid(relation); @@ -295,7 +333,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, if (pubviaroot && relation->rd_rel->relispartition) { publish_as_relid - = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + = GetTopMostAncestorInPublication(pubid, ancestors, NULL, + puballtables); if (!OidIsValid(publish_as_relid)) publish_as_relid = relid; @@ -355,7 +394,7 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot, char pubgencols_type, - bool *invalid_column_list, + bool puballtables, bool *invalid_column_list, bool *invalid_gen_col) { Oid relid = RelationGetRelid(relation); @@ -379,7 +418,8 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, */ if (pubviaroot && relation->rd_rel->relispartition) { - publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, + NULL, puballtables); if (!OidIsValid(publish_as_relid)) publish_as_relid = relid; @@ -515,7 +555,7 @@ InvalidatePubRelSyncCache(Oid pubid, bool puballtables) * its leaves. */ relids = GetPublicationRelations(pubid, - PUBLICATION_PART_ALL); + PUBLICATION_PART_ALL, false); schemarelids = GetAllSchemaPublicationRelations(pubid, PUBLICATION_PART_ALL); @@ -923,56 +963,54 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); - /* Associate objects with the publication. */ if (stmt->for_all_tables) - { - /* - * Invalidate relcache so that publication info is rebuilt. Sequences - * publication doesn't require invalidation, as replica identity - * checks don't apply to them. - */ - CacheInvalidateRelcacheAll(); - } + ObjectsInAllPublicationToOids(stmt->pubobjects, pstate, &relations); + else if (!stmt->for_all_sequences) - { ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, &schemaidlist); - /* FOR TABLES IN SCHEMA requires superuser */ - if (schemaidlist != NIL && !superuser()) - ereport(ERROR, - errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to create FOR TABLES IN SCHEMA publication")); + /* FOR TABLES IN SCHEMA requires superuser */ + if (schemaidlist != NIL && !superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR TABLES IN SCHEMA publication")); - if (relations != NIL) - { - List *rels; + if (relations != NIL) + { + List *rels; - rels = OpenTableList(relations); - TransformPubWhereClauses(rels, pstate->p_sourcetext, - publish_via_partition_root); + rels = OpenTableList(relations); + TransformPubWhereClauses(rels, pstate->p_sourcetext, + publish_via_partition_root); - CheckPubRelationColumnList(stmt->pubname, rels, - schemaidlist != NIL, - publish_via_partition_root); + CheckPubRelationColumnList(stmt->pubname, rels, + schemaidlist != NIL, + publish_via_partition_root); - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); - } + PublicationAddTables(puboid, rels, true, NULL); + CloseTableList(rels); + } - if (schemaidlist != NIL) - { - /* - * Schema lock is held until the publication is created to prevent - * concurrent schema deletion. - */ - LockSchemaList(schemaidlist); - PublicationAddSchemas(puboid, schemaidlist, true, NULL); - } + if (schemaidlist != NIL) + { + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. + */ + LockSchemaList(schemaidlist); + PublicationAddSchemas(puboid, schemaidlist, true, NULL); } table_close(rel, RowExclusiveLock); + /* Associate objects with the publication. */ + if (stmt->for_all_tables) + { + /* Invalidate relcache so that publication info is rebuilt. */ + CacheInvalidateRelcacheAll(); + } + InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); if (wal_level != WAL_LEVEL_LOGICAL) @@ -1041,7 +1079,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, AccessShareLock); root_relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ROOT); + PUBLICATION_PART_ROOT, false); foreach(lc, root_relids) { @@ -1161,7 +1199,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, */ if (root_relids == NIL) relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + PUBLICATION_PART_ALL, false); else { /* @@ -1263,6 +1301,27 @@ AlterPublicationReset(ParseState *pstate, AlterPublicationStmt *stmt, values[Anum_pg_publication_puballsequences - 1] = BoolGetDatum(PUB_DEFAULT_ALL_SEQUENCES); replaces[Anum_pg_publication_puballsequences - 1] = true; + /* + * Lock the publication so nobody else can do anything with it. This + * prevents concurrent publication parameter changes, add/drop tables(s) + * to the publication and add/drop schema(s) to the publication. + */ + LockDatabaseObject(PublicationRelationId, pubid, 0, + AccessExclusiveLock); + + /* + * It is possible that by the time we acquire the lock on publication, + * concurrent DDL has removed it. We can test this by checking the + * existence of publication. We get the tuple again to avoid the risk of + * any publication option getting changed. + */ + tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication \"%s\" does not exist", + stmt->pubname)); + if (pubform->puballtables) CacheInvalidateRelcacheAll(); @@ -1285,7 +1344,10 @@ AlterPublicationReset(ParseState *pstate, AlterPublicationStmt *stmt, PublicationDropSchemas(pubid, schemaids, true); /* Get all relations associated with the publication */ - relids = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT); + if (pubform->puballtables) + relids = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT, true); + else + relids = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT, false); foreach_oid(relid, relids) { @@ -1303,6 +1365,80 @@ AlterPublicationReset(ParseState *pstate, AlterPublicationStmt *stmt, CloseTableList(rels); } +/* + * Check if the publication has default values. + * + * Returns true if the publication satisfies all the following conditions: + * a) Publication is not set with "FOR ALL TABLES" + * b) Publication is having default publication parameter values + * c) Publication is not associated with schemas + * d) Publication is not associated with relations + */ +static bool +CheckPublicationDefValues(HeapTuple tup) +{ + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + Oid pubid = pubform->oid; + List *pubobjs = NIL; + + if (pubform->puballtables != PUB_DEFAULT_ALL_TABLES || + pubform->puballsequences != PUB_DEFAULT_ALL_SEQUENCES) + return false; + + if (pubform->pubinsert != PUB_DEFAULT_ACTION_INSERT || + pubform->pubupdate != PUB_DEFAULT_ACTION_UPDATE || + pubform->pubdelete != PUB_DEFAULT_ACTION_DELETE || + pubform->pubtruncate != PUB_DEFAULT_ACTION_TRUNCATE || + pubform->pubviaroot != PUB_DEFAULT_VIA_ROOT) + return false; + + pubobjs = GetPublicationSchemas(pubid); + if (list_length(pubobjs)) + return false; + + pubobjs = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT, false); + if (list_length(pubobjs)) + return false; + + return true; +} + +/* + * Set publication to publish all tables. + */ +static void +AlterPublicationSetAllTables(Relation rel, HeapTuple tup) +{ + Form_pg_publication pubform PG_USED_FOR_ASSERTS_ONLY = (Form_pg_publication) GETSTRUCT(tup); + bool nulls[Natts_pg_publication]; + bool replaces[Natts_pg_publication]; + Datum values[Natts_pg_publication]; + +#ifdef USE_ASSERT_CHECKING + Assert(!pubform->puballtables); +#endif + + /* Add ALL TABLES to the publication requires superuser */ + if (!superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to ADD ALL TABLES to the publication")); + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set ALL TABLES flag */ + values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(true); + replaces[Anum_pg_publication_puballtables - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); +} + /* * Add or remove table to/from publication. */ @@ -1341,7 +1477,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, else /* AP_SetObjects */ { List *oldrelids = GetPublicationRelations(pubid, - PUBLICATION_PART_ROOT); + PUBLICATION_PART_ROOT, + false); List *delrels = NIL; ListCell *oldlc; @@ -1442,6 +1579,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; oldrel->columns = NIL; + oldrel->except = false; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1492,7 +1630,8 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, ListCell *lc; List *reloids; - reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT, + false); foreach(lc, reloids) { @@ -1645,6 +1784,20 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION, stmt->pubname); + if (stmt->for_all_tables) + { + bool isdefault = CheckPublicationDefValues(tup); + + if (!isdefault) + ereport(ERROR, + errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("adding ALL TABLES requires the publication to have default publication parameter values"), + errdetail("ALL TABLES flag should not be set and no tables/schemas should be associated."), + errhint("Use ALTER PUBLICATION ... RESET to reset the publication")); + + AlterPublicationSetAllTables(rel, tup); + } + if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); else if (stmt->action == AP_Reset) @@ -1857,6 +2010,7 @@ OpenTableList(List *tables) pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; pub_rel->columns = t->columns; + pub_rel->except = t->except; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -1929,6 +2083,7 @@ OpenTableList(List *tables) /* child inherits column list from parent */ pub_rel->columns = t->columns; + pub_rel->except = t->except; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 23ebaa3f230..55773cc2ecd 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8651,7 +8651,7 @@ ATExecSetExpression(AlteredTableInfo *tab, Relation rel, const char *colName, * expressions. */ if (attgenerated == ATTRIBUTE_GENERATED_VIRTUAL && - GetRelationPublications(RelationGetRelid(rel)) != NIL) + GetRelationPublications(RelationGetRelid(rel), false) != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns in tables that are part of a publication"), @@ -18846,7 +18846,7 @@ ATPrepChangePersistence(AlteredTableInfo *tab, Relation rel, bool toLogged) * UNLOGGED, as UNLOGGED tables can't be published. */ if (!toLogged && - GetRelationPublications(RelationGetRelid(rel)) != NIL) + list_length(GetRelationPublications(RelationGetRelid(rel), false)) > 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a8b9ae6182d..f2970cc3fdf 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -454,6 +454,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list drop_option_list pub_obj_list pub_all_obj_type_list + except_pub_obj_list opt_except_clause %type returning_clause %type returning_option @@ -591,6 +592,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type var_value zone_value %type auth_ident RoleSpec opt_granted_by %type PublicationObjSpec +%type ExceptPublicationObjSpec %type PublicationAllObjSpec %type unreserved_keyword type_func_name_keyword @@ -10761,6 +10763,7 @@ CreatePublicationStmt: CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; + n->pubobjects = $5; preprocess_pub_all_objtype_list($5, &n->for_all_tables, &n->for_all_sequences, yyscanner); @@ -10801,6 +10804,7 @@ PublicationObjSpec: $$->pubtable->relation = $2; $$->pubtable->columns = $3; $$->pubtable->whereClause = $4; + $$->location = @1; } | TABLES IN_P SCHEMA ColId { @@ -10877,10 +10881,13 @@ pub_obj_list: PublicationObjSpec ; PublicationAllObjSpec: - ALL TABLES + ALL TABLES opt_except_clause { $$ = makeNode(PublicationAllObjSpec); $$->pubobjtype = PUBLICATION_ALL_TABLES; + $$->except_objects = (List *) $3; + if($$->except_objects != NULL) + preprocess_pubobj_list($$->except_objects, yyscanner); $$->location = @1; } | ALL SEQUENCES @@ -10897,6 +10904,28 @@ pub_all_obj_type_list: PublicationAllObjSpec { $$ = lappend($1, $3); } ; +opt_except_clause: + EXCEPT opt_table '(' except_pub_obj_list ')' { $$ = $4; } + | /*EMPTY*/ { $$ = NIL; } + ; + +ExceptPublicationObjSpec: + relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_EXCEPT_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->except = true; + $$->pubtable->relation = $1; + $$->location = @1; + } + ; + +except_pub_obj_list: ExceptPublicationObjSpec + { $$ = list_make1($1); } + | except_pub_obj_list ',' ExceptPublicationObjSpec + { $$ = lappend($1, $3); } + ; /***************************************************************************** * @@ -10913,6 +10942,8 @@ pub_all_obj_type_list: PublicationAllObjSpec * TABLE table_name [, ...] * TABLES IN SCHEMA schema_name [, ...] * + * ALTER PUBLICATION name ADD ALL TABLES EXCEPT [TABLE] (table_name [, ...]) + * * ALTER PUBLICATION name RESET * *****************************************************************************/ @@ -10956,6 +10987,15 @@ AlterPublicationStmt: n->action = AP_DropObjects; $$ = (Node *) n; } + | ALTER PUBLICATION name ADD_P ALL TABLES opt_except_clause + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->pubobjects = (List *) $7; + n->for_all_tables = true; + n->action = AP_AddObjects; + $$ = (Node *)n; + } | ALTER PUBLICATION name RESET { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 942e1abdb58..a9593c5d9da 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -2084,7 +2084,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (!entry->replicate_valid) { Oid schemaId = get_rel_namespace(relid); - List *pubids = GetRelationPublications(relid); + List *pubids = GetRelationPublications(relid, false); + List *exceptTablePubids = GetRelationPublications(relid, true); /* * We don't acquire a lock on the namespace system table as we build @@ -2195,22 +2196,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) Oid pub_relid = relid; int ancestor_level = 0; - /* - * If this is a FOR ALL TABLES publication, pick the partition - * root and set the ancestor level accordingly. - */ - if (pub->alltables) - { - publish = true; - if (pub->pubviaroot && am_partition) - { - List *ancestors = get_partition_ancestors(relid); - - pub_relid = llast_oid(ancestors); - ancestor_level = list_length(ancestors); - } - } - if (!publish) { bool ancestor_published = false; @@ -2229,7 +2214,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) ancestor = GetTopMostAncestorInPublication(pub->oid, ancestors, - &level); + &level, + pub->alltables); if (ancestor != InvalidOid) { @@ -2244,6 +2230,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (list_member_oid(pubids, pub->oid) || list_member_oid(schemaPubids, pub->oid) || + (pub->alltables && + !list_member_oid(exceptTablePubids, pub->oid)) || ancestor_published) publish = true; } @@ -2322,6 +2310,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free(pubids); list_free(schemaPubids); + list_free(exceptTablePubids); list_free(rel_publications); entry->replicate_valid = true; diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 915d0bc9084..96dd0ccf41a 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5794,6 +5794,8 @@ void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) { List *puboids; + List *alltablespuboids; + List *exceptpuboids = NIL; ListCell *lc; MemoryContext oldcxt; Oid schemaid; @@ -5831,9 +5833,10 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->gencols_valid_for_delete = true; /* Fetch the publication membership info. */ - puboids = GetRelationPublications(relid); + puboids = GetRelationPublications(relid, false); schemaid = RelationGetNamespace(relation); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + exceptpuboids = GetRelationPublications(relid, true); if (relation->rd_rel->relispartition) { @@ -5845,14 +5848,19 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) Oid ancestor = lfirst_oid(lc); puboids = list_concat_unique_oid(puboids, - GetRelationPublications(ancestor)); + GetRelationPublications(ancestor, false)); schemaid = get_rel_namespace(ancestor); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + exceptpuboids = list_concat_unique_oid(exceptpuboids, + GetRelationPublications(ancestor, true)); } } - puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + alltablespuboids = GetAllTablesPublications(); + puboids = list_concat_unique_oid(puboids, + list_difference_oid(alltablespuboids, + exceptpuboids)); foreach(lc, puboids) { Oid pubid = lfirst_oid(lc); @@ -5883,7 +5891,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) if (!pubform->puballtables && (pubform->pubupdate || pubform->pubdelete) && pub_rf_contains_invalid_column(pubid, relation, ancestors, - pubform->pubviaroot)) + pubform->pubviaroot, pubform->puballtables)) { if (pubform->pubupdate) pubdesc->rf_valid_for_update = false; @@ -5901,6 +5909,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pub_contains_invalid_column(pubid, relation, ancestors, pubform->pubviaroot, pubform->pubgencols, + pubform->puballtables, &invalid_column_list, &invalid_gen_col)) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index a00918bacb4..e34aaba7937 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -186,6 +186,8 @@ static SimpleOidList extension_include_oids = {NULL, NULL}; static SimpleStringList extension_exclude_patterns = {NULL, NULL}; static SimpleOidList extension_exclude_oids = {NULL, NULL}; +static SimplePtrList exceptinfo = {NULL, NULL}; + static const CatalogId nilCatalogId = {0, 0}; /* override for standard extra_float_digits setting */ @@ -4662,7 +4664,34 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) if (pubinfo->puballtables && pubinfo->puballsequences) appendPQExpBufferStr(query, " FOR ALL TABLES, ALL SEQUENCES"); else if (pubinfo->puballtables) + { + SimplePtrListCell *cell; + appendPQExpBufferStr(query, " FOR ALL TABLES"); + + /* Include exception tables if the publication has except tables */ + for (cell = exceptinfo.head; cell; cell = cell->next) + { + PublicationRelInfo *pubrinfo = (PublicationRelInfo *) cell->ptr; + TableInfo *tbinfo; + + if (pubinfo == pubrinfo->publication) + { + tbinfo = pubrinfo->pubtable; + + if (first) + { + appendPQExpBufferStr(query, " EXCEPT TABLE ("); + first = false; + } + else + appendPQExpBufferStr(query, ", "); + appendPQExpBuffer(query, "ONLY %s", fmtQualifiedDumpable(tbinfo)); + } + } + if (!first) + appendPQExpBufferStr(query, ")"); + } else if (pubinfo->puballsequences) appendPQExpBufferStr(query, " FOR ALL SEQUENCES"); @@ -4831,6 +4860,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_prrelid; int i_prrelqual; int i_prattrs; + int i_prexcept; int i, j, ntups; @@ -4842,8 +4872,16 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Collect all publication membership info. */ if (fout->remoteVersion >= 150000) + { + appendPQExpBufferStr(query, + "SELECT tableoid, oid, prpubid, prrelid,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, " prexcept,\n"); + else + appendPQExpBufferStr(query, " false AS prexcept,\n"); + appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid, " "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, " "(CASE\n" " WHEN pr.prattrs IS NOT NULL THEN\n" @@ -4854,6 +4892,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" " ELSE NULL END) prattrs " "FROM pg_catalog.pg_publication_rel pr"); + } else appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " @@ -4869,6 +4908,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_prrelid = PQfnumber(res, "prrelid"); i_prrelqual = PQfnumber(res, "prrelqual"); i_prattrs = PQfnumber(res, "prattrs"); + i_prexcept = PQfnumber(res, "prexcept"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4880,6 +4920,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid)); PublicationInfo *pubinfo; TableInfo *tbinfo; + char *prexcept = pg_strdup(PQgetvalue(res, i, i_prexcept)); /* * Ignore any entries for which we aren't interested in either the @@ -4893,7 +4934,11 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) continue; /* OK, make a DumpableObject for this relationship */ - pubrinfo[j].dobj.objType = DO_PUBLICATION_REL; + if (strcmp(prexcept, "f") == 0) + pubrinfo[j].dobj.objType = DO_PUBLICATION_REL; + else + pubrinfo[j].dobj.objType = DO_PUBLICATION_EXCEPT_REL; + pubrinfo[j].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid)); pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); @@ -4934,6 +4979,9 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); + if (strcmp(prexcept, "t") == 0) + simple_ptr_list_append(&exceptinfo, &pubrinfo[j]); + j++; } @@ -11812,6 +11860,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_PUBLICATION: dumpPublication(fout, (const PublicationInfo *) dobj); break; + case DO_PUBLICATION_EXCEPT_REL: + /* will be dumped in dumpPublication */ + break; case DO_PUBLICATION_REL: dumpPublicationTable(fout, (const PublicationRelInfo *) dobj); break; @@ -20182,6 +20233,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_DEFAULT_ACL: case DO_POLICY: case DO_PUBLICATION: + case DO_PUBLICATION_EXCEPT_REL: case DO_PUBLICATION_REL: case DO_PUBLICATION_TABLE_IN_SCHEMA: case DO_SUBSCRIPTION: diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 72a00e1bc20..723b5575c53 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -81,6 +81,7 @@ typedef enum DO_REFRESH_MATVIEW, DO_POLICY, DO_PUBLICATION, + DO_PUBLICATION_EXCEPT_REL, DO_PUBLICATION_REL, DO_PUBLICATION_TABLE_IN_SCHEMA, DO_REL_STATS, diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 164c76e0864..f3c30f3be37 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -92,6 +92,7 @@ enum dbObjectTypePriorities PRIO_FK_CONSTRAINT, PRIO_POLICY, PRIO_PUBLICATION, + PRIO_PUBLICATION_EXCEPT_REL, PRIO_PUBLICATION_REL, PRIO_PUBLICATION_TABLE_IN_SCHEMA, PRIO_SUBSCRIPTION, @@ -147,6 +148,7 @@ static const int dbObjectTypePriority[] = [DO_REFRESH_MATVIEW] = PRIO_REFRESH_MATVIEW, [DO_POLICY] = PRIO_POLICY, [DO_PUBLICATION] = PRIO_PUBLICATION, + [DO_PUBLICATION_EXCEPT_REL] = PRIO_PUBLICATION_EXCEPT_REL, [DO_PUBLICATION_REL] = PRIO_PUBLICATION_REL, [DO_PUBLICATION_TABLE_IN_SCHEMA] = PRIO_PUBLICATION_TABLE_IN_SCHEMA, [DO_REL_STATS] = PRIO_STATISTICS_DATA_DATA, @@ -443,6 +445,17 @@ DOTypeNameCompare(const void *p1, const void *p2) if (cmpval != 0) return cmpval; } + else if (obj1->objType == DO_PUBLICATION_EXCEPT_REL) + { + PublicationRelInfo *probj1 = *(PublicationRelInfo *const *) p1; + PublicationRelInfo *probj2 = *(PublicationRelInfo *const *) p2; + + /* Sort by publication name, since (namespace, name) match the rel */ + cmpval = strcmp(probj1->publication->dobj.name, + probj2->publication->dobj.name); + if (cmpval != 0) + return cmpval; + } else if (obj1->objType == DO_PUBLICATION_TABLE_IN_SCHEMA) { PublicationSchemaInfo *psobj1 = *(PublicationSchemaInfo *const *) p1; @@ -1715,6 +1728,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_EXCEPT_REL: + snprintf(buf, bufsize, + "PUBLICATION EXCEPT TABLE (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_PUBLICATION_REL: snprintf(buf, bufsize, "PUBLICATION TABLE (ID %d OID %u)", diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 445a541abf6..156319b8038 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -3170,6 +3170,26 @@ my %tests = ( like => { %full_runs, section_post_data => 1, }, }, + 'CREATE PUBLICATION pub8' => { + create_order => 50, + create_sql => + 'CREATE PUBLICATION pub8 FOR ALL TABLES EXCEPT (dump_test.test_table);', + regexp => qr/^ + \QCREATE PUBLICATION pub8 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_table) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + + 'CREATE PUBLICATION pub9' => { + create_order => 50, + create_sql => + 'CREATE PUBLICATION pub9 FOR ALL TABLES EXCEPT TABLE (dump_test.test_table, dump_test.test_second_table);', + regexp => qr/^ + \QCREATE PUBLICATION pub9 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_table, ONLY dump_test.test_second_table) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + 'CREATE SUBSCRIPTION sub1' => { create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub1 @@ -5157,7 +5177,7 @@ foreach my $run (sort keys %pgdump_runs) # # Either "all_runs" should be set or there should be a "like" list, # even if it is empty. (This makes the test more self-documenting.) - if (!defined($tests{$test}->{all_runs}) + if ( !defined($tests{$test}->{all_runs}) && !defined($tests{$test}->{like})) { die "missing \"like\" in test \"$test\""; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 36f24502842..50b1d435359 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3073,17 +3073,34 @@ describeOneTableDetails(const char *schemaname, " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" " ELSE NULL END) " "FROM pg_catalog.pg_publication p\n" - " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" - " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" - "WHERE pr.prrelid = '%s'\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" + "WHERE pr.prrelid = '%s'\n", + oid, oid, oid); + + if (pset.sversion >= 190000) + appendPQExpBufferStr(&buf, " AND NOT pr.prexcept\n"); + + appendPQExpBuffer(&buf, "UNION\n" "SELECT pubname\n" - " , NULL\n" - " , NULL\n" + " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" - "ORDER BY 1;", - oid, oid, oid, oid); + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n", + oid); + + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, + " AND NOT EXISTS (\n" + " SELECT 1\n" + " FROM pg_catalog.pg_publication_rel pr\n" + " JOIN pg_catalog.pg_class pc\n" + " ON pr.prrelid = pc.oid\n" + " WHERE pr.prrelid = '%s' AND pr.prpubid = p.oid)\n", + oid); + + appendPQExpBufferStr(&buf, "ORDER BY 1;"); } else { @@ -6753,8 +6770,12 @@ describePublications(const char *pattern) " pg_catalog.pg_publication_rel pr\n" "WHERE c.relnamespace = n.oid\n" " AND c.oid = pr.prrelid\n" - " AND pr.prpubid = '%s'\n" - "ORDER BY 1,2", pubid); + " AND pr.prpubid = '%s'\n", pubid); + + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, " AND NOT pr.prexcept\n"); + + appendPQExpBuffer(&buf, "ORDER BY 1,2"); if (!addFooterToPublicationDesc(&buf, _("Tables:"), false, &cont)) goto error_return; @@ -6772,6 +6793,23 @@ describePublications(const char *pattern) goto error_return; } } + else + { + if (pset.sversion >= 190000) + { + /* Get the excluded tables for the specified publication */ + printfPQExpBuffer(&buf, + "SELECT concat(c.relnamespace::regnamespace, '.', c.relname)\n" + "FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_publication_rel pr ON c.oid = pr.prrelid\n" + "WHERE pr.prpubid = '%s'\n" + " AND pr.prexcept\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, _("Except tables:"), + true, &cont)) + goto error_return; + } + } printTable(&cont, pset.queryFout, false, pset.logfile); printTableCleanup(&cont); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 5d918abaa87..b376c400c69 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2292,11 +2292,16 @@ match_previous_words(int pattern_id, COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "RESET", "SET"); /* ALTER PUBLICATION ADD */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) - COMPLETE_WITH("TABLES IN SCHEMA", "TABLE"); + COMPLETE_WITH("ALL TABLES", "TABLES IN SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "ALL", "TABLES")) + COMPLETE_WITH("EXCEPT TABLE"); else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); - else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") && - ends_with(prev_wd, ',')) + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "ALL", "TABLES", "EXCEPT", "TABLE")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); + else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") && ends_with(prev_wd, ',')) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); + else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD", "ALL", "TABLES", "EXCEPT", "TABLE") && ends_with(prev_wd, ',')) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); /* @@ -3623,6 +3628,10 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL")) COMPLETE_WITH("TABLES", "SEQUENCES"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES")) + COMPLETE_WITH("EXCEPT TABLE", "WITH ("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT")) + COMPLETE_WITH("TABLE"); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE", MatchAny) && !ends_with(prev_wd, ',')) COMPLETE_WITH("WITH ("); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLES")) COMPLETE_WITH("IN SCHEMA"); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 77b0a2f9eb8..0d39cb67779 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -146,11 +146,12 @@ typedef struct PublicationRelInfo Relation relation; Node *whereClause; List *columns; + bool except; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); -extern List *GetRelationPublications(Oid relid); +extern List *GetRelationPublications(Oid relid, bool except_flag); /* default values for flags and publication parameters */ #define PUB_DEFAULT_ACTION_INSERT true @@ -178,9 +179,10 @@ typedef enum PublicationPartOpt PUBLICATION_PART_ALL, } PublicationPartOpt; -extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt, + bool except_flag); extern List *GetAllTablesPublications(void); -extern List *GetAllPublicationRelations(char relkind, bool pubviaroot); +extern List *GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); extern List *GetSchemaPublicationRelations(Oid schemaid, @@ -191,7 +193,7 @@ extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid); extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, - int *ancestor_level); + int *ancestor_level, bool puballtables); extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 92cc36dfdf6..e7d7f3ba85c 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ + bool prexcept BKI_DEFAULT(f); /* exclude the relation */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ pg_node_tree prqual; /* qualifications */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index f90cf1ef896..0ad5d28754d 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -32,10 +32,11 @@ extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); extern void InvalidatePublicationRels(List *relids); extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, - List *ancestors, bool pubviaroot); + List *ancestors, bool pubviaroot, + bool puballtables); extern bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot, - char pubgencols_type, + char pubgencols_type, bool puballtables, bool *invalid_column_list, bool *invalid_gen_col); extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 8cf75724a7b..a14ecedb27f 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4271,6 +4271,7 @@ typedef struct PublicationTable RangeVar *relation; /* relation to be published */ Node *whereClause; /* qualifications */ List *columns; /* List of columns in a publication table */ + bool except; /* exclude the relation */ } PublicationTable; /* @@ -4279,6 +4280,7 @@ typedef struct PublicationTable typedef enum PublicationObjSpecType { PUBLICATIONOBJ_TABLE, /* A table */ + PUBLICATIONOBJ_EXCEPT_TABLE, /* A table to be excluded */ PUBLICATIONOBJ_TABLES_IN_SCHEMA, /* All tables in schema */ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA, /* All tables in first element of * search_path */ @@ -4307,6 +4309,7 @@ typedef struct PublicationAllObjSpec { NodeTag type; PublicationAllObjType pubobjtype; /* type of this publication object */ + List *except_objects; /* List of publication object to be excluded */ ParseLoc location; /* token location, or -1 if unknown */ } PublicationAllObjSpec; @@ -4342,6 +4345,7 @@ typedef struct AlterPublicationStmt * objects. */ List *pubobjects; /* Optional list of publication objects */ + bool for_all_tables; /* Special publication for all tables in db */ AlterPublicationAction action; /* What action to perform with the given * objects */ } AlterPublicationStmt; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index e3be29e378d..55cc7d5ee71 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -213,13 +213,37 @@ Not-null constraints: regress_publication_user | t | f | t | t | f | f | none | f (1 row) +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_foralltables_excepttable FOR ALL TABLES EXCEPT TABLE (testpub_tbl1, testpub_tbl2); +-- specify EXCEPT without TABLE +CREATE PUBLICATION testpub_foralltables_excepttable1 FOR ALL TABLES EXCEPT (testpub_tbl1); +RESET client_min_messages; +\dRp+ testpub_foralltables_excepttable + Publication testpub_foralltables_excepttable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl1" + "public.testpub_tbl2" + +\dRp+ testpub_foralltables_excepttable1 + Publication testpub_foralltables_excepttable1 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl1" + DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema, testpub_foralltables_excepttable, testpub_foralltables_excepttable1; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; +CREATE PUBLICATION testpub5 FOR ALL TABLES EXCEPT TABLE (testpub_tbl3); +CREATE PUBLICATION testpub6 FOR ALL TABLES EXCEPT TABLE (ONLY testpub_tbl3); RESET client_min_messages; \dRp+ testpub3 Publication testpub3 @@ -238,8 +262,25 @@ Tables: Tables: "public.testpub_tbl3" +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl3" + "public.testpub_tbl3a" + +\dRp+ testpub6 + Publication testpub6 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl3" + DROP TABLE testpub_tbl3, testpub_tbl3a; -DROP PUBLICATION testpub3, testpub4; +DROP PUBLICATION testpub3, testpub4, testpub5, testpub6; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; CREATE SEQUENCE pub_test.regress_pub_seq1; @@ -2012,6 +2053,7 @@ DROP TABLE gencols; -- Tests for ALTER PUBLICATION ... RESET CREATE SCHEMA pub_sch1; CREATE TABLE pub_sch1.tbl1 (a int); +CREATE TABLE pub_sch1.tbl2 (a int); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_reset FOR ALL TABLES; RESET client_min_messages; @@ -2020,8 +2062,18 @@ ALTER PUBLICATION testpub_reset OWNER TO regress_publication_user2; SET ROLE regress_publication_user2; ALTER PUBLICATION testpub_reset RESET; -- fail - must be superuser ERROR: must be superuser to RESET publication +-- Verify that only superuser can ADD ALL TABLES +ALTER PUBLICATION testpub_reset ADD ALL TABLES; +ERROR: adding ALL TABLES requires the publication to have default publication parameter values +DETAIL: ALL TABLES flag should not be set and no tables/schemas should be associated. +HINT: Use ALTER PUBLICATION ... RESET to reset the publication ALTER PUBLICATION testpub_reset OWNER TO regress_publication_user; SET ROLE regress_publication_user; +-- Can't add EXCEPT TABLE to 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); +ERROR: adding ALL TABLES requires the publication to have default publication parameter values +DETAIL: ALL TABLES flag should not be set and no tables/schemas should be associated. +HINT: Use ALTER PUBLICATION ... RESET to reset the publication -- Verify that 'ALL TABLES' flag is reset \dRp+ testpub_reset Publication testpub_reset @@ -2038,8 +2090,25 @@ ALTER PUBLICATION testpub_reset RESET; regress_publication_user | f | f | t | t | t | t | none | f (1 row) --- Verify that associated tables are removed from the publication after RESET +-- Should work now after resetting the publication +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1, pub_sch1.tbl2); +\dRp+ testpub_reset + Publication testpub_reset + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "pub_sch1.tbl1" + "pub_sch1.tbl2" + +ALTER PUBLICATION testpub_reset RESET; +-- Can't add EXCEPT TABLE to 'FOR TABLE' publication ALTER PUBLICATION testpub_reset ADD TABLE pub_sch1.tbl1; +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); +ERROR: adding ALL TABLES requires the publication to have default publication parameter values +DETAIL: ALL TABLES flag should not be set and no tables/schemas should be associated. +HINT: Use ALTER PUBLICATION ... RESET to reset the publication +-- Verify that associated tables are removed from the publication after RESET \dRp+ testpub_reset Publication testpub_reset Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root @@ -2056,8 +2125,13 @@ ALTER PUBLICATION testpub_reset RESET; regress_publication_user | f | f | t | t | t | t | none | f (1 row) --- Verify that associated schemas are removed from the publication after RESET +-- Can't add EXCEPT TABLE to 'FOR ALL TABLES IN SCHEMA' publication ALTER PUBLICATION testpub_reset ADD TABLES IN SCHEMA public; +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); +ERROR: adding ALL TABLES requires the publication to have default publication parameter values +DETAIL: ALL TABLES flag should not be set and no tables/schemas should be associated. +HINT: Use ALTER PUBLICATION ... RESET to reset the publication +-- Verify that associated schemas are removed from the publication after RESET \dRp+ testpub_reset Publication testpub_reset Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root @@ -2074,8 +2148,14 @@ ALTER PUBLICATION testpub_reset RESET; regress_publication_user | f | f | t | t | t | t | none | f (1 row) --- Verify that 'PUBLISH' parameter is reset +-- Can't add EXCEPT TABLE when the 'PUBLISH' parameter does not have default +-- value ALTER PUBLICATION testpub_reset SET (PUBLISH = ''); +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); +ERROR: adding ALL TABLES requires the publication to have default publication parameter values +DETAIL: ALL TABLES flag should not be set and no tables/schemas should be associated. +HINT: Use ALTER PUBLICATION ... RESET to reset the publication +-- Verify that 'PUBLISH' parameter is reset \dRp+ testpub_reset Publication testpub_reset Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root @@ -2091,8 +2171,14 @@ ALTER PUBLICATION testpub_reset RESET; regress_publication_user | f | f | t | t | t | t | none | f (1 row) --- Verify that 'PUBLISH_VIA_PARTITION_ROOT' parameter is reset +-- Can't add EXCEPT TABLE when 'PUBLISH_VIA_PARTITION_ROOT' parameter does not +-- have default value ALTER PUBLICATION testpub_reset SET (PUBLISH_VIA_PARTITION_ROOT = 'true'); +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); +ERROR: adding ALL TABLES requires the publication to have default publication parameter values +DETAIL: ALL TABLES flag should not be set and no tables/schemas should be associated. +HINT: Use ALTER PUBLICATION ... RESET to reset the publication +-- Verify that 'PUBLISH_VIA_PARTITION_ROOT' parameter is reset \dRp+ testpub_reset Publication testpub_reset Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root @@ -2127,6 +2213,7 @@ ALTER PUBLICATION testpub_reset RESET; DROP PUBLICATION testpub_reset; DROP TABLE pub_sch1.tbl1; +DROP TABLE pub_sch1.tbl2; DROP SCHEMA pub_sch1; RESET client_min_messages; -- Test that the INSERT ON CONFLICT command correctly checks REPLICA IDENTITY diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 84deaaf5a1f..d3c03f54278 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -105,20 +105,33 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall \d+ testpub_tbl2 \dRp+ testpub_foralltables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_foralltables_excepttable FOR ALL TABLES EXCEPT TABLE (testpub_tbl1, testpub_tbl2); +-- specify EXCEPT without TABLE +CREATE PUBLICATION testpub_foralltables_excepttable1 FOR ALL TABLES EXCEPT (testpub_tbl1); +RESET client_min_messages; + +\dRp+ testpub_foralltables_excepttable +\dRp+ testpub_foralltables_excepttable1 + DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema, testpub_foralltables_excepttable, testpub_foralltables_excepttable1; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; +CREATE PUBLICATION testpub5 FOR ALL TABLES EXCEPT TABLE (testpub_tbl3); +CREATE PUBLICATION testpub6 FOR ALL TABLES EXCEPT TABLE (ONLY testpub_tbl3); RESET client_min_messages; \dRp+ testpub3 \dRp+ testpub4 +\dRp+ testpub5 +\dRp+ testpub6 DROP TABLE testpub_tbl3, testpub_tbl3a; -DROP PUBLICATION testpub3, testpub4; +DROP PUBLICATION testpub3, testpub4, testpub5, testpub6; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; @@ -1271,6 +1284,7 @@ DROP TABLE gencols; -- Tests for ALTER PUBLICATION ... RESET CREATE SCHEMA pub_sch1; CREATE TABLE pub_sch1.tbl1 (a int); +CREATE TABLE pub_sch1.tbl2 (a int); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_reset FOR ALL TABLES; RESET client_min_messages; @@ -1279,34 +1293,59 @@ RESET client_min_messages; ALTER PUBLICATION testpub_reset OWNER TO regress_publication_user2; SET ROLE regress_publication_user2; ALTER PUBLICATION testpub_reset RESET; -- fail - must be superuser + +-- Verify that only superuser can ADD ALL TABLES +ALTER PUBLICATION testpub_reset ADD ALL TABLES; ALTER PUBLICATION testpub_reset OWNER TO regress_publication_user; SET ROLE regress_publication_user; +-- Can't add EXCEPT TABLE to 'FOR ALL TABLES' publication +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); + -- Verify that 'ALL TABLES' flag is reset \dRp+ testpub_reset ALTER PUBLICATION testpub_reset RESET; \dRp+ testpub_reset --- Verify that associated tables are removed from the publication after RESET +-- Should work now after resetting the publication +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1, pub_sch1.tbl2); +\dRp+ testpub_reset +ALTER PUBLICATION testpub_reset RESET; + +-- Can't add EXCEPT TABLE to 'FOR TABLE' publication ALTER PUBLICATION testpub_reset ADD TABLE pub_sch1.tbl1; +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); + +-- Verify that associated tables are removed from the publication after RESET \dRp+ testpub_reset ALTER PUBLICATION testpub_reset RESET; \dRp+ testpub_reset --- Verify that associated schemas are removed from the publication after RESET +-- Can't add EXCEPT TABLE to 'FOR ALL TABLES IN SCHEMA' publication ALTER PUBLICATION testpub_reset ADD TABLES IN SCHEMA public; +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); + +-- Verify that associated schemas are removed from the publication after RESET \dRp+ testpub_reset ALTER PUBLICATION testpub_reset RESET; \dRp+ testpub_reset --- Verify that 'PUBLISH' parameter is reset +-- Can't add EXCEPT TABLE when the 'PUBLISH' parameter does not have default +-- value ALTER PUBLICATION testpub_reset SET (PUBLISH = ''); +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); + +-- Verify that 'PUBLISH' parameter is reset \dRp+ testpub_reset ALTER PUBLICATION testpub_reset RESET; \dRp+ testpub_reset --- Verify that 'PUBLISH_VIA_PARTITION_ROOT' parameter is reset +-- Can't add EXCEPT TABLE when 'PUBLISH_VIA_PARTITION_ROOT' parameter does not +-- have default value ALTER PUBLICATION testpub_reset SET (PUBLISH_VIA_PARTITION_ROOT = 'true'); +ALTER PUBLICATION testpub_reset ADD ALL TABLES EXCEPT TABLE (pub_sch1.tbl1); + +-- Verify that 'PUBLISH_VIA_PARTITION_ROOT' parameter is reset \dRp+ testpub_reset ALTER PUBLICATION testpub_reset RESET; \dRp+ testpub_reset @@ -1319,6 +1358,7 @@ ALTER PUBLICATION testpub_reset RESET; DROP PUBLICATION testpub_reset; DROP TABLE pub_sch1.tbl1; +DROP TABLE pub_sch1.tbl2; DROP SCHEMA pub_sch1; RESET client_min_messages; diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 85d10a89994..b8e5c54c314 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -46,6 +46,7 @@ tests += { 't/034_temporal.pl', 't/035_conflicts.pl', 't/036_sequences.pl', + 't/037_rep_changes_except_table.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/037_rep_changes_except_table.pl b/src/test/subscription/t/037_rep_changes_except_table.pl new file mode 100644 index 00000000000..096e0606365 --- /dev/null +++ b/src/test/subscription/t/037_rep_changes_except_table.pl @@ -0,0 +1,186 @@ + +# Copyright (c) 2021-2025, PostgreSQL Global Development Group + +# Logical replication tests for except table publications +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Test replication with publications created using FOR ALL TABLES EXCEPT TABLE +# clause. +# Create schemas and tables on publisher +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SCHEMA sch1; + CREATE TABLE sch1.tab1 AS SELECT generate_series(1,10) AS a; + CREATE TABLE public.tab1(a int); +)); + +# Create schemas and tables on subscriber +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SCHEMA sch1; + CREATE TABLE sch1.tab1 (a int); + CREATE TABLE public.tab1 (a int); +)); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_schema FOR ALL TABLES EXCEPT TABLE (sch1.tab1)" +); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, + 'tap_sub_schema'); + +# Check the table data does not sync for excluded table +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.tab1"); +is($result, qq(0||), + 'check there is no initial data copied for the excluded table'); + +# Insert some data and verify that inserted data is not replicated +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.tab1 VALUES(generate_series(11,20))"); + +$node_publisher->wait_for_catchup('tap_sub_schema'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.tab1"); +is($result, qq(0||), 'check replicated inserts on subscriber'); + +# Alter publication to exclude data changes in public.tab1 and verify that +# subscriber does not get the changed data for this table. +$node_publisher->safe_psql( + 'postgres', qq( + ALTER PUBLICATION tap_pub_schema RESET; + ALTER PUBLICATION tap_pub_schema ADD ALL TABLES EXCEPT TABLE (sch1.tab1, public.tab1); + INSERT INTO public.tab1 VALUES(generate_series(1,10)); +)); +$node_publisher->wait_for_catchup('tap_sub_schema'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM public.tab1"); +is($result, qq(0||), 'check rows on subscriber catchup'); + +# cleanup +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_schema"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_schema"); + + +# Check behaviour of publish_via_partition_root and EXCEPT clause with +# partitioned table or partiitions of partitioned table. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE sch1.t1(a int) PARTITION BY RANGE(a); + CREATE TABLE sch1.part1 PARTITION OF sch1.t1 FOR VALUES FROM (0) TO (5); + INSERT INTO sch1.t1 VALUES (1); +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE sch1.t1(a int); + CREATE TABLE sch1.part1(a int); +)); + +# publish_via_partition_root = false and EXCEPT sch1.part1 +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.part1)"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', "INSERT INTO sch1.t1 VALUES (2)"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is($result, qq(), 'check rows on partitions'); + +# publish_via_partition_root = false and EXCEPT sch1.t1 +$node_publisher->safe_psql( + 'postgres', qq( + ALTER PUBLICATION tap_pub_part RESET; + ALTER PUBLICATION tap_pub_part ADD ALL TABLES EXCEPT (sch1.t1); +)); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_part REFRESH PUBLICATION"); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', "INSERT INTO sch1.t1 VALUES (3)"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is( $result, qq(1 +2 +3), 'check rows on partitions'); +$node_publisher->safe_psql('postgres', "TRUNCATE sch1.t1"); +$node_publisher->wait_for_catchup('tap_sub_part'); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); + +# publish_via_partition_root = true and EXCEPT sch1.t1 +$node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1) WITH (publish_via_partition_root); + INSERT INTO sch1.t1 VALUES (1) +)); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', "INSERT INTO sch1.t1 VALUES (2)"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is($result, qq(), 'check rows on partitions'); + +# publish_via_partition_root = true and EXCEPT sch1.part1 +$node_publisher->safe_psql( + 'postgres', qq( + ALTER PUBLICATION tap_pub_part RESET; + ALTER PUBLICATION tap_pub_part ADD ALL TABLES EXCEPT (sch1.part1); + ALTER PUBLICATION tap_pub_part SET (publish_via_partition_root); +)); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_part REFRESH PUBLICATION"); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', "INSERT INTO sch1.t1 VALUES (3)"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is( $result, qq(1 +2 +3), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is($result, qq(), 'check rows on partitions'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +done_testing(); -- 2.34.1