From 95ac39989edc7743c06123b8f1ddb27f4aefb776 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Thu, 19 Feb 2026 20:16:39 +0530 Subject: [PATCH v50 1/3] Skip publishing the tables specified in EXCEPT TABLE. A new "EXCEPT TABLE" clause for CREATE PUBLICATION allows one or more tables to be excluded. The publisher will not send the data of excluded tables to the subscriber. The new syntax allows specifying excluded relations when creating a publication. For example: CREATE PUBLICATION pub1 FOR ALL TABLES EXCEPT TABLE (t1,t2); A new column "prexcept" is added to table "pg_publication_rel", to flag the relations that the user wants to exclude from the publications. Only root partitioned tables can be specified in the EXCEPT TABLE clause. Specifying a root partitioned table excludes all partitions belonging to that partition hierarchy from publication. --- doc/src/sgml/catalogs.sgml | 10 + doc/src/sgml/logical-replication.sgml | 6 +- doc/src/sgml/ref/create_publication.sgml | 61 +++- doc/src/sgml/ref/psql-ref.sgml | 5 +- src/backend/catalog/pg_publication.c | 224 +++++++++++--- src/backend/commands/publicationcmds.c | 50 +++- src/backend/commands/subscriptioncmds.c | 124 ++++++++ src/backend/commands/tablecmds.c | 20 +- src/backend/parser/gram.y | 39 ++- src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 31 +- src/backend/utils/cache/relcache.c | 21 +- src/bin/pg_dump/pg_dump.c | 68 +++++ src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 30 ++ src/bin/psql/describe.c | 87 +++++- src/bin/psql/tab-complete.in.c | 12 +- src/include/catalog/pg_publication.h | 18 +- src/include/catalog/pg_publication_rel.h | 1 + src/include/commands/subscriptioncmds.h | 2 + src/include/nodes/parsenodes.h | 5 +- src/test/regress/expected/publication.out | 130 ++++++++- src/test/regress/sql/publication.sql | 65 ++++- src/test/subscription/meson.build | 1 + .../t/037_rep_changes_except_table.pl | 274 ++++++++++++++++++ 25 files changed, 1166 insertions(+), 120 deletions(-) create mode 100644 src/test/subscription/t/037_rep_changes_except_table.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index e7067c84ece..69588937719 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6572,6 +6572,16 @@ SCRAM-SHA-256$<iteration count>:&l + + + prexcept bool + + + True if the table is excluded from the publication. See + EXCEPT TABLE. + + + prqual pg_node_tree diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 5028fe9af09..bcb473c078b 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -116,7 +116,11 @@ FOR TABLES IN SCHEMA, FOR ALL TABLES, or FOR ALL SEQUENCES. Unlike tables, sequences can be synchronized at any time. For more information, see - . + . When a publication is + created with FOR ALL TABLES, a table or set of tables can + be explicitly excluded from publication using the + EXCEPT TABLE + clause. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 6efbb915cec..c305dfc50b7 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -32,12 +32,16 @@ CREATE PUBLICATION name and publication_all_object is one of: - ALL TABLES + ALL TABLES [ EXCEPT [ TABLE ] ( except_table_object [, ... ] ) ] ALL SEQUENCES and table_and_columns is: [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] + +and except_table_object is: + + [ ONLY ] table_name [ * ] @@ -164,7 +168,8 @@ CREATE PUBLICATION name Marks the publication as one that replicates changes for all tables in - the database, including tables created in the future. + the database, including tables created in the future. Tables listed in + EXCEPT TABLE are excluded from the publication. @@ -184,6 +189,41 @@ CREATE PUBLICATION name + + EXCEPT TABLE + + + This clause specifies a list of tables to be excluded from the + publication. + + + For inherited tables, if ONLY is specified before the + table name, only that table is excluded from the publication. If + ONLY is not specified, the table and all its descendant + tables (if any) are excluded. Optionally, * can be + specified after the table name to explicitly indicate that descendant + tables are excluded. + + + For partitioned tables, only the root partitioned table may be specified + in EXCEPT TABLE. Doing so excludes the root table and + all of its partitions from replication. The optional + ONLY and * has no effect for + partitioned tables. + + + There can be a case where a subscription includes multiple publications. + In such a case, a table or partition that is included in one publication + and listed in the EXCEPT TABLE clause of another is + considered included for replication. + + + Subscribing to multiple publications that specify + EXCEPT TABLE lists is not supported. + + + + WITH ( publication_parameter [= value] [, ... ] ) @@ -489,6 +529,23 @@ CREATE PUBLICATION all_sequences FOR ALL SEQUENCES; all sequences for synchronization: CREATE PUBLICATION all_tables_sequences FOR ALL TABLES, ALL SEQUENCES; + + + + + Create a publication that publishes all changes in all tables except + users and departments: + +CREATE PUBLICATION all_tables_except FOR ALL TABLES EXCEPT (users, departments); + + + + + Create a publication that publishes all sequences for synchronization, and + all changes in all tables except users and + departments: + +CREATE PUBLICATION all_sequences_tables_except FOR ALL SEQUENCES, ALL TABLES EXCEPT (users, departments); diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index 8b1d948ba05..1045bc6a02c 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -2103,8 +2103,9 @@ SELECT $1 \parse stmt1 listed. If x is appended to the command name, the results are displayed in expanded mode. - If + is appended to the command name, the tables and - schemas associated with each publication are shown as well. + If + is appended to the command name, the tables, + excluded tables, and schemas associated with each publication are shown + as well. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9a4791c573e..bdc32e77a49 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -32,6 +32,7 @@ #include "catalog/pg_type.h" #include "commands/publicationcmds.h" #include "funcapi.h" +#include "replication/logical.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -53,37 +54,48 @@ typedef struct * error if not. */ static void -check_publication_add_relation(Relation targetrel) +check_publication_add_relation(PublicationRelInfo *pri, bool puballtables) { + Relation targetrel = pri->relation; + const char *errormsg; + + if (pri->except) + errormsg = gettext_noop("cannot use publication EXCEPT clause for relation \"%s\""); + else + errormsg = gettext_noop("cannot add relation \"%s\" to publication"); + + /* If in EXCEPT clause, must be root partitioned table */ + if (puballtables && pri->except && targetrel->rd_rel->relispartition) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg(errormsg, RelationGetRelationName(targetrel)), + errdetail("This operation is not supported for individual partitions."))); + /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add relation \"%s\" to publication", - RelationGetRelationName(targetrel)), + errmsg(errormsg, RelationGetRelationName(targetrel)), errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind))); /* Can't be system table */ if (IsCatalogRelation(targetrel)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add relation \"%s\" to publication", - RelationGetRelationName(targetrel)), + errmsg(errormsg, RelationGetRelationName(targetrel)), errdetail("This operation is not supported for system tables."))); /* UNLOGGED and TEMP relations cannot be part of publication. */ if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add relation \"%s\" to publication", - RelationGetRelationName(targetrel)), + errmsg(errormsg, RelationGetRelationName(targetrel)), errdetail("This operation is not supported for temporary tables."))); else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add relation \"%s\" to publication", - RelationGetRelationName(targetrel)), + errmsg(errormsg, RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); } @@ -366,9 +378,11 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); - List *apubids = GetRelationPublications(ancestor); + List *apubids = NIL; List *aschemaPubids = NIL; + GetRelationPublications(ancestor, &apubids, NULL); + level++; if (list_member_oid(apubids, puboid)) @@ -466,7 +480,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, RelationGetRelationName(targetrel), pub->name))); } - check_publication_add_relation(targetrel); + check_publication_add_relation(pri, pub->alltables); /* Validate and translate column names into a Bitmapset of attnums. */ attnums = pub_collist_validate(pri->relation, pri->columns); @@ -482,6 +496,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_publication_rel_prexcept - 1] = + BoolGetDatum(pri->except); /* Add qualifications, if available */ if (pri->whereClause != NULL) @@ -530,17 +546,26 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, table_close(rel, RowExclusiveLock); /* - * Invalidate relcache so that publication info is rebuilt. - * - * For the partitioned tables, we must invalidate all partitions contained - * in the respective partition hierarchies, not just the one explicitly - * mentioned in the publication. This is required because we implicitly - * publish the child tables when the parent table is published. + * Relations excluded via the EXCEPT clause do not need explicit + * invalidation as CreatePublication() function invalidates all relations + * as part of defining a FOR ALL TABLES publication. */ - relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL, - relid); + if (!pri->except) + { + /* + * Invalidate relcache so that publication info is rebuilt. + * + * For the partitioned tables, we must invalidate all partitions + * contained in the respective partition hierarchies, not just the one + * explicitly mentioned in the publication. This is required because + * we implicitly publish the child tables when the parent table is + * published. + */ + relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL, + relid); - InvalidatePublicationRels(relids); + InvalidatePublicationRels(relids); + } return myself; } @@ -749,38 +774,54 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) return myself; } -/* Gets list of publication oids for a relation */ -List * -GetRelationPublications(Oid relid) +/* + * Get the list of publication oids associated with a specified relation. + * + * 'pubids' returns the Oids of the publications the relation is part of. + * + * 'except_pubids' returns the Oids of publications the relation is excluded + * from. + * + * This function returns true if the relation is part of any publication. + */ +bool +GetRelationPublications(Oid relid, List **pubids, List **except_pubids) { - List *result = NIL; CatCList *pubrellist; - int i; + bool found = false; /* Find all publications associated with the relation. */ pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, ObjectIdGetDatum(relid)); - for (i = 0; i < pubrellist->n_members; i++) + for (int i = 0; i < pubrellist->n_members; i++) { HeapTuple tup = &pubrellist->members[i]->tuple; - Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; + Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + Oid pubid = pubrel->prpubid; + List **target_list = pubrel->prexcept ? except_pubids : pubids; - result = lappend_oid(result, pubid); + if (target_list) + *target_list = lappend_oid(*target_list, pubid); + + if (!pubrel->prexcept) + found = true; } ReleaseSysCacheList(pubrellist); - return result; + return found; } /* - * Gets list of relation oids for a publication. + * Internal function to get the list of relation Oids for a publication. * - * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES - * should use GetAllPublicationRelations(). + * If except_flag is true, returns the list of relations specified in the + * EXCEPT clause of the publication; otherwise, returns the list of relations + * included in the publication. */ -List * -GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +static List * +get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt, + bool except_flag) { List *result; Relation pubrelsrel; @@ -805,8 +846,10 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) Form_pg_publication_rel pubrel; pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); - result = GetPubPartitionOptionRelations(result, pub_partopt, - pubrel->prrelid); + + if (except_flag == pubrel->prexcept) + result = GetPubPartitionOptionRelations(result, pub_partopt, + pubrel->prrelid); } systable_endscan(scan); @@ -819,6 +862,34 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } +/* + * Gets list of relation oids that are associated with a publication. + * + * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES + * should use GetAllPublicationRelations(). + */ +List * +GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +{ + Assert(!GetPublication(pubid)->alltables); + + return get_publication_relations(pubid, pub_partopt, false); +} + +/* + * Gets list of table oids that were specified in the EXCEPT clause for a + * publication. + * + * This should only be used FOR ALL TABLES publications. + */ +List * +GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt) +{ + Assert(GetPublication(pubid)->alltables); + + return get_publication_relations(pubid, pub_partopt, true); +} + /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ @@ -858,24 +929,36 @@ GetAllTablesPublications(void) /* * Gets list of all relations published by FOR ALL TABLES/SEQUENCES - * publication(s). + * publication. * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the * root partitioned tables. This is not applicable to FOR ALL SEQUENCES * publication. + * + * For a FOR ALL TABLES publication, the returned list excludes tables mentioned + * in EXCEPT TABLE clause. */ List * -GetAllPublicationRelations(char relkind, bool pubviaroot) +GetAllPublicationRelations(Publication *pub, char relkind) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; + List *exceptlist = NIL; + bool pubviaroot = pub->pubviaroot; + Oid pubid = pub->oid; Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot)); + /* EXCEPT filtering applies only to relations, not sequences */ + if (relkind == RELKIND_RELATION) + exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + classRel = table_open(RelationRelationId, AccessShareLock); ScanKeyInit(&key[0], @@ -891,7 +974,8 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !(relForm->relispartition && pubviaroot)) + !(relForm->relispartition && pubviaroot) && + !list_member_oid(exceptlist, relid)) result = lappend_oid(result, relid); } @@ -912,7 +996,8 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !relForm->relispartition) + !relForm->relispartition && + !list_member_oid(exceptlist, relid)) result = lappend_oid(result, relid); } @@ -1116,6 +1201,51 @@ GetPublicationByName(const char *pubname, bool missing_ok) return OidIsValid(oid) ? GetPublication(oid) : NULL; } +/* + * publication_has_any_except_table + * + * Return true if the given publication has any relation entry + * in pg_publication_rel with prexcept set. + */ +bool +publication_has_any_except_table(Oid pubid) +{ + Relation pubrelsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + bool found = false; + + pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_rel_prpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubrelsrel, + PublicationRelPrpubidIndexId, + true, NULL, 1, &scankey); + + /* We only need to find any occurrence of prexcept = true */ + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_rel pubrel; + + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + if (pubrel->prexcept) + { + found = true; + break; + } + } + + systable_endscan(scan); + table_close(pubrelsrel, AccessShareLock); + + return found; +} + /* * Get information of the tables in the given publication array. * @@ -1168,17 +1298,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * those. Otherwise, get the partitioned table itself. */ if (pub_elem->alltables) - pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION, - pub_elem->pubviaroot); + pub_elem_tables = GetAllPublicationRelations(pub_elem, + RELKIND_RELATION); else { List *relids, *schemarelids; - relids = GetPublicationRelations(pub_elem->oid, - pub_elem->pubviaroot ? - PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); + relids = GetIncludedPublicationRelations(pub_elem->oid, + pub_elem->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid, pub_elem->pubviaroot ? PUBLICATION_PART_ROOT : @@ -1367,7 +1497,7 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS) publication = GetPublicationByName(pubname, false); if (publication->allsequences) - sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false); + sequences = GetAllPublicationRelations(publication, RELKIND_SEQUENCE); funcctx->user_fctx = sequences; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index fc3a4c19e65..2180cb866f4 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -198,7 +198,12 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, switch (pubobj->pubobjtype) { + case PUBLICATIONOBJ_EXCEPT_TABLE: + pubobj->pubtable->except = true; + *rels = lappend(*rels, pubobj->pubtable); + break; case PUBLICATIONOBJ_TABLE: + pubobj->pubtable->except = false; *rels = lappend(*rels, pubobj->pubtable); break; case PUBLICATIONOBJ_TABLES_IN_SCHEMA: @@ -519,8 +524,8 @@ InvalidatePubRelSyncCache(Oid pubid, bool puballtables) * a target. However, WAL records for TRUNCATE specify both a root and * its leaves. */ - relids = GetPublicationRelations(pubid, - PUBLICATION_PART_ALL); + relids = GetIncludedPublicationRelations(pubid, + PUBLICATION_PART_ALL); schemarelids = GetAllSchemaPublicationRelations(pubid, PUBLICATION_PART_ALL); @@ -845,6 +850,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) AclResult aclresult; List *relations = NIL; List *schemaidlist = NIL; + List *rels = NIL; /* must have CREATE privilege on database */ aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE); @@ -929,8 +935,20 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CommandCounterIncrement(); /* Associate objects with the publication. */ + ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, + &schemaidlist); + if (relations != NIL) + rels = OpenTableList(relations); + if (stmt->for_all_tables) { + /* Process EXCEPT table list */ + if (relations != NIL) + { + Assert(rels != NIL); + PublicationAddTables(puboid, rels, true, NULL); + } + /* * Invalidate relcache so that publication info is rebuilt. Sequences * publication doesn't require invalidation, as replica identity @@ -940,9 +958,6 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) } else if (!stmt->for_all_sequences) { - ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, - &schemaidlist); - /* FOR TABLES IN SCHEMA requires superuser */ if (schemaidlist != NIL && !superuser()) ereport(ERROR, @@ -951,9 +966,6 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) if (relations != NIL) { - List *rels; - - rels = OpenTableList(relations); TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); @@ -962,7 +974,6 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) publish_via_partition_root); PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); } if (schemaidlist != NIL) @@ -976,6 +987,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) } } + if (rels != NIL) + CloseTableList(rels); + table_close(rel, RowExclusiveLock); InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); @@ -1050,8 +1064,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, LockDatabaseObject(PublicationRelationId, pubform->oid, 0, AccessShareLock); - root_relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ROOT); + root_relids = GetIncludedPublicationRelations(pubform->oid, + PUBLICATION_PART_ROOT); foreach(lc, root_relids) { @@ -1170,8 +1184,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, * trees, not just those explicitly mentioned in the publication. */ if (root_relids == NIL) - relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + relids = GetIncludedPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); else { /* @@ -1256,8 +1270,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, PublicationDropTables(pubid, rels, false); else /* AP_SetObjects */ { - List *oldrelids = GetPublicationRelations(pubid, - PUBLICATION_PART_ROOT); + List *oldrelids = GetIncludedPublicationRelations(pubid, + PUBLICATION_PART_ROOT); List *delrels = NIL; ListCell *oldlc; @@ -1358,6 +1372,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, oldrel = palloc_object(PublicationRelInfo); oldrel->whereClause = NULL; oldrel->columns = NIL; + oldrel->except = false; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1408,7 +1423,8 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, ListCell *lc; List *reloids; - reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + reloids = GetIncludedPublicationRelations(pubform->oid, + PUBLICATION_PART_ROOT); foreach(lc, reloids) { @@ -1771,6 +1787,7 @@ OpenTableList(List *tables) pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; pub_rel->columns = t->columns; + pub_rel->except = t->except; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -1843,6 +1860,7 @@ OpenTableList(List *tables) /* child inherits column list from parent */ pub_rel->columns = t->columns; + pub_rel->except = t->except; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5e3c0964d38..c745d132f9f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -37,6 +37,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/origin.h" @@ -575,6 +576,126 @@ check_publications(WalReceiverConn *wrconn, List *publications) } } +/* + * Ensure that at most one publication in the given list defines an + * EXCEPT TABLE clause. + */ +void +CheckPublicationsForExceptClauses(List *except_publications) +{ + StringInfoData pubnames; + + /* Nothing to do if zero or one publication has EXCEPT. */ + if (list_length(except_publications) <= 1) + return; + + initStringInfo(&pubnames); + GetPublicationsStr(except_publications, &pubnames, false); + + list_free_deep(except_publications); + + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot combine publications %s with an EXCEPT TABLE clause", + pubnames.data)); +} + +/* + * ERROR if multiple publications with EXCEPT clauses are present. + * + * Logical replication does not support subscribing to more than one + * publication that defines an exclusion list. Allowing this would + * create ambiguous and unpredictable replication behavior. + * + * The ambiguity arises when publications differ in their exclusions + * and in the setting of 'publish_via_partition_root'. In such cases, + * the effective publication set cannot be determined consistently: + * + * - Tables excluded in one publication may be implicitly included + * in another. + * - Conflicting 'publish_via_partition_root' settings alter how + * partitions are represented and deduplicated. + * + * To avoid these conflicts, a subscription may include at most one + * 'FOR ALL TABLES' publication that specifies an EXCEPT clause. + * + * Example: + * + * Partitioned table: tab_root + * Partitions: part1, part2, part3 + * + * pub1: + * FOR ALL TABLES EXCEPT (part1, part2) + * WITH (publish_via_partition_root = true) + * + * pub2: + * FOR ALL TABLES EXCEPT (part3) + * WITH (publish_via_partition_root = false) + * + * Subscribing to both pub1 and pub2 is invalid because: + * - pub1 excludes part1 and part2, publishing remaining partitions + * via the root. + * - pub2 excludes part3, publishing other partitions individually. + * + * The resulting publication set is ambiguous and provides no clear + * benefit. Unless all publications exclude the same tables, combining + * them introduces complex and conflicting partition resolution rules. + */ +static void +check_publications_except_list(WalReceiverConn *wrconn, List *publications) +{ + List *except_publications = NIL; + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {TEXTOID}; + + if (list_length(publications) <= 1) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT p.pubname\n" + " FROM pg_catalog.pg_publication p\n" + " WHERE p.pubname IN ("); + + GetPublicationsStr(publications, &cmd, true); + + appendStringInfoString(&cmd, + ")\n" + " AND EXISTS (SELECT 1\n" + " FROM pg_catalog.pg_publication_rel pr\n" + " WHERE pr.prpubid = p.oid\n" + " AND pr.prexcept IS TRUE)"); + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not receive list of publications from the publisher: %s", + res->err)); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + except_publications = lappend(except_publications, makeString(pubname)); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + CheckPublicationsForExceptClauses(except_publications); +} + /* * Auxiliary function to build a text array out of a list of String nodes. */ @@ -832,6 +953,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, char relation_state; check_publications(wrconn, publications); + check_publications_except_list(wrconn, publications); check_publications_origin_tables(wrconn, publications, opts.copy_data, opts.retaindeadtuples, opts.origin, @@ -996,6 +1118,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); + check_publications_except_list(wrconn, sub->publications); + /* Get the relation list from publisher. */ pubrels = fetch_relation_list(wrconn, sub->publications); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index b04b0dbd2a0..08d20bf2834 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8686,7 +8686,7 @@ ATExecSetExpression(AlteredTableInfo *tab, Relation rel, const char *colName, * expressions. */ if (attgenerated == ATTRIBUTE_GENERATED_VIRTUAL && - GetRelationPublications(RelationGetRelid(rel)) != NIL) + GetRelationPublications(RelationGetRelid(rel), NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns in tables that are part of a publication"), @@ -18881,7 +18881,7 @@ ATPrepChangePersistence(AlteredTableInfo *tab, Relation rel, bool toLogged) * UNLOGGED, as UNLOGGED tables can't be published. */ if (!toLogged && - GetRelationPublications(RelationGetRelid(rel)) != NIL) + GetRelationPublications(RelationGetRelid(rel), NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", @@ -20322,6 +20322,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, const char *trigger_name; Oid defaultPartOid; List *partBoundConstraint; + List *exceptpuboids = NIL; ParseState *pstate = make_parsestate(NULL); pstate->p_sourcetext = context->queryString; @@ -20361,6 +20362,21 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot attach a typed table as partition"))); + /* + * Check that the table is not part of any publication when changing to + * UNLOGGED, as UNLOGGED tables can't be published. + */ + GetRelationPublications(RelationGetRelid(attachrel), NULL, &exceptpuboids); + if (exceptpuboids != NIL) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot attach table \"%s\" as partition because it is referenced in a publication EXCEPT clause", + RelationGetRelationName(attachrel)), + errdetail("Tables excluded from publications cannot be attached as partitions."), + errhint("Remove the table from the publication EXCEPT list before attaching it.")); + + list_free(exceptpuboids); + /* * Table being attached should not already be part of inheritance; either * as a child table... diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index c567252acc4..21ef7bee729 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -203,6 +203,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType, static PartitionStrategy parsePartitionStrategy(char *strategy, int location, core_yyscan_t yyscanner); static void preprocess_pub_all_objtype_list(List *all_objects_list, + List **pubobjects, bool *all_tables, bool *all_sequences, core_yyscan_t yyscanner); @@ -455,6 +456,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list drop_option_list pub_obj_list pub_all_obj_type_list + pub_except_obj_list opt_pub_except_clause %type returning_clause %type returning_option @@ -592,6 +594,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type var_value zone_value %type auth_ident RoleSpec opt_granted_by %type PublicationObjSpec +%type PublicationExceptObjSpec %type PublicationAllObjSpec %type unreserved_keyword type_func_name_keyword @@ -10792,7 +10795,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec * * pub_all_obj_type is one of: * - * TABLES + * TABLES [EXCEPT [TABLE] ( table [, ...] )] * SEQUENCES * * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] @@ -10818,7 +10821,8 @@ CreatePublicationStmt: CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - preprocess_pub_all_objtype_list($5, &n->for_all_tables, + preprocess_pub_all_objtype_list($5, &n->pubobjects, + &n->for_all_tables, &n->for_all_sequences, yyscanner); n->options = $6; @@ -10933,11 +10937,17 @@ pub_obj_list: PublicationObjSpec { $$ = lappend($1, $3); } ; +opt_pub_except_clause: + EXCEPT opt_table '(' pub_except_obj_list ')' { $$ = $4; } + | /*EMPTY*/ { $$ = NIL; } + ; + PublicationAllObjSpec: - ALL TABLES + ALL TABLES opt_pub_except_clause { $$ = makeNode(PublicationAllObjSpec); $$->pubobjtype = PUBLICATION_ALL_TABLES; + $$->except_tables = $3; $$->location = @1; } | ALL SEQUENCES @@ -10954,6 +10964,23 @@ pub_all_obj_type_list: PublicationAllObjSpec { $$ = lappend($1, $3); } ; +PublicationExceptObjSpec: + relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_EXCEPT_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->except = true; + $$->pubtable->relation = $1; + $$->location = @1; + } + ; + +pub_except_obj_list: PublicationExceptObjSpec + { $$ = list_make1($1); } + | pub_except_obj_list ',' PublicationExceptObjSpec + { $$ = lappend($1, $3); } + ; /***************************************************************************** * @@ -19812,8 +19839,9 @@ parsePartitionStrategy(char *strategy, int location, core_yyscan_t yyscanner) * Also, checks if the pub_object_type has been specified more than once. */ static void -preprocess_pub_all_objtype_list(List *all_objects_list, bool *all_tables, - bool *all_sequences, core_yyscan_t yyscanner) +preprocess_pub_all_objtype_list(List *all_objects_list, List **pubobjects, + bool *all_tables, bool *all_sequences, + core_yyscan_t yyscanner) { if (!all_objects_list) return; @@ -19833,6 +19861,7 @@ preprocess_pub_all_objtype_list(List *all_objects_list, bool *all_tables, parser_errposition(obj->location)); *all_tables = true; + *pubobjects = list_concat(*pubobjects, obj->except_tables); } else if (obj->pubobjtype == PUBLICATION_ALL_SEQUENCES) { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2f2f0121ecf..046a50f2e9a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -104,6 +104,7 @@ #include "nodes/makefuncs.h" #include "parser/parse_relation.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/logicallauncher.h" #include "replication/logicalrelation.h" #include "replication/logicalworker.h" diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7a49185d29d..7a6dd841111 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1797,12 +1797,18 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) * to silently continue the replication in the absence of a missing publication. * This is required because we allow the users to create publications after they * have specified the required publications at the time of replication start. + * + * We also enforce that no more than one publication in the list may contain + * an EXCEPT TABLE clause. Using multiple publications with EXCEPT TABLE clause + * is currently unsupported to prevent non-deterministic filtering behavior + * across overlapping publication sets. */ static List * LoadPublications(List *pubnames) { List *result = NIL; ListCell *lc; + List *pubnames_with_except = NIL; foreach(lc, pubnames) { @@ -1810,7 +1816,13 @@ LoadPublications(List *pubnames) Publication *pub = GetPublicationByName(pubname, true); if (pub) + { result = lappend(result, pub); + + /* Check if this publication has an EXCEPT TABLE list. */ + if (publication_has_any_except_table(pub->oid)) + pubnames_with_except = lappend(pubnames_with_except, makeString(pubname)); + } else ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1819,6 +1831,9 @@ LoadPublications(List *pubnames) errhint("Create the publication if it does not exist.")); } + CheckPublicationsForExceptClauses(pubnames_with_except); + + list_free_deep(pubnames_with_except); return result; } @@ -2089,7 +2104,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (!entry->replicate_valid) { Oid schemaId = get_rel_namespace(relid); - List *pubids = GetRelationPublications(relid); + List *pubids = NIL; /* * We don't acquire a lock on the namespace system table as we build @@ -2104,6 +2119,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + GetRelationPublications(relid, &pubids, NULL); + /* Reload publications if needed before use. */ if (!publications_valid) { @@ -2206,14 +2223,22 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) */ if (pub->alltables) { - publish = true; - if (pub->pubviaroot && am_partition) + List *exceptpubids = NIL; + + if (am_partition && pub->pubviaroot) { List *ancestors = get_partition_ancestors(relid); pub_relid = llast_oid(ancestors); ancestor_level = list_length(ancestors); } + + GetRelationPublications(pub_relid, NULL, &exceptpubids); + + if (!list_member_oid(exceptpubids, pub->oid)) + publish = true; + + list_free(exceptpubids); } if (!publish) diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 6b634c9fff1..dc021dbb6cd 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5788,7 +5788,9 @@ RelationGetExclusionInfo(Relation indexRelation, void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) { - List *puboids; + List *puboids = NIL; + List *exceptpuboids = NIL; + List *alltablespuboids; ListCell *lc; MemoryContext oldcxt; Oid schemaid; @@ -5826,7 +5828,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->gencols_valid_for_delete = true; /* Fetch the publication membership info. */ - puboids = GetRelationPublications(relid); + GetRelationPublications(relid, &puboids, &exceptpuboids); schemaid = RelationGetNamespace(relation); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); @@ -5838,16 +5840,25 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); + List *ancestor_puboids = NIL; + List *ancestor_exceptpuboids = NIL; - puboids = list_concat_unique_oid(puboids, - GetRelationPublications(ancestor)); + GetRelationPublications(ancestor, &ancestor_puboids, + &ancestor_exceptpuboids); + + puboids = list_concat_unique_oid(puboids, ancestor_puboids); schemaid = get_rel_namespace(ancestor); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + exceptpuboids = list_concat_unique_oid(exceptpuboids, + ancestor_exceptpuboids); } } - puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + alltablespuboids = GetAllTablesPublications(); + puboids = list_concat_unique_oid(puboids, + list_difference_oid(alltablespuboids, + exceptpuboids)); foreach(lc, puboids) { Oid pubid = lfirst_oid(lc); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 450cec285b3..4f92054b377 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4623,9 +4623,55 @@ getPublications(Archive *fout) (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); pubinfo[i].pubgencols_type = *(PQgetvalue(res, i, i_pubgencols)); + pubinfo[i].except_tables = (SimplePtrList) + { + NULL, NULL + }; /* Decide whether we want to dump it */ selectDumpableObject(&(pubinfo[i].dobj), fout); + + /* + * Get the list of tables for publications specified with the EXCEPT + * TABLE clause. This is introduced in PostgreSQL 19. + * + * EXCEPT TABLES is processed here and output directly by + * dumpPublication(). This differs from the approach used in + * dumpPublicationTable() and dumpPublicationNamespace(), since that + * approach would require EXCEPT TABLE support for ALTER PUBLICATION, + * which is not currently supported. + */ + if (fout->remoteVersion >= 190000) + { + int ntbls; + PGresult *res_tbls; + + resetPQExpBuffer(query); + appendPQExpBuffer(query, + "SELECT prrelid\n" + "FROM pg_catalog.pg_publication_rel\n" + "WHERE prpubid = %u and prexcept", + pubinfo[i].dobj.catId.oid); + + res_tbls = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntbls = PQntuples(res_tbls); + + for (int j = 0; j < ntbls; j++) + { + Oid prrelid; + TableInfo *tbinfo; + + prrelid = atooid(PQgetvalue(res_tbls, j, 0)); + + tbinfo = findTableByOid(prrelid); + + if (tbinfo != NULL) + simple_ptr_list_append(&pubinfo[i].except_tables, tbinfo); + } + + PQclear(res_tbls); + } } cleanup: @@ -4665,7 +4711,25 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) if (pubinfo->puballtables && pubinfo->puballsequences) appendPQExpBufferStr(query, " FOR ALL TABLES, ALL SEQUENCES"); else if (pubinfo->puballtables) + { + int n_except = 0; + appendPQExpBufferStr(query, " FOR ALL TABLES"); + + /* Include EXCEPT TABLE clause if there are except_tables. */ + for (SimplePtrListCell *cell = pubinfo->except_tables.head; cell; cell = cell->next) + { + TableInfo *tbinfo = (TableInfo *) cell->ptr; + + if (++n_except == 1) + appendPQExpBufferStr(query, " EXCEPT TABLE ("); + else + appendPQExpBufferStr(query, ", "); + appendPQExpBuffer(query, "ONLY %s", fmtQualifiedDumpable(tbinfo)); + } + if (n_except > 0) + appendPQExpBufferStr(query, ")"); + } else if (pubinfo->puballsequences) appendPQExpBufferStr(query, " FOR ALL SEQUENCES"); @@ -4845,6 +4909,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Collect all publication membership info. */ if (fout->remoteVersion >= 150000) + { appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, " @@ -4857,6 +4922,9 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" " ELSE NULL END) prattrs " "FROM pg_catalog.pg_publication_rel pr"); + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, " WHERE NOT pr.prexcept"); + } else appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 6deceef23f3..e138ef1276c 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -676,6 +676,7 @@ typedef struct _PublicationInfo bool pubtruncate; bool pubviaroot; PublishGencolsType pubgencols_type; + SimplePtrList except_tables; } PublicationInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index bc7a082f57a..f0290de9c3d 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -3165,6 +3165,36 @@ my %tests = ( like => { %full_runs, section_post_data => 1, }, }, + 'CREATE PUBLICATION pub8' => { + create_order => 50, + create_sql => + 'CREATE PUBLICATION pub8 FOR ALL TABLES EXCEPT (dump_test.test_table);', + regexp => qr/^ + \QCREATE PUBLICATION pub8 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_table) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + + 'CREATE PUBLICATION pub9' => { + create_order => 50, + create_sql => + 'CREATE PUBLICATION pub9 FOR ALL TABLES EXCEPT TABLE (dump_test.test_table, dump_test.test_second_table);', + regexp => qr/^ + \QCREATE PUBLICATION pub9 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_table, ONLY dump_test.test_second_table) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + + 'CREATE PUBLICATION pub10' => { + create_order => 92, + create_sql => + 'CREATE PUBLICATION pub10 FOR ALL TABLES EXCEPT TABLE (dump_test.test_inheritance_parent);', + regexp => qr/^ + \QCREATE PUBLICATION pub10 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_inheritance_parent, ONLY dump_test.test_inheritance_child) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + 'CREATE SUBSCRIPTION sub1' => { create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub1 diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 571a6a003d5..24df03a1fd3 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3073,17 +3073,34 @@ describeOneTableDetails(const char *schemaname, " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" " ELSE NULL END) " "FROM pg_catalog.pg_publication p\n" - " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" - " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" - "WHERE pr.prrelid = '%s'\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" + "WHERE pr.prrelid = '%s'\n", + oid, oid, oid); + + if (pset.sversion >= 190000) + appendPQExpBufferStr(&buf, " AND NOT pr.prexcept\n"); + + appendPQExpBuffer(&buf, "UNION\n" "SELECT pubname\n" - " , NULL\n" - " , NULL\n" + " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" - "ORDER BY 1;", - oid, oid, oid, oid); + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n", + oid); + + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, + " AND NOT EXISTS (\n" + " SELECT 1\n" + " FROM pg_catalog.pg_publication_rel pr\n" + " JOIN pg_catalog.pg_class pc\n" + " ON pr.prrelid = pc.oid\n" + " WHERE pr.prrelid = '%s' AND pr.prpubid = p.oid)\n", + oid); + + appendPQExpBufferStr(&buf, "ORDER BY 1;"); } else { @@ -3134,6 +3151,35 @@ describeOneTableDetails(const char *schemaname, PQclear(result); } + /* Print publications where the table is in the EXCEPT clause */ + if (pset.sversion >= 190000) + { + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n AND pr.prexcept\n" + "ORDER BY 1;", oid); + + result = PSQLexec(buf.data); + if (!result) + goto error_return; + else + tuples = PQntuples(result); + + if (tuples > 0) + printTableAddFooter(&cont, _("Except Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", PQgetvalue(result, i, 0)); + + printTableAddFooter(&cont, buf.data); + } + PQclear(result); + } + /* * If verbose, print NOT NULL constraints. */ @@ -6753,8 +6799,12 @@ describePublications(const char *pattern) " pg_catalog.pg_publication_rel pr\n" "WHERE c.relnamespace = n.oid\n" " AND c.oid = pr.prrelid\n" - " AND pr.prpubid = '%s'\n" - "ORDER BY 1,2", pubid); + " AND pr.prpubid = '%s'\n", pubid); + + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, " AND NOT pr.prexcept\n"); + + appendPQExpBuffer(&buf, "ORDER BY 1,2"); if (!addFooterToPublicationDesc(&buf, _("Tables:"), false, &cont)) goto error_return; @@ -6772,6 +6822,23 @@ describePublications(const char *pattern) goto error_return; } } + else + { + if (pset.sversion >= 190000) + { + /* Get tables in the EXCEPT clause for this publication */ + printfPQExpBuffer(&buf, + "SELECT concat(c.relnamespace::regnamespace, '.', c.relname)\n" + "FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_publication_rel pr ON c.oid = pr.prrelid\n" + "WHERE pr.prpubid = '%s'\n" + " AND pr.prexcept\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, _("Except tables:"), + true, &cont)) + goto error_return; + } + } printTable(&cont, pset.queryFout, false, pset.logfile); printTableCleanup(&cont); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8b91bc00062..39404ea0f69 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -3669,7 +3669,17 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL")) COMPLETE_WITH("TABLES", "SEQUENCES"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES")) - COMPLETE_WITH("WITH ("); + COMPLETE_WITH("EXCEPT TABLE (", "WITH ("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT")) + COMPLETE_WITH("TABLE ("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE")) + COMPLETE_WITH("("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE", "(")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE", "(", MatchAnyN) && ends_with(prev_wd, ',')) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE", "(", MatchAnyN) && !ends_with(prev_wd, ',')) + COMPLETE_WITH(")"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLES")) COMPLETE_WITH("IN SCHEMA"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny) && !ends_with(prev_wd, ',')) diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 6e5f73caa9e..c33524e4456 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -150,16 +150,18 @@ typedef struct PublicationRelInfo Relation relation; Node *whereClause; List *columns; + bool except; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); -extern List *GetRelationPublications(Oid relid); +extern bool GetRelationPublications(Oid relid, List **pubids, List **except_pubids); /*--------- - * Expected values for pub_partopt parameter of GetPublicationRelations(), - * which allows callers to specify which partitions of partitioned tables - * mentioned in the publication they expect to see. + * Expected values for pub_partopt parameter of + * GetIncludedPublicationRelations(), which allows callers to specify which + * partitions of partitioned tables mentioned in the publication they expect to + * see. * * ROOT: only the table explicitly mentioned in the publication * LEAF: only leaf partitions in given tree @@ -172,9 +174,12 @@ typedef enum PublicationPartOpt PUBLICATION_PART_ALL, } PublicationPartOpt; -extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetIncludedPublicationRelations(Oid pubid, + PublicationPartOpt pub_partopt); +extern List *GetExcludedPublicationTables(Oid pubid, + PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); -extern List *GetAllPublicationRelations(char relkind, bool pubviaroot); +extern List *GetAllPublicationRelations(Publication *pub, char relkind); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); extern List *GetSchemaPublicationRelations(Oid schemaid, @@ -189,6 +194,7 @@ extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); +extern bool publication_has_any_except_table(Oid pubid); extern bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols); extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 63eb7c75f53..cefc38c9ed8 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -33,6 +33,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ + bool prexcept BKI_DEFAULT(f); /* relation is not published */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ pg_node_tree prqual; /* qualifications */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 63504232a14..4bb6f5a4921 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -36,4 +36,6 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, bool retention_active, bool max_retention_set); +extern void CheckPublicationsForExceptClauses(List *except_publications); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index f37131835be..ff41943a6db 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4299,9 +4299,10 @@ typedef struct AlterTSConfigurationStmt typedef struct PublicationTable { NodeTag type; - RangeVar *relation; /* relation to be published */ + RangeVar *relation; /* publication relation */ Node *whereClause; /* qualifications */ List *columns; /* List of columns in a publication table */ + bool except; /* True if listed in the EXCEPT clause */ } PublicationTable; /* @@ -4310,6 +4311,7 @@ typedef struct PublicationTable typedef enum PublicationObjSpecType { PUBLICATIONOBJ_TABLE, /* A table */ + PUBLICATIONOBJ_EXCEPT_TABLE, /* A table in the EXCEPT clause */ PUBLICATIONOBJ_TABLES_IN_SCHEMA, /* All tables in schema */ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA, /* All tables in first element of * search_path */ @@ -4338,6 +4340,7 @@ typedef struct PublicationAllObjSpec { NodeTag type; PublicationAllObjType pubobjtype; /* type of this publication object */ + List *except_tables; /* tables specified in the EXCEPT clause */ ParseLoc location; /* token location, or -1 if unknown */ } PublicationAllObjSpec; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 7fb49aaf29b..ae3bebc2b4b 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -213,33 +213,141 @@ Not-null constraints: regress_publication_user | t | f | t | t | f | f | none | f (1 row) -DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema; -CREATE TABLE testpub_tbl3 (a int); -CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); +--------------------------------------------- +-- EXCEPT TABLE tests for normal tables +--------------------------------------------- SET client_min_messages = 'ERROR'; -CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; -CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; +-- Specify table list in the EXCEPT clause of a FOR ALL TABLES publication and +-- the optional TABLE keyword. +CREATE PUBLICATION testpub_foralltables_excepttable FOR ALL TABLES EXCEPT TABLE (testpub_tbl1, testpub_tbl2); +\dRp+ testpub_foralltables_excepttable + Publication testpub_foralltables_excepttable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl1" + "public.testpub_tbl2" + +-- Specify table in the EXCEPT clause of a FOR ALL TABLES publication +CREATE PUBLICATION testpub_foralltables_excepttable1 FOR ALL TABLES EXCEPT (testpub_tbl1); +\dRp+ testpub_foralltables_excepttable1 + Publication testpub_foralltables_excepttable1 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl1" + +-- Check that the table description shows the publications where it is listed +-- in the EXCEPT clause +\d testpub_tbl1 + Table "public.testpub_tbl1" + Column | Type | Collation | Nullable | Default +--------+---------+-----------+----------+------------------------------------------ + id | integer | | not null | nextval('testpub_tbl1_id_seq'::regclass) + data | text | | | +Indexes: + "testpub_tbl1_pkey" PRIMARY KEY, btree (id) +Publications: + "testpub_foralltables" +Except Publications: + "testpub_foralltables_excepttable" + "testpub_foralltables_excepttable1" + RESET client_min_messages; +DROP TABLE testpub_tbl2; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema, testpub_foralltables_excepttable, testpub_foralltables_excepttable1; +--------------------------------------------- +-- Tests for inherited tables, and +-- EXCEPT TABLE tests for inherited tables +--------------------------------------------- +SET client_min_messages = 'ERROR'; +CREATE TABLE testpub_tbl_parent (a int); +CREATE TABLE testpub_tbl_child (b text) INHERITS (testpub_tbl_parent); +CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl_parent; \dRp+ testpub3 Publication testpub3 Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root --------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- regress_publication_user | f | f | t | t | t | t | none | f Tables: - "public.testpub_tbl3" - "public.testpub_tbl3a" + "public.testpub_tbl_child" + "public.testpub_tbl_parent" +CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl_parent; \dRp+ testpub4 Publication testpub4 Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root --------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- regress_publication_user | f | f | t | t | t | t | none | f Tables: - "public.testpub_tbl3" + "public.testpub_tbl_parent" + +-- List the parent table in the EXCEPT clause (without ONLY or '*') +CREATE PUBLICATION testpub5 FOR ALL TABLES EXCEPT TABLE (testpub_tbl_parent); +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl_child" + "public.testpub_tbl_parent" + +-- EXCEPT with '*': list the table and all its descendants in the EXCEPT clause +CREATE PUBLICATION testpub6 FOR ALL TABLES EXCEPT TABLE (testpub_tbl_parent *); +\dRp+ testpub6 + Publication testpub6 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl_child" + "public.testpub_tbl_parent" + +-- EXCEPT with ONLY: list the table in the EXCEPT clause, but not its descendants +CREATE PUBLICATION testpub7 FOR ALL TABLES EXCEPT TABLE (ONLY testpub_tbl_parent); +\dRp+ testpub7 + Publication testpub7 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl_parent" -DROP TABLE testpub_tbl3, testpub_tbl3a; -DROP PUBLICATION testpub3, testpub4; +RESET client_min_messages; +DROP TABLE testpub_tbl_parent, testpub_tbl_child; +DROP PUBLICATION testpub3, testpub4, testpub5, testpub6, testpub7; +--------------------------------------------- +-- EXCEPT TABLE tests for partitioned tables +--------------------------------------------- +SET client_min_messages = 'ERROR'; +CREATE TABLE testpub_root(a int) PARTITION BY RANGE(a); +CREATE TABLE testpub_part1 PARTITION OF testpub_root FOR VALUES FROM (0) TO (100); +CREATE TABLE testpub_part2 PARTITION OF testpub_root FOR VALUES FROM (100) TO (200); +CREATE PUBLICATION testpub8 FOR ALL TABLES EXCEPT TABLE (testpub_root); +\dRp+ testpub8; + Publication testpub8 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_root" + +CREATE PUBLICATION testpub9 FOR ALL TABLES EXCEPT TABLE (testpub_part1); +ERROR: cannot use publication EXCEPT clause for relation "testpub_part1" +DETAIL: This operation is not supported for individual partitions. +CREATE TABLE tab_main (a int) PARTITION BY RANGE(a); +-- Attaching a partition is not allowed if the partitioned table appears in a +-- publication's EXCEPT clause. +ALTER TABLE tab_main ATTACH PARTITION testpub_root FOR VALUES FROM (0) TO (200); +ERROR: cannot attach table "testpub_root" as partition because it is referenced in a publication EXCEPT clause +DETAIL: Tables excluded from publications cannot be attached as partitions. +HINT: Remove the table from the publication EXCEPT list before attaching it. +RESET client_min_messages; +DROP TABLE testpub_root, testpub_part1, testpub_part2, tab_main; +DROP PUBLICATION testpub8; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; CREATE SEQUENCE pub_test.regress_pub_seq1; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 85b00bd67c8..31b9b7d4c54 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -105,20 +105,69 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall \d+ testpub_tbl2 \dRp+ testpub_foralltables +--------------------------------------------- +-- EXCEPT TABLE tests for normal tables +--------------------------------------------- +SET client_min_messages = 'ERROR'; +-- Specify table list in the EXCEPT clause of a FOR ALL TABLES publication and +-- the optional TABLE keyword. +CREATE PUBLICATION testpub_foralltables_excepttable FOR ALL TABLES EXCEPT TABLE (testpub_tbl1, testpub_tbl2); +\dRp+ testpub_foralltables_excepttable +-- Specify table in the EXCEPT clause of a FOR ALL TABLES publication +CREATE PUBLICATION testpub_foralltables_excepttable1 FOR ALL TABLES EXCEPT (testpub_tbl1); +\dRp+ testpub_foralltables_excepttable1 +-- Check that the table description shows the publications where it is listed +-- in the EXCEPT clause +\d testpub_tbl1 + +RESET client_min_messages; DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema, testpub_foralltables_excepttable, testpub_foralltables_excepttable1; -CREATE TABLE testpub_tbl3 (a int); -CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); +--------------------------------------------- +-- Tests for inherited tables, and +-- EXCEPT TABLE tests for inherited tables +--------------------------------------------- SET client_min_messages = 'ERROR'; -CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; -CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; -RESET client_min_messages; +CREATE TABLE testpub_tbl_parent (a int); +CREATE TABLE testpub_tbl_child (b text) INHERITS (testpub_tbl_parent); +CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl_parent; \dRp+ testpub3 +CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl_parent; \dRp+ testpub4 +-- List the parent table in the EXCEPT clause (without ONLY or '*') +CREATE PUBLICATION testpub5 FOR ALL TABLES EXCEPT TABLE (testpub_tbl_parent); +\dRp+ testpub5 +-- EXCEPT with '*': list the table and all its descendants in the EXCEPT clause +CREATE PUBLICATION testpub6 FOR ALL TABLES EXCEPT TABLE (testpub_tbl_parent *); +\dRp+ testpub6 +-- EXCEPT with ONLY: list the table in the EXCEPT clause, but not its descendants +CREATE PUBLICATION testpub7 FOR ALL TABLES EXCEPT TABLE (ONLY testpub_tbl_parent); +\dRp+ testpub7 -DROP TABLE testpub_tbl3, testpub_tbl3a; -DROP PUBLICATION testpub3, testpub4; +RESET client_min_messages; +DROP TABLE testpub_tbl_parent, testpub_tbl_child; +DROP PUBLICATION testpub3, testpub4, testpub5, testpub6, testpub7; + +--------------------------------------------- +-- EXCEPT TABLE tests for partitioned tables +--------------------------------------------- +SET client_min_messages = 'ERROR'; +CREATE TABLE testpub_root(a int) PARTITION BY RANGE(a); +CREATE TABLE testpub_part1 PARTITION OF testpub_root FOR VALUES FROM (0) TO (100); +CREATE TABLE testpub_part2 PARTITION OF testpub_root FOR VALUES FROM (100) TO (200); +CREATE PUBLICATION testpub8 FOR ALL TABLES EXCEPT TABLE (testpub_root); +\dRp+ testpub8; +CREATE PUBLICATION testpub9 FOR ALL TABLES EXCEPT TABLE (testpub_part1); + +CREATE TABLE tab_main (a int) PARTITION BY RANGE(a); +-- Attaching a partition is not allowed if the partitioned table appears in a +-- publication's EXCEPT clause. +ALTER TABLE tab_main ATTACH PARTITION testpub_root FOR VALUES FROM (0) TO (200); + +RESET client_min_messages; +DROP TABLE testpub_root, testpub_part1, testpub_part2, tab_main; +DROP PUBLICATION testpub8; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index a4c7dbaff59..07282aa3c18 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -46,6 +46,7 @@ tests += { 't/034_temporal.pl', 't/035_conflicts.pl', 't/036_sequences.pl', + 't/037_rep_changes_except_table.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/037_rep_changes_except_table.pl b/src/test/subscription/t/037_rep_changes_except_table.pl new file mode 100644 index 00000000000..a9d1751d054 --- /dev/null +++ b/src/test/subscription/t/037_rep_changes_except_table.pl @@ -0,0 +1,274 @@ + +# Copyright (c) 2021-2026, PostgreSQL Global Development Group + +# Logical replication tests for EXCEPT TABLE publications +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $result; + +sub test_except_root_partition +{ + my $pubviaroot = @_; + + # When the root partitioned table is listed in the EXCEPT clause, + # all its partitions are not published. + $node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1) WITH (publish_via_partition_root = $pubviaroot); + INSERT INTO sch1.t1 VALUES (1), (101), (151); + )); + $node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" + ); + $node_subscriber->wait_for_subscription_sync($node_publisher, + 'tap_sub_part'); + $node_publisher->safe_psql('postgres', + "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" + ); + $node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (2), (102), (152)"); + + # Verify that data inserted into the partitioned table is not published when + # it is in the EXCEPT clause. + $result = $node_publisher->safe_psql('postgres', + "SELECT count(*) = 0 FROM pg_logical_slot_get_binary_changes('test_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub_part')" + ); + $node_publisher->wait_for_catchup('tap_sub_part'); + + # Check that no rows are replicated to subscriber + $result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); + is($result, qq(), 'check rows on root table'); + + $result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); + is($result, qq(), 'check rows on table sch1.part1'); + + $result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); + is($result, qq(), 'check rows on table sch1.part2'); + + $result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_1"); + is($result, qq(), 'check rows on table sch1.part2_1'); + + $result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_2"); + is($result, qq(), 'check rows on table sch1.part2_2'); + + $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); + $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); +} + +# ============================================ +# EXCEPT TABLE test cases for normal tables +# ============================================ +# Create schemas and tables on publisher +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SCHEMA sch1; + CREATE TABLE sch1.tab1 AS SELECT generate_series(1,10) AS a; +)); + +# Create schemas and tables on subscriber +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SCHEMA sch1; + CREATE TABLE sch1.tab1 (a int); +)); + +# Setup logical replication, and create a logical replication slot to help with +# later tests. +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_schema FOR ALL TABLES EXCEPT TABLE (sch1.tab1)" +); + +$node_publisher->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('test_slot', 'pgoutput')"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, + 'tap_sub_schema'); + +# Check the table data does not sync for the tables specified in EXCEPT clause +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM sch1.tab1"); +is($result, qq(0), + 'check there is no initial data copied for the tables specified in the except clause' +); + +# Insert some data into the table listed in the EXCEPT clause +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.tab1 VALUES(generate_series(11,20))"); + +# Verify that data inserted into a table listed in the EXCEPT clause is not +# published. +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) = 0 FROM pg_logical_slot_get_binary_changes('test_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_sub_schema')" +); +is($result, qq(t), + 'verify no changes for table listed in the EXCEPT clause are present in the replication slot' +); + +# Verify that data inserted into a table listed in the EXCEPT clause is not +# replicated. +$node_publisher->wait_for_catchup('tap_sub_schema'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM sch1.tab1"); +is($result, qq(0), 'check replicated inserts on subscriber'); + +# cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_schema"); +$node_publisher->safe_psql( + 'postgres', qq( + DROP PUBLICATION tap_pub_schema; + TRUNCATE TABLE sch1.tab1; +)); +$node_subscriber->safe_psql('postgres', "TRUNCATE TABLE sch1.tab1"); + +# ============================================ +# EXCEPT TABLE test cases for partitioned tables +# Check behavior of EXCEPT TABLE with publish_via_partition_root on a +# partitioned table and its partitions. +# ============================================ +# Setup partitioned table and partitions on the publisher that map to normal +# tables on the subscriber +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE sch1.t1(a int) PARTITION BY RANGE(a); + CREATE TABLE sch1.part1 PARTITION OF sch1.t1 FOR VALUES FROM (0) TO (100); + CREATE TABLE sch1.part2 PARTITION OF sch1.t1 FOR VALUES FROM (100) TO (200) PARTITION BY RANGE(a);; + CREATE TABLE sch1.part2_1 PARTITION OF sch1.part2 FOR VALUES FROM (100) TO (150); + CREATE TABLE sch1.part2_2 PARTITION OF sch1.part2 FOR VALUES FROM (150) TO (200); +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE sch1.t1(a int); + CREATE TABLE sch1.part1(a int); + CREATE TABLE sch1.part2(a int); + CREATE TABLE sch1.part2_1(a int); + CREATE TABLE sch1.part2_2(a int); +)); + +test_except_root_partition('false'); +test_except_root_partition('true'); + +$node_publisher->safe_psql('postgres', + "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" +); + +# ============================================ +# Test when a subscription is subscribing to multiple publications +# ============================================ +# ERROR if subscribing to multiple publications having EXCEPT TABLE. +my ($stdout, $stderr); + +$node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub1 FOR ALL TABLES EXCEPT (sch1.tab1); + CREATE PUBLICATION tap_pub2 FOR ALL TABLES EXCEPT (sch1.t1); +)); + +($result, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub1, tap_pub2" +); +like( + $stderr, + qr/ERROR: cannot combine publications "tap_pub1", "tap_pub2" with an EXCEPT TABLE clause/, + 'subscription with multiple EXCEPT TABLE publication'); + +$node_publisher->safe_psql('postgres', 'DROP PUBLICATION tap_pub2'); + +# OK when a table is excluded by pub1 EXCEPT TABLE, but it is included by pub2 +# FOR TABLE +$node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub2 FOR TABLE sch1.tab1; + INSERT INTO sch1.tab1 VALUES(1); +)); +$node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub1, tap_pub2" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', qq(INSERT INTO sch1.tab1 VALUES(2))); +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT * FROM sch1.tab1 ORDER BY a"); +is( $result, qq(1 +2), + "check replication of a table in the EXCEPT clause of one publication but included by another" +); +$node_publisher->safe_psql( + 'postgres', qq( + DROP PUBLICATION tap_pub2; + TRUNCATE sch1.tab1; +)); +$node_subscriber->safe_psql('postgres', qq(TRUNCATE sch1.tab1)); + +# OK when a table is excluded by pub1 EXCEPT TABLE, but it is included by pub2 +# FOR ALL TABLES +$node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub2 FOR ALL TABLES; + INSERT INTO sch1.tab1 VALUES(1); +)); +$node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub1, tap_pub2" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', qq(INSERT INTO sch1.tab1 VALUES(2))); +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT * FROM sch1.tab1 ORDER BY a"); +is( $result, qq(1 +2), + "check replication of a table in the EXCEPT clause of one publication but included by another" +); + +# ERROR if ALTER SUBSCRIPTION ... REFRESH PUBLICATION causes the +# subscription to end up with multiple publications having EXCEPT TABLE. +$node_publisher->safe_psql( + 'postgres', qq( + DROP PUBLICATION tap_pub2; + CREATE PUBLICATION tap_pub2 FOR ALL TABLES EXCEPT (sch1.t1); +)); + +($result, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); +like( + $stderr, + qr/ERROR: cannot combine publications "tap_pub1", "tap_pub2" with an EXCEPT TABLE clause/, + 'subscription with multiple EXCEPT TABLE publication'); + +$node_subscriber->safe_psql('postgres', 'DROP SUBSCRIPTION tap_sub'); +$node_publisher->safe_psql('postgres', 'DROP PUBLICATION tap_pub1'); +$node_publisher->safe_psql('postgres', 'DROP PUBLICATION tap_pub2'); + +$node_publisher->stop('fast'); + +done_testing(); -- 2.34.1