From 5be4b36317f8fd6cecd2c5f9d02e4c5f87026811 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Thu, 31 Mar 2022 17:08:35 +0530 Subject: [PATCH v6 1/2] Skip replication of non local data. Add an option publish_local_only which will subscribe only to the locally generated data in the publisher node. If subscriber is created with this option, publisher will skip publishing the data that was subscribed from other nodes. It can be created using following syntax: ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (publish_local_only = on); --- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 12 ++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 4 +- src/backend/commands/subscriptioncmds.c | 26 +++- .../libpqwalreceiver/libpqwalreceiver.c | 4 + src/backend/replication/logical/worker.c | 2 + src/backend/replication/pgoutput/pgoutput.c | 23 +++ src/bin/pg_dump/pg_dump.c | 13 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 4 +- src/include/catalog/pg_subscription.h | 3 + src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 139 ++++++++++-------- src/test/regress/sql/subscription.sql | 7 + src/test/subscription/t/032_circular.pl | 108 ++++++++++++++ 18 files changed, 289 insertions(+), 73 deletions(-) create mode 100644 src/test/subscription/t/032_circular.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fe13ab9a2d..8f667c916e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -207,8 +207,9 @@ ALTER SUBSCRIPTION name RENAME TO < information. The parameters that can be altered are slot_name, synchronous_commit, - binary, streaming, and - disable_on_error. + binary, streaming, + disable_on_error, and + subscribe_local_only. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index ebf7db57c5..36dfbc888b 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -152,6 +152,18 @@ CREATE SUBSCRIPTION subscription_name + + subscribe_local_only (boolean) + + + Specifies whether the subscription will request the publisher to send + locally generated changes or both the locally generated changes and + the replicated changes that was generated from other nodes. The + default is false. + + + + slot_name (string) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 0ff0982f7b..8e0d0ccced 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; sub->skiplsn = subform->subskiplsn; + sub->only_local = subform->sublocal; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 9eaa51df29..673ef29301 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1286,8 +1286,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, - substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname, - subsynccommit, subpublications) + substream, subtwophasestate, subdisableonerr, sublocal, + subskiplsn, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 85dacbe93d..55c48af2c9 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -64,6 +64,7 @@ #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 +#define SUBOPT_LOCAL_ONLY 0x00001000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -86,6 +87,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + bool only_local; XLogRecPtr lsn; } SubOpts; @@ -138,6 +140,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->twophase = false; if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) opts->disableonerr = false; + if (IsSet(supported_opts, SUBOPT_LOCAL_ONLY)) + opts->only_local = false; /* Parse options */ foreach(lc, stmt_options) @@ -236,6 +240,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_LOCAL_ONLY) && + strcmp(defel->defname, "subscribe_local_only") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_LOCAL_ONLY)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_LOCAL_ONLY; + opts->only_local = defGetBoolean(defel); + } else if (strcmp(defel->defname, "two_phase") == 0) { /* @@ -532,7 +545,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR); + SUBOPT_DISABLE_ON_ERR | SUBOPT_LOCAL_ONLY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -602,6 +615,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); + values[Anum_pg_subscription_sublocal - 1] = BoolGetDatum(opts.only_local); values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : @@ -1019,7 +1033,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_LOCAL_ONLY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1076,6 +1091,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_LOCAL_ONLY)) + { + values[Anum_pg_subscription_sublocal - 1] = + BoolGetDatum(opts.only_local); + replaces[Anum_pg_subscription_sublocal - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a..b36d23d419 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -453,6 +453,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + if (options->proto.logical.only_local && + PQserverVersion(conn->streamConn) >= 150000) + appendStringInfoString(&cmd, ", subscribe_local_only 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f3868b3e1f..8cd1622419 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3106,6 +3106,7 @@ maybe_reread_subscription(void) newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || newsub->owner != MySubscription->owner || + newsub->only_local != MySubscription->only_local || !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, @@ -3786,6 +3787,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + options.proto.logical.only_local = MySubscription->only_local; if (!am_tablesync_worker()) { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 20d0b1e125..298ceb87d9 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -294,12 +294,15 @@ parse_output_parameters(List *options, PGOutputData *data) bool sequences_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool subscribe_local_only_option_given = false; + data->binary = false; data->streaming = false; data->messages = false; data->two_phase = false; data->sequences = true; + data->only_local = false; foreach(lc, options) { @@ -398,6 +401,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "subscribe_local_only") == 0) + { + if (subscribe_local_only_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + subscribe_local_only_option_given = true; + + data->only_local = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -496,6 +509,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, else ctx->twophase_opt_given = true; + if (data->only_local && data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support subscribe_local_only, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM))); + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -1773,6 +1792,10 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; return false; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 535b160165..4160642134 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4407,6 +4407,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_sublocal; int i, ntups; @@ -4451,11 +4452,13 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, " s.subtwophasestate,\n" - " s.subdisableonerr\n"); + " s.subdisableonerr,\n" + " s.sublocal\n"); else appendPQExpBuffer(query, " '%c' AS subtwophasestate,\n" - " false AS subdisableonerr\n", + " false AS subdisableonerr,\n" + " false AS s.sublocal\n", LOGICALREP_TWOPHASE_STATE_DISABLED); appendPQExpBufferStr(query, @@ -4483,6 +4486,7 @@ getSubscriptions(Archive *fout) i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); + i_sublocal = PQfnumber(res, "sublocal"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4512,6 +4516,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); + subinfo[i].sublocal = + pg_strdup(PQgetvalue(res, i, i_sublocal)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4585,6 +4591,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subdisableonerr, "t") == 0) appendPQExpBufferStr(query, ", disable_on_error = true"); + if (strcmp(subinfo->sublocal, "f") != 0) + appendPQExpBufferStr(query, ", subscribe_local_only = on"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 688093c55e..d40b54e081 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -664,6 +664,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *subsynccommit; char *subpublications; + char *sublocal; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 4dddf08789..200c08d75e 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6280,7 +6280,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6318,9 +6318,11 @@ describeSubscriptions(const char *pattern, bool verbose) if (pset.sversion >= 150000) appendPQExpBuffer(&buf, ", subtwophasestate AS \"%s\"\n" - ", subdisableonerr AS \"%s\"\n", + ", subdisableonerr AS \"%s\"\n" + ", sublocal AS \"%s\"\n", gettext_noop("Two phase commit"), - gettext_noop("Disable on error")); + gettext_noop("Disable on error"), + gettext_noop("Only local")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 3f9dfffd57..9258b8dd01 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1882,7 +1882,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); + COMPLETE_WITH("binary", "slot_name", "streaming", "subscribe_local_only", "synchronous_commit", "disable_on_error"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3167,7 +3167,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "enabled", "slot_name", "streaming", + "enabled", "slot_name", "streaming", "subscribe_local_only", "synchronous_commit", "two_phase", "disable_on_error"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 599c2e4422..38031bd587 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -66,6 +66,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + bool sublocal; /* skip copying of remote origin data */ + char subtwophasestate; /* Stream two-phase transactions */ bool subdisableonerr; /* True if a worker error should cause the @@ -109,6 +111,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + bool only_local; /* Skip copying of remote orgin data */ char twophasestate; /* Allow streaming two-phase transactions */ bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index f4e9f35d09..3407a1eff6 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -30,6 +30,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool sequences; + bool only_local; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 92f73a55b8..65c83977a3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,6 +183,7 @@ typedef struct bool streaming; /* Streaming of large transactions */ bool twophase; /* Streaming of two-phase transactions at * prepare time */ + bool only_local; /* publish only locally generated data */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 7fcfad1591..17b991b1fc 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -70,16 +70,35 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +-- ok - with subscribe_local_only = true +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, subscribe_local_only = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub4 SET (subscribe_local_only = false); +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -96,10 +115,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -108,10 +127,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -143,10 +162,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -179,19 +198,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -202,19 +221,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -229,10 +248,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more then once @@ -247,10 +266,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -284,10 +303,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -296,10 +315,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -308,10 +327,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -323,18 +342,18 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 74c38ead5d..badc00b7c7 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -54,7 +54,14 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU ALTER SUBSCRIPTION regress_testsub3 ENABLE; ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; +-- ok - with subscribe_local_only = true +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, subscribe_local_only = true); +\dRs+ regress_testsub4 +ALTER SUBSCRIPTION regress_testsub4 SET (subscribe_local_only = false); +\dRs+ regress_testsub4 + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; diff --git a/src/test/subscription/t/032_circular.pl b/src/test/subscription/t/032_circular.pl new file mode 100644 index 0000000000..30e14bf727 --- /dev/null +++ b/src/test/subscription/t/032_circular.pl @@ -0,0 +1,108 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Test circular logical replication. +# +# Includes tests for circulation replication using subscribe_local_only option. +# +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +################################################### +# Setup a circulation replication of pub/sub nodes. +# node_A -> node_B -> node_A +################################################### + +# Initialize nodes +# node_A +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf('postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_A->start; +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf('postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_B->start; + +# Create tables on node_A +$node_A->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Create the same tables on node_B +$node_B->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); +my $appname_B = 'tap_sub_B'; +$node_B->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_B + CONNECTION '$node_A_connstr application_name=$appname_B' + PUBLICATION tap_pub_A + WITH (subscribe_local_only = on)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); +my $appname_A = 'tap_sub_A'; +$node_A->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_A + CONNECTION '$node_B_connstr application_name=$appname_A' + PUBLICATION tap_pub_B + WITH (subscribe_local_only = on, copy_data = off)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_A); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is(1,1, "Circular replication setup is complete"); + +my $result; + +########################################################################## +# check that circular replication setup does not cause infinite recursive +# insertion. +########################################################################## + +# insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_A); + +# check that transaction was committed on subscriber(s) +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in circular replication setup'); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in circular replication setup'); + +# shutdown +$node_B->stop('fast'); +$node_A->stop('fast'); + +done_testing(); -- 2.32.0