From bff45768b066ccf94de6dae9f687cf54212900b1 Mon Sep 17 00:00:00 2001 From: Japin Li Date: Tue, 26 Jan 2021 15:43:52 +0800 Subject: [PATCH v3 1/4] Introduce a new syntax to add/drop publications At present, if we want to update publications in subscription, we can use SET PUBLICATION, however, it requires supply all publications that exists and the new publications if we want to add new publications, it's inconvenient. The new syntax only supply the new publications. When the refresh is true, it only refresh the new publications. --- src/backend/commands/subscriptioncmds.c | 121 ++++++++++++++++++++++++ src/backend/parser/gram.y | 20 ++++ src/include/nodes/parsenodes.h | 2 + 3 files changed, 143 insertions(+) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 082f7855b8..88fa7f1b3f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -46,6 +46,8 @@ #include "utils/syscache.h" static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *merge_subpublications(HeapTuple tuple, TupleDesc tupledesc, + List *publications, bool addpub); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -857,6 +859,51 @@ AlterSubscription(AlterSubscriptionStmt *stmt) break; } + case ALTER_SUBSCRIPTION_ADD_PUBLICATION: + case ALTER_SUBSCRIPTION_DROP_PUBLICATION: + { + bool copy_data = false; + bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; + bool refresh; + List *publist = NIL; + + publist = merge_subpublications(tup, RelationGetDescr(rel), + stmt->publication, isadd); + + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + isadd ? ©_data : NULL, + NULL, /* no "synchronous_commit" */ + &refresh, + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(publist); + replaces[Anum_pg_subscription_subpublications - 1] = true; + + update_tuple = true; + + /* Refresh if user asked us to. */ + if (refresh) + { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + + /* Only refresh the added/dropped list of publications. */ + sub->publications = stmt->publication; + + AlterSubscription_refresh(sub, copy_data); + } + + break; + } + case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; @@ -1278,3 +1325,77 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * Merge current subscription's publications and user specified publications + * by ADD/DROP PUBLICATIONS. + * + * If isadd == true, we will add the list of publications into current + * subscription's publications. Otherwise, we will delete the list of + * publications from current subscription's publications. + */ +static List * +merge_subpublications(HeapTuple tuple, TupleDesc tupledesc, + List *newpublist, bool isadd) +{ + int i; + int npublications; + Datum *publications; + bool nulls[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + List *publist = NIL; + ListCell *lc; + ArrayType *array; + + /* deconstruct the subpublications */ + heap_deform_tuple(tuple, tupledesc, values, nulls); + array = DatumGetArrayTypeP(values[Anum_pg_subscription_subpublications - 1]); + deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT, + &publications, NULL, &npublications); + + for (i = 0; i < npublications; i++) + publist = lappend(publist, + makeString(TextDatumGetCString((publications[i])))); + + foreach(lc, newpublist) + { + char *name = strVal(lfirst(lc)); + ListCell *cell = NULL; + + foreach(cell, publist) + { + char *pubname = strVal(lfirst(cell)); + + if (strcmp(name, pubname) == 0) + { + if (isadd) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication name \"%s\" is already in subscription", + name))); + } + else + { + publist = list_delete_cell(publist, cell); + break; + } + } + } + + if (isadd) + publist = lappend(publist, makeString(name)); + else if (cell == NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication name \"%s\" do not in subscription", + name))); + } + + if (publist == NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("subscription must contain at least one publication"))); + + return publist; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 7574d545e0..d20e513518 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9615,6 +9615,26 @@ AlterSubscriptionStmt: n->options = $7; $$ = (Node *)n; } + | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION; + n->subname = $3; + n->publication = $6; + n->options = $7; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION; + n->subname = $3; + n->publication = $6; + n->options = $7; + $$ = (Node *)n; + } | ALTER SUBSCRIPTION name ENABLE_P { AlterSubscriptionStmt *n = diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index dc2bb40926..9148ca9888 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3553,6 +3553,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_PUBLICATION, + ALTER_SUBSCRIPTION_ADD_PUBLICATION, + ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, ALTER_SUBSCRIPTION_ENABLED } AlterSubscriptionType; -- 2.30.0