From d04d5a49de338042241a2ac86a608bf6dbe8a02c Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 7 Nov 2019 18:19:33 +0900 Subject: [PATCH v5] Support adding partitioned tables to publications When a partitioned table is added to a publication, any direct and indirect changes of its leaf partitions are published as if its own. --- doc/src/sgml/logical-replication.sgml | 12 +- doc/src/sgml/ref/create_publication.sgml | 19 ++- src/backend/catalog/pg_publication.c | 50 +++++-- src/backend/commands/publicationcmds.c | 12 +- src/backend/commands/subscriptioncmds.c | 89 +++++++++---- src/backend/executor/execReplication.c | 17 +-- src/backend/replication/logical/tablesync.c | 27 ++-- src/backend/replication/logical/worker.c | 89 ++++++++++++- src/backend/replication/pgoutput/pgoutput.c | 198 ++++++++++++++++++++++------ src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_publication.h | 2 + src/test/regress/expected/publication.out | 19 ++- src/test/regress/sql/publication.sql | 12 +- 13 files changed, 428 insertions(+), 123 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f657d1d06e..87c950b9c8 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,13 +402,11 @@ - Replication is only possible from base tables to base tables. That is, - the tables on the publication and on the subscription side must be normal - tables, not views, materialized views, partition root tables, or foreign - tables. In the case of partitions, you can therefore replicate a - partition hierarchy one-to-one, but you cannot currently replicate to a - differently partitioned setup. Attempts to replicate tables other than - base tables will result in an error. + Replication is only supported by regular and partitioned tables, although + when using partitioned tables as replication target, only + inserts can be replicated. Attempts to replicate relations other than + regular and partitioned tables, such as views, materialized views, or + foreign tables, will result in an error. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..9a4efc06f8 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -68,14 +68,18 @@ CREATE PUBLICATION name that table is added to the publication. If ONLY is not specified, the table and all its descendant tables (if any) are added. Optionally, * can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. However, adding + a partitioned table to a publication never explicitly adds its partitions, + because partitions are implicitly published due to the partitioned table + being added to the publication. - Only persistent base tables can be part of a publication. Temporary - tables, unlogged tables, foreign tables, materialized views, regular - views, and partitioned tables cannot be part of a publication. To - replicate a partitioned table, add the individual partitions to the + Only persistent base tables and partitioned tables can be part of a + publication. Temporary tables, unlogged tables, foreign tables, + materialized views, regular views cannot be part of a publication. + When a partitioned table is added to a publication, all of its existing + and future partitions are also implicitly considered to be part of the publication. @@ -133,6 +137,11 @@ CREATE PUBLICATION name + Partitioned tables are not considered when FOR ALL TABLES + is specified. + + + The creation of a publication does not start replication. It only defines a grouping and filtering logic for future subscribers. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index d442c8e0bb..de8f1afc65 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -26,6 +26,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" @@ -47,17 +48,9 @@ static void check_publication_add_relation(Relation targetrel) { - /* Give more specific error for partitioned tables */ - if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("\"%s\" is a partitioned table", - RelationGetRelationName(targetrel)), - errdetail("Adding partitioned tables to publications is not supported."), - errhint("You can add the table partitions individually."))); - - /* Must be table */ - if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) + /* Must be a regular or partitioned table */ + if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("\"%s\" is not a table", @@ -103,7 +96,8 @@ check_publication_add_relation(Relation targetrel) static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { - return reltuple->relkind == RELKIND_RELATION && + return (reltuple->relkind == RELKIND_RELATION || + reltuple->relkind == RELKIND_PARTITIONED_TABLE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -247,6 +241,38 @@ GetRelationPublications(Oid relid) } /* + * Finds all publications that publish changes to the input relation' + * ancestors. + * + * *published_ancestors will contain the OIDs of ancestors, one for each + * publication returned. The ancestor OIDs can be repeated, because a given + * ancestor may be published via multiple publications. + */ +List * +GetRelationAncestorPublications(Relation rel, + List **published_ancestors) +{ + List *ancestors = get_partition_ancestors(RelationGetRelid(rel)); + List *ancestor_pubids = NIL; + ListCell *lc; + + *published_ancestors = NIL; + foreach(lc, ancestors) + { + Oid relid = lfirst_oid(lc); + List *rel_publishers = GetRelationPublications(relid); + int n = list_length(rel_publishers), + i; + + ancestor_pubids = list_concat_copy(ancestor_pubids, rel_publishers); + for (i = 0; i < n; i++) + *published_ancestors = lappend_oid(*published_ancestors, relid); + } + + return ancestor_pubids; +} + +/* * Gets list of relation oids for a publication. * * This should only be used for normal publications, the FOR ALL TABLES diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index fbf11c86aa..ee56acf3f3 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -498,7 +498,8 @@ RemovePublicationRelById(Oid proid) /* * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -539,8 +540,13 @@ OpenTableList(List *tables) rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); - /* Add children of this rel, if requested */ - if (recurse) + /* + * Add children of this rel, if requested, so that they too are added + * to the publication. A partitioned table can't have any inheritance + * children other than its partitions, which need not be explicitly + * added to the publication. + */ + if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { List *children; ListCell *child; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5408edcfc2..9056409fee 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,7 +44,20 @@ #include "utils/memutils.h" #include "utils/syscache.h" +/* + * Structure for fetch_table_list() to store the information about + * a given published table. + */ +typedef struct PublicationTable +{ + char *schemaname; + char *relname; + bool pubupdate; + bool pubdelete; +} PublicationTable; + static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static Oid ValidateSubscriptionRel(PublicationTable *pt); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -456,15 +469,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) tables = fetch_table_list(wrconn, publications); foreach(lc, tables) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublicationTable *pt = (PublicationTable *) lfirst(lc); Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - + relid = ValidateSubscriptionRel(pt); AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -565,14 +573,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) foreach(lc, pubrel_names) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublicationTable *pt = (PublicationTable *) lfirst(lc); Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + /* Check that there's an appropriate relation present locally. */ + relid = ValidateSubscriptionRel(pt); pubrel_local_oids[off++] = relid; @@ -584,7 +589,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) InvalidXLogRecPtr); ereport(DEBUG1, (errmsg("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + pt->schemaname, pt->relname, sub->name))); } } @@ -1129,7 +1134,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[4] = {TEXTOID, TEXTOID, BOOLOID, BOOLOID}; ListCell *lc; bool first; List *tablelist = NIL; @@ -1137,9 +1142,11 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, p.pubupdate, p.pubdelete\n" " FROM pg_catalog.pg_publication_tables t\n" + " JOIN pg_catalog.pg_publication p ON t.pubname = p.pubname\n" " WHERE t.pubname IN ("); + first = true; foreach(lc, publications) { @@ -1154,7 +1161,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 4, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1166,18 +1173,19 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - char *nspname; - char *relname; + PublicationTable *pt = palloc0(sizeof(PublicationTable)); bool isnull; - RangeVar *rv; - nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + pt->schemaname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + pt->relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + pt->pubupdate = DatumGetBool(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + pt->pubdelete = DatumGetBool(slot_getattr(slot, 4, &isnull)); Assert(!isnull); - rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); - tablelist = lappend(tablelist, rv); + tablelist = lappend(tablelist, pt); ExecClearTuple(slot); } @@ -1187,3 +1195,36 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * Looks up a local relation matching the given publication table and + * checks that it's appropriate to use as replication target, erroring + * out if not. + * + * Oid of the successfully validated local relation is returned. + */ +static Oid +ValidateSubscriptionRel(PublicationTable *pt) +{ + Oid relid; + char local_relkind; + RangeVar *rv; + + rv = makeRangeVar(pstrdup(pt->schemaname), pstrdup(pt->relname), -1); + relid = RangeVarGetRelid(rv, AccessShareLock, false); + Assert(OidIsValid(relid)); + + /* Check for supported relkind. */ + local_relkind = get_rel_relkind(relid); + CheckSubscriptionRelkind(local_relkind, rv->schemaname, rv->relname); + + if (local_relkind == RELKIND_PARTITIONED_TABLE && + (pt->pubupdate || pt->pubdelete)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use partitioned table \"%s.%s\" as logical replication target", + pt->schemaname, pt->relname), + errdetail("Partitioned tables can accept only insert and truncate operations via logical replication."))); + + return relid; +} diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 95e027c970..11a2293b56 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -396,7 +396,7 @@ ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot) ResultRelInfo *resultRelInfo = estate->es_result_relation_info; Relation rel = resultRelInfo->ri_RelationDesc; - /* For now we support only tables. */ + /* For now we support only regular tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); CheckCmdReplicaIdentity(rel, CMD_INSERT); @@ -591,17 +591,10 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { /* - * We currently only support writing to regular tables. However, give a - * more specific error for partitioned and foreign tables. + * We currently only support writing to regular and partitioned tables. + * However, give a more specific error for foreign tables. */ - if (relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - nspname, relname), - errdetail("\"%s.%s\" is a partitioned table.", - nspname, relname))); - else if (relkind == RELKIND_FOREIGN_TABLE) + if (relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", @@ -609,7 +602,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, errdetail("\"%s.%s\" is a foreign table.", nspname, relname))); - if (relkind != RELKIND_RELATION) + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e01d18c3a1..cccbe0e9c1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -630,16 +630,17 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. XXX - While we fetch relkind too + * here, the RELATION message doesn't provide it. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, char *relkind) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {OIDOID, CHAROID}; + Oid tableRow[3] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; bool isnull; int natt; @@ -649,16 +650,16 @@ fetch_remote_table_info(char *nspname, char *relname, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" " WHERE n.nspname = %s" " AND c.relname = %s" - " AND c.relkind = 'r'", + " AND pg_relation_is_publishable(c.oid)", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -675,6 +676,8 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + *relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -750,10 +753,12 @@ copy_table(Relation rel) CopyState cstate; List *attnamelist; ParseState *pstate; + char remote_relkind; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, + &remote_relkind); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -764,8 +769,12 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "COPY %s TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + if (remote_relkind == RELKIND_PARTITIONED_TABLE) + appendStringInfo(&cmd, "COPY (SELECT * FROM %s) TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + else + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ff62303638..78fcb4ffe9 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -29,11 +29,13 @@ #include "access/xlog_internal.h" #include "catalog/catalog.h" #include "catalog/namespace.h" +#include "catalog/partition.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "executor/execPartition.h" #include "executor/nodeModifyTable.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -139,6 +141,21 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) } /* + * Different interface to use when a LogicalRepRelMapEntry is not present + * for a given local target relation. + */ +static bool +should_apply_changes_for_relid(Oid localreloid, char state, + XLogRecPtr statelsn) +{ + if (am_tablesync_worker()) + return MyLogicalRepWorker->relid == localreloid; + else + return (state == SUBREL_STATE_READY || + (state == SUBREL_STATE_SYNCDONE && + statelsn <= remote_final_lsn)); +} +/* * Make sure that we started local transaction. * * Also switches to ApplyMessageContext as necessary. @@ -573,6 +590,8 @@ apply_handle_insert(StringInfo s) EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; + ModifyTableState *mtstate = NULL; + PartitionTupleRouting *proute = NULL; ensure_transaction(); @@ -601,6 +620,36 @@ apply_handle_insert(StringInfo s) oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); slot_store_cstrings(remoteslot, rel, newtup.values); slot_fill_defaults(rel, estate, remoteslot); + + /* Tuple routing for a partitioned table. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ResultRelInfo *partrelinfo; + PartitionRoutingInfo *partinfo; + TupleConversionMap *map; + + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = CMD_INSERT; + mtstate->resultRelInfo = estate->es_result_relations; + proute = ExecSetupPartitionTupleRouting(estate, mtstate, + rel->localrel); + partrelinfo = ExecFindPartition(mtstate, + estate->es_result_relation_info, + proute, remoteslot, estate); + estate->es_result_relation_info = partrelinfo; + partinfo = partrelinfo->ri_PartitionInfo; + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + TupleTableSlot *new_slot = partinfo->pi_PartitionTupleSlot; + + remoteslot = execute_attr_map_slot(map->attrMap, remoteslot, + new_slot); + } + } + MemoryContextSwitchTo(oldctx); ExecOpenIndices(estate->es_result_relation_info, false); @@ -610,6 +659,8 @@ apply_handle_insert(StringInfo s) /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); + if (proute) + ExecCleanupTupleRouting(mtstate, proute); PopActiveSnapshot(); /* Handle queued AFTER triggers. */ @@ -906,14 +957,48 @@ apply_handle_truncate(StringInfo s) LogicalRepRelMapEntry *rel; rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) { + bool really_skip = true; + + /* + * If we seem to have gotten sent a leaf partition because an + * ancestor was truncated, confirm before proceeding with + * truncating the partition that an ancestor indeed has a valid + * subscription state. + */ + if (rel->state == SUBREL_STATE_UNKNOWN && + rel->localrel->rd_rel->relispartition) + { + List *ancestors = get_partition_ancestors(rel->localreloid); + ListCell *lc1; + + foreach(lc1, ancestors) + { + Oid ancestor = lfirst_oid(lc1); + XLogRecPtr statelsn; + char state; + + /* Check using the ancestor's subscription state. */ + state = GetSubscriptionRelState(MySubscription->oid, + ancestor, &statelsn, + false); + really_skip &= !should_apply_changes_for_relid(ancestor, + state, + statelsn); + } + } + /* * The relation can't become interesting in the middle of the * transaction so it's safe to unlock it. */ - logicalrep_rel_close(rel, RowExclusiveLock); - continue; + if (really_skip) + { + logicalrep_rel_close(rel, RowExclusiveLock); + continue; + } } remote_rels = lappend(remote_rels, rel); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3483c1b877..a0fc37542b 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,6 +12,7 @@ */ #include "postgres.h" +#include "access/tupconvert.h" #include "catalog/pg_publication.h" #include "fmgr.h" #include "replication/logical.h" @@ -49,21 +50,34 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); +static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); /* Entry in the map used to remember which relation schemas we sent. */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - bool schema_sent; /* did we send the schema? */ + + /* + * Did we send the schema -- either own or ancestor's? + */ + bool schema_sent; bool replicate_valid; PublicationActions pubactions; + + /* + * Valid if publishing relation's changes as changes to some ancestor, + * that is, if relation is a partition. The map, if any, will be used + * to convert the tuples from partition's type to the ancestor's. + */ + Oid replicate_as_relid; + TupleConversionMap *map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -254,47 +268,72 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Write the relation schema if the current schema hasn't been sent yet. + * Write the current schema of the relation and its ancestor (if any) if not + * done yet. */ static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry) { - if (!relentry->schema_sent) + if (relentry->schema_sent) + return; + + /* If needed, send the ancestor's schema first. */ + if (OidIsValid(relentry->replicate_as_relid)) { - TupleDesc desc; - int i; + Relation ancestor = + RelationIdGetRelation(relentry->replicate_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + MemoryContext oldctx; - desc = RelationGetDescr(relation); + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + relentry->map = convert_tuples_by_name(indesc, outdesc); + MemoryContextSwitchTo(oldctx); + send_relation_and_attrs(ancestor, ctx); + RelationClose(ancestor); + } - /* - * Write out type info if needed. We do that only for user-created - * types. We use FirstGenbkiObjectId as the cutoff, so that we only - * consider objects with hand-assigned OIDs to be "built in", not for - * instance any function or type defined in the information_schema. - * This is important because only hand-assigned OIDs can be expected - * to remain stable across major versions. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); + send_relation_and_attrs(relation, ctx); + relentry->schema_sent = true; +} - if (att->attisdropped || att->attgenerated) - continue; +/* + * Sends a relation + */ +static void +send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +{ + TupleDesc desc = RelationGetDescr(relation); + int i; - if (att->atttypid < FirstGenbkiObjectId) - continue; + /* + * Write out type info if needed. We do that only for user-created + * types. We use FirstGenbkiObjectId as the cutoff, so that we only + * consider objects with hand-assigned OIDs to be "built in", not for + * instance any function or type defined in the information_schema. + * This is important because only hand-assigned OIDs can be expected + * to remain stable across major versions. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atttypid < FirstGenbkiObjectId) + continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_typ(ctx->out, att->atttypid); OutputPluginWrite(ctx, false); - relentry->schema_sent = true; } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); } /* @@ -311,7 +350,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) return; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ switch (change->action) @@ -341,28 +380,56 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, - &change->data.tp.newtuple->tuple); - OutputPluginWrite(ctx, true); - break; + { + HeapTuple tuple = &change->data.tp.newtuple->tuple; + + if (OidIsValid(relentry->replicate_as_relid)) + { + relation = RelationIdGetRelation(relentry->replicate_as_relid); + if (relentry->map) + tuple = execute_attr_map_tuple(tuple, relentry->map); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, relation, tuple); + OutputPluginWrite(ctx, true); + break; + } case REORDER_BUFFER_CHANGE_UPDATE: { HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + HeapTuple newtuple = &change->data.tp.newtuple->tuple; + + if (OidIsValid(relentry->replicate_as_relid)) + { + relation = RelationIdGetRelation(relentry->replicate_as_relid); + if (relentry->map) + { + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + newtuple = execute_attr_map_tuple(newtuple, relentry->map); + } + } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, - &change->data.tp.newtuple->tuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); OutputPluginWrite(ctx, true); break; } case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { + HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + + if (OidIsValid(relentry->replicate_as_relid)) + { + relation = RelationIdGetRelation(relentry->replicate_as_relid); + if (relentry->map) + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + } + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, - &change->data.tp.oldtuple->tuple); + logicalrep_write_delete(ctx->out, relation, oldtuple); OutputPluginWrite(ctx, true); } else @@ -401,7 +468,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -524,10 +591,16 @@ init_rel_sync_cache(MemoryContext cachectx) /* * Find or create entry in the relation schema cache. + * + * For a partition, the schema of the top-most ancestor that is published + * will be used instead of that of the partition itself, so the information + * about ancestor's publications is looked up here and saved in the schema + * cache entry. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation rel) { + Oid relid = RelationGetRelid(rel); RelationSyncEntry *entry; bool found; MemoryContext oldctx; @@ -585,6 +658,51 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) break; } + /* + * For partitions, we prefer to publish their changes using an + * ancestor's schema (usually the top-most ancestor) if it is + * published, but only if a publication explicitly lists the ancestor + * as its member (that is, not a FOR ALL TABLES publication). + */ + if (rel->rd_rel->relispartition) + { + List *ancestor_pubids; + List *published_ancestors = NIL; + Oid topmost_published_ancestor = InvalidOid; + + ancestor_pubids = + GetRelationAncestorPublications(rel, &published_ancestors); + + foreach(lc, data->publications) + { + Publication *pub = lfirst(lc); + ListCell *lc1, + *lc2; + + forboth(lc1, ancestor_pubids, lc2, published_ancestors) + { + if (lfirst_oid(lc1) == pub->oid) + { + entry->pubactions.pubinsert |= pub->pubactions.pubinsert; + entry->pubactions.pubupdate |= pub->pubactions.pubupdate; + entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + topmost_published_ancestor = lfirst_oid(lc2); + } + } + + if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) + break; + } + + if (OidIsValid(topmost_published_ancestor)) + entry->replicate_as_relid = topmost_published_ancestor; + + list_free(ancestor_pubids); + list_free(published_ancestors); + } + list_free(pubids); entry->replicate_valid = true; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index bf69adc2f4..57b4d1a8c1 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3969,8 +3969,9 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) { TableInfo *tbinfo = &tblinfo[i]; - /* Only plain tables can be aded to publications. */ - if (tbinfo->relkind != RELKIND_RELATION) + /* Only plain and partitioned tables can be added to publications. */ + if (tbinfo->relkind != RELKIND_RELATION && + tbinfo->relkind != RELKIND_PARTITIONED_TABLE) continue; /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 20a2f0ac1b..a67a626a71 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -81,6 +81,8 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); +extern List *GetRelationAncestorPublications(Relation rel, + List **published_ancestors); extern List *GetPublicationRelations(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index feb51e4add..b41e90e8ad 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -116,6 +116,20 @@ Tables: DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +SELECT tablename FROM pg_publication_tables WHERE pubname = 'testpub_forparted'; + tablename +---------------- + testpub_parted +(1 row) + +DROP PUBLICATION testpub_forparted; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table @@ -142,11 +156,6 @@ Tables: ALTER PUBLICATION testpub_default ADD TABLE testpub_view; ERROR: "testpub_view" is not a table DETAIL: Only tables can be added to publications. --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; -ERROR: "testpub_parted" is a partitioned table -DETAIL: Adding partitioned tables to publications is not supported. -HINT: You can add the table partitions individually. ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 5773a755cf..bed6e7d54c 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -69,6 +69,16 @@ RESET client_min_messages; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +SELECT tablename FROM pg_publication_tables WHERE pubname = 'testpub_forparted'; +DROP PUBLICATION testpub_forparted; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; @@ -83,8 +93,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; -- fail - view ALTER PUBLICATION testpub_default ADD TABLE testpub_view; --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; -- 2.11.0