From f8bede65f9d75875821e6aaf577a47b060560d5f Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Tue, 10 Jun 2025 13:48:16 +0800 Subject: [PATCH v35 3/3] Add a retain_conflict_info option to subscriptions This patch adds a subscription option allowing users to specify whether information on the subscriber, which is useful for detecting update_deleted conflicts, should be retained. The default setting is false. If set to true, the detection of update_deleted will be enabled, and an additional replication slot named pg_conflict_detection will be created on the subscriber to prevent conflict information from being removed. Note that if multiple subscriptions on one node enable this option, only one replication slot will be created. The logical launcher will create and maintain a replication slot named pg_conflict_detection only if any local subscription has the retain_conflict_info option enabled. Enabling retain_conflict_info is prohibited if the publisher is currently in recovery mode (operating as a standby server). Bump catalog version --- doc/src/sgml/catalogs.sgml | 11 + doc/src/sgml/logical-replication.sgml | 6 + doc/src/sgml/ref/alter_subscription.sgml | 18 +- doc/src/sgml/ref/create_subscription.sgml | 35 +++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 201 ++++++++++++++++-- .../replication/logical/applyparallelworker.c | 3 +- src/backend/replication/logical/launcher.c | 48 +++-- src/backend/replication/logical/tablesync.c | 3 +- src/backend/replication/logical/worker.c | 85 +++++++- src/bin/pg_dump/pg_dump.c | 18 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_upgrade/check.c | 53 ++++- src/bin/pg_upgrade/info.c | 25 ++- src/bin/pg_upgrade/pg_upgrade.h | 4 +- src/bin/pg_upgrade/t/004_subscription.pl | 48 +++++ src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.in.c | 10 +- src/include/catalog/pg_subscription.h | 5 + src/include/commands/subscriptioncmds.h | 2 + src/include/replication/logicalworker.h | 3 + src/include/replication/worker_internal.h | 3 +- src/test/regress/expected/subscription.out | 181 +++++++++------- src/test/regress/sql/subscription.sql | 16 ++ 25 files changed, 655 insertions(+), 134 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index fa86c569dc4..b8f9bf573ea 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8082,6 +8082,17 @@ SCRAM-SHA-256$<iteration count>:&l + + + subretainconflictinfo bool + + + If true, the information (e.g., dead tuples, commit timestamps, and + origins) on the subscriber that is still useful for conflict detection + is retained. + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 686dd441d02..5073d31ca8e 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2364,6 +2364,12 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER the subscriber, plus some reserve for table synchronization. + + max_replication_slots + must be set to at least 1 when retain_conflict_info + is enabled for any subscription. + + max_logical_replication_workers must be set to at least the number of subscriptions (for leader apply diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d007f..e6f1cffeda5 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -235,8 +235,9 @@ ALTER SUBSCRIPTION name RENAME TO < password_required, run_as_owner, origin, - failover, and - two_phase. + failover, + two_phase, and + retain_conflict_info. Only a superuser can set password_required = false. @@ -261,8 +262,9 @@ ALTER SUBSCRIPTION name RENAME TO < - The failover - and two_phase + The failover, + two_phase, and + retain_conflict_info parameters can only be altered when the subscription is disabled. @@ -285,6 +287,14 @@ ALTER SUBSCRIPTION name RENAME TO < option is changed from true to false, the publisher will replicate the transactions again when they are committed. + + + If the retain_conflict_info + option is altered to false and no other subscription + has this option enabled, the additional replication slot named + pg_conflict_detection, created to retain + conflict information, will be dropped. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 37fd40252a3..be90088bcd0 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -437,6 +437,41 @@ CREATE SUBSCRIPTION subscription_name + + + retain_conflict_info (boolean) + + + Specifies whether the information (e.g., dead tuples, commit + timestamps, and origins) on the subscriber that is still useful for + conflict detection is retained. The default is + false. If set to true, an additional replication + slot named pg_conflict_detection + will be created on the subscriber to prevent the conflict information + from being removed. + + + + Note that the information useful for conflict detection is retained + only after the creation of the additional slot. You can verify the + existence of this slot by querying pg_replication_slots. + And even if multiple subscriptions on one node enable this option, + only one replication slot will be created. + + + + Note that the information for conflict detection cannot be purged if + the subscription is disabled; thus, the information will accumulate + until the subscription is enabled. To prevent excessive accumulation, + it is recommended to disable retain_conflict_info + if the subscription will be inactive for an extended period. + + + + This option cannot be enabled if the publisher is also a physical standby. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032413e..39cfae43d6f 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->retainconflictinfo = subform->subretainconflictinfo; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 08f780a2e63..ec4aa9ea7b4 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1378,7 +1378,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subretainconflictinfo, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 46d4e65da97..5bcc171ca5b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -71,8 +72,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_RETAIN_CONFLICT_INFO 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +100,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool retainconflictinfo; char *origin; XLogRecPtr lsn; } SubOpts; @@ -107,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname); +static void check_pub_conflict_info_retention(WalReceiverConn *wrconn, + bool retain_conflict_info); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + opts->retainconflictinfo = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_RETAIN_CONFLICT_INFO) && + strcmp(defel->defname, "retain_conflict_info") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_RETAIN_CONFLICT_INFO; + opts->retainconflictinfo = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -608,6 +625,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, errmsg("password_required=false is superuser-only"), errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser."))); + CheckSubConflictInfoRetention(opts.retainconflictinfo, + !opts.enabled || !opts.connect); + /* * If built with appropriate switch, whine when regression-testing * conventions for subscription names are violated. @@ -670,6 +690,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subretainconflictinfo - 1] = + BoolGetDatum(opts.retainconflictinfo); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -724,6 +746,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); + check_pub_conflict_info_retention(wrconn, opts.retainconflictinfo); + /* * Set sync state based on if we were asked to do data copy or * not. @@ -1051,7 +1075,8 @@ CheckAlterSubOption(Subscription *sub, const char *option, * two_phase options. */ Assert(strcmp(option, "failover") == 0 || - strcmp(option, "two_phase") == 0); + strcmp(option, "two_phase") == 0 || + strcmp(option, "retain_conflict_info") == 0); /* * Do not allow changing the option if the subscription is enabled. This @@ -1059,6 +1084,41 @@ CheckAlterSubOption(Subscription *sub, const char *option, * publisher cannot be modified if the slot is currently acquired by the * existing walsender. * + * Do not allow changing the retain_conflict_info option when the + * subscription is enabled or the apply worker is active, to prevent race + * conditions arising from the new option value being acknowledged + * asynchronously by the launcher and apply workers. + * + * Without the restriction, a race condition may arise when a user + * disables and immediately re-enables the retain_conflict_info option. In + * this case, the launcher might drop the slot upon noticing the disabled + * action, while the apply worker may keep maintaining + * oldest_nonremovable_xid without noticing the option change. During this + * period, a transaction ID wraparound could falsely make this ID appear + * as if it originates from the future w.r.t the transaction ID stored in + * the slot maintained by launcher. + * + * Similarly, if the user enables retain_conflict_info concurrently with + * the launcher starting the worker, the apply worker may start + * calculating oldest_nonremovable_xid before the launcher notices the + * enable action. Consequently, the launcher may update slot.xmin to a + * newer value than that maintained by the worker. In subsequent cycles, + * upon integrating the worker's oldest_nonremovable_xid, the launcher + * might detect a retreat in the calculated xmin, necessitating additional + * handling. + * + * XXX To address the above race conditions, we can define + * oldest_nonremovable_xid as FullTransactionID and adds the check to + * disallow retreating the conflict slot's xmin. For now, we kept the + * implementation simple by disallowing change to the + * retain_conflict_info, but in the future we can change this after some + * more analysis. + * + * Note that we could restrict only the enabling of retain_conflict_info + * to avoid the race conditions described above, but we maintain the + * restriction for both enable and disable operations for the sake of + * consistency. + * * Note that two_phase is enabled (aka changed from 'false' to 'true') on * the publisher by the existing walsender, so we could have allowed that * even when the subscription is enabled. But we kept this restriction for @@ -1110,6 +1170,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_tuple = false; bool update_failover = false; bool update_two_phase = false; + bool retain_conflict_info = false; Subscription *sub; Form_pg_subscription form; bits32 supported_opts; @@ -1165,7 +1226,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1325,6 +1386,41 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + { + values[Anum_pg_subscription_subretainconflictinfo - 1] = + BoolGetDatum(opts.retainconflictinfo); + replaces[Anum_pg_subscription_subretainconflictinfo - 1] = true; + + CheckAlterSubOption(sub, "retain_conflict_info", false, isTopLevel); + + /* + * Note that workers may still survive even if the + * subscription has been disabled. + * + * Ensure workers have already been exited to avoid the + * race conditions as described in CheckAlterSubOption(). + */ + if (logicalrep_workers_find(subid, true, true)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter retain_conflict_info when logical replication worker is still running"), + errhint("Try again after some time."))); + + CheckSubConflictInfoRetention(opts.retainconflictinfo, + false); + + /* + * Notify the launcher to manage the replication slot for + * conflict detection. This ensures that replication slot + * is efficiently handled (created, updated, or dropped) + * in response to any configuration changes. + */ + ApplyLauncherWakeupAtCommit(); + + retain_conflict_info = opts.retainconflictinfo; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -1347,6 +1443,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot enable subscription that does not have a slot name"))); + CheckSubConflictInfoRetention(sub->retainconflictinfo, + !opts.enabled); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -1355,6 +1454,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, ApplyLauncherWakeupAtCommit(); update_tuple = true; + + /* + * The subscription might be initially created with + * connect=false and retain_conflict_info=true, meaning the + * remote server's status may not be checked. Ensure this + * check is conducted now. + */ + retain_conflict_info = sub->retainconflictinfo; break; } @@ -1369,6 +1476,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, CStringGetTextDatum(stmt->conninfo); replaces[Anum_pg_subscription_subconninfo - 1] = true; update_tuple = true; + + /* + * Since the remote server configuration might have changed, + * perform a check to ensure it permits enabling + * retain_conflict_info. + */ + retain_conflict_info = sub->retainconflictinfo; break; case ALTER_SUBSCRIPTION_SET_PUBLICATION: @@ -1568,14 +1682,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering the slot, if - * needed. + * Try to acquire the connection necessary either for modifying the slot + * or for checking if the remote server permits enabling + * retain_conflict_info. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (update_failover || update_two_phase) + if (update_failover || update_two_phase || retain_conflict_info) { bool must_use_password; char *err; @@ -1584,10 +1699,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ + /* + * Try to connect to the publisher, using the new connection string if + * available. + */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, - sub->name, &err); + wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo, + true, true, must_use_password, sub->name, + &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1596,9 +1715,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, - update_failover ? &opts.failover : NULL, - update_two_phase ? &opts.twophase : NULL); + check_pub_conflict_info_retention(wrconn, retain_conflict_info); + + if (update_failover || update_two_phase) + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { @@ -2196,6 +2318,57 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, walrcv_clear_result(res); } +/* + * Check if the publisher's status permits enabling retain_conflict_info. + * + * Enabling retain_conflict_info is not allowed if the publisher's version is + * prior to PG18 or if the publisher is in recovery (operating as a standby + * server). + * + * Refer to the comments atop maybe_advance_nonremovable_xid() for detailed + * reasons. + */ +static void +check_pub_conflict_info_retention(WalReceiverConn *wrconn, bool retain_conflict_info) +{ + WalRcvExecResult *res; + Oid RecoveryRow[1] = {BOOLOID}; + TupleTableSlot *slot; + bool isnull; + bool remote_in_recovery; + + if (!retain_conflict_info) + return; + + if (walrcv_server_version(wrconn) < 18000) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable retain_conflict_info if the publisher is running a version earlier than PostgreSQL 18.")); + + res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not obtain recovery progress from the publisher: %s", + res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + elog(ERROR, "failed to fetch tuple for the recovery progress"); + + remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull)); + + if (remote_in_recovery) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable retain_conflict_info if the publisher is in recovery.")); + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..1fa931a7422 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -441,7 +441,8 @@ pa_launch_parallel_worker(void) MySubscription->name, MyLogicalRepWorker->userid, InvalidOid, - dsm_segment_handle(winfo->dsm_seg)); + dsm_segment_handle(winfo->dsm_seg), + false); if (launched) { diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 9b12b2900e6..494b8de9ef9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -92,8 +92,8 @@ static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; /* - * Whether the slot used to retain dead tuples for conflict detection has been - * dropped. + * Whether the slot used to retain information useful for conflict detection + * has been dropped. */ static bool conflict_slot_dropped = false; @@ -106,6 +106,7 @@ static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, + bool retain_conflict_info, TransactionId *xmin, bool *can_advance_xmin); static void create_conflict_slot_if_not_exists(void); @@ -159,6 +160,7 @@ get_subscription_list(void) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); + sub->retainconflictinfo = subform->subretainconflictinfo; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -307,7 +309,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid, dsm_handle subworker_dsm) + Oid relid, dsm_handle subworker_dsm, + bool retain_conflict_info) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -326,10 +329,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * - must be valid worker type * - tablesync workers are only ones to have relid * - parallel apply worker is the only kind of subworker + * - The replication slot used in conflict detection is created when + * retain_conflict_info is enabled */ Assert(wtype != WORKERTYPE_UNKNOWN); Assert(is_tablesync_worker == OidIsValid(relid)); Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); + Assert(!retain_conflict_info || MyReplicationSlot); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -452,7 +458,9 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; - worker->oldest_nonremovable_xid = InvalidTransactionId; + worker->oldest_nonremovable_xid = retain_conflict_info + ? MyReplicationSlot->data.xmin + : InvalidTransactionId; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -1162,6 +1170,7 @@ ApplyLauncherMain(Datum main_arg) MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; bool can_advance_xmin = true; + bool retain_conflict_info = false; TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1182,9 +1191,12 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; + retain_conflict_info |= sub->retainconflictinfo; + if (!sub->enabled) { - compute_min_nonremovable_xid(NULL, &xmin, &can_advance_xmin); + compute_min_nonremovable_xid(NULL, sub->retainconflictinfo, + &xmin, &can_advance_xmin); continue; } @@ -1192,7 +1204,8 @@ ApplyLauncherMain(Datum main_arg) w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); - compute_min_nonremovable_xid(w, &xmin, &can_advance_xmin); + compute_min_nonremovable_xid(w, sub->retainconflictinfo, &xmin, + &can_advance_xmin); if (w != NULL) continue; /* worker is running already */ @@ -1219,7 +1232,8 @@ ApplyLauncherMain(Datum main_arg) logicalrep_worker_launch(WORKERTYPE_APPLY, sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, - DSM_HANDLE_INVALID); + DSM_HANDLE_INVALID, + sub->retainconflictinfo); } else { @@ -1233,7 +1247,7 @@ ApplyLauncherMain(Datum main_arg) * detection if needed. Otherwise, drop the slot if we're no longer * retaining information useful for conflict detection. */ - if (!sublist) + if (!retain_conflict_info) drop_conflict_slot_if_exists(); else if (can_advance_xmin) advance_conflict_slot_xmin(xmin); @@ -1266,17 +1280,18 @@ ApplyLauncherMain(Datum main_arg) } /* - * Compute the minimum non-removable transaction ID from all apply workers. - * Store the result in *xmin. + * Compute the minimum non-removable transaction ID from all apply workers for + * subscriptions with retain_conflict_info enabled. Store the result in *xmin. * * If the slot cannot be advanced during this cycle, due to either a disabled * subscription or an inactive worker, *can_advance_xmin is set to false. */ static void -compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin, +compute_min_nonremovable_xid(LogicalRepWorker *worker, + bool retain_conflict_info, TransactionId *xmin, bool *can_advance_xmin) { - if (!*can_advance_xmin) + if (!retain_conflict_info || !*can_advance_xmin) return; if (worker) @@ -1319,7 +1334,8 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin, create_conflict_slot_if_not_exists(); /* - * Only collect xmin when all workers for subscriptions are running. + * Only collect xmin when all workers for subscriptions with + * retain_conflict_info enabled are running. */ *can_advance_xmin = false; } @@ -1370,7 +1386,7 @@ create_conflict_slot_if_not_exists(void) /* * Attempt to advance the xmin value of the replication slot used to retain - * dead tuples for conflict detection. + * information useful for conflict detection. */ static void advance_conflict_slot_xmin(TransactionId new_xmin) @@ -1406,8 +1422,8 @@ advance_conflict_slot_xmin(TransactionId new_xmin) } /* - * Drop the replication slot used to retain dead tuples for conflict detection, - * if it exists. + * Drop the replication slot used to retain information useful for conflict + * detection, if it exists. */ static void drop_conflict_slot_if_exists(void) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8e1e8762f62..1591c1c99d4 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -609,7 +609,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->name, MyLogicalRepWorker->userid, rstate->relid, - DSM_HANDLE_INVALID); + DSM_HANDLE_INVALID, + false); hentry->last_start_time = now; } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c864f7f52b0..48eb387d41b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -147,6 +147,7 @@ #include #include +#include "access/commit_ts.h" #include "access/table.h" #include "access/tableam.h" #include "access/twophase.h" @@ -173,6 +174,7 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" @@ -445,6 +447,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void maybe_advance_nonremovable_xid(RetainConflictInfoData *rci_data, bool status_received); +static bool can_advance_nonremovable_xid(RetainConflictInfoData *rci_data); static void process_rci_phase_transition(RetainConflictInfoData *rci_data, bool status_received); static void get_candidate_xid(RetainConflictInfoData *rci_data); @@ -4114,6 +4117,19 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static void maybe_advance_nonremovable_xid(RetainConflictInfoData *rci_data, bool status_received) +{ + if (!can_advance_nonremovable_xid(rci_data)) + return; + + process_rci_phase_transition(rci_data, status_received); +} + +/* + * Preliminary check to determine if advancing the non-removable transaction ID + * is allowed. + */ +static bool +can_advance_nonremovable_xid(RetainConflictInfoData *rci_data) { /* * It is sufficient to manage non-removable transaction ID for a @@ -4121,9 +4137,13 @@ maybe_advance_nonremovable_xid(RetainConflictInfoData *rci_data, * even for table sync or parallel apply workers. */ if (!am_leader_apply_worker()) - return; + return false; - process_rci_phase_transition(rci_data, status_received); + /* No need to advance if retaining conflict information is not required */ + if (!MySubscription->retainconflictinfo) + return false; + + return true; } /* @@ -5236,6 +5256,28 @@ InitializeLogRepWorker(void) apply_worker_exit(); } + /* + * Restart the worker if retain_conflict_info was enabled at startup. The + * replication slot for conflict detection may not be created yet, or + * might soon be dropped as the launcher sees retain_conflict_info as + * disabled. To prevent unnecessary maintenance of oldest_nonremovable_xid + * when the slot is absent or at risk of being dropped, a restart is + * initiated. + * + * The oldest_nonremovable_xid should be initialized only when the + * retain_conflict_info is enabled before launching the worker. See + * logicalrep_worker_launch. + */ + if (am_leader_apply_worker() && MySubscription->retainconflictinfo && + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup", + MySubscription->name, "retain_conflict_info")); + + apply_worker_exit(); + } + /* Setup synchronous commit according to the user's wishes */ SetConfigOption("synchronous_commit", MySubscription->synccommit, PGC_BACKEND, PGC_S_OVERRIDE); @@ -5392,6 +5434,8 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + CheckSubConflictInfoRetention(MySubscription->retainconflictinfo, true); + proc_exit(0); } @@ -5754,3 +5798,40 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_APPLY; } } + +/* + * Check if the subscriber's configuration is adequate to enable the + * retain_conflict_info option. + * + * Issue a warning if track_commit_timestamp is not enabled. + * + * Issue a warning if the subscription is being disabled. + * + * Provide a notice if retain_conflict_info is enabled for a disabled + * subscription, reminding the user to enable the subscription to prevent the + * accumulation of dead tuples. A warning is not issued since + * retain_conflict_info can be altered only for disabled subscriptions. + */ +void +CheckSubConflictInfoRetention(bool retain_conflict_info, bool disabling_sub) +{ + if (!retain_conflict_info) + return; + + if (!track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), + errhint("Consider setting \"%s\" to true.", + "track_commit_timestamp")); + + if (disabling_sub) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), + errhint("Consider setting %s to false.", + "retain_conflict_info")); + else + ereport(NOTICE, + errmsg("deleted rows will continue to accumulate for detecting conflicts until the subscription is enabled")); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 37432e66efd..13d57dd3b13 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4960,6 +4960,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subretainconflictinfo; int i, ntups; @@ -5032,10 +5033,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBufferStr(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + + if (fout->remoteVersion >= 180000) + appendPQExpBufferStr(query, + " s.subretainconflictinfo\n"); + else + appendPQExpBufferStr(query, + " false AS subretainconflictinfo\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5069,6 +5077,7 @@ getSubscriptions(Archive *fout) i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); + i_subretainconflictinfo = PQfnumber(res, "subretainconflictinfo"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5102,6 +5111,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0); subinfo[i].subfailover = (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); + subinfo[i].subretainconflictinfo = + (strcmp(PQgetvalue(res, i, i_subretainconflictinfo), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5360,6 +5371,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subfailover) appendPQExpBufferStr(query, ", failover = true"); + if (subinfo->subretainconflictinfo) + appendPQExpBufferStr(query, ", retain_conflict_info = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 7417eab6aef..945b3fce670 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -708,6 +708,7 @@ typedef struct _SubscriptionInfo bool subpasswordrequired; bool subrunasowner; bool subfailover; + bool subretainconflictinfo; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 940fc77fc2e..3a3c532db47 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -629,7 +629,7 @@ check_and_dump_old_cluster(void) * Before that the logical slots are not upgraded, so we will not be * able to upgrade the logical replication clusters completely. */ - get_subscription_count(&old_cluster); + get_subscription_info(&old_cluster); check_old_cluster_subscription_state(); } @@ -2017,9 +2017,11 @@ check_new_cluster_logical_replication_slots(void) /* * check_new_cluster_subscription_configuration() * - * Verify that the max_active_replication_origins configuration specified is - * enough for creating the subscriptions. This is required to create the - * replication origin for each subscription. + * Verify that the max_active_replication_origins and max_replication_slots + * configurations specified are enough for creating the subscriptions. This is + * required to create the replication origin for each subscription and to + * create the conflict detection slot when any subscription has the + * retain_conflict_info option enabled. */ static void check_new_cluster_subscription_configuration(void) @@ -2027,6 +2029,8 @@ check_new_cluster_subscription_configuration(void) PGresult *res; PGconn *conn; int max_active_replication_origins; + int max_replication_slots; + int nslots_on_old; /* Subscriptions and their dependencies can be migrated since PG17. */ if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) @@ -2052,6 +2056,31 @@ check_new_cluster_subscription_configuration(void) "subscriptions (%d) on the old cluster", max_active_replication_origins, old_cluster.nsubs); + PQclear(res); + + /* Return if no subscriptions enabled the retain_conflict_info option. */ + if (!old_cluster.sub_retain_conflict_info) + { + PQfinish(conn); + check_ok(); + return; + } + + res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings " + "WHERE name = 'max_replication_slots';"); + + if (PQntuples(res) != 1) + pg_fatal("could not determine parameter settings on new cluster"); + + nslots_on_old = count_old_cluster_logical_slots(); + + max_replication_slots = atoi(PQgetvalue(res, 0, 0)); + if (nslots_on_old + 1 > max_replication_slots) + pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " + "logical replication slots on the old cluster plus one additional slot required " + "for retaining conflict detection information (%d)", + max_replication_slots, nslots_on_old + 1); + PQclear(res); PQfinish(conn); @@ -2114,6 +2143,22 @@ check_old_cluster_for_valid_slots(void) "The slot \"%s\" has not consumed the WAL yet\n", slot->slotname); } + + /* + * The name "pg_conflict_detection" (defined as + * CONFLICT_DETECTION_SLOT) has been reserved for logical + * replication conflict detection since PG18. + */ + if (strcmp(slot->slotname, "pg_conflict_detection") == 0) + { + if (script == NULL && + (script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + fprintf(script, + "The slot name \"%s\" is reserved\n", + slot->slotname); + } } } diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 4b7a56f5b3b..69658595e0b 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -752,20 +752,33 @@ count_old_cluster_logical_slots(void) } /* - * get_subscription_count() + * get_subscription_info() * - * Gets the number of subscriptions in the cluster. + * Gets the information of subscriptions in the cluster. */ void -get_subscription_count(ClusterInfo *cluster) +get_subscription_info(ClusterInfo *cluster) { PGconn *conn; PGresult *res; + int i_nsub; + int i_retain_conflict_info; conn = connectToServer(cluster, "template1"); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription"); - cluster->nsubs = atoi(PQgetvalue(res, 0, 0)); + if (GET_MAJOR_VERSION(cluster->major_version) >= 1800) + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "COUNT(CASE WHEN subretainconflictinfo THEN 1 END) AS retain_conflict_info " + "FROM pg_catalog.pg_subscription"); + else + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "'f' AS retain_conflict_info " + "FROM pg_catalog.pg_subscription"); + + i_nsub = PQfnumber(res, "nsub"); + i_retain_conflict_info = PQfnumber(res, "retain_conflict_info"); + + cluster->nsubs = atoi(PQgetvalue(res, 0, i_nsub)); + cluster->sub_retain_conflict_info = (strcmp(PQgetvalue(res, 0, i_retain_conflict_info), "1") == 0); PQclear(res); PQfinish(conn); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 69c965bb7d0..352c8b6f376 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -302,6 +302,8 @@ typedef struct uint32 bin_version; /* version returned from pg_ctl */ const char *tablespace_suffix; /* directory specification */ int nsubs; /* number of subscriptions */ + bool sub_retain_conflict_info; /* whether a subscription enables + * retain_conflict_info. */ } ClusterInfo; @@ -441,7 +443,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, const char *new_pgdata); void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); -void get_subscription_count(ClusterInfo *cluster); +void get_subscription_info(ClusterInfo *cluster); /* option.c */ diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl index c545abf6581..dc6deed5557 100644 --- a/src/bin/pg_upgrade/t/004_subscription.pl +++ b/src/bin/pg_upgrade/t/004_subscription.pl @@ -87,6 +87,54 @@ $publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1"); $old_sub->start; $old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); + +# ------------------------------------------------------ +# Check that pg_upgrade fails when max_replication_slots configured in the new +# cluster is less than the number of logical slots in the old cluster + 1 when +# subscription's retain_conflict_info option is enabled. +# ------------------------------------------------------ +# It is sufficient to use disabled subscription to test upgrade failure. +$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1"); +$old_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (enabled = false, retain_conflict_info = true)" +); + +$old_sub->stop; + +$new_sub->append_conf('postgresql.conf', "max_replication_slots = 0"); + +# pg_upgrade will fail because the new cluster has insufficient +# max_replication_slots. +command_checks_all( + [ + 'pg_upgrade', + '--no-sync', + '--old-datadir' => $old_sub->data_dir, + '--new-datadir' => $new_sub->data_dir, + '--old-bindir' => $oldbindir, + '--new-bindir' => $newbindir, + '--socketdir' => $new_sub->host, + '--old-port' => $old_sub->port, + '--new-port' => $new_sub->port, + $mode, + '--check', + ], + 1, + [ + qr/"max_replication_slots" \(0\) must be greater than or equal to the number of logical replication slots on the old cluster plus one additional slot required for retaining conflict detection information \(1\)/ + ], + [qr//], + 'run of pg_upgrade where the new cluster has insufficient max_replication_slots' +); + +# Reset max_replication_slots +$new_sub->append_conf('postgresql.conf', "max_replication_slots = 10"); + +# Cleanup +$publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1"); +$old_sub->start; +$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); + # ------------------------------------------------------ # Check that pg_upgrade refuses to run if: # a) there's a subscription with tables in a state other than 'r' (ready) or diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 24e0100c9f0..c86010c2b89 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6745,7 +6745,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6813,6 +6813,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 180000) + appendPQExpBuffer(&buf, + ", subretainconflictinfo AS \"%s\"\n", + gettext_noop("Retain conflict info")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index ec65ab79fec..cbf2703c190 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2298,8 +2298,9 @@ match_previous_words(int pattern_id, /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_conflict_info", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3728,8 +3729,9 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_conflict_info", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 20fc329992d..0ac7c0b120c 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -78,6 +78,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subretainconflictinfo; /* True if information useful for + * conflict detection is retained */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +134,8 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool retainconflictinfo; /* True if information useful for conflict + * detection is retained */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index c2262e46a7f..d90d170054e 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -28,4 +28,6 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d..15039811de6 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -30,4 +30,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid); extern void AtEOXact_LogicalRepWorkers(bool isCommit); +extern void CheckSubConflictInfoRetention(bool retain_conflict_info, + bool disabling_sub); + #endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index c9ef5259b68..576626c6557 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -256,7 +256,8 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running, extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, - dsm_handle subworker_dsm); + dsm_handle subworker_dsm, + bool retain_conflict_info); extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 1443e1d9292..373a8484f24 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,47 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - retain_conflict_info must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = foo); +ERROR: retain_conflict_info requires a Boolean value +-- ok - but a warning will occur because track_commit_timestamp is not enabled +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = true); +WARNING: commit timestamp and origin data required for detecting conflicts won't be retained +HINT: Consider setting "track_commit_timestamp" to true. +WARNING: deleted rows to detect conflicts would not be removed until the subscription is enabled +HINT: Consider setting retain_conflict_info to false. +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (retain_conflict_info = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 007c9e70374..c65397e5ac6 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -287,6 +287,22 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - retain_conflict_info must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = foo); + +-- ok - but a warning will occur because track_commit_timestamp is not enabled +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = true); + +\dRs+ + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (retain_conflict_info = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; -- 2.30.0.windows.2