From 2bae27e37c6c1ace313f544709c0582e2b94b937 Mon Sep 17 00:00:00 2001 From: vignesh Date: Wed, 7 Apr 2021 22:05:53 +0530 Subject: [PATCH v3] Identify missing publications from publisher while create/alter subscription. Creating/altering subscription is successful when we specify a publication which does not exist in the publisher. This patch checks if the specified publications are present in the publisher and throws an error if any of the publication is missing in the publisher. --- doc/src/sgml/ref/alter_subscription.sgml | 12 ++ doc/src/sgml/ref/create_subscription.sgml | 12 ++ src/backend/commands/subscriptioncmds.c | 242 +++++++++++++++++++--- src/test/subscription/t/100_bugs.pl | 71 ++++++- 4 files changed, 306 insertions(+), 31 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 367ac814f4..85d3b8f17b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -160,6 +160,18 @@ ALTER SUBSCRIPTION name RENAME TO < + + + validate_publication (boolean) + + + Specifies whether the subscriber must verify if the specified + publications are present in the publisher. By default, the subscriber + will not check if the publications are present in the publisher. + + + + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e812beee37..ee9e656c60 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -239,6 +239,18 @@ CREATE SUBSCRIPTION subscription_name + + + validate_publication (boolean) + + + Specifies whether the subscriber must verify if the specified + publications are present in the publisher. By default, the subscriber + will not check if the publications are present in the publisher. + + + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 517c8edd3b..e595388472 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -50,6 +50,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +static void check_publications(WalReceiverConn *wrconn, List *publications); /* @@ -69,7 +70,9 @@ parse_subscription_options(List *options, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) + bool *streaming_given, bool *streaming, + bool *validate_publication_given, + bool *validate_publication) { ListCell *lc; bool connect_given = false; @@ -111,6 +114,12 @@ parse_subscription_options(List *options, *streaming = false; } + if (validate_publication) + { + *validate_publication_given = false; + *validate_publication = false; + } + /* Parse options */ foreach(lc, options) { @@ -215,6 +224,16 @@ parse_subscription_options(List *options, *streaming_given = true; *streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "validate_publication") == 0 && validate_publication) + { + if (*validate_publication_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *validate_publication_given = true; + *validate_publication = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -247,10 +266,18 @@ parse_subscription_options(List *options, errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); + if (validate_publication && validate_publication_given && + *validate_publication) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "validate_publication = true"))); + /* Change the defaults of other options. */ *enabled = false; *create_slot = false; *copy_data = false; + *validate_publication = false; } /* @@ -343,6 +370,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool slotname_given; bool binary; bool binary_given; + bool validate_publication; + bool validate_publication_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -361,7 +390,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + &validate_publication_given, + &validate_publication); /* * Since creating a replication slot is not transactional, rolling back @@ -472,6 +503,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) PG_TRY(); { + if (validate_publication) + { + /* Verify specified publications exists in the publisher. */ + check_publications(wrconn, publications); + } + /* * Set sync state based on if we were asked to do data copy or * not. @@ -539,7 +576,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, bool check_pub) { char *err; List *pubrel_names; @@ -568,6 +605,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); + /* Verify specified publications exists in the publisher. */ + if (check_pub) + check_publications(wrconn, sub->publications); + /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); @@ -814,7 +855,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + NULL, NULL); if (slotname_given) { @@ -871,6 +913,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ + NULL, NULL, NULL, NULL); /* no streaming */ Assert(enabled_given); @@ -906,6 +949,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { bool copy_data; bool refresh; + bool validate_publication; + bool validate_publication_given; + char *err; + WalReceiverConn *wrconn; parse_subscription_options(stmt->options, NULL, /* no "connect" */ @@ -916,13 +963,33 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + &validate_publication_given, + &validate_publication); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; update_tuple = true; + if (validate_publication) + { + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Verify specified publications exists in the publisher. */ + check_publications(wrconn, stmt->publication); + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + } + /* Refresh if user asked us to. */ if (refresh) { @@ -937,7 +1004,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, copy_data, false); } break; @@ -950,6 +1017,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) bool copy_data; bool refresh; List *publist; + bool validate_publication; + bool validate_publication_given; + char *err; + WalReceiverConn *wrconn; publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); @@ -963,7 +1034,9 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + &validate_publication_given, + &validate_publication); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publist); @@ -971,6 +1044,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; + if (validate_publication) + { + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Verify specified publications exists in the publisher. */ + check_publications(wrconn, stmt->publication); + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + } + /* Refresh if user asked us to. */ if (refresh) { @@ -985,7 +1076,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Only refresh the added/dropped list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, copy_data, false); } break; @@ -994,6 +1085,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; + bool validate_publication; + bool validate_publication_given; if (!sub->enabled) ereport(ERROR, @@ -1009,11 +1102,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + &validate_publication_given, + &validate_publication); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, copy_data, validate_publication); break; } @@ -1466,27 +1561,20 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) } /* - * Get the list of tables which belong to specified publications on the - * publisher connection. + * Return a query by appending the publications to the input query. */ -static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +static StringInfoData * +get_appended_publications_query(char *query, List *publications) { - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + bool first = true; + StringInfoData *cmd = makeStringInfo(); ListCell *lc; - bool first; - List *tablelist = NIL; Assert(list_length(publications) > 0); - initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); - first = true; + initStringInfo(cmd); + appendStringInfoString(cmd, query); + foreach(lc, publications) { char *pubname = strVal(lfirst(lc)); @@ -1494,14 +1582,108 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) if (first) first = false; else - appendStringInfoString(&cmd, ", "); + appendStringInfoString(cmd, ", "); + + appendStringInfoString(cmd, quote_literal_cstr(pubname)); + } + + appendStringInfoChar(cmd, ')'); + return cmd; +} + +/* + * Verify that the specified publication(s) exists in the publisher. + */ +static void +check_publications(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData *cmd; + TupleTableSlot *slot; + List *publicationsCopy = NIL; + Oid tableRow[1] = {TEXTOID}; + + cmd = get_appended_publications_query("SELECT t.pubname FROM\n" + " pg_catalog.pg_publication t WHERE\n" + " t.pubname IN (", publications); + + res = walrcv_exec(wrconn, cmd->data, 1, tableRow); + pfree(cmd->data); + pfree(cmd); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of publications from the publisher: %s", + res->err))); - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + publicationsCopy = list_copy(publications); + + /* Process publications. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + /* Delete the publication present in publisher from the list. */ + publicationsCopy = list_delete(publicationsCopy, makeString(pubname)); + ExecClearTuple(slot); } - appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); - pfree(cmd.data); + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + if (list_length(publicationsCopy)) + { + StringInfoData nonExistentPublications; + bool first = true; + ListCell *lc; + + /* Convert the publications which does not exist into a string. */ + initStringInfo(&nonExistentPublications); + foreach(lc, publicationsCopy) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&nonExistentPublications, ", "); + appendStringInfoString(&nonExistentPublications, "\""); + appendStringInfoString(&nonExistentPublications, pubname); + appendStringInfoString(&nonExistentPublications, "\""); + } + + ereport(ERROR, + (errmsg("publication(s) %s does not exist in the publisher", + nonExistentPublications.data))); + } +} + +/* + * Get the list of tables which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_table_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData *cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + List *tablelist = NIL; + + cmd = get_appended_publications_query( + "SELECT DISTINCT t.schemaname, t.tablename\n" + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN (", publications); + res = walrcv_exec(wrconn, cmd->data, 2, tableRow); + pfree(cmd->data); + pfree(cmd); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index b8f46f08cc..6ad640600f 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 5; +use Test::More tests => 11; # Bug #15114 @@ -154,6 +154,75 @@ is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"), is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"), $rows * 2, "2x$rows rows in t2"); +# Create subcription for a publication which does not exist. +$node_publisher = get_new_node('testpublisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +$node_subscriber = get_new_node('testsubscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION testpub1 FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher_connstr' PUBLICATION testpub1" +); + +# Specified publication does not exist. +my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication\(s\) "pub_doesnt_exist" does not exist in the publisher/, + "Create subscription for non existent publication fails"); + +# One of the specified publication exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION testpub1, pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication\(s\) "pub_doesnt_exist" does not exist in the publisher/, + "Create subscription for non existent publication fails"); + +# Multiple publications does not exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist, pub_doesnt_exist1 WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication\(s\) "pub_doesnt_exist", "pub_doesnt_exist1" does not exist in the publisher/, + "Create subscription for non existent publication fails"); + +# Specified publication does not exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)"); +ok( $stderr =~ + /ERROR: publication\(s\) "pub_doesnt_exist" does not exist in the publisher/, + "Alter subscription for non existent publication fails"); + +# Specified publication does not exist with refresh = false. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + /ERROR: publication\(s\) "pub_doesnt_exist" does not exist in the publisher/, + "Alter subscription for non existent publication fails"); + +# Set publication on non existent database. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION testsub1 CONNECTION 'dbname=regress_doesnotexist2'"); +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ /ERROR: could not connect to the publisher/, + "Alter subscription for non existent publication fails"); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); + # Verify table data is synced with cascaded replication setup. This is mainly # to test whether the data written by tablesync worker gets replicated. my $node_pub = get_new_node('testpublisher1'); -- 2.25.1