From 37a504cd37a500c38e63a0a118c2e5407148ed37 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 13 Jan 2021 16:35:00 +0530 Subject: [PATCH v1 1/2] Fix ALTER PUBLICATION...DROP TABLE behaviour Currently, in logical replication, publisher/walsender publishes the tables even though they aren't part of the publication i.e they are dropped from the publication. Because of this ALTER PUBLICATION...DROP TABLE doesn't work as expected. This patch adds a new function is_relation_part_of_publication, which looks up in the pg_publication_rel system catalogue, to check whether the given table is part of the publication. If the table is not part of the publication, the publisher/walsender can skip sending changes related to it. --- src/backend/replication/pgoutput/pgoutput.c | 33 +++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 2f01137b42..d4ff953dc0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -66,6 +66,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx); +static bool is_relation_part_of_publication(Oid relid, Oid puboid); /* * Entry in the map used to remember which relation schemas we sent. @@ -1055,6 +1056,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } + else if (!is_relation_part_of_publication(relid, pub->oid)) + { + /* + * Relation is not associated with the publication anymore i.e + * it would have been dropped from the publication. So no need + * to publish the changes for it. + */ + entry->pubactions.pubinsert = false; + entry->pubactions.pubupdate = false; + entry->pubactions.pubdelete = false; + entry->pubactions.pubtruncate = false; + } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && entry->pubactions.pubdelete && entry->pubactions.pubtruncate) @@ -1181,3 +1194,23 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) entry->replicate_valid = false; } + +/* + * Check whether the relation is associated with the given publication. If yes, + * return true, otherwise false. + */ +static bool +is_relation_part_of_publication(Oid relid, Oid puboid) +{ + HeapTuple tup; + + tup = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), ObjectIdGetDatum(puboid)); + + if (!HeapTupleIsValid(tup)) + return false; + + ReleaseSysCache(tup); + + return true; +} -- 2.25.1