From 6f81b16a09a604d7293edea5977884a04eb26d50 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Fri, 20 Feb 2026 13:34:17 +0530 Subject: [PATCH v47 2/2] Support specifying partition tables in EXCEPT clause Allow partitioned tables and individual partitions to be explicitly specified in the EXCEPT clause of a publication. Adjust logical replication so that both initial data synchronization and ongoing change replication correctly honor exclusions involving partitioned tables. On the subscriber side, extend fetch_remote_table_info() to compute the effective set of relations for the initial COPY. If exclusions affect a partitioned table, the root relation cannot always be used directly. Instead, derive the list of non-excluded leaf partitions and construct the COPY query using UNION ALL over those partitions. If no relevant exclusions are present, retain the existing behavior and copy from the root relation as before. --- doc/src/sgml/ref/create_publication.sgml | 8 +- src/backend/catalog/pg_publication.c | 447 +++++++++++++++++- src/backend/commands/subscriptioncmds.c | 2 + src/backend/replication/logical/tablesync.c | 148 +++++- src/backend/replication/pgoutput/pgoutput.c | 156 +++++- src/include/catalog/pg_proc.dat | 7 + src/include/catalog/pg_publication.h | 1 + src/include/replication/worker_internal.h | 6 + src/test/regress/expected/publication.out | 11 +- src/test/regress/sql/publication.sql | 3 +- .../t/037_rep_changes_except_table.pl | 90 +++- src/tools/pgindent/typedefs.list | 1 + 12 files changed, 847 insertions(+), 33 deletions(-) diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index f7cf7237061..436e7d694a3 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -205,11 +205,9 @@ CREATE PUBLICATION name tables are excluded. - For partitioned tables, only the root partitioned table may be specified - in EXCEPT TABLE. Doing so excludes the root table and - all of its partitions from replication, regardless of the value of - publish_via_partition_root. The optional - * has no effect for partitioned tables. + For partitioned tables, if a table is specified in EXCEPT TABLE, that + table and all of its partitions (that is, the entire partition subtree + rooted at that table) are excluded from the publication. There can be a case where a subscription includes multiple publications. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index e3a68ffb6bb..1fdafafb558 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -469,11 +469,6 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, RelationGetRelationName(targetrel), pub->name))); } - if (pub->alltables && pri->except && targetrel->rd_rel->relispartition) - ereport(ERROR, - errmsg("partition \"%s\" cannot be excluded using EXCEPT TABLE", - RelationGetRelationName(targetrel))); - check_publication_add_relation(targetrel); /* Validate and translate column names into a Bitmapset of attnums. */ @@ -921,6 +916,45 @@ GetAllTablesPublications(void) return result; } +/* + * Returns true if the given partitioned table is effectively excluded + * by the EXCEPT list. + * + * A relation is considered excluded if: + * 1) It is explicitly present in exceptlist, or + * 2) All of its leaf partitions are present in exceptlist. + */ +static bool +relation_is_effectively_excluded(Oid relid, List *exceptlist) +{ + List *leaftables = NIL; + + if (exceptlist == NIL) + return false; + + /* Explicitly excluded */ + if (list_member_oid(exceptlist, relid)) + return true; + + /* Get all leaf partitions of relid */ + leaftables = GetPubPartitionOptionRelations(leaftables, + PUBLICATION_PART_LEAF, + relid); + + /* + * If any leaf is NOT present in exceptlist, then the relation is not + * fully excluded. + */ + foreach_oid(leafrelid, leaftables) + { + if (!list_member_oid(exceptlist, leafrelid)) + return false; + } + + /* All leaves are excluded */ + return true; +} + /* * Gets list of all relations published by FOR ALL TABLES/SEQUENCES * publication. @@ -949,7 +983,7 @@ GetAllPublicationRelations(Publication *pub, char relkind) if (relkind == RELKIND_RELATION) exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ? - PUBLICATION_PART_ROOT : + PUBLICATION_PART_ALL : PUBLICATION_PART_LEAF); classRel = table_open(RelationRelationId, AccessShareLock); @@ -990,7 +1024,7 @@ GetAllPublicationRelations(Publication *pub, char relkind) if (is_publishable_class(relid, relForm) && !relForm->relispartition && - !list_member_oid(exceptlist, relid)) + !relation_is_effectively_excluded(relid, exceptlist)) result = lappend_oid(result, relid); } @@ -1239,6 +1273,405 @@ publication_has_any_except_table(Oid pubid) return found; } +/* is_relid_excepted + * + * Check if the relation 'relid' is explicitly specified in the EXCEPT clause + * of the given publication. + */ +bool +is_relid_excepted(Oid relid, Oid pubid) +{ + HeapTuple tup; + bool result = false; + + tup = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubid)); + if (HeapTupleIsValid(tup)) + { + Form_pg_publication_rel prform; + + prform = (Form_pg_publication_rel) GETSTRUCT(tup); + result = prform->prexcept; + + ReleaseSysCache(tup); + } + return result; +} + +/* + * is_relid_or_ancestor_excepted + * + * Check if the relation 'relid' or any of its partition ancestors are + * explicitly specified in the EXCEPT clause of the given publication. + */ +static bool +is_relid_or_ancestor_excepted(Oid relid, Oid pubid) +{ + List *ancestors; + ListCell *lc; + bool in_except = false; + + /* Check the relation itself first */ + if (is_relid_excepted(relid, pubid)) + return true; + + /* Check the inheritance chain */ + ancestors = get_partition_ancestors(relid); + + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + + if (is_relid_excepted(ancestor, pubid)) + { + in_except = true; + break; + } + } + + list_free(ancestors); + + return in_except; +} + +/* + * is_relid_published + * + * Check whether a given table or its schema is included in the specified + * publication. + */ +static bool +is_relid_published(Oid relid, Oid pubid) +{ + HeapTuple tup; + + tup = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubid)); + if (HeapTupleIsValid(tup)) + { + bool published = false; + Form_pg_publication_rel prform; + + prform = (Form_pg_publication_rel) GETSTRUCT(tup); + published = !prform->prexcept; + + ReleaseSysCache(tup); + + if (published) + return true; + } + + return SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(get_rel_namespace(relid)), + ObjectIdGetDatum(pubid)); +} + +/* + * is_relid_or_ancestor_published + * + * Check whether a given table or its schema or any of its partition ancestors, + * or its schema included in the specified publication + */ +static bool +is_relid_or_ancestor_published(Oid relid, Oid pubid) +{ + if (is_relid_published(relid, pubid)) + return true; + else + { + List *ancestors = get_partition_ancestors(relid); + + foreach_oid(anc_oid, ancestors) + { + if (is_relid_published(anc_oid, pubid)) + return true; + } + } + + return false; +} + +/* + * pg_get_publication_effective_tables + * + * Given a root partitioned table and a list of publications, calculate the set + * of relations that are effectively published. This is necessary for + * "FOR ALL TABLES" publications that use "EXCEPT TABLE" filters. + * + * The function returns a minimal set of relations that collectively + * include all non-excluded leaf partitions in the partition hierarchy. + */ +Datum +pg_get_publication_effective_tables(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + List *results; + + if (SRF_IS_FIRSTCALL()) + { + Oid root_relid = PG_GETARG_OID(0); + ArrayType *pub_names_array = PG_GETARG_ARRAYTYPE_P(1); + MemoryContext oldcontext; + List *pub_oids = NIL; + Datum *pub_datums; + bool *pub_nulls; + int pub_count; + TupleDesc tupdesc; + List *final_output = NIL; + bool has_clean_all_tables_pub = false; + List *except_pub_names = NIL; + Oid except_pub_id = InvalidOid; + + Assert(get_rel_relkind(root_relid) == RELKIND_PARTITIONED_TABLE); + + funcctx = SRF_FIRSTCALL_INIT(); + + deconstruct_array(pub_names_array, TEXTOID, -1, false, 'i', + &pub_datums, &pub_nulls, &pub_count); + + /* Build the list of pub_oids */ + for (int i = 0; i < pub_count; i++) + { + if (!pub_nulls[i]) + { + char *pubname = TextDatumGetCString(pub_datums[i]); + + pub_oids = lappend_oid(pub_oids, get_publication_oid(pubname, false)); + } + } + + /* + * Determine whether the expensive expansion step can be skipped. If + * any publication is a FOR ALL TABLES publication without an EXCEPT + * clause, the root relation alone is sufficient as the result. + */ + foreach_oid(puboid, pub_oids) + { + HeapTuple pubTup; + Form_pg_publication pubform; + + pubTup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(puboid)); + if (!HeapTupleIsValid(pubTup)) + continue; + + pubform = (Form_pg_publication) GETSTRUCT(pubTup); + if (pubform->puballtables) + { + /* Check whether this publication defines any EXCEPT entries */ + if (publication_has_any_except_table(puboid)) + { + except_pub_names = lappend(except_pub_names, + makeString(pubform->pubname.data)); + except_pub_id = pubform->oid; + } + else + { + /* This publication includes all tables without except */ + has_clean_all_tables_pub = true; + } + } + + ReleaseSysCache(pubTup); + } + + if (list_length(except_pub_names) > 1) + { + StringInfo pub_names = makeStringInfo(); + + GetPublicationsStr(except_pub_names, pub_names, true); + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot combine publications that define EXCEPT TABLE clauses"), + errdetail("The following publications define EXCEPT TABLE clauses: %s.", + pub_names->data)); + } + + /* Return root immediately if no filtering logic is needed */ + if (has_clean_all_tables_pub || !OidIsValid(except_pub_id)) + { + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + final_output = list_make1_oid(root_relid); + MemoryContextSwitchTo(oldcontext); + } + + /* + * Build the effective publication relation list for a partitioned + * hierarchy in the presence of an EXCEPT publication. + * + * 1. Collect all leaf partitions under the given root relation. + * + * 2. Classify each leaf as either: - excepted_leaves: explicitly + * excluded via the EXCEPT clause (either directly or through any of + * its ancestors), or - allowed_leaves: not excluded and therefore + * part of the effective publication set. + * + * 3. Re-evaluate excluded leaves against other publications. A leaf + * excluded in one publication may still be effectively included if it + * (or any of its ancestors, or its schema) is published through + * another publication. Such leaves are added back to allowed_leaves. + * + * 4. Bottom-up collapse of partition branches. For each partitioned + * table in the hierarchy: - If *all* of its leaf partitions are + * present in allowed_leaves, the parent can represent the entire + * branch. - Such parents are added as candidates, allowing + * higher-level representation instead of listing every leaf + * individually. + * + * 5. Deduplicate and normalize the result. Remove any relation whose + * ancestor is already selected as a candidate. This ensures the final + * output contains only the highest-level representative for each + * fully-allowed branch and avoids redundant entries. + * + * The final_output therefore contains a minimal, non-redundant set of + * relations that accurately represents the effective publication set + * after considering EXCEPT rules and multiple publications. + */ + else + { + List *all_tables; + List *all_leaves = NIL; + List *excepted_leaves = NIL; + List *allowed_leaves = NIL; + List *candidate_list = NIL; + + /* Get all the leaf relations */ + all_leaves = GetPubPartitionOptionRelations(all_leaves, + PUBLICATION_PART_LEAF, + root_relid); + foreach_oid(curr_relid, all_leaves) + { + /* + * A leaf table is considered excluded if it, or any of its + * ancestors, is listed in the EXCEPT clause of the + * publication. Otherwise, it remains part of the effective + * publication set. + */ + if (is_relid_or_ancestor_excepted(curr_relid, except_pub_id)) + excepted_leaves = lappend_oid(excepted_leaves, curr_relid); + else + allowed_leaves = lappend_oid(allowed_leaves, curr_relid); + } + + /* + * A table excluded by the EXCEPT clause of one publication may + * still be included if it is explicitly published, or published + * via its schema or any of its ancestors, in another publication. + */ + foreach_oid(curr_relid, excepted_leaves) + { + foreach_oid(pubid, pub_oids) + { + /* Skip the publication that excluded this relation. */ + if (pubid == except_pub_id) + continue; + + if (is_relid_or_ancestor_published(curr_relid, pubid)) + allowed_leaves = lappend_oid(allowed_leaves, + curr_relid); + } + } + + /* Bottom-Up Collapse. Check if parents can represent children */ + all_tables = find_all_inheritors(root_relid, AccessShareLock, NULL); + candidate_list = list_copy(allowed_leaves); + foreach_oid(curr_relid, all_tables) + { + List *branch_leaves = NIL; + bool all_allowed = true; + + /* Only consider partitioned tables as collapse candidates */ + if (get_rel_relkind(curr_relid) != RELKIND_PARTITIONED_TABLE) + continue; + + branch_leaves = GetPubPartitionOptionRelations(branch_leaves, + PUBLICATION_PART_LEAF, + curr_relid); + if (branch_leaves == NIL) + continue; + + foreach_oid(lcb_oid, branch_leaves) + { + if (!list_member_oid(allowed_leaves, lcb_oid)) + { + all_allowed = false; + break; + } + } + + if (all_allowed) + candidate_list = list_append_unique_oid(candidate_list, + curr_relid); + } + + /* + * Deduplicate: Filter out any relation whose ancestor is already + * present in the candidate list. This ensures we only return the + * "highest" representative for each branch. + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + foreach_oid(curr_relid, candidate_list) + { + List *ancestors = get_partition_ancestors(curr_relid); + bool ancestor_already_included = false; + + /* + * Check if any ancestor of the current relation exists in the + * candidate list. If so, this relation is redundant. + */ + foreach_oid(ancestor_relid, ancestors) + { + if (list_member_oid(candidate_list, ancestor_relid)) + { + ancestor_already_included = true; + break; + } + } + + if (!ancestor_already_included) + final_output = lappend_oid(final_output, curr_relid); + + list_free(ancestors); + } + + MemoryContextSwitchTo(oldcontext); + } + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + /* Construct a tuple descriptor for the result rows. */ + tupdesc = CreateTemplateTupleDesc(2); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "nspname", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relname", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + funcctx->user_fctx = final_output; + + MemoryContextSwitchTo(oldcontext); + } + + /* SRF Per-call Resume */ + funcctx = SRF_PERCALL_SETUP(); + results = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(results)) + { + Oid current_relid = list_nth_oid(results, (int) funcctx->call_cntr); + HeapTuple rettuple; + Datum values[2]; + bool nulls[2] = {false, false}; + + values[0] = CStringGetTextDatum(get_namespace_name(get_rel_namespace(current_relid))); + values[1] = CStringGetTextDatum(get_rel_name(current_relid)); + + rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple)); + } + + SRF_RETURN_DONE(funcctx); +} + /* * Get information of the tables in the given publication array. * diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b0a53c17dd0..1dee31a86c5 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -3051,6 +3051,8 @@ fetch_relation_list(WalReceiverConn *wrconn, List *publications) pub_names.data); } + elog(LOG, "fetch_relation_list: executing query to fetch effective relations: \n%s", + cmd.data); pfree(pub_names.data); res = walrcv_exec(wrconn, cmd.data, column_count, tableRow); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index eee909d912b..a6b6437da58 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -717,21 +717,28 @@ copy_read_data(void *outbuf, int minread, int maxread) * message provides during replication. * * This function also returns (a) the relation qualifications to be used in - * the COPY command, and (b) whether the remote relation has published any - * generated column. + * the COPY command, (b) whether the remote relation has published any + * generated column, and (c) computes the effective set of relations to be used + * as COPY sources when exclusions are present. When no exclusions exist, the + * list remains empty and the root relation is used as-is. When exclusions + * exist, the list contains leaf relations that are not excluded and must be + * combined using UNION ALL. */ static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, - List **qual, bool *gencol_published) + List **qual, bool *gencol_published, + List **effective_relations) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; + Oid filtertableRow[] = {TEXTOID, TEXTOID}; bool isnull; int natt; + bool is_partition; StringInfo pub_names = NULL; Bitmapset *included_cols = NULL; int server_version = walrcv_server_version(LogRepWorkerWalRcvConn); @@ -741,7 +748,7 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -771,6 +778,8 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + is_partition = DatumGetBool(slot_getattr(slot, 4, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -955,6 +964,75 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, walrcv_clear_result(res); + if (server_version >= 190000 && !is_partition && + lrel->relkind == RELKIND_PARTITIONED_TABLE) + { + resetStringInfo(&cmd); + + /* + * This query recursively traverses the inheritance (partition) tree + * starting from the given table OID and determines which leaf + * relations should be included for replication. Exclusion propagates + * from parent to child, and a relation is also treated as excluded if + * it is explicitly marked with prexcept = true in pg_publication_rel + * for the specified publications. The final result returns only non + * excluded leaf relations. + */ + appendStringInfo(&cmd, "SELECT nspname, relname FROM pg_get_publication_effective_tables(%u, ARRAY[%s]);", + lrel->remoteid, + pub_names->data); + + elog(LOG, "Executing query to get the partition tables to be copied:\n%s", cmd.data); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(filtertableRow), filtertableRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not get effective included table list for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err)); + + /* + * Store the tables as a list of schemaname and tablename. + */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + QualifiedRelationName *relinfo = palloc_object(QualifiedRelationName); + + relinfo->nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relinfo->relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *effective_relations = lappend(*effective_relations, relinfo); + + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + + /* + * If there is exactly one item in the effective_relations list and it + * equals the table being processed, that means no actual exclusion + * occurred. + */ + if (list_length(*effective_relations) == 1) + { + QualifiedRelationName *relinfo; + + relinfo = linitial(*effective_relations); + if (strcmp(nspname, relinfo->nspname) == 0 && + strcmp(relname, relinfo->relname) == 0) + { + pfree(relinfo->nspname); + pfree(relinfo->relname); + list_free_deep(*effective_relations); + *effective_relations = NIL; + } + } + } + /* * Get relation's row filter expressions. DISTINCT avoids the same * expression of a table in multiple publications from being included @@ -1044,6 +1122,7 @@ copy_table(Relation rel) LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; List *qual = NIL; + List *effective_relations = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; @@ -1055,7 +1134,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), RelationGetRelationName(rel), &lrel, &qual, - &gencol_published); + &gencol_published, &effective_relations); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -1067,9 +1146,61 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); + + if (effective_relations && list_length(effective_relations)) + { + bool first = true; + + /* + * Build a single COPY command to synchronize all resolved relations + * into the root table. + * + * The array 'effective_relations' contains the leaf tables of + * partition hierarchies, with excluded subtrees removed according to + * the EXCEPT clauses. This applies only when + * 'publish_via_partition_root' is enabled, since the initial sync + * must route all changes to the root table. + * + * We construct a UNION ALL query that combines data from multiple + * leaf relations into one sub-COPY statement, ensuring all rows are + * copied consistently into the root table. + */ + appendStringInfoString(&cmd, "COPY (\n"); + foreach_ptr(QualifiedRelationName, relinfo, effective_relations) + { + if (!first) + appendStringInfoString(&cmd, "UNION ALL\n"); + + first = false; + + appendStringInfoString(&cmd, "SELECT "); + + /* If the table has columns, then specify the columns */ + if (lrel.natts) + { + for (int i = 0; i < lrel.natts; i++) + { + if (i > 0) + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + } + } + else + appendStringInfoString(&cmd, " * "); + + appendStringInfo(&cmd, " FROM %s\n", + quote_qualified_identifier(relinfo->nspname, + relinfo->relname)); + } + + appendStringInfoString(&cmd, ")\n"); + appendStringInfoString(&cmd, "TO STDOUT"); + } /* Regular or partitioned table with no row filter or generated columns */ - if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE) - && qual == NIL && !gencol_published) + else if ((lrel.relkind == RELKIND_RELATION || + lrel.relkind == RELKIND_PARTITIONED_TABLE) && + qual == NIL && !gencol_published) { appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); @@ -1158,6 +1289,7 @@ copy_table(Relation rel) } res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); + elog(LOG, "Tablesync worker: Executing query to get the initial sync data:\n%s", cmd.data); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index dd38eef9303..cbb1a0e7fe7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -2073,6 +2073,73 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) MemoryContextSwitchTo(oldctx); } +/* + * publication_has_publishable_partitions + * + * Checks if the publication has any effective data in a partition tree. + * Returns true if at least one leaf is NOT excluded. + */ +static bool +publication_has_publishable_partitions(Oid root_relid, Oid pubid) +{ + List *all_leaves = NIL; + List *except_tables_in_pub = NIL; + + Assert(get_rel_relkind(root_relid) == RELKIND_PARTITIONED_TABLE); + + except_tables_in_pub = GetExcludedPublicationTables(pubid, + PUBLICATION_PART_LEAF); + + if (except_tables_in_pub == NIL) + return false; + + /* Get all leaf relations in the hierarchy */ + all_leaves = GetPubPartitionOptionRelations(NIL, + PUBLICATION_PART_LEAF, + root_relid); + + /* + * Traverse leaves. If we find even ONE leaf that is not excluded by the + * publication's EXCEPT list, the tree is effective. + */ + foreach_oid(curr_relid, all_leaves) + { + bool relation_in_except = false; + List *ancestors; + + /* Check if the leaf itself is an exception */ + if (list_member_oid(except_tables_in_pub, curr_relid)) + continue; + + /* Check if any ancestor is an exception */ + ancestors = get_partition_ancestors(curr_relid); + + foreach_oid(ancestor, ancestors) + { + if (list_member_oid(except_tables_in_pub, ancestor)) + { + relation_in_except = true; + break; + } + } + + list_free(ancestors); + + /* + * If this leaf is not listed in the EXCEPT clause, the publication + * effectively includes data from this partition tree. + */ + if (!relation_in_except) + { + list_free(all_leaves); + return true; + } + } + + list_free(all_leaves); + return false; +} + /* * Find or create entry in the relation schema cache. * @@ -2134,6 +2201,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + bool root_published_via_alltables = false; + Oid root_ancestor = InvalidOid; + List *ancestors = NIL; GetRelationPublications(relid, &pubids, NULL); @@ -2214,6 +2284,33 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->estate = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); + /* + * For partitions, pre-determine if the top-most ancestor is covered + * by any 'FOR ALL TABLES' publication that uses + * 'publish_via_partition_root' and an 'EXCEPT' clause. + * + * This pre-calculation is vital for resolving overlap conflicts: if a + * partition is excluded globally via an EXCEPT clause but included + * explicitly elsewhere (table/schema level), it must still be routed + * via the root identity if that root is published. + */ + if (am_partition) + { + ancestors = get_partition_ancestors(relid); + root_ancestor = llast_oid(ancestors); + + foreach_ptr(Publication, pubinfo, data->publications) + { + if (pubinfo->alltables && pubinfo->pubviaroot && + publication_has_publishable_partitions(root_ancestor, + pubinfo->oid)) + { + root_published_via_alltables = true; + break; + } + } + } + /* * Build publication cache. We can't use one provided by relcache as * relcache considers all publications that the given relation is in, @@ -2236,28 +2333,42 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* * If this is a FOR ALL TABLES publication, pick the partition * root and set the ancestor level accordingly. + * + * If this is a FOR ALL TABLES publication and it has an EXCEPT + * TABLE list: + * + * If the relation is a partition, check whether the current + * relation or any of the ancestors is included in the EXCEPT + * TABLE list. If so, do not publish the change. + * + * "Do not publish the change" is achieved by keeping the variable + * "publish" set to false. And eventually, entry->pubactions will + * remain all false for this publication. */ if (pub->alltables) { List *exceptpubids = NIL; - if (am_partition && pub->pubviaroot) + if (am_partition) { - List *ancestors = get_partition_ancestors(relid); + foreach_oid(ancestor, ancestors) + GetRelationPublications(ancestor, NULL, &exceptpubids); - pub_relid = llast_oid(ancestors); - ancestor_level = list_length(ancestors); + if (pub->pubviaroot) + { + pub_relid = root_ancestor; + ancestor_level = list_length(ancestors); + } } - GetRelationPublications(pub_relid, NULL, &exceptpubids); + GetRelationPublications(relid, NULL, &exceptpubids); if (!list_member_oid(exceptpubids, pub->oid)) publish = true; list_free(exceptpubids); } - - if (!publish) + else if (!publish) { bool ancestor_published = false; @@ -2271,12 +2382,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { Oid ancestor; int level; - List *ancestors = get_partition_ancestors(relid); ancestor = GetTopMostAncestorInPublication(pub->oid, ancestors, &level); - if (ancestor != InvalidOid) { ancestor_published = true; @@ -2291,7 +2400,25 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (list_member_oid(pubids, pub->oid) || list_member_oid(schemaPubids, pub->oid) || ancestor_published) + { + /* + * If the root ancestor is effectively published by an ALL + * TABLES publication with publish_via_partition_root, + * then changes for its partitions must be published using + * the root identity. + * + * This applies even if other publications do not specify + * publish_via_partition_root, provided the root is not + * excluded from that ALL TABLES publication. + */ + if (root_published_via_alltables) + { + pub_relid = root_ancestor; + ancestor_level = list_length(ancestors); + } + publish = true; + } } /* @@ -2339,6 +2466,17 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) Assert(publish_as_relid == pub_relid); } + /* + * Partitions whose top-most ancestor is being published via + * an 'ALL TABLES' publication need not be individually + * published via any other publication. Repeated occurrences + * of a partition take the least restricted definition, which + * the 'ALL TABLES' publication always provides. I.e., all + * columns and all rows. + */ + if (root_published_via_alltables) + continue; + /* Track publications for this ancestor. */ rel_publications = lappend(rel_publications, pub); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index dac40992cbc..0b951a3d6ea 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12398,6 +12398,13 @@ proname => 'pg_relation_is_publishable', provolatile => 's', prorettype => 'bool', proargtypes => 'regclass', prosrc => 'pg_relation_is_publishable' }, +{ oid => '9002', descr => 'return partition tables eligible for tablesync when publication uses EXCEPT and publish_via_partition_root = true', + proname => 'pg_get_publication_effective_tables', prorows => '1000', + proretset => 't', provolatile => 's', + prorettype => 'record', proargtypes => 'oid _text', + proallargtypes => '{oid,_text,text,text}', proargmodes => '{i,i,o,o}', + proargnames => '{root_relid,pub_names,nspname,relname}', + prosrc => 'pg_get_publication_effective_tables' }, # rls { oid => '3298', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 8d695276837..24308ad4950 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -190,6 +190,7 @@ extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); +extern bool is_relid_excepted(Oid relid, Oid pubid); extern bool publication_has_any_except_table(Oid pubid); extern bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 33fb7f552b4..1f9ece056b8 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -234,6 +234,12 @@ typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; +typedef struct QualifiedRelationName +{ + char *nspname; + char *relname; +} QualifiedRelationName; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern PGDLLIMPORT MemoryContext ApplyContext; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 1e5819c6de2..55dfbe9d5e9 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -335,10 +335,17 @@ Except tables: "public.testpub_root" CREATE PUBLICATION testpub9 FOR ALL TABLES EXCEPT TABLE (testpub_part1); -ERROR: partition "testpub_part1" cannot be excluded using EXCEPT TABLE +\dRp+ testpub9; + Publication testpub9 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_part1" + RESET client_min_messages; DROP TABLE testpub_root, testpub_part1, testpub_part2; -DROP PUBLICATION testpub8; +DROP PUBLICATION testpub8, testpub9; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; CREATE SEQUENCE pub_test.regress_pub_seq1; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 4de0176bc25..80913d562be 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -158,10 +158,11 @@ CREATE TABLE testpub_part2 PARTITION OF testpub_root FOR VALUES FROM (100) TO (2 CREATE PUBLICATION testpub8 FOR ALL TABLES EXCEPT TABLE (testpub_root); \dRp+ testpub8; CREATE PUBLICATION testpub9 FOR ALL TABLES EXCEPT TABLE (testpub_part1); +\dRp+ testpub9; RESET client_min_messages; DROP TABLE testpub_root, testpub_part1, testpub_part2; -DROP PUBLICATION testpub8; +DROP PUBLICATION testpub8, testpub9; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; diff --git a/src/test/subscription/t/037_rep_changes_except_table.pl b/src/test/subscription/t/037_rep_changes_except_table.pl index 361ce1ac996..28f9e59de9d 100644 --- a/src/test/subscription/t/037_rep_changes_except_table.pl +++ b/src/test/subscription/t/037_rep_changes_except_table.pl @@ -178,6 +178,54 @@ $node_publisher->safe_psql('postgres', "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" ); +# Excluding one of the partitions (part2) should not affect replication of the +# other partitions that don't intersect it. So, in this case, all of part1 +# should still be replicated. +$node_publisher->safe_psql( + 'postgres', qq( + TRUNCATE sch1.t1; + INSERT INTO sch1.t1 VALUES (3), (103), (153); + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.part2) WITH (publish_via_partition_root = true); +)); +$node_subscriber->safe_psql( + 'postgres', qq( + TRUNCATE sch1.t1; + CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part; +)); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', + "SELECT slot_name FROM pg_replication_slot_advance('test_slot', pg_current_wal_lsn());" +); + +# Verify that data inserted to the partition part2 is not published when it +# is in the EXCEPT clause +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) = 0 FROM pg_logical_slot_get_binary_changes('test_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub_part')" +); +$node_publisher->wait_for_catchup('tap_sub_part'); + +# Check that table data 103 and 153 which is present in sch1.part2 should +# not be replicated. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(3), 'check rows on root table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is($result, qq(), 'check rows on table sch1.part1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); +is($result, qq(), 'check rows on table sch1.part2'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_1"); +is($result, qq(), 'check rows on table sch1.part2_1'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2_2"); +is($result, qq(), 'check rows on table sch1.part2_2'); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); + # ============================================ # Test when a subscription is subscribing to multiple publications # ============================================ @@ -265,12 +313,52 @@ like( $stderr, qr/ERROR: cannot combine publications that define EXCEPT TABLE clauses.* .*DETAIL:.*The following publications define EXCEPT TABLE clauses: "tap_pub1", "tap_pub2"./, - 'subscription with multiple EXCEPT TABLE publication'); + 'subscription with multiple EXCEPT TABLE publication'); $node_subscriber->safe_psql('postgres', 'DROP SUBSCRIPTION tap_sub'); $node_publisher->safe_psql('postgres', 'DROP PUBLICATION tap_pub1'); $node_publisher->safe_psql('postgres', 'DROP PUBLICATION tap_pub2'); +# ============================================ +# Test multi-publication subscription where a table listed in the +# EXCEPT clause of one publication is included by another publication +# ============================================ +$node_publisher->safe_psql( + 'postgres', qq( + TRUNCATE TABLE sch1.t1; + CREATE PUBLICATION tap_pub_part1 FOR ALL TABLES EXCEPT TABLE (sch1.part2_1) WITH (publish_via_partition_root = true); + CREATE PUBLICATION tap_pub_part2 FOR TABLE sch1.part2_1 WHERE (a > 110) WITH (publish_via_partition_root = true); + INSERT INTO sch1.t1 VALUES (11), (111), (161); +)); +$node_subscriber->safe_psql( + 'postgres', qq( + TRUNCATE TABLE sch1.t1, sch1.part1, sch1.part2, sch1.part2_1, sch1.part2_2; + CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part1, tap_pub_part2; +)); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); + +# Verify that rows for sch1.part2_1 are replicated because the table, although +# listed in the EXCEPT clause of tap_pub_part1, is included by tap_pub_part2. +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1 ORDER BY 1"); +is( $result, qq(11 +111 +161), 'initial rows replicated to root table'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (12), (112), (162);"); + +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1 ORDER BY 1"); +is( $result, qq(11 +12 +111 +112 +161 +162), 'subsequent rows replicated to root table'); + $node_publisher->stop('fast'); done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 241945734ec..9c576e3c267 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2421,6 +2421,7 @@ QTNode QUERYTYPE QualCost QualItem +QualifiedRelationName Query QueryCompletion QueryDesc -- 2.43.0