From 5729539bde15f1810b01590610fa4b2b2f01c4db Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Fri, 1 Aug 2025 22:26:06 +0530 Subject: [PATCH v3_PG14] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/021_alter_sub_pub.pl | 51 +++++++++++++++++++- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index a81215cff86..f463e4d6d60 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -841,6 +841,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -851,9 +856,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/021_alter_sub_pub.pl b/src/test/subscription/t/021_alter_sub_pub.pl index 4c59d44e33f..c4d0b99a66a 100644 --- a/src/test/subscription/t/021_alter_sub_pub.pl +++ b/src/test/subscription/t/021_alter_sub_pub.pl @@ -1,12 +1,14 @@ # Copyright (c) 2021, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 3; +use Test::More tests => 4; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -81,6 +83,51 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_3" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_3"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, + $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_3"); +is($result, qq(2), + 'check that the incremental data is replicated after the publication is created' +); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0