From 834702b565c2476b72d1366004f188b943874e67 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 5 Feb 2025 16:40:18 +0800 Subject: [PATCH v28 3/4] Add a retain_conflict_info option to subscriptions This patch adds a subscription option allowing users to specify whether information on the subscriber, which is useful for detecting update_deleted conflicts, should be retained. The default setting is false. If set to true, the detection of update_deleted will be enabled, and an additional replication slot named pg_conflict_detection will be created on the subscriber to prevent conflict information from being removed. Note that if multiple subscriptions on one node enable this option, only one replication slot will be created. The logical launcher will create and maintain a replication slot named pg_conflict_detection only if any local subscription has the retain_conflict_info option enabled. Enabling retain_conflict_info is prohibited if the publisher is currently in recovery mode (operating as a standby server). Bump catalog version --- doc/src/sgml/catalogs.sgml | 12 ++ doc/src/sgml/logical-replication.sgml | 3 +- doc/src/sgml/ref/alter_subscription.sgml | 12 +- doc/src/sgml/ref/create_subscription.sgml | 29 ++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 151 +++++++++++++++-- src/backend/replication/logical/launcher.c | 52 ++++-- src/backend/replication/logical/worker.c | 16 +- src/bin/pg_dump/pg_dump.c | 17 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_upgrade/check.c | 27 +++- src/bin/pg_upgrade/info.c | 25 ++- src/bin/pg_upgrade/pg_upgrade.h | 4 +- src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.in.c | 10 +- src/include/catalog/pg_subscription.h | 5 + src/test/regress/expected/subscription.out | 178 ++++++++++++--------- src/test/regress/sql/subscription.sql | 16 ++ 19 files changed, 441 insertions(+), 127 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index ee59a7e15d0..00fc5dac170 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8067,6 +8067,18 @@ SCRAM-SHA-256$<iteration count>:&l + + + subretainconflictinfo bool + + + If true, the detection of is + enabled and the information (e.g., dead tuples, commit timestamps, and + origins) on the subscriber that is still useful for conflict detection + is retained. + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 3d18e507bbc..632c102c2ae 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2387,7 +2387,8 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER max_replication_slots must be set to at least the number of subscriptions expected to connect, - plus some reserve for table synchronization. + plus some reserve for table synchronization and one if + retain_conflict_info is enabled. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d007f..e5415c3150d 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -235,8 +235,9 @@ ALTER SUBSCRIPTION name RENAME TO < password_required, run_as_owner, origin, - failover, and - two_phase. + failover, + two_phase, and + retain_conflict_info. Only a superuser can set password_required = false. @@ -285,6 +286,13 @@ ALTER SUBSCRIPTION name RENAME TO < option is changed from true to false, the publisher will replicate the transactions again when they are committed. + + + If the retain_conflict_info + option is altered to false and no other subscription + has this option enabled, the additional replication slot that was created + to retain conflict information will be dropped. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e95e0cd2715..61374b74d1c 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -437,6 +437,35 @@ CREATE SUBSCRIPTION subscription_name + + + retain_conflict_info (boolean) + + + Specifies whether the information (e.g., dead tuples, commit + timestamps, and origins) on the subscriber that is still useful for + conflict detection is retained. The default is + false. If set to true, the detection of + is enabled, and an + additional replication slot named + pg_conflict_detection will be + created on the subscriber to prevent the conflict information from + being removed. + + + + Note that the information useful for conflict detection is retained + only after the creation of the additional slot. You can verify the + existence of this slot by querying pg_replication_slots.conflicting + And even if multiple subscriptions on one node enable this option, + only one replication slot will be created. + + + + This option cannot be enabled if the publisher is also a physical standby. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032413e..39cfae43d6f 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->retainconflictinfo = subform->subretainconflictinfo; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index eff0990957e..8afbba16f8b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1373,7 +1373,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subretainconflictinfo, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3cf1e055394..51bd52e48a3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -71,8 +72,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_RETAIN_CONFLICT_INFO 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +100,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool retainconflictinfo; char *origin; XLogRecPtr lsn; } SubOpts; @@ -107,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname); +static void check_conflict_info_retaintion(WalReceiverConn *wrconn, + bool retain_conflict_info); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + opts->retainconflictinfo = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_RETAIN_CONFLICT_INFO) && + strcmp(defel->defname, "retain_conflict_info") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_RETAIN_CONFLICT_INFO; + opts->retainconflictinfo = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -608,6 +625,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, errmsg("password_required=false is superuser-only"), errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser."))); + if (opts.retainconflictinfo && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("information for detecting conflicts cannot be fully retained when \"%s\" is disabled", + "track_commit_timestamp")); + /* * If built with appropriate switch, whine when regression-testing * conventions for subscription names are violated. @@ -670,6 +693,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subretainconflictinfo - 1] = + BoolGetDatum(opts.retainconflictinfo); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -724,6 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); + check_conflict_info_retaintion(wrconn, opts.retainconflictinfo); + /* * Set sync state based on if we were asked to do data copy or * not. @@ -1110,6 +1137,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_tuple = false; bool update_failover = false; bool update_two_phase = false; + bool retain_conflict_info = false; Subscription *sub; Form_pg_subscription form; bits32 supported_opts; @@ -1165,7 +1193,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1325,6 +1353,29 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + { + values[Anum_pg_subscription_subretainconflictinfo - 1] = + BoolGetDatum(opts.retainconflictinfo); + replaces[Anum_pg_subscription_subretainconflictinfo - 1] = true; + + if (opts.retainconflictinfo && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("information for detecting conflicts cannot be fully retained when \"%s\" is disabled", + "track_commit_timestamp")); + + /* + * Notify the launcher to manage the replication slot for + * conflict detection. This ensures that replication slot + * is efficiently handled (created, updated, or dropped) + * in response to any configuration changes. + */ + ApplyLauncherWakeupAtCommit(); + + retain_conflict_info = opts.retainconflictinfo; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -1355,6 +1406,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, ApplyLauncherWakeupAtCommit(); update_tuple = true; + + /* + * The subscription might be initially created with + * connect=false and retain_conflict_info=true, meaning the + * remote server's status may not be checked. Ensure this + * check is conducted now. + */ + retain_conflict_info = sub->retainconflictinfo; break; } @@ -1369,6 +1428,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, CStringGetTextDatum(stmt->conninfo); replaces[Anum_pg_subscription_subconninfo - 1] = true; update_tuple = true; + + /* + * Since the remote server configuration might have changed, + * perform a check to ensure it permits enabling + * retain_conflict_info. + */ + retain_conflict_info = sub->retainconflictinfo; break; case ALTER_SUBSCRIPTION_SET_PUBLICATION: @@ -1568,14 +1634,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering the slot, if - * needed. + * Try to acquire the connection necessary either for modifying the slot + * or for checking if the remote server permits enabling + * retain_conflict_info. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (update_failover || update_two_phase) + if (update_failover || update_two_phase || retain_conflict_info) { bool must_use_password; char *err; @@ -1584,10 +1651,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ + /* + * Try to connect to the publisher, using the new connection string if + * available. + */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, - sub->name, &err); + wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo, + true, true, must_use_password, sub->name, + &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1596,9 +1667,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, - update_failover ? &opts.failover : NULL, - update_two_phase ? &opts.twophase : NULL); + check_conflict_info_retaintion(wrconn, retain_conflict_info); + + if (update_failover || update_two_phase) + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { @@ -2193,6 +2267,57 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, walrcv_clear_result(res); } +/* + * Check if the publisher's status permits enabling retain_conflict_info. + * + * Enabling retain_conflict_info is not allowed if the publisher's version is + * prior to PG18 or if the publisher is in recovery (operating as a standby + * server). + * + * Refer to the comments atop maybe_advance_nonremovable_xid() for detailed + * reasons. + */ +static void +check_conflict_info_retaintion(WalReceiverConn *wrconn, bool retain_conflict_info) +{ + WalRcvExecResult *res; + Oid RecoveryRow[1] = {BOOLOID}; + TupleTableSlot *slot; + bool isnull; + bool remote_in_recovery; + + if (!retain_conflict_info) + return; + + if (walrcv_server_version(wrconn) < 18000) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable retain_conflict_info if the publisher is running a version earlier than PostgreSQL 18.")); + + res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not obtain recovery progress from the publisher: %s", + res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + elog(ERROR, "failed to fetch tuple for the recovery progress"); + + remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull)); + + if (remote_in_recovery) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable retain_conflict_info if the publisher is in recovery.")); + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 2ef6594b5ce..c176bc0d87d 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -163,6 +163,7 @@ get_subscription_list(void) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); + sub->retainconflictinfo = subform->subretainconflictinfo; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -1169,6 +1170,7 @@ ApplyLauncherMain(Datum main_arg) MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; bool can_advance_xmin = true; + bool retain_conflict_info = false; FullTransactionId xmin = InvalidFullTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1190,16 +1192,26 @@ ApplyLauncherMain(Datum main_arg) long elapsed; /* - * Create the conflict slot before starting the worker to prevent - * it from unnecessarily maintaining its oldest_nonremovable_xid. + * Create a replication slot to retain information (e.g., dead + * tuples, commit timestamps, and origins) useful for conflict + * detection if any subscription requests it. Only advance xmin + * when all such subscriptions are enabled. */ - create_conflict_slot_if_not_exists(); + if (sub->retainconflictinfo) + { + retain_conflict_info = true; + can_advance_xmin &= sub->enabled; + + /* + * Create the conflict slot before starting the worker to + * prevent it from unnecessarily maintaining its + * oldest_nonremovable_xid. + */ + create_conflict_slot_if_not_exists(); + } if (!sub->enabled) - { - can_advance_xmin = false; continue; - } LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); @@ -1209,10 +1221,11 @@ ApplyLauncherMain(Datum main_arg) { /* * Collect non-removable transaction IDs from all apply - * workers to determine the xmin for advancing the replication - * slot used in conflict detection. + * workers that enabled retain_conflict_info. This determines + * the new xmin for advancing the replication slot used in + * conflict detection. */ - if (can_advance_xmin) + if (sub->retainconflictinfo && can_advance_xmin) { FullTransactionId nonremovable_xid; @@ -1238,7 +1251,8 @@ ApplyLauncherMain(Datum main_arg) * The worker has not yet started, so there is no valid * non-removable transaction ID available for advancement. */ - can_advance_xmin = false; + if (sub->retainconflictinfo) + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1276,7 +1290,7 @@ ApplyLauncherMain(Datum main_arg) * detection if needed, and update the sleep time before the next * attempt. */ - if (sublist) + if (retain_conflict_info) { bool updated = false; @@ -1290,12 +1304,14 @@ ApplyLauncherMain(Datum main_arg) } /* - * Drop the slot if we're no longer retaining dead tuples. + * Drop the slot if we're no longer retaining information useful for + * conflict detection */ else if (slot_maybe_exist) { drop_conflict_slot_if_exists(); slot_maybe_exist = false; + slot_update_wait_time = MIN_NAPTIME_PER_SLOT_UPDATE; } /* Switch back to original memory context. */ @@ -1326,7 +1342,7 @@ ApplyLauncherMain(Datum main_arg) } /* - * Create and acquire the replication slot used to retain dead tuples for + * Create and acquire the replication slot used to retain information for * conflict detection, if not yet. */ static void @@ -1368,7 +1384,7 @@ create_conflict_slot_if_not_exists(void) /* * Attempt to advance the xmin value of the replication slot used to retain - * dead tuples for conflict detection. + * information useful for conflict detection. */ static bool advance_conflict_slot_xmin(FullTransactionId new_xmin) @@ -1377,7 +1393,9 @@ advance_conflict_slot_xmin(FullTransactionId new_xmin) FullTransactionId next_full_xid; Assert(MyReplicationSlot); - Assert(FullTransactionIdIsValid(new_xmin)); + + if (!FullTransactionIdIsValid(new_xmin)) + return false; next_full_xid = ReadNextFullTransactionId(); @@ -1445,8 +1463,8 @@ compute_slot_update_naptime(bool slot_updated, long *sleep_time) } /* - * Drop the replication slot used to retain dead tuples for conflict detection, - * if it exists. + * Drop the replication slot used to retain information useful for conflict + * detection, if it exists. */ static void drop_conflict_slot_if_exists(void) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 74e39c231ed..923df059ff5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4111,6 +4111,10 @@ static void maybe_advance_nonremovable_xid(RetainConflictInfoData *data, bool status_received) { + /* Exit early if retaining conflict information is not required */ + if (!MySubscription->retainconflictinfo) + return; + /* * It is sufficient to manage non-removable transaction ID for a * subscription by the main apply worker to detect update_deleted conflict @@ -4520,6 +4524,15 @@ maybe_reread_subscription(void) * worker won't restart if the streaming option's value is changed from * 'parallel' to any other value or the server decides not to stream the * in-progress transaction. + * + * Additionally, exit if the retain_conflict_info option is disabled. This + * is necessary to reset the oldest non-removable transaction ID and the + * state of advancement. Direct resetting could not work without a + * restart, as the worker might be in an intermediate state (e.g., waiting + * publisher status). If the option is re-enabled before the old publisher + * status is received, it could incorrectly use the old status in a new + * transaction ID advancement cycle, leading to premature advancement of + * the non-removable transaction ID. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || strcmp(newsub->name, MySubscription->name) != 0 || @@ -4529,7 +4542,8 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + (!newsub->retainconflictinfo && MySubscription->retainconflictinfo)) { if (am_parallel_apply_worker()) ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 6370bb711c0..d2a2f738e70 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4857,6 +4857,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subretainconflictinfo; int i, ntups; @@ -4929,11 +4930,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBuffer(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + if (fout->remoteVersion >= 180000) + appendPQExpBufferStr(query, + " s.subretainconflictinfo\n"); + else + appendPQExpBuffer(query, + " false AS subretainconflictinfo\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -4966,6 +4973,7 @@ getSubscriptions(Archive *fout) i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); + i_subretainconflictinfo = PQfnumber(res, "subretainconflictinfo"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -4999,6 +5007,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0); subinfo[i].subfailover = (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); + subinfo[i].subretainconflictinfo = + (strcmp(PQgetvalue(res, i, i_subretainconflictinfo), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5257,6 +5267,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subfailover) appendPQExpBufferStr(query, ", failover = true"); + if (subinfo->subretainconflictinfo) + appendPQExpBufferStr(query, ", retain_conflict_info = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 7139c88a69a..117d5a81f1e 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -681,6 +681,7 @@ typedef struct _SubscriptionInfo bool subpasswordrequired; bool subrunasowner; bool subfailover; + bool subretainconflictinfo; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 7ca1d8fffc9..70489b93745 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -627,7 +627,7 @@ check_and_dump_old_cluster(void) * Before that the logical slots are not upgraded, so we will not be * able to upgrade the logical replication clusters completely. */ - get_subscription_count(&old_cluster); + get_subscription_info(&old_cluster); check_old_cluster_subscription_state(); } @@ -1845,7 +1845,13 @@ check_new_cluster_subscription_configuration(void) pg_fatal("could not determine parameter settings on new cluster"); max_replication_slots = atoi(PQgetvalue(res, 0, 0)); - if (old_cluster.nsubs > max_replication_slots) + + if (old_cluster.sub_retain_conflict_info && + old_cluster.nsubs + 1 > max_replication_slots) + pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " + "subscriptions plus one (%d) on the old cluster", + max_replication_slots, old_cluster.nsubs + 1); + else if (old_cluster.nsubs > max_replication_slots) pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " "subscriptions (%d) on the old cluster", max_replication_slots, old_cluster.nsubs); @@ -1912,6 +1918,23 @@ check_old_cluster_for_valid_slots(void) "The slot \"%s\" has not consumed the WAL yet\n", slot->slotname); } + + /* + * The name "pg_conflict_detection" (defined as + * CONFLICT_DETECTION_SLOT) has been reserved for logical + * replication conflict detection since PG18. + */ + if (GET_MAJOR_VERSION(new_cluster.major_version) >= 1800 && + strcmp(slot->slotname, "pg_conflict_detection") == 0) + { + if (script == NULL && + (script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + fprintf(script, + "The slot name \"%s\" is reserved\n", + slot->slotname); + } } } diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index ad52de8b607..1f84ed292a5 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -750,20 +750,33 @@ count_old_cluster_logical_slots(void) } /* - * get_subscription_count() + * get_subscription_info() * - * Gets the number of subscriptions in the cluster. + * Gets the information of subscriptions in the cluster. */ void -get_subscription_count(ClusterInfo *cluster) +get_subscription_info(ClusterInfo *cluster) { PGconn *conn; PGresult *res; + int i_nsub; + int i_retain_conflict_info; conn = connectToServer(cluster, "template1"); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription"); - cluster->nsubs = atoi(PQgetvalue(res, 0, 0)); + if (GET_MAJOR_VERSION(cluster->major_version) >= 1800) + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "COUNT(CASE WHEN subretainconflictinfo THEN 1 END) AS retain_conflict_info " + "FROM pg_catalog.pg_subscription"); + else + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "'f' AS retain_conflict_info " + "FROM pg_catalog.pg_subscription"); + + i_nsub = PQfnumber(res, "nsub"); + i_retain_conflict_info = PQfnumber(res, "retain_conflict_info"); + + cluster->nsubs = atoi(PQgetvalue(res, 0, i_nsub)); + cluster->sub_retain_conflict_info = (strcmp(PQgetvalue(res, 0, i_retain_conflict_info), "t") == 0); PQclear(res); PQfinish(conn); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 0cdd675e4f1..58395ab31cf 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -295,6 +295,8 @@ typedef struct uint32 bin_version; /* version returned from pg_ctl */ const char *tablespace_suffix; /* directory specification */ int nsubs; /* number of subscriptions */ + bool sub_retain_conflict_info; /* whether a subscription enables + * retain_conflict_info. */ } ClusterInfo; @@ -430,7 +432,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, const char *new_pgdata); void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); -void get_subscription_count(ClusterInfo *cluster); +void get_subscription_info(ClusterInfo *cluster); /* option.c */ diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 3b7ba66fad0..8c7ef1ba80b 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6747,7 +6747,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6815,6 +6815,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 180000) + appendPQExpBuffer(&buf, + ", subretainconflictinfo AS \"%s\"\n", + gettext_noop("Retain conflict info")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index eb8bc128720..2097267cdef 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2297,8 +2297,9 @@ match_previous_words(int pattern_id, /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_conflict_info", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3718,8 +3719,9 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_conflict_info", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 20fc329992d..0ac7c0b120c 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -78,6 +78,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subretainconflictinfo; /* True if information useful for + * conflict detection is retained */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +134,8 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool retainconflictinfo; /* True if information useful for conflict + * detection is retained */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 1443e1d9292..bff4cc051db 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,44 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - retain_conflict_info must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = foo); +ERROR: retain_conflict_info requires a Boolean value +-- ok - but a warning will occur because track_commit_timestamp is not enabled +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = true); +WARNING: information for detecting conflicts cannot be fully retained when "track_commit_timestamp" is disabled +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (retain_conflict_info = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 007c9e70374..c65397e5ac6 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -287,6 +287,22 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - retain_conflict_info must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = foo); + +-- ok - but a warning will occur because track_commit_timestamp is not enabled +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = true); + +\dRs+ + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (retain_conflict_info = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; -- 2.30.0.windows.2