From 4a405e22ec858d9c42d595e55d7612a97ce63073 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Mon, 4 Jul 2022 21:00:59 +0530 Subject: [PATCH v29 2/3] Check and throw an error if publication tables were also subscribing from other publishers and support force value for copy_data parameter. This patch does a couple of things: 1) Checks and throws an error if 'copy_data = on' and 'origin = local' but the publication tables were also replicated from other publishers. 2) Adds 'force' value for copy_data parameter. ------------------------------------------------------------------------------- The steps below help to demonstrate how the new exception is useful: The initial copy phase has no way to know the origin of the row data, so if 'copy_data = on' in the step 4 below, then an error will be thrown to prevent any potentially non-local data from being copied: e.g. CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '' PUBLICATION pub_node1 WITH (copy_data = on, origin = local); ERROR: CREATE/ALTER SUBSCRIPTION with origin = local and copy_data = on is not allowed when the publisher might have replicated data. ------------------------------------------------------------------------------- The following steps help to demonstrate how the 'copy_data = force' change will be useful: Let's take a scenario where the user wants to set up bidirectional logical replication between node1 and node2 where the same table on node1 has pre-existing data and node2 has no pre-existing data. e.g. node1: Table t1 (c1 int) has data 11, 12, 13, 14 node2: Table t1 (c1 int) has no pre-existing data The following steps are required in this case: step 1: node1=# CREATE PUBLICATION pub_node1 FOR TABLE t1; CREATE PUBLICATION step 2: node2=# CREATE PUBLICATION pub_node2 FOR TABLE t1; CREATE PUBLICATION step 3: node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '' node1-# PUBLICATION pub_node2; CREATE SUBSCRIPTION step 4: node2=# CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '' node2-# PUBLICATION pub_node1; CREATE SUBSCRIPTION After the subscription is created on node2, node1 will be synced to node2 and the newly synced data will be sent to node2. This process of node1 sending data to node2 and node2 sending data to node1 will repeat infinitely. If table t1 has a unique key, this will lead to a unique key violation and replication won't proceed. This problem can be avoided by using origin and copy_data parameters as given below: Step 1 & Step 2 are same as above. step 3: Create a subscription on node1 to subscribe to node2: node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '' node1-# PUBLICATION pub_node2 WITH (copy_data = off, origin = local); CREATE SUBSCRIPTION step 4: Create a subscription on node2 to subscribe to node1. Use 'copy_data = force' when creating a subscription to node1 so that the existing table data is copied during initial sync: node2=# CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '' node2-# PUBLICATION pub_node1 WITH (copy_data = force, origin = local); CREATE SUBSCRIPTION --- doc/src/sgml/ref/alter_subscription.sgml | 16 +- doc/src/sgml/ref/create_subscription.sgml | 32 +- src/backend/catalog/pg_subscription.c | 86 ++--- src/backend/commands/subscriptioncmds.c | 214 +++++++++++- src/backend/replication/logical/tablesync.c | 3 +- src/include/catalog/pg_subscription_rel.h | 13 +- src/test/regress/expected/subscription.out | 22 +- src/test/regress/sql/subscription.sql | 14 + src/test/subscription/t/032_origin.pl | 341 +++++++++++++++++--- src/tools/pgindent/typedefs.list | 2 + 10 files changed, 616 insertions(+), 127 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 64efc21f53..04e526fb80 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -161,12 +161,22 @@ ALTER SUBSCRIPTION name RENAME TO < - copy_data (boolean) + copy_data (enum) Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is true. + that are being subscribed to when the replication starts. This + parameter may be either true, + false or force. The default is + true. + + + There is some interaction between the origin + parameter and the copy_data parameter. Refer to + the CREATE SUBSCRIPTION + for interaction + details and usage of force for + copy_data parameter. Previously subscribed tables are not copied, even if a table's row diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index b0a6ebcb7d..f0bc2ba63d 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -115,7 +115,8 @@ CREATE SUBSCRIPTION subscription_nameconnect to false with setting create_slot, enabled, - or copy_data to true.) + or copy_data to + true/force.) @@ -201,18 +202,27 @@ CREATE SUBSCRIPTION subscription_name - copy_data (boolean) + copy_data (enum) Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is true. + that are being subscribed to when the replication starts. This + parameter may be either true, + false or force. The default is + true. If the publications contain WHERE clauses, it will affect what data is copied. Refer to the for details. + + There is some interaction between the origin + parameter and the copy_data parameter. Refer to + the for interaction + details and usage of force for + copy_data parameter. + @@ -316,6 +326,11 @@ CREATE SUBSCRIPTION subscription_nameany. + + There is some interaction between the origin + parameter and the copy_data parameter. Refer to + the for details. + @@ -387,6 +402,15 @@ CREATE SUBSCRIPTION subscription_name + + If the subscription is created with origin = local and + copy_data = true, it will check if the publisher has + subscribed to the same table from other publishers and, if so, throw an + error to prevent possible non-local data from being copied. The user can + override this check and continue with the copy operation by specifying + copy_data = force. + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 33ae3da8ae..7049500fea 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -533,65 +533,13 @@ HasSubscriptionRelations(Oid subid) } /* - * Get all relations for subscription. + * Get the relations for the subscription based on the subscriber relation type + * specified. * * Returned list is palloc'ed in current memory context. */ List * -GetSubscriptionRelations(Oid subid) -{ - List *res = NIL; - Relation rel; - HeapTuple tup; - ScanKeyData skey[1]; - SysScanDesc scan; - - rel = table_open(SubscriptionRelRelationId, AccessShareLock); - - ScanKeyInit(&skey[0], - Anum_pg_subscription_rel_srsubid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(subid)); - - scan = systable_beginscan(rel, InvalidOid, false, - NULL, 1, skey); - - while (HeapTupleIsValid(tup = systable_getnext(scan))) - { - Form_pg_subscription_rel subrel; - SubscriptionRelState *relstate; - Datum d; - bool isnull; - - subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); - - relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); - relstate->relid = subrel->srrelid; - relstate->state = subrel->srsubstate; - d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, - Anum_pg_subscription_rel_srsublsn, &isnull); - if (isnull) - relstate->lsn = InvalidXLogRecPtr; - else - relstate->lsn = DatumGetLSN(d); - - res = lappend(res, relstate); - } - - /* Cleanup */ - systable_endscan(scan); - table_close(rel, AccessShareLock); - - return res; -} - -/* - * Get all relations for subscription that are not in a ready state. - * - * Returned list is palloc'ed in current memory context. - */ -List * -GetSubscriptionNotReadyRelations(Oid subid) +GetSubscriptionRelations(Oid subid, SubRelStateType relstatetype) { List *res = NIL; Relation rel; @@ -601,16 +549,28 @@ GetSubscriptionNotReadyRelations(Oid subid) SysScanDesc scan; rel = table_open(SubscriptionRelRelationId, AccessShareLock); - ScanKeyInit(&skey[nkeys++], Anum_pg_subscription_rel_srsubid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(subid)); - ScanKeyInit(&skey[nkeys++], - Anum_pg_subscription_rel_srsubstate, - BTEqualStrategyNumber, F_CHARNE, - CharGetDatum(SUBREL_STATE_READY)); + switch (relstatetype) + { + case ALL_STATE: + break; + case READY_STATE: + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(SUBREL_STATE_READY)); + break; + case NOT_READY_STATE: + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHARNE, + CharGetDatum(SUBREL_STATE_READY)); + break; + } scan = systable_beginscan(rel, InvalidOid, false, NULL, nkeys, skey); @@ -634,6 +594,12 @@ GetSubscriptionNotReadyRelations(Oid subid) else relstate->lsn = DatumGetLSN(d); + if (relstatetype == READY_STATE) + { + relstate->relname = get_rel_name(subrel->srrelid); + relstate->nspname = get_namespace_name(get_rel_namespace(subrel->srrelid)); + } + res = lappend(res, relstate); } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 217fdf7339..6a4ef7edc3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -69,6 +69,16 @@ /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) +/* + * Represents whether copy_data parameter is specified with off, on or force. + */ +typedef enum CopyData +{ + COPY_DATA_OFF = 0, + COPY_DATA_ON, + COPY_DATA_FORCE +} CopyData; + /* * Structure to hold a bitmap representing the user-provided CREATE/ALTER * SUBSCRIPTION command options and the parsed/default values of each of them. @@ -81,7 +91,7 @@ typedef struct SubOpts bool connect; bool enabled; bool create_slot; - bool copy_data; + CopyData copy_data; bool refresh; bool binary; bool streaming; @@ -92,10 +102,67 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void check_pub_table_subscribed(WalReceiverConn *wrconn, + List *publications, CopyData copydata, + char *origin, Oid subid); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +/* + * Validate the value specified for copy_data parameter. + */ +static CopyData +DefGetCopyData(DefElem *def) +{ + /* + * If no parameter given, assume "true" is meant. + */ + if (def->arg == NULL) + return COPY_DATA_ON; + + /* + * Allow 0, 1, "true", "false", "on", "off" or "force". + */ + switch (nodeTag(def->arg)) + { + case T_Integer: + switch (intVal(def->arg)) + { + case 0: + return COPY_DATA_OFF; + case 1: + return COPY_DATA_ON; + default: + /* otherwise, error out below */ + break; + } + break; + default: + { + char *sval = defGetString(def); + + /* + * The set of strings accepted here should match up with the + * grammar's opt_boolean_or_string production. + */ + if (pg_strcasecmp(sval, "false") == 0 || + pg_strcasecmp(sval, "off") == 0) + return COPY_DATA_OFF; + if (pg_strcasecmp(sval, "true") == 0 || + pg_strcasecmp(sval, "on") == 0) + return COPY_DATA_ON; + if (pg_strcasecmp(sval, "force") == 0) + return COPY_DATA_FORCE; + } + break; + } + + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%s requires a boolean or \"force\"", def->defname)); + return COPY_DATA_OFF; /* keep compiler quiet */ +} /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -128,7 +195,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (IsSet(supported_opts, SUBOPT_CREATE_SLOT)) opts->create_slot = true; if (IsSet(supported_opts, SUBOPT_COPY_DATA)) - opts->copy_data = true; + opts->copy_data = COPY_DATA_ON; if (IsSet(supported_opts, SUBOPT_REFRESH)) opts->refresh = true; if (IsSet(supported_opts, SUBOPT_BINARY)) @@ -196,7 +263,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_COPY_DATA; - opts->copy_data = defGetBoolean(defel); + opts->copy_data = DefGetCopyData(defel); } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && strcmp(defel->defname, "synchronous_commit") == 0) @@ -352,12 +419,12 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", - "connect = false", "copy_data = true"))); + "connect = false", "copy_data = true/force"))); /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; - opts->copy_data = false; + opts->copy_data = COPY_DATA_OFF; } /* @@ -680,6 +747,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { check_publications(wrconn, publications); + check_pub_table_subscribed(wrconn, publications, opts.copy_data, + opts.origin, InvalidOid); /* * Set sync state based on if we were asked to do data copy or @@ -775,7 +844,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data, +AlterSubscription_refresh(Subscription *sub, CopyData copy_data, List *validate_publications) { char *err; @@ -810,11 +879,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); + check_pub_table_subscribed(wrconn, sub->publications, copy_data, + sub->origin, sub->oid); + /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid); + subrel_states = GetSubscriptionRelations(sub->oid, ALL_STATE); /* * Build qsorted array of local table oids for faster lookup. This can @@ -1494,7 +1566,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. */ - rstates = GetSubscriptionNotReadyRelations(subid); + rstates = GetSubscriptionRelations(subid, NOT_READY_STATE); foreach(lc, rstates) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -1787,6 +1859,132 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) table_close(rel, RowExclusiveLock); } +/* + * Check and throw an error if the publisher has subscribed to the same table + * from some other publisher. This check is required only if copydata is ON and + * the origin is local for ALTER SUBSCRIPTION ... REFRESH statement. This check + * need not be peformed on the tables that are in ready state as incremental + * sync for ready tables will happen through WAL and the origin of the data can + * be identified from the WAL records. + */ +static void +check_pub_table_subscribed(WalReceiverConn *wrconn, List *publications, + CopyData copydata, char *origin, Oid subid) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + List *subreadyrels = NIL; + ListCell *lc; + + if (copydata != COPY_DATA_ON || !origin || (strcmp(origin, "local") != 0)) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT N.nspname AS schemaname,\n" + " C.relname AS tablename\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_tables(P.pubname) GPT\n" + " LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPT.relid AND PS.srrelid IS NOT NULL AND P.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* + * Get the ready relations for the subscription. The subid will be valid + * only for ALTER SUBSCRIPTION ... REFRESH because there will be no + * relations in ready state while the subscription is created. + */ + if (OidIsValid(subid)) + subreadyrels = GetSubscriptionRelations(subid, READY_STATE); + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + bool isreadytable = false; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + foreach(lc, subreadyrels) + { + SubscriptionRelState *relstate; + + relstate = (SubscriptionRelState *) lfirst(lc); + if ((strcmp(nspname, relstate->nspname) == 0) && + (strcmp(relname, relstate->relname) == 0)) + { + isreadytable = true; + break; + } + } + + ExecClearTuple(slot); + + /* + * No need to throw an error for the tables that are in ready state, + * as the walsender will send the changes from WAL in case of tables + * in ready state. + */ + if (isreadytable) + { + pfree(nspname); + pfree(relname); + continue; + } + + /* + * Throw an error if the publisher has subscribed to the same table + * from some other publisher. We cannot differentiate between the + * local and non-local data that is present in the HEAP during the + * initial sync. Identification of local data can be done only from + * the WAL by using the origin id. + * + * XXX: For simplicity, we don't check whether the table has any data + * or not. If the table doesn't have any data then we don't need to + * distinguish between local and non-local data so we can avoid + * throwing error in that case. + */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("table: \"%s.%s\" might have replicated data in the publisher", + nspname, relname), + errdetail("CREATE/ALTER SUBSCRIPTION with origin = local and copy_data = on is not allowed when the publisher might have replicated data."), + errhint("Use CREATE/ALTER SUBSCRIPTION with copy_data = off/force.")); + } + + foreach(lc, subreadyrels) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + + pfree(relstate->nspname); + pfree(relstate->relname); + pfree(relstate); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 670c6fcada..61f097f8f8 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1479,7 +1479,8 @@ FetchTableStates(bool *started_tx) } /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + rstates = GetSubscriptionRelations(MySubscription->oid, + NOT_READY_STATE); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9df99c3418..06176d7fe7 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -78,8 +78,17 @@ typedef struct SubscriptionRelState Oid relid; XLogRecPtr lsn; char state; + char *relname; + char *nspname; } SubscriptionRelState; +typedef enum SubRelStateType +{ + ALL_STATE, + READY_STATE, + NOT_READY_STATE +} SubRelStateType; + extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, @@ -88,7 +97,7 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); -extern List *GetSubscriptionRelations(Oid subid); -extern List *GetSubscriptionNotReadyRelations(Oid subid); +extern List *GetSubscriptionRelations(Oid subid, + SubRelStateType subreltype); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index d46c3f8d6a..49c87240a6 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -46,8 +46,18 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU ERROR: must be superuser to create subscriptions SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data); +ERROR: connect = false and copy_data = true/force are mutually exclusive options CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); -ERROR: connect = false and copy_data = true are mutually exclusive options +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on); +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1); +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force); +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 2); +ERROR: copy_data requires a boolean or "force" CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true); ERROR: connect = false and enabled = true are mutually exclusive options CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true); @@ -93,6 +103,16 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); DROP SUBSCRIPTION regress_testsub3; DROP SUBSCRIPTION regress_testsub4; +-- ok - valid copy_data options +CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; +DROP SUBSCRIPTION regress_testsub5; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index fff63ce538..597d56f561 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -39,7 +39,12 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 2); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = true); @@ -66,6 +71,15 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); DROP SUBSCRIPTION regress_testsub3; DROP SUBSCRIPTION regress_testsub4; +-- ok - valid copy_data options +CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false); +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off); +CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0); + +DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; +DROP SUBSCRIPTION regress_testsub5; + -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; diff --git a/src/test/subscription/t/032_origin.pl b/src/test/subscription/t/032_origin.pl index 8cefb3b1fd..0348a87682 100644 --- a/src/test/subscription/t/032_origin.pl +++ b/src/test/subscription/t/032_origin.pl @@ -1,13 +1,124 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Test the CREATE SUBSCRIPTION 'origin' parameter. +# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with +# 'copy_data' parameter. use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +my $result; +my $stdout; +my $stderr; + +my $subname_AB = 'tap_sub_A_B'; +my $subname_AC = 'tap_sub_A_C'; +my $subname_BA = 'tap_sub_B_A'; +my $subname_BC = 'tap_sub_B_C'; +my $subname_CA = 'tap_sub_C_A'; +my $subname_CB = 'tap_sub_C_B'; + +# Detach node_C from the node-group of (node_A, node_B, node_C) and clean the +# table contents from all nodes. +sub detach_node_clean_table_data +{ + my ($node_A, $node_B, $node_C) = @_; + $node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A_C"); + $node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C"); + $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_A"); + $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_B"); + + $result = + $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(1), 'check subscription was dropped on subscriber'); + + $result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(1), 'check subscription was dropped on subscriber'); + + $result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(0), 'check subscription was dropped on subscriber'); + + $result = $node_A->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(1), 'check replication slot was dropped on publisher'); + + $result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(1), 'check replication slot was dropped on publisher'); + + $result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(0), 'check replication slot was dropped on publisher'); + + $node_A->safe_psql('postgres', "TRUNCATE tab_full"); + $node_B->safe_psql('postgres', "TRUNCATE tab_full"); + $node_C->safe_psql('postgres', "TRUNCATE tab_full"); +} + +# Subroutine to verify the data is replicated successfully. +sub verify_data +{ + my ($node_A, $node_B, $node_C, $expect) = @_; + + $node_A->wait_for_catchup($subname_BA); + $node_A->wait_for_catchup($subname_CA); + $node_B->wait_for_catchup($subname_AB); + $node_B->wait_for_catchup($subname_CB); + $node_C->wait_for_catchup($subname_AC); + $node_C->wait_for_catchup($subname_BC); + + # check that data is replicated to all the nodes + $result = + $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); + is($result, qq($expect), + 'Data is replicated as expected' + ); + + $result = + $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); + is($result, qq($expect), + 'Data is replicated as expected' + ); + + $result = + $node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); + is($result, qq($expect), + 'Data is replicated as expected' + ); +} + +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + +# Subroutine to create subscription and wait until the initial sync is +# completed. Subroutine expects subscriber node, publisher node, subscription +# name, destination connection string, publication name and the subscription +# parameters to be passed as input parameters. +sub create_subscription +{ + my ($node_subscriber, $node_publisher, $sub_name, $node_connstr, + $pub_name, $sub_params) + = @_; + + # Application_name is always assigned the same value as the subscription + # name. + $node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $sub_name + CONNECTION '$node_connstr application_name=$sub_name' + PUBLICATION $pub_name + WITH ($sub_params)"); + $node_publisher->wait_for_catchup($sub_name); + + # also wait for initial table sync to finish + $node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +} + ############################################################################### # Setup a bidirectional logical replication between node_A & node_B ############################################################################### @@ -33,42 +144,18 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); -my $appname_B1 = 'tap_sub_B1'; -$node_B->safe_psql( - 'postgres', " - CREATE SUBSCRIPTION tap_sub_B1 - CONNECTION '$node_A_connstr application_name=$appname_B1' - PUBLICATION tap_pub_A - WITH (origin = local)"); +create_subscription($node_B, $node_A, $subname_BA, $node_A_connstr, + 'tap_pub_A', 'copy_data = on, origin = local'); # node_B (pub) -> node_A (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); -my $appname_A = 'tap_sub_A'; -$node_A->safe_psql( - 'postgres', " - CREATE SUBSCRIPTION tap_sub_A - CONNECTION '$node_B_connstr application_name=$appname_A' - PUBLICATION tap_pub_B - WITH (origin = local, copy_data = off)"); - -# Wait for subscribers to finish initialization -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); - -# Also wait for initial table sync to finish -my $synced_query = - "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; -$node_A->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; -$node_B->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +create_subscription($node_A, $node_B, $subname_AB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = local'); is(1, 1, 'Bidirectional replication setup is complete'); -my $result; - ############################################################################### # Check that bidirectional logical replication setup does not cause infinite # recursive insertion. @@ -78,8 +165,8 @@ my $result; $node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); $node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); # check that transaction was committed on subscriber(s) $result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); @@ -93,8 +180,8 @@ is($result, qq(11 $node_A->safe_psql('postgres', "DELETE FROM tab_full;"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); ############################################################################### # Check that remote data of node_B (that originated from node_C) is not @@ -118,25 +205,14 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab_full"); - -my $appname_B2 = 'tap_sub_B2'; -$node_B->safe_psql( - 'postgres', " - CREATE SUBSCRIPTION tap_sub_B2 - CONNECTION '$node_C_connstr application_name=$appname_B2' - PUBLICATION tap_pub_C - WITH (origin = local)"); - -$node_C->wait_for_catchup($appname_B2); - -$node_B->poll_query_until('postgres', $synced_query) - or die "Timed out while waiting for subscriber to synchronize data"; +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = on, origin = local'); # insert a record $node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);"); -$node_C->wait_for_catchup($appname_B2); -$node_B->wait_for_catchup($appname_A); +$node_C->wait_for_catchup($subname_BC); +$node_B->wait_for_catchup($subname_AB); $result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); is($result, qq(13), 'The node_C data replicated to node_B' @@ -147,6 +223,175 @@ $result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); is($result, qq(), 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is local' ); +# clear the operations done by this test +$node_B->safe_psql( + 'postgres', " + DROP SUBSCRIPTION $subname_BC"); +$node_C->safe_psql( + 'postgres', " + DELETE FROM tab_full"); +$node_B->safe_psql( + 'postgres', " + DELETE FROM tab_full where a = 13"); + +############################################################################### +# Specify origin as 'local' which indicates that the publisher should only +# replicate the changes that are generated locally from node_B, but in +# this case since the node_B is also subscribing data from node_A, node_B can +# have remotely originated data from node_A. We throw an error, in this case, +# to draw attention to there being possible remote data. +############################################################################### +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A2 + CONNECTION '$node_B_connstr application_name=$subname_AB' + PUBLICATION tap_pub_B + WITH (origin = local, copy_data = on)"); +like( + $stderr, + qr/ERROR: ( [A-Z0-9]+:)? table: "public.tab_full" might have replicated data in the publisher/, + "Create subscription with origin and copy_data having replicated table in publisher" +); + +# Creating subscription with origin as local and copy_data as force should be +# successful when the publisher has replicated data +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A2 + CONNECTION '$node_B_connstr application_name=$subname_AC' + PUBLICATION tap_pub_B + WITH (origin = local, copy_data = force)"); + +$node_B->wait_for_catchup($subname_AC); + +# also wait for initial table sync to finish +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Alter subscription ... refresh publication should be successful in this case +$node_A->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_A2 REFRESH PUBLICATION"); + +$node_A->safe_psql( + 'postgres', " + DROP SUBSCRIPTION tap_sub_A2"); + +############################################################################### +# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional +# replication setup when the existing nodes (node_A & node_B) has pre-existing +# data and the new node (node_C) does not have any data. +############################################################################### +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +$result = + $node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = local'); +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = local'); +create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr, + 'tap_pub_A', 'copy_data = force, origin = local'); +create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = local'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (22);"); +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (32);"); + +verify_data($node_A, $node_B, $node_C, '12 +22 +32'); + +detach_node_clean_table_data($node_A, $node_B, $node_C); + +############################################################################### +# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional +# replication setup when the existing nodes (node_A & node_B) and the new node +# (node_C) does not have any data. +############################################################################### +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +$result = $node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = local'); +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = local'); +create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr, + 'tap_pub_A', 'copy_data = off, origin = local'); +create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = local'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (23);"); +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (33);"); + +verify_data($node_A, $node_B, $node_C, '13 +23 +33'); + +detach_node_clean_table_data($node_A, $node_B, $node_C); + +############################################################################### +# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional +# replication setup when the existing nodes (node_A & node_B) has no data and +# the new node (node_C) some pre-existing data. +############################################################################### +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (34);"); + +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is( $result, qq(), 'Check existing data'); + +$result = $node_C->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(34), 'Check existing data'); + +create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr, + 'tap_pub_C', 'copy_data = on, origin = local'); +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = on, origin = local'); + +$node_C->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete');"); + +$node_C->safe_psql('postgres', "TRUNCATE tab_full"); + +# include truncates now +$node_C->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete,truncate');" +); + +create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr, + 'tap_pub_A', 'copy_data = force, origin = local'); +create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = local'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (15);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (25);"); +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (35);"); + +verify_data($node_A, $node_B, $node_C, '15 +25 +34 +35'); + # shutdown $node_B->stop('fast'); $node_A->stop('fast'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 4fb746930a..64a14bb3c8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -456,6 +456,7 @@ ConvProcInfo ConversionLocation ConvertRowtypeExpr CookedConstraint +CopyData CopyDest CopyFormatOptions CopyFromState @@ -2621,6 +2622,7 @@ SubOpts SubPlan SubPlanState SubRemoveRels +SubRelStateType SubTransactionId SubXactCallback SubXactCallbackItem -- 2.32.0