From d07cccd57cbc915f2b1b5a7e9329bf1a6147070e Mon Sep 17 00:00:00 2001 From: amit Date: Wed, 13 Nov 2019 17:18:51 +0900 Subject: [PATCH v6 4/4] Publish partitioned table inserts as its own --- src/backend/catalog/pg_publication.c | 11 +- src/backend/commands/subscriptioncmds.c | 85 ++++++-- src/backend/executor/nodeModifyTable.c | 2 + src/backend/replication/logical/tablesync.c | 19 +- src/backend/replication/logical/worker.c | 298 ++++++++++++++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 182 +++++++++++++---- src/include/catalog/pg_publication.h | 2 +- src/test/subscription/t/013_partition.pl | 48 ++++- 8 files changed, 560 insertions(+), 87 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 5ef77f1014..84fc302592 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -243,20 +243,29 @@ GetRelationPublications(Oid relid) /* * Finds all publications that publish changes to the input relation's * ancestors. + * + * *publisher_ancestors will contain one OID for each publication returned, + * of the ancestor which belongs to it. Values in this list can be repeated, + * because a given ancestor may belong to multiple publications. */ List * -GetRelationAncestorPublications(Oid relid) +GetRelationAncestorPublications(Oid relid, List **published_ancestors) { List *ancestors = get_partition_ancestors(relid); List *ancestor_pubids = NIL; ListCell *lc; + *published_ancestors = NIL; foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); List *rel_publishers = GetRelationPublications(ancestor); + int n = list_length(rel_publishers), + i; ancestor_pubids = list_concat_copy(ancestor_pubids, rel_publishers); + for (i = 0; i < n; i++) + *published_ancestors = lappend_oid(*published_ancestors, ancestor); } return ancestor_pubids; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f65cad4ac0..143d572702 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,6 +44,27 @@ #include "utils/memutils.h" #include "utils/syscache.h" +/* + * Structure used by fetch_publication_tables to describe a published table. + * The information is used by the callers of fetch_publication_tables to + * generate a pg_subscription_rel catalog entry for the table. + */ +typedef struct PublishedTable +{ + RangeVar *rv; + + char relkind; + + /* + * If the published table is partitioned, the following being true means + * its changes are published using own schema rather than the schema of + * its individual partitions. In the latter case, a separate + * PublicationTable instance (and hence pg_subscription_rel entry) for + * each partition will be needed. + */ + bool published_using_root_schema; +} PublishedTable; + static List *fetch_publication_tables(WalReceiverConn *wrconn, List *publications); static Oid ValidateSubscriptionRel(RangeVar *rv); @@ -457,10 +478,21 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) tables = fetch_publication_tables(wrconn, publications); foreach(lc, tables) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublishedTable *pt = (PublishedTable *) lfirst(lc); + RangeVar *rv = pt->rv; Oid relid; relid = ValidateSubscriptionRel(rv); + + /* + * If a partitioned table is published using the schema of its + * partitions, the initial sync will be performed by copying + * from the partitions, so mark the partitioned table itself + * as ready. + */ + if (pt->relkind == RELKIND_PARTITIONED_TABLE && + !pt->published_using_root_schema) + table_state = SUBREL_STATE_READY; AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -561,19 +593,31 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) foreach(lc, pubrel_names) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublishedTable *pt = (PublishedTable *) lfirst(lc); + RangeVar *rv = pt->rv; Oid relid; + char table_state = copy_data ? SUBREL_STATE_INIT : + SUBREL_STATE_READY; /* Check that there's an appropriate relation present locally. */ relid = ValidateSubscriptionRel(rv); pubrel_local_oids[off++] = relid; + /* + * If a partitioned table is published using the schema of its + * partitions, the initial sync will be performed by copying from the + * partitions, so mark the partitioned table itself as ready. + */ + if (pt->relkind == RELKIND_PARTITIONED_TABLE && + !pt->published_using_root_schema) + table_state = SUBREL_STATE_READY; + if (!bsearch(&relid, subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + table_state, InvalidXLogRecPtr); ereport(DEBUG1, (errmsg("table \"%s.%s\" added to subscription \"%s\"", @@ -1124,7 +1168,7 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, BOOLOID}; ListCell *lc; bool first; List *tablelist = NIL; @@ -1132,17 +1176,23 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.tablename FROM (\n" - " SELECT DISTINCT t.pubname, t.schemaname, t.tablename \n" + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.tablename, s.relkind, s.pubasroot FROM (\n" + " SELECT DISTINCT t.pubname, t.schemaname, t.tablename, c.relkind, p.pubasroot \n" " FROM pg_catalog.pg_publication_tables t\n" + " JOIN pg_catalog.pg_publication p ON t.pubname = p.pubname\n" + " JOIN pg_catalog.pg_class c ON t.schemaname = c.relnamespace::pg_catalog.regnamespace::pg_catalog.name\n" + " AND t.tablename = c.relname\n" " UNION\n" - " SELECT DISTINCT t.pubname, s.schemaname, s.tablename\n" - " FROM pg_catalog.pg_publication_tables t,\n" - " LATERAL (SELECT c.relnamespace::regnamespace::name, c.relname\n" - " FROM pg_class c\n" - " JOIN pg_partition_tree(t.schemaname || '.' || t.tablename) p\n" + " SELECT DISTINCT t.pubname, s.schemaname, s.tablename, c.relkind, p.pubasroot\n" + " FROM pg_catalog.pg_publication_tables t\n" + " JOIN pg_catalog.pg_publication p ON t.pubname = p.pubname AND NOT p.pubasroot,\n" + " LATERAL (SELECT c.relnamespace::pg_catalog.regnamespace::pg_catalog.name, c.relname\n" + " FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_partition_tree(t.schemaname || '.' || t.tablename) p\n" " ON p.relid = c.oid\n" - " WHERE p.level > 0) AS s(schemaname, tablename)) s\n" + " WHERE p.level > 0) AS s(schemaname, tablename)\n" + " JOIN pg_catalog.pg_class c ON s.schemaname = c.relnamespace::pg_catalog.regnamespace::pg_catalog.name\n" + " AND s.tablename = c.relname) s\n" " WHERE s.pubname IN ("); first = true; @@ -1159,7 +1209,7 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) } appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 4, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1174,15 +1224,18 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) char *nspname; char *relname; bool isnull; - RangeVar *rv; + PublishedTable *pt = palloc(sizeof(PublishedTable)); nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + pt->rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); + pt->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + pt->published_using_root_schema = DatumGetBool(slot_getattr(slot, 4, &isnull)); + Assert(!isnull); - rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); - tablelist = lappend(tablelist, rv); + tablelist = lappend(tablelist, pt); ExecClearTuple(slot); } diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index fb97d24f3a..4e22b7b382 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -2299,6 +2299,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) { mtstate->rootResultRelInfo = estate->es_root_result_relations + node->rootResultRelIndex; + CheckValidResultRel(mtstate->rootResultRelInfo, + mtstate->rootResultRelInfo, operation); rootResultRelInfo = mtstate->rootResultRelInfo; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 554bdb10d3..56c1e28e1b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -767,21 +767,14 @@ copy_table(Relation rel) relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); - /* - * If either table is partitioned, skip copying. Individual partitions - * will be copied instead. - */ - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE || - remote_relkind == RELKIND_PARTITIONED_TABLE) - { - logicalrep_rel_close(relmapentry, NoLock); - return; - } - /* Start copy on the publisher. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "COPY %s TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + if (remote_relkind == RELKIND_PARTITIONED_TABLE) + appendStringInfo(&cmd, "COPY (SELECT * FROM %s) TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + else + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2686fccdc2..3d6bb37f89 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -29,11 +29,14 @@ #include "access/xlog_internal.h" #include "catalog/catalog.h" #include "catalog/namespace.h" +#include "catalog/partition.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "executor/execPartition.h" #include "executor/nodeModifyTable.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -140,6 +143,22 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) } /* + * Different interface to use when a LogicalRepRelMapEntry is not present + * for a given local target relation. + */ +static bool +should_apply_changes_for_relid(Oid localreloid, char state, + XLogRecPtr statelsn) +{ + if (am_tablesync_worker()) + return MyLogicalRepWorker->relid == localreloid; + else + return (state == SUBREL_STATE_READY || + (state == SUBREL_STATE_SYNCDONE && + statelsn <= remote_final_lsn)); +} + +/* * Make sure that we started local transaction. * * Also switches to ApplyMessageContext as necessary. @@ -722,6 +741,168 @@ apply_handle_do_delete(ResultRelInfo *relinfo, EState *estate, } /* + * This handles insert, update, delete on a partitioned table. + */ +static void +apply_handle_tuple_routing(ResultRelInfo *relinfo, + LogicalRepRelMapEntry *relmapentry, + EState *estate, CmdType operation, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup) +{ + Relation rel = relinfo->ri_RelationDesc; + ModifyTableState *mtstate = NULL; + PartitionTupleRouting *proute = NULL; + ResultRelInfo *partrelinfo, + *partrelinfo1; + TupleTableSlot *localslot; + PartitionRoutingInfo *partinfo; + TupleConversionMap *map; + MemoryContext oldctx; + + /* ModifyTableState is needed for ExecFindPartition(). */ + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = operation; + mtstate->resultRelInfo = relinfo; + proute = ExecSetupPartitionTupleRouting(estate, mtstate, rel); + + /* + * Find a partition for the tuple contained in remoteslot. + * + * For insert, remoteslot is tuple to insert. For update and delete, it + * is the tuple to be replaced and deleted, repectively. + */ + Assert(remoteslot != NULL); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + /* The following throws error if a suitable partition is not found. */ + partrelinfo = ExecFindPartition(mtstate, relinfo, proute, + remoteslot, estate); + Assert(partrelinfo != NULL); + /* Convert the tuple to match the partition's rowtype. */ + partinfo = partrelinfo->ri_PartitionInfo; + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + TupleTableSlot *part_slot = partinfo->pi_PartitionTupleSlot; + + remoteslot = execute_attr_map_slot(map->attrMap, remoteslot, + part_slot); + } + MemoryContextSwitchTo(oldctx); + + switch (operation) + { + case CMD_INSERT: + /* Just insert into the partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_insert(partrelinfo, estate, remoteslot); + break; + + case CMD_DELETE: + /* Just delete from the partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_delete(partrelinfo, estate, remoteslot, + &relmapentry->remoterel); + break; + + case CMD_UPDATE: + + /* + * partrelinfo computed above is the partition which might contain + * the search tuple. Now find the partition for the replacement + * tuple, which might not be the same as partrelinfo. + */ + localslot = table_slot_create(rel, &estate->es_tupleTable); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_cstrings(localslot, remoteslot, + newtup->values, newtup->changed, + relmapentry->attrmap, &relmapentry->remoterel, + RelationGetRelid(rel)); + partrelinfo1 = ExecFindPartition(mtstate, relinfo, proute, + localslot, estate); + MemoryContextSwitchTo(oldctx); + + /* + * If both search and replacement tuple would be in the same + * partition, we can apply this as an UPDATE on the parttion. + */ + if (partrelinfo == partrelinfo1) + { + AttrNumber *attrmap = relmapentry->attrmap; + + /* + * If the partition's attributes don't match the root + * relation's, we'll need to make a new attrmap mapping + * partition attribute numbers to remoterel's. + */ + if (map) + { + TupleDesc partdesc = RelationGetDescr(partrelinfo1->ri_RelationDesc); + TupleDesc rootdesc = RelationGetDescr(rel); + AttrNumber *partToRootMap, + attno; + + /* Need the reverse map here */ + partToRootMap = convert_tuples_by_name_map(partdesc, rootdesc); + attrmap = palloc(partdesc->natts * sizeof(AttrNumber)); + memset(attrmap, -1, partdesc->natts * sizeof(AttrNumber)); + for (attno = 0; attno < partdesc->natts; attno++) + { + AttrNumber root_attno = partToRootMap[attno]; + + attrmap[attno] = relmapentry->attrmap[root_attno - 1]; + } + } + + /* UPDATE partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_update(partrelinfo, estate, remoteslot, + newtup, attrmap, + &relmapentry->remoterel); + if (attrmap != relmapentry->attrmap) + pfree(attrmap); + } + else + { + /* Different, so handle this as DELETE followed by INSERT. */ + + /* DELETE from partition partrelinfo. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_delete(partrelinfo, estate, remoteslot, + &relmapentry->remoterel); + + /* + * Convert the replacement tuple to match the destination + * partition rowtype. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partinfo = partrelinfo1->ri_PartitionInfo; + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + TupleTableSlot *part_slot = partinfo->pi_PartitionTupleSlot; + + localslot = execute_attr_map_slot(map->attrMap, localslot, + part_slot); + } + MemoryContextSwitchTo(oldctx); + /* INSERT into partition partrelinfo1. */ + estate->es_result_relation_info = partrelinfo1; + apply_handle_do_insert(partrelinfo1, estate, localslot); + } + break; + + default: + elog(ERROR, "unrecognized CmdType: %d", (int) operation); + break; + } + + ExecCleanupTupleRouting(mtstate, proute); +} + +/* * Handle INSERT message. */ static void @@ -763,9 +944,13 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_insert(estate->es_result_relation_info, estate, - remoteslot); + /* For a partitioned table, insert the tuple into a partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, rel, + estate, CMD_INSERT, remoteslot, NULL); + else + apply_handle_do_insert(estate->es_result_relation_info, estate, + remoteslot); PopActiveSnapshot(); @@ -863,10 +1048,14 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_update(estate->es_result_relation_info, estate, - remoteslot, &newtup, rel->attrmap, - &rel->remoterel); + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, rel, + estate, CMD_UPDATE, remoteslot, &newtup); + else + apply_handle_do_update(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel->attrmap, + &rel->remoterel); PopActiveSnapshot(); @@ -928,9 +1117,13 @@ apply_handle_delete(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_delete(estate->es_result_relation_info, estate, - remoteslot, &rel->remoterel); + /* For a partitioned table, apply delete to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, rel, + estate, CMD_DELETE, remoteslot, NULL); + else + apply_handle_do_delete(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); PopActiveSnapshot(); @@ -972,14 +1165,48 @@ apply_handle_truncate(StringInfo s) LogicalRepRelMapEntry *rel; rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) { + bool really_skip = true; + + /* + * If we seem to have gotten sent a leaf partition because an + * ancestor was truncated, confirm before proceeding with + * truncating the partition that an ancestor indeed has a valid + * subscription state. + */ + if (rel->state == SUBREL_STATE_UNKNOWN && + rel->localrel->rd_rel->relispartition) + { + List *ancestors = get_partition_ancestors(rel->localreloid); + ListCell *lc1; + + foreach(lc1, ancestors) + { + Oid ancestor = lfirst_oid(lc1); + XLogRecPtr statelsn; + char state; + + /* Check using the ancestor's subscription state. */ + state = GetSubscriptionRelState(MySubscription->oid, + ancestor, &statelsn, + false); + really_skip &= !should_apply_changes_for_relid(ancestor, + state, + statelsn); + } + } + /* * The relation can't become interesting in the middle of the * transaction so it's safe to unlock it. */ - logicalrep_rel_close(rel, RowExclusiveLock); - continue; + if (really_skip) + { + logicalrep_rel_close(rel, RowExclusiveLock); + continue; + } } remote_rels = lappend(remote_rels, rel); @@ -987,6 +1214,47 @@ apply_handle_truncate(StringInfo s) relids = lappend_oid(relids, rel->localreloid); if (RelationIsLogicallyLogged(rel->localrel)) relids_logged = lappend_oid(relids_logged, rel->localreloid); + + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ListCell *child; + List *children = find_all_inheritors(rel->localreloid, + RowExclusiveLock, + NULL); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + if (list_member_oid(relids, childrelid)) + continue; + + /* find_all_inheritors already got lock */ + childrel = table_open(childrelid, NoLock); + + /* + * It is possible that the parent table has children that are + * temp tables of other backends. We cannot safely access + * such tables (because of buffering issues), and the best + * thing to do is to silently ignore them. Note that this + * check is the same as one of the checks done in + * truncate_check_activity() called below, still it is kept + * here for simplicity. + */ + if (RELATION_IS_OTHER_TEMP(childrel)) + { + table_close(childrel, RowExclusiveLock); + continue; + } + + rels = lappend(rels, childrel); + relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(childrel)) + relids_logged = lappend_oid(relids_logged, childrelid); + } + } } /* @@ -996,11 +1264,11 @@ apply_handle_truncate(StringInfo s) */ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); - foreach(lc, remote_rels) + foreach(lc, rels) { - LogicalRepRelMapEntry *rel = lfirst(lc); + Relation rel = lfirst(lc); - logicalrep_rel_close(rel, NoLock); + table_close(rel, NoLock); } CommandCounterIncrement(); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 8dc78f1779..4784a3c587 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,6 +12,7 @@ */ #include "postgres.h" +#include "access/tupconvert.h" #include "catalog/pg_publication.h" #include "fmgr.h" #include "replication/logical.h" @@ -49,6 +50,7 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); +static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. @@ -59,9 +61,22 @@ static void publication_invalidation_cb(Datum arg, int cacheid, typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - bool schema_sent; /* did we send the schema? */ + + /* + * Did we send the schema? If ancestor relid is set, its schema must also + * have been sent for this to be true. + */ + bool schema_sent; bool replicate_valid; PublicationActions pubactions; + + /* + * Valid if publishing relation's changes as changes to some ancestor, + * that is, if relation is a partition. The map, if any, will be used to + * convert the tuples from partition's type to the ancestor's. + */ + Oid replicate_as_relid; + TupleConversionMap *map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -259,47 +274,72 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Write the relation schema if the current schema hasn't been sent yet. + * Write the current schema of the relation and its ancestor (if any) if not + * done yet. */ static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry) { - if (!relentry->schema_sent) + if (relentry->schema_sent) + return; + + /* If needed, send the ancestor's schema first. */ + if (OidIsValid(relentry->replicate_as_relid)) { - TupleDesc desc; - int i; + Relation ancestor = + RelationIdGetRelation(relentry->replicate_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + MemoryContext oldctx; - desc = RelationGetDescr(relation); + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + relentry->map = convert_tuples_by_name(indesc, outdesc); + MemoryContextSwitchTo(oldctx); + send_relation_and_attrs(ancestor, ctx); + RelationClose(ancestor); + } - /* - * Write out type info if needed. We do that only for user-created - * types. We use FirstGenbkiObjectId as the cutoff, so that we only - * consider objects with hand-assigned OIDs to be "built in", not for - * instance any function or type defined in the information_schema. - * This is important because only hand-assigned OIDs can be expected - * to remain stable across major versions. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); + send_relation_and_attrs(relation, ctx); + relentry->schema_sent = true; +} - if (att->attisdropped || att->attgenerated) - continue; +/* + * Sends a relation + */ +static void +send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +{ + TupleDesc desc = RelationGetDescr(relation); + int i; - if (att->atttypid < FirstGenbkiObjectId) - continue; + /* + * Write out type info if needed. We do that only for user-created types. + * We use FirstGenbkiObjectId as the cutoff, so that we only consider + * objects with hand-assigned OIDs to be "built in", not for instance any + * function or type defined in the information_schema. This is important + * because only hand-assigned OIDs can be expected to remain stable across + * major versions. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atttypid < FirstGenbkiObjectId) + continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_typ(ctx->out, att->atttypid); OutputPluginWrite(ctx, false); - relentry->schema_sent = true; } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); } /* @@ -346,28 +386,56 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, - &change->data.tp.newtuple->tuple); - OutputPluginWrite(ctx, true); - break; + { + HeapTuple tuple = &change->data.tp.newtuple->tuple; + + if (OidIsValid(relentry->replicate_as_relid)) + { + relation = RelationIdGetRelation(relentry->replicate_as_relid); + if (relentry->map) + tuple = execute_attr_map_tuple(tuple, relentry->map); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, relation, tuple); + OutputPluginWrite(ctx, true); + break; + } case REORDER_BUFFER_CHANGE_UPDATE: { HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + HeapTuple newtuple = &change->data.tp.newtuple->tuple; + + if (OidIsValid(relentry->replicate_as_relid)) + { + relation = RelationIdGetRelation(relentry->replicate_as_relid); + if (relentry->map) + { + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + newtuple = execute_attr_map_tuple(newtuple, relentry->map); + } + } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, - &change->data.tp.newtuple->tuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); OutputPluginWrite(ctx, true); break; } case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { + HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + + if (OidIsValid(relentry->replicate_as_relid)) + { + relation = RelationIdGetRelation(relentry->replicate_as_relid); + if (relentry->map) + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + } + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, - &change->data.tp.oldtuple->tuple); + logicalrep_write_delete(ctx->out, relation, oldtuple); OutputPluginWrite(ctx, true); } else @@ -411,6 +479,28 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!relentry->pubactions.pubtruncate) continue; + /* + * If this partition was not *directly* truncated, don't bother + * sending it to the subscriber. + */ + if (OidIsValid(relentry->replicate_as_relid)) + { + int j; + bool can_skip_part_trunc = false; + + for (j = 0; j < nrelids; j++) + { + if (relentry->replicate_as_relid == relids[j]) + { + can_skip_part_trunc = true; + break; + } + } + + if (can_skip_part_trunc) + continue; + } + relids[nrelids++] = relid; maybe_send_schema(ctx, relation, relentry); } @@ -529,6 +619,11 @@ init_rel_sync_cache(MemoryContext cachectx) /* * Find or create entry in the relation schema cache. + * + * For a partition, the schema of the top-most ancestor that is published + * will be used in some cases, instead of that of the partition itself, so + * the information about ancestor's publications is looked up here and saved in + * the schema cache entry. */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation rel) @@ -553,8 +648,11 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) { List *pubids = GetRelationPublications(relid); ListCell *lc, - *lc1; + *lc1, + *lc2; List *ancestor_pubids = NIL; + List *published_ancestors = NIL; + Oid topmost_published_ancestor = InvalidOid; /* Reload publications if needed before use. */ if (!publications_valid) @@ -579,7 +677,9 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) /* For partitions, also consider publications of ancestors. */ if (rel->rd_rel->relispartition) ancestor_pubids = - GetRelationAncestorPublications(RelationGetRelid(rel)); + GetRelationAncestorPublications(RelationGetRelid(rel), + &published_ancestors); + Assert(list_length(ancestor_pubids) == list_length(published_ancestors)); foreach(lc, data->publications) { @@ -597,7 +697,7 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; - foreach(lc1, ancestor_pubids) + forboth(lc1, ancestor_pubids, lc2, published_ancestors) { if (lfirst_oid(lc1) == pub->oid) { @@ -605,6 +705,8 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + if (pub->publish_using_root_schema) + topmost_published_ancestor = lfirst_oid(lc2); } } @@ -615,7 +717,9 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) list_free(pubids); list_free(ancestor_pubids); + list_free(published_ancestors); + entry->replicate_as_relid = topmost_published_ancestor; entry->replicate_valid = true; } diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 61d338b110..15bf4a7d4c 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -83,7 +83,7 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); -extern List *GetRelationAncestorPublications(Oid relid); +extern List *GetRelationAncestorPublications(Oid relid, List **published_ancestors); extern List *GetPublicationRelations(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 2b8a5025dc..2e3c7991f8 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 10; +use Test::More tests => 16; # setup @@ -39,21 +39,38 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); $node_subscriber1->safe_psql('postgres', - "CREATE TABLE tab1_2 PARTITION OF tab1 (b DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); + "CREATE TABLE tab1_2 PARTITION OF tab1 (b DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6) PARTITION BY LIST (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_1 PARTITION OF tab1_2 FOR VALUES IN (5)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (6)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab1_2 (a int PRIMARY KEY, b text DEFAULT 'sub2_tab1_2')"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text DEFAULT 'sub2_tab1') PARTITION BY HASH (a)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part1 (b text, a int NOT NULL)"); +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)"); + $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE tab1, tab1_1"); $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab1_2"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE tab1 WITH (publish_using_root_schema = true)"); $node_subscriber1->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); $node_subscriber2->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2"); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3"); # Wait for initial sync of all subscriptions my $synced_query = @@ -83,17 +100,26 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT b, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|1|5), 'inserts into tab1_2 replicated'); + # update a row (no partition change) $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 1"); $node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); $result = $node_subscriber1->safe_psql('postgres', "SELECT b, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|2|5), 'update of tab1_1 replicated'); + # update a row (partition changes) $node_publisher->safe_psql('postgres', @@ -110,6 +136,10 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT b, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|3|6), 'delete from tab1_1 replicated'); + # delete rows (some from the root parent, some directly from the partition) $node_publisher->safe_psql('postgres', @@ -128,12 +158,18 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1_2"); is($result, qq(0||), 'delete from tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'delete from tab1_1, tab_2 replicated'); + # truncate (root parent and partition directly) $node_subscriber1->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (5)"); $node_subscriber2->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); $node_publisher->safe_psql('postgres', "TRUNCATE tab1_2"); @@ -149,6 +185,10 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1_2"); is($result, qq(0||), 'truncate of tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(3|1|5), 'no change, because only truncate of tab1 will be replicated'); + $node_publisher->safe_psql('postgres', "TRUNCATE tab1"); @@ -157,3 +197,7 @@ $node_publisher->wait_for_catchup('sub1'); $result = $node_subscriber1->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1"); is($result, qq(0||), 'truncate of tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); -- 2.11.0