From 168764a0b756a07d3ed9cd84f022f2233e4a7dfc Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 2 Aug 2021 14:23:18 +0900 Subject: [PATCH v7 3/4] Add RESET command to ALTER SUBSCRIPTION command. ALTER SUBSCRIPTION ... RESET command resets subscription parameters. The parameters that can be set are streaming, binary, synchronous_commit. RESET command is reuiqred by follow-up commit introducing to a new parameter skip_xid to reset. --- doc/src/sgml/ref/alter_subscription.sgml | 8 ++- src/backend/commands/subscriptioncmds.c | 78 +++++++++++++++++----- src/backend/parser/gram.y | 11 ++- src/include/nodes/parsenodes.h | 5 +- src/test/regress/expected/subscription.out | 14 +++- src/test/regress/sql/subscription.sql | 13 ++++ 6 files changed, 109 insertions(+), 20 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a6f994450d..8c3c28b7e7 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -29,6 +29,7 @@ ALTER SUBSCRIPTION name REFRESH PUB ALTER SUBSCRIPTION name ENABLE ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) +ALTER SUBSCRIPTION name RESET ( subscription_parameter [, ... ] ) ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER SUBSCRIPTION name RENAME TO new_name @@ -192,16 +193,21 @@ ALTER SUBSCRIPTION name RENAME TO < SET ( subscription_parameter [= value] [, ... ] ) + RESET ( subscription_parameter [, ... ] ) This clause alters parameters originally set by . See there for more - information. The parameters that can be altered + information. The parameters that can be set are slot_name, synchronous_commit, binary, and streaming. + + The parameters that can be reset are: streaming, + binary, synchronous_commit. + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5157f44058..cc390ce95a 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -99,7 +99,8 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, */ static void parse_subscription_options(ParseState *pstate, List *stmt_options, - bits32 supported_opts, SubOpts *opts) + bits32 supported_opts, SubOpts *opts, + bool is_reset) { ListCell *lc; @@ -134,6 +135,11 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, { DefElem *defel = (DefElem *) lfirst(lc); + if (is_reset && defel->arg != NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("RESET must not include values for parameters"))); + if (IsSet(supported_opts, SUBOPT_CONNECT) && strcmp(defel->defname, "connect") == 0) { @@ -192,12 +198,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; - opts->synchronous_commit = defGetString(defel); + if (!is_reset) + { + opts->synchronous_commit = defGetString(defel); - /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", opts->synchronous_commit, - PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, - false, 0, false); + /* + * Test if the given value is valid for synchronous_commit + * GUC. + */ + (void) set_config_option("synchronous_commit", opts->synchronous_commit, + PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, + false, 0, false); + } } else if (IsSet(supported_opts, SUBOPT_REFRESH) && strcmp(defel->defname, "refresh") == 0) @@ -215,7 +227,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_BINARY; - opts->binary = defGetBoolean(defel); + if (!is_reset) + opts->binary = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_STREAMING) && strcmp(defel->defname, "streaming") == 0) @@ -224,7 +237,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_STREAMING; - opts->streaming = defGetBoolean(defel); + if (!is_reset) + opts->streaming = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -397,7 +411,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); - parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, supported_opts, &opts, + false); /* * Since creating a replication slot is not transactional, rolling back @@ -866,14 +881,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, switch (stmt->kind) { - case ALTER_SUBSCRIPTION_OPTIONS: + case ALTER_SUBSCRIPTION_SET_OPTIONS: { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING); parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { @@ -923,10 +938,43 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_RESET_OPTIONS: + { + supported_opts = (SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | + SUBOPT_STREAMING); + + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts, true); + + if (IsSet(opts.specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) + { + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum("off"); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_BINARY)) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_substream - 1] = true; + } + + update_tuple = true; + break; + } case ALTER_SUBSCRIPTION_ENABLED: { parse_subscription_options(pstate, stmt->options, - SUBOPT_ENABLED, &opts); + SUBOPT_ENABLED, &opts, false); + Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED)); if (!sub->slotname && opts.enabled) @@ -961,7 +1009,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -1011,7 +1059,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts |= SUBOPT_COPY_DATA; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); values[Anum_pg_subscription_subpublications - 1] = @@ -1059,7 +1107,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA, &opts, false); /* * The subscription option "two_phase" requires that diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 39a2849eba..bcf85e8980 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9707,7 +9707,16 @@ AlterSubscriptionStmt: { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_OPTIONS; + n->kind = ALTER_SUBSCRIPTION_SET_OPTIONS; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name RESET definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_RESET_OPTIONS; n->subname = $3; n->options = $5; $$ = (Node *)n; diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e28248af32..504d65f7d6 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3659,7 +3659,8 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { - ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SET_OPTIONS, + ALTER_SUBSCRIPTION_RESET_OPTIONS, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -3671,7 +3672,7 @@ typedef enum AlterSubscriptionType typedef struct AlterSubscriptionStmt { NodeTag type; - AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */ + AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_SET_OPTIONS, etc */ char *subname; /* Name of the subscription */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 77b4437b69..b87f67fe55 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -284,11 +284,23 @@ ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +-- ok +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit); +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit, binary, streaming); +-- fail - unsupported parameters +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ERROR: unrecognized subscription parameter: "connect" +ALTER SUBSCRIPTION regress_testsub RESET (enabled); +ERROR: unrecognized subscription parameter: "enabled" +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); +ERROR: RESET must not include values for parameters \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo -----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index d42104c191..aa90560691 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -218,6 +218,19 @@ ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +-- ok +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit); +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit, binary, streaming); + +-- fail - unsupported parameters +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ALTER SUBSCRIPTION regress_testsub RESET (enabled); + +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); + \dRs+ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); -- 2.24.3 (Apple Git-128)