From d5fcde5bcd6cf093376bdc670c39f74936d1919a Mon Sep 17 00:00:00 2001 From: wangw Date: Tue, 14 Jun 2022 11:21:25 +0800 Subject: [PATCH v10 3/5] The v4 patch(0001~0003) in thread [1] [1] - https://www.postgresql.org/message-id/OS0PR01MB5716377C85D4A164E6A7D45F94AB9%40OS0PR01MB5716.jpnprd01.prod.outlook.com --- src/backend/replication/logical/relation.c | 226 +++++++++++++-------- src/backend/replication/logical/worker.c | 25 ++- src/include/replication/logicalrelation.h | 1 + src/test/subscription/t/013_partition.pl | 85 +++++++- 4 files changed, 242 insertions(+), 95 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 80fb561a9a..f342396310 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -249,6 +249,72 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel, } } +/* + * Check if replica identity matches and mark the updatable flag. + * + * We allow for stricter replica identity (fewer columns) on subscriber as + * that will not stop us from finding unique tuple. IE, if publisher has + * identity (id,timestamp) and subscriber just (id) this will not be a + * problem, but in the opposite scenario it will. + * + * Don't throw any error here just mark the relation entry as not updatable, + * as replica identity is only for updates and deletes but inserts can be + * replicated even without it. + */ +static void +logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) +{ + Bitmapset *idkey; + LogicalRepRelation *remoterel = &entry->remoterel; + int i; + + entry->updatable = true; + + /* + * If it is a partitioned table, we don't check it, we will check its + * partition later. + */ + if (entry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + idkey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + /* fallback to PK if no replica identity */ + if (idkey == NULL) + { + idkey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + /* + * If no replica identity index and no PK, the published table + * must have replica identity FULL. + */ + if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) + entry->updatable = false; + } + + i = -1; + while ((i = bms_next_member(idkey, i)) >= 0) + { + int attnum = i + FirstLowInvalidHeapAttributeNumber; + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication target relation \"%s.%s\" uses " + "system columns in REPLICA IDENTITY index", + remoterel->nspname, remoterel->relname))); + + attnum = AttrNumberGetAttrOffset(attnum); + + if (entry->attrmap->attnums[attnum] < 0 || + !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) + { + entry->updatable = false; + break; + } + } +} + /* * Open the local relation associated with the remote one. * @@ -307,7 +373,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) if (!entry->localrelvalid) { Oid relid; - Bitmapset *idkey; TupleDesc desc; MemoryContext oldctx; int i; @@ -365,55 +430,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) /* be tidy */ bms_free(missingatts); - /* - * Check that replica identity matches. We allow for stricter replica - * identity (fewer columns) on subscriber as that will not stop us - * from finding unique tuple. IE, if publisher has identity - * (id,timestamp) and subscriber just (id) this will not be a problem, - * but in the opposite scenario it will. - * - * Don't throw any error here just mark the relation entry as not - * updatable, as replica identity is only for updates and deletes but - * inserts can be replicated even without it. - */ - entry->updatable = true; - idkey = RelationGetIndexAttrBitmap(entry->localrel, - INDEX_ATTR_BITMAP_IDENTITY_KEY); - /* fallback to PK if no replica identity */ - if (idkey == NULL) - { - idkey = RelationGetIndexAttrBitmap(entry->localrel, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - - /* - * If no replica identity index and no PK, the published table - * must have replica identity FULL. - */ - if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) - entry->updatable = false; - } - - i = -1; - while ((i = bms_next_member(idkey, i)) >= 0) - { - int attnum = i + FirstLowInvalidHeapAttributeNumber; - - if (!AttrNumberIsForUserDefinedAttr(attnum)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication target relation \"%s.%s\" uses " - "system columns in REPLICA IDENTITY index", - remoterel->nspname, remoterel->relname))); - - attnum = AttrNumberGetAttrOffset(attnum); - - if (entry->attrmap->attnums[attnum] < 0 || - !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) - { - entry->updatable = false; - break; - } - } + /* Check that replica identity matches. */ + logicalrep_rel_mark_updatable(entry); entry->localrelvalid = true; } @@ -436,22 +454,13 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) rel->localrel = NULL; } -/* - * Partition cache: look up partition LogicalRepRelMapEntry's - * - * Unlike relation map cache, this is keyed by partition OID, not remote - * relation OID, because we only have to use this cache in the case where - * partitions are not directly mapped to any remote relation, such as when - * replication is occurring with one of their ancestors as target. - */ - /* * Relcache invalidation callback */ static void logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) { - LogicalRepRelMapEntry *entry; + LogicalRepPartMapEntry *entry; /* Just to be sure. */ if (LogicalRepPartMap == NULL) @@ -464,11 +473,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); /* TODO, use inverse lookup hashtable? */ - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) { - if (entry->localreloid == reloid) + if (entry->relmapentry.localreloid == reloid) { - entry->localrelvalid = false; + entry->relmapentry.localrelvalid = false; hash_seq_term(&status); break; } @@ -481,8 +490,42 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepPartMap); - while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) - entry->localrelvalid = false; + while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + entry->relmapentry.localrelvalid = false; + } +} + +/* + * Reset the entries in the partition map that refer to remoterel + * + * Called when new relation mapping is sent by the publisher to update our + * expected view of incoming data from said publisher. + * + * Note that we don't update the remoterel information in the entry here, + * we will update the information in logicalrep_partition_open to avoid + * unnecessary work. + */ +void +logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel) +{ + HASH_SEQ_STATUS status; + LogicalRepPartMapEntry *part_entry; + LogicalRepRelMapEntry *entry; + + if (LogicalRepPartMap == NULL) + return; + + hash_seq_init(&status, LogicalRepPartMap); + while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) + { + entry = &part_entry->relmapentry; + + if (entry->remoterel.remoteid != remoterel->remoteid) + continue; + + logicalrep_relmap_free_entry(entry); + + memset(entry, 0, sizeof(LogicalRepRelMapEntry)); } } @@ -534,7 +577,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, Oid partOid = RelationGetRelid(partrel); AttrMap *attrmap = root->attrmap; bool found; - int i; MemoryContext oldctx; if (LogicalRepPartMap == NULL) @@ -545,31 +587,40 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, (void *) &partOid, HASH_ENTER, &found); - if (found) - return &part_entry->relmapentry; + entry = &part_entry->relmapentry; - memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + if (found && entry->localrelvalid) + return entry; /* Switch to longer-lived context. */ oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); - part_entry->partoid = partOid; + if (!found) + { + memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); + part_entry->partoid = partOid; + } - /* Remote relation is copied as-is from the root entry. */ - entry = &part_entry->relmapentry; - entry->remoterel.remoteid = remoterel->remoteid; - entry->remoterel.nspname = pstrdup(remoterel->nspname); - entry->remoterel.relname = pstrdup(remoterel->relname); - entry->remoterel.natts = remoterel->natts; - entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); - entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); - for (i = 0; i < remoterel->natts; i++) + if (!entry->remoterel.remoteid) { - entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); - entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + int i; + + /* Remote relation is copied as-is from the root entry. */ + entry = &part_entry->relmapentry; + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); } - entry->remoterel.replident = remoterel->replident; - entry->remoterel.attkeys = bms_copy(remoterel->attkeys); entry->localrel = partrel; entry->localreloid = partOid; @@ -594,7 +645,11 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, { AttrNumber root_attno = map->attnums[attno]; - entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; + /* 0 means it's a dropped attribute */ + if (root_attno == 0) + entry->attrmap->attnums[attno] = -1; + else + entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; } } else @@ -605,7 +660,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, attrmap->maplen * sizeof(AttrNumber)); } - entry->updatable = root->updatable; + /* Check that replica identity matches. */ + logicalrep_rel_mark_updatable(entry); entry->localrelvalid = true; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3f8dea0fc1..4e3bcf7c28 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1803,6 +1803,11 @@ apply_handle_relation(StringInfo s) rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); + + /* + * Also reset all entries in the partition map that refer to remoterel. + */ + logicalrep_partmap_reset_relmap(rel); } /* @@ -2359,6 +2364,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot_part; TupleConversionMap *map; MemoryContext oldctx; + LogicalRepRelMapEntry *part_entry; + AttrMap *attrmap = NULL; /* ModifyTableState is needed for ExecFindPartition(). */ edata->mtstate = mtstate = makeNode(ModifyTableState); @@ -2390,8 +2397,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable); map = partrelinfo->ri_RootToPartitionMap; if (map != NULL) - remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot, + { + attrmap = map->attrMap; + remoteslot_part = execute_attr_map_slot(attrmap, remoteslot, remoteslot_part); + } else { remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); @@ -2399,6 +2409,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); + /* Check if we can do the update or delete on the leaf partition. */ + if(operation == CMD_UPDATE || operation == CMD_DELETE) + { + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + check_relation_updatable(part_entry); + } + switch (operation) { case CMD_INSERT: @@ -2420,15 +2438,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * suitable partition. */ { - AttrMap *attrmap = map ? map->attrMap : NULL; - LogicalRepRelMapEntry *part_entry; TupleTableSlot *localslot; ResultRelInfo *partrelinfo_new; bool found; - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); - /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(estate, partrel, &part_entry->remoterel, diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 7bf8cd22bd..78cd7e77f5 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -38,6 +38,7 @@ typedef struct LogicalRepRelMapEntry } LogicalRepRelMapEntry; extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); +extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel); extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index e7f4a94f19..df897c86db 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -800,9 +800,86 @@ ok( $logfile =~ qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, 'delete target row is missing in tab2_1'); -# No need for this until more tests are added. -# $node_subscriber1->append_conf('postgresql.conf', -# "log_min_messages = warning"); -# $node_subscriber1->reload; +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + +# Test the case that target table on subscriber is a partitioned table and +# check that the changes are replicated correctly after changing the schema of +# table on subcriber. + +$node_publisher->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int); + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;}); + +$node_subscriber2->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a); + CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT; + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx; + ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;}); + +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Make partition map cache +$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b FROM tab5 ORDER BY 1"); +is($result, qq(2|1), 'updates of tab5 replicated correctly'); + +# Change the column order of partition on subscriber +$node_subscriber2->safe_psql( + 'postgres', q{ + ALTER TABLE tab5 DETACH PARTITION tab5_1; + ALTER TABLE tab5_1 DROP COLUMN b; + ALTER TABLE tab5_1 ADD COLUMN b int; + ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT}); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber'); + +# Change the column order of table on publisher +$node_publisher->safe_psql( + 'postgres', q{ + ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT; + ALTER TABLE tab5 ADD COLUMN b INT;}); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 3"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher'); + +# Alter REPLICA IDENTITY on subscriber. +# No REPLICA IDENTITY in the partitioned table on subscriber, but what we check +# is the partition, so it works fine. +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab5 REPLICA IDENTITY NOTHING"); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5_1 ORDER BY 1"); +is($result, qq(4||1), 'updates of tab5 replicated correctly'); done_testing(); -- 2.23.0.windows.1