From 95618a85136a4545e9f0c39c577e4d83ed3f642d Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Fri, 8 Apr 2022 12:16:05 +0530 Subject: [PATCH v7 2/2] Support force option for copy_data, check and throw an error if publisher tables were also subscribing data in the publisher from other publishers. This patch does couple of things: 1) Added force option for copy_data. 2) Check and throw an error if the publication tables were also subscribing data in the publisher from other publishers. Let's consider an existing Multi master logical replication setup between Node1 and Node2 that is created using the following steps: a) Node1 - Publication publishing employee table. b) Node2 - Subscription subscribing from publication pub1 with local_only. c) Node2 - Publication publishing employee table. d) Node1 - Subscription subscribing from publication pub2 with local_only. Now when user is trying to add another node Node3 to the above Multi master logical replication setup: a) user will have to create one subscription subscribing from Node1 to Node3 b) user wil have to create another subscription subscribing from Node2 to Node3 using local_only option and copy_data as true. While the subscription is created, server will identify that Node2 is subscribing from Node1 and Node1 is subscribing from Node2 and throw an error so that user can handle the initial copy data. Handling of initial data copying in this case is detailed in the documentation section of the patch. --- doc/src/sgml/logical-replication.sgml | 215 +++++++++++++ doc/src/sgml/ref/alter_subscription.sgml | 8 +- doc/src/sgml/ref/create_subscription.sgml | 19 +- src/backend/commands/subscriptioncmds.c | 155 ++++++++-- src/test/regress/expected/subscription.out | 18 +- src/test/regress/sql/subscription.sql | 12 + src/test/subscription/t/032_bidirectional.pl | 298 ++++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 8 files changed, 652 insertions(+), 74 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 555fbd749c..8f99b04026 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -700,4 +700,219 @@ CREATE SUBSCRIPTION mysub CONNECTION 'dbname=foo host=bar user=repuser' PUBLICAT incremental changes to those tables. + + + Setting Bidirection logical replication between two nodes: + + + + + Create the publication in node1: + +CREATE PUBLICATION pub_node1 FOR TABLE t1; + + + + + + + Create the subscription in node2 to subscribe the changes from node1: + +CREATE SUBSCRIPTION sub_node1_node2 CONNECTION 'dbname=foo host=node1 user=repuser' PUBLICATION pub_node1 WITH (copy_data = off, local_only = on); + + + + + + + Create publication in node2: + +CREATE PUBLICATION pub_node2 FOR TABLE t1; + + + + + + + Create subscription in node1 to subscribe the changes from node2: + +CREATE SUBSCRIPTION sub_node2_node1 CONNECTION 'dbname=foo host=node2 user=repuser' PUBLICATION pub_node2 WITH (copy_data = off, local_only = on); + + + + + + + Now the BiDirectional logical replication setup is complete between node1 + and node2. Any incremental changes from node1 will be replicated to node2 and + the incremental changes from node2 will be replicated to node1. + + + + Adding new node to the two node bidirectional logical replication setup + + Let's see a slightly complex setup where we try to add node3 to the above + setup in various different scenarios: + + + + When there is no data present in any of the nodes node1, node2 or node3 + + + + Create publication in node3: + +CREATE PUBLICATION pub_node3 FOR TABLE t1; + + + + + + + Create subscription in node1 to subscribe the changes from node3: + +CREATE SUBSCRIPTION sub_node1_node3 CONNECTION 'dbname=foo host=node3 user=repuser' PUBLICATION pub_node3 WITH (copy_data = off, local_only = on); + + + + + + + Create subscription in node2 to subscribe the changes from node3: + +CREATE SUBSCRIPTION sub_node2_node3 CONNECTION 'dbname=foo host=node3 user=repuser' PUBLICATION pub_node3 WITH (copy_data = off, local_only = on); + + + + + + + Create subscription in node3 to subscribe the changes from node1: + +CREATE SUBSCRIPTION sub_node3_node1 CONNECTION 'dbname=foo host=node1 user=repuser' PUBLICATION pub_node1 WITH (copy_data = off, local_only = on); + + + + + + + Create subscription in node3 to subscribe the changes from node2: + +CREATE SUBSCRIPTION sub_node3_node2 CONNECTION 'dbname=foo host=node2 user=repuser' PUBLICATION pub_node2 WITH (copy_data = off, local_only = on); + + + + + + + + When data is present in the exiting nodes node1 and node2 + + + + Create publication in node3: + +CREATE PUBLICATION pub_node3 FOR TABLE t1; + + + + + + + Create subscription in node1 to subscribe the changes from node3: + +CREATE SUBSCRIPTION sub_node1_node3 CONNECTION 'dbname=foo host=node3 user=repuser' PUBLICATION pub_node3 WITH (copy_data = off, local_only = on); + + + + + + + Create subscription in node2 to subscribe the changes from node3: + +CREATE SUBSCRIPTION sub_node2_node3 CONNECTION 'dbname=foo host=node3 user=repuser' PUBLICATION pub_node3 WITH (copy_data = off, local_only = on); + + + + + + + Create subscription in node3 to subscribe the changes from node1, here + copy_data is specified as force so that the existing table data is + copied during initial sync: + +CREATE SUBSCRIPTION sub_node3_node1 CONNECTION 'dbname=foo host=node1 user=repuser' PUBLICATION pub_node1 WITH (copy_data = force, local_only = on); + + + + + + + Create subscription in node3 to subscribe the changes from node2: + +CREATE SUBSCRIPTION sub_node3_node2 CONNECTION 'dbname=foo host=node2 user=repuser' PUBLICATION pub_node2 WITH (copy_data = off, local_only = on); + + + + + + + + When data is present in the new node node3 + + + + Create publication in node3: + +CREATE PUBLICATION pub_node3 FOR TABLE t1; + + + + + + + Create subscription in node1 to subscribe the changes from node3, here + copy_data is specified as force so that the existing table data is + copied during initial sync: + +CREATE SUBSCRIPTION sub_node1_node3 CONNECTION 'dbname=foo host=node3 user=repuser' PUBLICATION pub_node3 WITH (copy_data = force, local_only = on); + + + + + + + Create subscription in node2 to subscribe the changes from node3, here + copy_data is specified as force so that the existing table data is + copied during initial sync: + +CREATE SUBSCRIPTION sub_node2_node3 CONNECTION 'dbname=foo host=node3 user=repuser' PUBLICATION pub_node3 WITH (copy_data = force, local_only = on); + + + + + + + Create subscription in node3 to subscribe the changes from node1 and + node2, here copy_data is specified as force when creating subscription + to node1 so that the existing table data is copied during initial sync: + +# Truncate the table data but do not replicate the truncate. +ALTER PUBLICATION pub_node3 SET (publish='insert,update,delete'); +TRUNCATE t1; + +CREATE SUBSCRIPTION sub_node3_node1 CONNECTION 'dbname=foo host=node1 user=repuser' PUBLICATION pub_node1 WITH (copy_data = force, local_only = on); + +CREATE SUBSCRIPTION sub_node3_node2 CONNECTION 'dbname=foo host=node2 user=repuser' PUBLICATION pub_node2 WITH (copy_data = off, local_only = on); + +# Include truncate operations from now +ALTER PUBLICATION pub_node3 SET (publish='insert,update,delete,truncate'); + + + + + + + + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index bd19abf4bf..4717535bad 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -161,12 +161,14 @@ ALTER SUBSCRIPTION name RENAME TO < - copy_data (boolean) + copy_data (enum) Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is true. + that are being subscribed to when the replication starts. This + parameter may be either true, + false or force. The default is + true. Previously subscribed tables are not copied, even if a table's row diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index c09f7b0600..907331015f 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -161,6 +161,11 @@ CREATE SUBSCRIPTION subscription_namefalse. + + There is some interation between the "local_only" option and + "copy_data" option. Refer to the + for details. + @@ -213,18 +218,26 @@ CREATE SUBSCRIPTION subscription_name - copy_data (boolean) + copy_data (enum) Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is true. + that are being subscribed to when the replication starts. This + parameter may be either true, + false or force. The default is + true. If the publications contain WHERE clauses, it will affect what data is copied. Refer to the for details. + + + There is some interation between the "local_only" option and + "copy_data" option. Refer to the + for details. + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3d9efb2a06..127cfd12b8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -69,6 +69,18 @@ /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) +#define IS_COPY_DATA_ON_OR_FORCE(copy_data) (copy_data != COPY_DATA_OFF) + +/* + * Represents whether copy_data option is specified with on, off or force. + */ +typedef enum CopyData +{ + COPY_DATA_OFF, + COPY_DATA_ON, + COPY_DATA_FORCE +} CopyData; + /* * Structure to hold a bitmap representing the user-provided CREATE/ALTER * SUBSCRIPTION command options and the parsed/default values of each of them. @@ -81,7 +93,7 @@ typedef struct SubOpts bool connect; bool enabled; bool create_slot; - bool copy_data; + CopyData copy_data; bool refresh; bool binary; bool streaming; @@ -91,11 +103,66 @@ typedef struct SubOpts XLogRecPtr lsn; } SubOpts; -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_table_list(WalReceiverConn *wrconn, List *publications, + CopyData copydata, bool local_only); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +/* + * Validate the value specified for copy_data option. + */ +static CopyData +DefGetCopyData(DefElem *def) +{ + /* + * If no parameter given, assume "true" is meant. + */ + if (def->arg == NULL) + return COPY_DATA_ON; + + /* + * Allow 0, 1, "true", "false", "on", "off" or "force". + */ + switch (nodeTag(def->arg)) + { + case T_Integer: + switch (intVal(def->arg)) + { + case 0: + return COPY_DATA_OFF; + case 1: + return COPY_DATA_ON; + default: + /* otherwise, error out below */ + break; + } + break; + default: + { + char *sval = defGetString(def); + + /* + * The set of strings accepted here should match up with + * the grammar's opt_boolean_or_string production. + */ + if (pg_strcasecmp(sval, "true") == 0 || + pg_strcasecmp(sval, "on") == 0) + return COPY_DATA_ON; + if (pg_strcasecmp(sval, "false") == 0 || + pg_strcasecmp(sval, "off") == 0) + return COPY_DATA_OFF; + if (pg_strcasecmp(sval, "force") == 0) + return COPY_DATA_FORCE; + } + break; + } + + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s requires a boolean or \"force\"", def->defname)); + return COPY_DATA_OFF; /* keep compiler quiet */ +} /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -128,7 +195,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (IsSet(supported_opts, SUBOPT_CREATE_SLOT)) opts->create_slot = true; if (IsSet(supported_opts, SUBOPT_COPY_DATA)) - opts->copy_data = true; + opts->copy_data = COPY_DATA_ON; if (IsSet(supported_opts, SUBOPT_REFRESH)) opts->refresh = true; if (IsSet(supported_opts, SUBOPT_BINARY)) @@ -196,7 +263,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_COPY_DATA; - opts->copy_data = defGetBoolean(defel); + opts->copy_data = DefGetCopyData(defel); } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && strcmp(defel->defname, "synchronous_commit") == 0) @@ -333,17 +400,17 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "create_slot = true"))); - if (opts->copy_data && + if (IS_COPY_DATA_ON_OR_FORCE(opts->copy_data) && IsSet(opts->specified_opts, SUBOPT_COPY_DATA)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", - "connect = false", "copy_data = true"))); + "connect = false", "copy_data = true/force"))); /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; - opts->copy_data = false; + opts->copy_data = COPY_DATA_OFF; } /* @@ -671,13 +738,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = IS_COPY_DATA_ON_OR_FORCE(opts.copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* * Get the table list from publisher and build local table status * info. */ - tables = fetch_table_list(wrconn, publications); + tables = fetch_table_list(wrconn, publications, opts.copy_data, + opts.local_only); foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); @@ -720,7 +788,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && IS_COPY_DATA_ON_OR_FORCE(opts.copy_data) + && tables != NIL) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -761,8 +830,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data, - List *validate_publications) +AlterSubscription_refresh(Subscription *sub, CopyData copy_data, + List *validate_publications, bool local_only) { char *err; List *pubrel_names; @@ -797,7 +866,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, check_publications(wrconn, validate_publications); /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = fetch_table_list(wrconn, sub->publications, copy_data, + local_only); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid); @@ -851,7 +921,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, list_length(subrel_states), sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + IS_COPY_DATA_ON_OR_FORCE(copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr); ereport(DEBUG1, (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", @@ -1134,7 +1204,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, case ALTER_SUBSCRIPTION_SET_PUBLICATION: { - supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; + supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH | SUBOPT_LOCAL_ONLY; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1157,7 +1227,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * See ALTER_SUBSCRIPTION_REFRESH for details why this is * not allowed. */ - if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && IS_COPY_DATA_ON_OR_FORCE(opts.copy_data)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), @@ -1170,7 +1240,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = stmt->publication; AlterSubscription_refresh(sub, opts.copy_data, - stmt->publication); + stmt->publication, + opts.local_only); } break; @@ -1182,7 +1253,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, List *publist; bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA; + supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA | SUBOPT_LOCAL_ONLY; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1222,7 +1293,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = publist; AlterSubscription_refresh(sub, opts.copy_data, - validate_publications); + validate_publications, + opts.local_only); } break; @@ -1236,7 +1308,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA | SUBOPT_LOCAL_ONLY, + &opts); /* * The subscription option "two_phase" requires that @@ -1255,7 +1328,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * * For more details see comments atop worker.c. */ - if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + IS_COPY_DATA_ON_OR_FORCE(opts.copy_data)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), @@ -1264,7 +1338,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data, NULL); + AlterSubscription_refresh(sub, opts.copy_data, NULL, + opts.local_only); break; } @@ -1779,22 +1854,27 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * publisher connection. */ static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_table_list(WalReceiverConn *wrconn, List *publications, CopyData copydata, + bool local_only) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, CHAROID}; List *tablelist = NIL; initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); + appendStringInfoString(&cmd, + "SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename, PS.srsubstate as replicated\n" + "FROM pg_publication P,\n" + "LATERAL pg_get_publication_tables(P.pubname) GPT\n" + "LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" + "pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPT.relid AND P.pubname in ("); get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1820,6 +1900,25 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) rv = makeRangeVar(nspname, relname, -1); tablelist = lappend(tablelist, rv); + /* + * XXX: During initial table sync we cannot differentiate between the + * local and non-local data that is present in the HEAP. Identification + * of local data can be done only from the WAL by using the origin id. + * Throw an error so that the user can take care of the initial data + * copying and then create subscription with copy_data off. + * + * It is quite possible that subscriber has not yet pulled data to + * the tables, but in ideal cases the table data will be subscribed. + * To keep the code simple it is not checked if the subscriber table + * has pulled the data or not. + */ + if (copydata == COPY_DATA_ON && local_only && !slot_attisnull(slot, 3)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("CREATE/ALTER SUBSCRIPTION with local_only and copy_data as true is not allowed when the publisher might have replicated data, table:%s.%s might have replicated data in the publisher", + nspname, relname), + errhint("Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force")); + ExecClearTuple(slot); } ExecDropSingleTupleTableSlot(slot); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 8bf7c810a5..fd8c49e05f 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -47,7 +47,13 @@ ERROR: must be superuser to create subscriptions SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); -ERROR: connect = false and copy_data = true are mutually exclusive options +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force); +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on); +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1); +ERROR: connect = false and copy_data = true/force are mutually exclusive options CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true); ERROR: connect = false and enabled = true are mutually exclusive options CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true); @@ -93,6 +99,16 @@ ALTER SUBSCRIPTION regress_testsub4 SET (local_only = false); DROP SUBSCRIPTION regress_testsub3; DROP SUBSCRIPTION regress_testsub4; +-- ok - valid coy_data options +CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; +DROP SUBSCRIPTION regress_testsub5; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 327a1e2500..11b2c0a7c2 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -40,6 +40,9 @@ SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = true); @@ -66,6 +69,15 @@ ALTER SUBSCRIPTION regress_testsub4 SET (local_only = false); DROP SUBSCRIPTION regress_testsub3; DROP SUBSCRIPTION regress_testsub4; +-- ok - valid coy_data options +CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off); +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false); +CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0); + +DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; +DROP SUBSCRIPTION regress_testsub5; + -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; diff --git a/src/test/subscription/t/032_bidirectional.pl b/src/test/subscription/t/032_bidirectional.pl index 4a73533672..88c03081f6 100644 --- a/src/test/subscription/t/032_bidirectional.pl +++ b/src/test/subscription/t/032_bidirectional.pl @@ -11,6 +11,120 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +my $result; +my $stdout; +my $stderr; + +my $appname_A1 = 'tap_sub_A_B'; +my $appname_A2 = 'tap_sub_A_C'; +my $appname_B1 = 'tap_sub_B_A'; +my $appname_B2 = 'tap_sub_B_C'; +my $appname_C1 = 'tap_sub_C_A'; +my $appname_C2 = 'tap_sub_C_B'; + +# Subroutine for cleaning up the subscriber contents. +sub clean_subscriber_contents +{ + my ($node_A, $node_B, $node_C) = @_; + $node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A_C"); + $node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C"); + $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_A"); + $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_B"); + + $result = + $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(1), 'check subscription was dropped on subscriber'); + + $result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(1), 'check subscription was dropped on subscriber'); + + $result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(0), 'check subscription was dropped on subscriber'); + + $result = $node_A->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(1), 'check replication slot was dropped on publisher'); + + $result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(1), 'check replication slot was dropped on publisher'); + + $result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(0), 'check replication slot was dropped on publisher'); + + $node_A->safe_psql('postgres', "TRUNCATE tab_full"); + $node_B->safe_psql('postgres', "TRUNCATE tab_full"); + $node_C->safe_psql('postgres', "TRUNCATE tab_full"); +} + +# Subroutine for verify the data is replicated successfully. +sub verify_data +{ + my ($node_A, $node_B, $node_C) = @_; + + $node_A->wait_for_catchup($appname_B1); + $node_A->wait_for_catchup($appname_C1); + $node_B->wait_for_catchup($appname_A1); + $node_B->wait_for_catchup($appname_C2); + $node_C->wait_for_catchup($appname_A2); + $node_C->wait_for_catchup($appname_B2); + + # check that data is replicated to all the nodes + $result = + $node_A->safe_psql('postgres', "SELECT * FROM tab_full order by 1;"); + is( $result, qq(11 +12 +21 +22 +31), + 'Inserted successfully without leading to infinite recursion in circular replication setup' + ); + + $result = + $node_B->safe_psql('postgres', "SELECT * FROM tab_full order by 1;"); + is( $result, qq(11 +12 +21 +22 +31), + 'Inserted successfully without leading to infinite recursion in circular replication setup' + ); + + $result = + $node_C->safe_psql('postgres', "SELECT * FROM tab_full order by 1;"); + is( $result, qq(11 +12 +21 +22 +31), + 'Inserted successfully without leading to infinite recursion in circular replication setup' + ); +} + +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + +sub create_subscription +{ + my ($node_subscriber, $node_publisher, $sub_name, $node_connstr, + $application_name, $pub_name, $copy_data_val) + = @_; + $node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $sub_name + CONNECTION '$node_connstr application_name=$application_name' + PUBLICATION $pub_name + WITH (copy_data = $copy_data_val, local_only = on)"); + $node_publisher->wait_for_catchup($application_name); + + # Also wait for initial table sync to finish + $node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +} + ##################################################################### # Setup a bidirectional logical replication between Node_A & Node_B ##################################################################### @@ -19,7 +133,8 @@ use Test::More; # node_A my $node_A = PostgreSQL::Test::Cluster->new('node_A'); $node_A->init(allows_streaming => 'logical'); -$node_A->append_conf('postgresql.conf', qq( +$node_A->append_conf( + 'postgresql.conf', qq( max_prepared_transactions = 10 logical_decoding_work_mem = 64kB )); @@ -27,58 +142,64 @@ $node_A->start; # node_B my $node_B = PostgreSQL::Test::Cluster->new('node_B'); $node_B->init(allows_streaming => 'logical'); -$node_B->append_conf('postgresql.conf', qq( +$node_B->append_conf( + 'postgresql.conf', qq( max_prepared_transactions = 10 logical_decoding_work_mem = 64kB )); $node_B->start; # Create tables on node_A -$node_A->safe_psql('postgres', - "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); # Create the same tables on node_B -$node_B->safe_psql('postgres', - "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); # Setup logical replication # node_A (pub) -> node_B (sub) my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); -my $appname_B = 'tap_sub_B'; -$node_B->safe_psql('postgres', " - CREATE SUBSCRIPTION tap_sub_B - CONNECTION '$node_A_connstr application_name=$appname_B' - PUBLICATION tap_pub_A - WITH (local_only = on)"); + +create_subscription($node_B, $node_A, $appname_B1, $node_A_connstr, + $appname_B1, 'tap_pub_A', 'off'); # node_B (pub) -> node_A (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); -my $appname_A = 'tap_sub_A'; -$node_A->safe_psql('postgres', " - CREATE SUBSCRIPTION tap_sub_A - CONNECTION '$node_B_connstr application_name=$appname_A' - PUBLICATION tap_pub_B - WITH (local_only = on, copy_data = off)"); - -# Wait for subscribers to finish initialization -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_A); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_A->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -$node_B->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -is(1,1, "Circular replication setup is complete"); +create_subscription($node_A, $node_B, $appname_A1, $node_B_connstr, + $appname_A1, 'tap_pub_B', 'off'); -my $result; +is(1, 1, "Circular replication setup is complete"); + +# Error when creating susbcription with local_only and copy_data as true when +# the publisher has replicated data +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A3 + CONNECTION '$node_B_connstr application_name=$appname_A1' + PUBLICATION tap_pub_B + WITH (local_only = on, copy_data = on)"); +like( + $stderr, + qr/ERROR: CREATE\/ALTER SUBSCRIPTION with local_only and copy_data as true is not allowed when the publisher might have replicated data/, + "Create subscription with local_only and copy_data having replicated table in publisher" +); + +# Creating subscription with local_only and copy_data as force should be +# successful when the publisher has replicated data +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A2 + CONNECTION '$node_B_connstr application_name=$appname_A2' + PUBLICATION tap_pub_B + WITH (local_only = on, copy_data = force)"); + +$node_A->safe_psql( + 'postgres', " + DROP SUBSCRIPTION tap_sub_A2"); ############################################################################# # check that bidirectional logical replication setup does not cause infinite @@ -87,20 +208,119 @@ my $result; # insert a record $node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); -$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (21);"); -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A1); # check that transaction was committed on subscriber(s) $result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;"); -is($result, qq(11 -12), 'Inserted successfully without leading to infinite recursion in circular replication setup'); +is( $result, qq(11 +21), + 'Inserted successfully without leading to infinite recursion in circular replication setup' +); $result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;"); -is($result, qq(11 -12), 'Inserted successfully without leading to infinite recursion in circular replication setup'); +is( $result, qq(11 +21), + 'Inserted successfully without leading to infinite recursion in circular replication setup' +); + +# Initialize 3rd node +# node_C +my $node_C = PostgreSQL::Test::Cluster->new('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_C->start; + +# Create tables on node_C +$node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; +$node_C->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_C FOR TABLE tab_full"); + +########################################################################## +# Add 3rd node when the existing node has some data +########################################################################## +create_subscription($node_A, $node_C, $appname_A2, $node_C_connstr, + $appname_A2, 'tap_pub_C', 'off'); +create_subscription($node_B, $node_C, $appname_B2, $node_C_connstr, + $appname_B2, 'tap_pub_C', 'off'); +create_subscription($node_C, $node_A, $appname_C1, $node_A_connstr, + $appname_C1, 'tap_pub_A', 'force'); +create_subscription($node_C, $node_B, $appname_C2, $node_B_connstr, + $appname_C2, 'tap_pub_B', 'off'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (22);"); +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);"); + +verify_data($node_A, $node_B, $node_C); + +clean_subscriber_contents($node_A, $node_B, $node_C); + +########################################################################## +# Add 3rd node when the existing node has no data +########################################################################## +create_subscription($node_A, $node_C, $appname_A2, $node_C_connstr, + $appname_A2, 'tap_pub_C', 'off'); +create_subscription($node_B, $node_C, $appname_B2, $node_C_connstr, + $appname_B2, 'tap_pub_C', 'off'); +create_subscription($node_C, $node_A, $appname_C1, $node_A_connstr, + $appname_C1, 'tap_pub_A', 'off'); +create_subscription($node_C, $node_B, $appname_C2, $node_B_connstr, + $appname_C2, 'tap_pub_B', 'off'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (21);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (22);"); +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);"); + +verify_data($node_A, $node_B, $node_C); + +clean_subscriber_contents($node_A, $node_B, $node_C); + +########################################################################## +# Add 3rd node when the new node has some data +########################################################################## +create_subscription($node_A, $node_C, $appname_A2, $node_C_connstr, + $appname_A2, 'tap_pub_C', 'force'); +create_subscription($node_B, $node_C, $appname_B2, $node_C_connstr, + $appname_B2, 'tap_pub_C', 'force'); + +$node_C->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete');"); + +$node_C->safe_psql('postgres', "TRUNCATE tab_full"); + +create_subscription($node_C, $node_A, $appname_C1, $node_A_connstr, + $appname_C1, 'tap_pub_A', 'force'); +create_subscription($node_C, $node_B, $appname_C2, $node_B_connstr, + $appname_C2, 'tap_pub_B', 'off'); + +#include truncates now +$node_C->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete,truncate');" +); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (21);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (22);"); +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);"); + +verify_data($node_A, $node_B, $node_C); # shutdown +$node_C->stop('fast'); $node_B->stop('fast'); $node_A->stop('fast'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index be3fafadf8..cf40b843f0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -442,6 +442,7 @@ ConvProcInfo ConversionLocation ConvertRowtypeExpr CookedConstraint +CopyData CopyDest CopyFormatOptions CopyFromState -- 2.32.0