From 856403e4943954267b0e9b4fa96ccc6955838210 Mon Sep 17 00:00:00 2001 From: Vignesh Date: Mon, 3 Mar 2025 11:54:27 +0530 Subject: [PATCH v6] Fix logical replication breakage after ALTER SUBSCRIPTION ... SET PUBLICATION Altering a subscription with `ALTER SUBSCRIPTION ... SET PUBLICATION` could cause logical replication to break under certain conditions. When the apply worker restarts after executing SET PUBLICATION, it continues using the existing replication slot and origin. If the replication origin was not updated before the restart, the WAL start location could point to a position prior to the existence of the specified publication, leading to persistent start of apply worker and reporting errors. This patch skips loading the publication if the publication does not exist and loads the publication and updates the relation entry when the publication gets created. Discussion: https://www.postgresql.org/message-id/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com Discussion: https://www.postgresql.org/message-id/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7d464f656aa..4cfd2b02023 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1762,6 +1762,8 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we just skip the publications that don't exist yet. */ static List * LoadPublications(List *pubnames) @@ -1772,9 +1774,12 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + elog(WARNING, "skipped loading publication: %s", pubname); } return result; @@ -2053,8 +2058,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->attrmap = NULL; } - /* Validate the entry */ - if (!entry->replicate_valid) + /* + * Validate the entry if (a) the entry is not valid, or (b) there is a + * change in the publications. + */ + if (!entry->replicate_valid || !publications_valid) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); -- 2.43.0