From bd8ec0915c68460b68c6447796b7eaf6cbf33f8c Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Mon, 25 Aug 2025 10:06:10 +0800 Subject: [PATCH v65 1/4] Introduce a 'max_conflict_retention_duration' option to subscriptions. This commit introduces a subscription option max_conflict_retention_duration, designed to prevent excessive accumulation of dead tuples when subscription with retain_dead_tuples enabled is present and the apply worker cannot catch up with the publisher's workload. If the time spent advancing non-removable transaction ID surpasses the max_conflict_retention_duration threshold, the apply worker would stop retaining information for conflict detection. The replication slot pg_conflict_detection.xmin will be set to InvalidTransactionId if all apply workers associated with the subscription, where retain_dead_tuples is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration. Additionally, retention status is recorded in the pg_subscription catalog (subretentionactive) to prevent unnecessary retention initiation upon server restarts. In this patch, a replication slot will not be automatically re-initialized. Users can disable retain_dead_tuples and re-enable it manually to resume the retention. An upcoming patch will include support for automatic slot re-initialization once at least one apply worker confirms that the retention duration is within the max_conflict_retention_duration limit. --- doc/src/sgml/catalogs.sgml | 25 +++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 43 +++- src/backend/catalog/pg_subscription.c | 41 ++++ src/backend/catalog/system_views.sql | 4 +- src/backend/commands/subscriptioncmds.c | 135 ++++++++++-- src/backend/replication/logical/launcher.c | 71 ++++--- src/backend/replication/logical/worker.c | 236 ++++++++++++++++++++- src/bin/pg_dump/pg_dump.c | 18 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 12 +- src/bin/psql/tab-complete.in.c | 6 +- src/include/catalog/pg_subscription.h | 19 ++ src/include/catalog/pg_subscription_rel.h | 2 + src/include/commands/subscriptioncmds.h | 6 +- src/include/replication/worker_internal.h | 3 + src/test/regress/expected/subscription.out | 186 +++++++++------- src/test/regress/sql/subscription.sql | 16 ++ 18 files changed, 683 insertions(+), 146 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index da8a7882580..98bfa96b17b 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8094,6 +8094,31 @@ SCRAM-SHA-256$<iteration count>:&l + + + submaxconflretention int4 + + + The maximum duration (in milliseconds) for which information (e.g., dead + tuples, commit timestamps, and origins) useful for conflict detection can + be retained. + + + + + + subretentionactive bool + + + The retention status of information (e.g., dead tuples, commit + timestamps, and origins) useful for conflict detection. True if + retain_dead_tuples + is enabled, and the retention duration has not exceeded + max_conflict_retention_duration, + when defined. + + + subconninfo text diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index d48cdc76bd3..f2c2e147472 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -236,8 +236,9 @@ ALTER SUBSCRIPTION name RENAME TO < run_as_owner, origin, failover, - two_phase, and - retain_dead_tuples. + two_phase, + retain_dead_tuples, and + max_conflict_retention_duration. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 247c5bd2604..8de0cd0d53f 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -448,7 +448,7 @@ CREATE SUBSCRIPTION subscription_nametrue, the detection of is enabled, and a physical replication slot named pg_conflict_detection - created on the subscriber to prevent the information for detecting + is created on the subscriber to prevent the information for detecting conflicts from being removed. @@ -521,6 +521,47 @@ CREATE SUBSCRIPTION subscription_name + + + max_conflict_retention_duration (integer) + + + Maximum duration for which this subscription's apply worker is allowed + to retain the information useful for conflict detection when + retain_dead_tuples is enabled for the associated + subscriptions. The default value is 0, indicating + that the information is retained until it is no longer needed for + detection purposes. This value is taken as milliseconds. + + + The information useful for conflict detection is no longer retained if + all apply workers associated with the subscriptions, where + retain_dead_tuples is enabled, confirm that the + retention duration exceeded the + max_conflict_retention_duration set within the + corresponding subscription. To re-enable retention manually, you can + disable retain_dead_tuples for all subscriptions and + re-enable it after confirming this replication slot has been dropped. + + + Note that overall retention will not stop if other subscriptions + specify a greater value and have not exceeded it, or if they set this + option to 0. + + + This option is effective only when + retain_conflict_info is enabled and the apply + worker associated with the subscription is active. + + + + Note that setting a non-zero value for this option could lead to + information for conflict detection being removed prematurely, + potentially missing some conflict detections. + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 244acf52f36..16eb5c16a0b 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,8 @@ GetSubscription(Oid subid, bool missing_ok) sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; sub->retaindeadtuples = subform->subretaindeadtuples; + sub->maxconflretention = subform->submaxconflretention; + sub->retentionactive = subform->subretentionactive; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, @@ -598,3 +600,42 @@ GetSubscriptionRelations(Oid subid, bool not_ready) return res; } + +/* + * Update the dead tuple retention status for the given subscription. + */ +void +UpdateDeadTupleRetentionStatus(Oid subid, bool active) +{ + Relation rel; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + HeapTuple tup; + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for subscription %u", subid); + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set the subscription to disabled. */ + values[Anum_pg_subscription_subretentionactive - 1] = active; + replaces[Anum_pg_subscription_subretentionactive - 1] = true; + + /* Update the catalog */ + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + heap_freetuple(tup); + + table_close(rel, NoLock); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 1b3c5a55882..f2e8d6a3057 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1389,8 +1389,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subretaindeadtuples, subslotname, subsynccommit, - subpublications, suborigin) + subretaindeadtuples, submaxconflretention, subretentionactive, + subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4c01d21b2f3..f94c2c6db43 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -73,8 +73,9 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 -#define SUBOPT_LSN 0x00008000 -#define SUBOPT_ORIGIN 0x00010000 +#define SUBOPT_MAX_CONFLICT_RETENTION_DURATION 0x00008000 +#define SUBOPT_LSN 0x00010000 +#define SUBOPT_ORIGIN 0x00020000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -101,6 +102,7 @@ typedef struct SubOpts bool runasowner; bool failover; bool retaindeadtuples; + int32 maxconflretention; char *origin; XLogRecPtr lsn; } SubOpts; @@ -112,6 +114,7 @@ static void check_publications_origin(WalReceiverConn *wrconn, Oid *subrel_local_oids, int subrel_count, char *subname); static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); +static void notify_ineffective_max_retention(bool update_maxretention); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -169,6 +172,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) opts->retaindeadtuples = false; + if (IsSet(supported_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + opts->maxconflretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -323,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; opts->retaindeadtuples = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION) && + strcmp(defel->defname, "max_conflict_retention_duration") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MAX_CONFLICT_RETENTION_DURATION; + opts->maxconflretention = defGetInt32(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -580,7 +594,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_CONFLICT_RETENTION_DURATION | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -647,9 +662,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, stmt->subname))); } - /* Ensure that we can enable retain_dead_tuples */ - if (opts.retaindeadtuples) - CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + /* + * Ensure that the configurations for retain_dead_tuples and + * max_conflict_retention_duration is appropriate. + */ + CheckSubDeadTupleRetention(true, !opts.enabled, WARNING, + opts.retaindeadtuples, opts.retaindeadtuples, + true, opts.maxconflretention); if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) @@ -693,6 +712,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); values[Anum_pg_subscription_subretaindeadtuples - 1] = BoolGetDatum(opts.retaindeadtuples); + values[Anum_pg_subscription_submaxconflretention - 1] = + Int32GetDatum(opts.maxconflretention); + values[Anum_pg_subscription_subretentionactive - 1] = + Int32GetDatum(opts.retaindeadtuples); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1175,6 +1198,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_failover = false; bool update_two_phase = false; bool check_pub_rdt = false; + bool ineffective_maxconflretention = false; + bool update_maxretention = false; bool retain_dead_tuples; char *origin; Subscription *sub; @@ -1235,7 +1260,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_CONFLICT_RETENTION_DURATION | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1397,10 +1424,34 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES)) { + bool retention_active = sub->retentionactive; + values[Anum_pg_subscription_subretaindeadtuples - 1] = BoolGetDatum(opts.retaindeadtuples); replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true; + /* + * Update the retention status only when there is a change + * in the retain_dead_tuples option value. + * + * It might not be ideal to blindly mark retention as + * active upon enabling the retain_dead_tuples, when + * retention was previously ceased and the user toggles + * retain_dead_tuples without adjusting the publisher + * workload. However, since retention will be stopped + * again soon in such cases, and this approach offers a + * convenient way for the user to manually refresh the + * retention status, it is suitable for now. + */ + if (opts.retaindeadtuples != sub->retaindeadtuples) + { + values[Anum_pg_subscription_subretentionactive - 1] = + BoolGetDatum(opts.retaindeadtuples); + replaces[Anum_pg_subscription_subretentionactive - 1] = true; + + retention_active = opts.retaindeadtuples; + } + CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel); /* @@ -1421,8 +1472,10 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * Remind the user that enabling subscription will prevent * the accumulation of dead tuples. */ - if (opts.retaindeadtuples) - CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE); + CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE, + opts.retaindeadtuples, + retention_active, false, + sub->maxconflretention); /* * Notify the launcher to manage the replication slot for @@ -1434,6 +1487,20 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, check_pub_rdt = opts.retaindeadtuples; retain_dead_tuples = opts.retaindeadtuples; + + ineffective_maxconflretention = (!opts.retaindeadtuples && + sub->maxconflretention); + } + + if (IsSet(opts.specified_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + { + values[Anum_pg_subscription_submaxconflretention - 1] = + Int32GetDatum(opts.maxconflretention); + replaces[Anum_pg_subscription_submaxconflretention - 1] = true; + + update_maxretention = true; + ineffective_maxconflretention = (!retain_dead_tuples && + opts.maxconflretention); } if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) @@ -1453,6 +1520,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, origin = opts.origin; } + if (ineffective_maxconflretention) + notify_ineffective_max_retention(update_maxretention); + update_tuple = true; break; } @@ -1473,9 +1543,10 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * subscription in case it was disabled after creation. See * comments atop CheckSubDeadTupleRetention() for details. */ - if (sub->retaindeadtuples) - CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, - WARNING); + CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, + WARNING, sub->retaindeadtuples, + sub->retentionactive, false, + sub->maxconflretention); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); @@ -2468,38 +2539,62 @@ check_pub_dead_tuple_retention(WalReceiverConn *wrconn) * this setting can be adjusted after subscription creation. Without it, the * apply worker will simply skip conflict detection. * - * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an - * ERROR since users can only modify retain_dead_tuples for disabled - * subscriptions. And as long as the subscription is enabled promptly, it will - * not pose issues. + * Issue a WARNING or NOTICE if the subscription is disabled and the retention + * is active. Do not raise an ERROR since users can only modify + * retain_dead_tuples for disabled subscriptions. And as long as the + * subscription is enabled promptly, it will not pose issues. + * + * Issue a NOTICE to inform users that max_conflict_retention_duration is + * ineffective (See notify_ineffective_max_retention). */ void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, - int elevel_for_sub_disabled) + int elevel_for_sub_disabled, + bool retain_dead_tuples, bool retention_active, + bool check_max_retention, int max_retention) { Assert(elevel_for_sub_disabled == NOTICE || elevel_for_sub_disabled == WARNING); - if (check_guc && wal_level < WAL_LEVEL_REPLICA) + if (retain_dead_tuples && check_guc && wal_level < WAL_LEVEL_REPLICA) ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); - if (check_guc && !track_commit_timestamp) + if (retain_dead_tuples && check_guc && !track_commit_timestamp) ereport(WARNING, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), errhint("Consider setting \"%s\" to true.", "track_commit_timestamp")); - if (sub_disabled) + if (retain_dead_tuples && sub_disabled && retention_active) ereport(elevel_for_sub_disabled, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), (elevel_for_sub_disabled > NOTICE) ? errhint("Consider setting %s to false.", "retain_dead_tuples") : 0); + + if (!retain_dead_tuples && check_max_retention && max_retention) + notify_ineffective_max_retention(true); +} + +/* + * Report a NOTICE to inform users that max_conflict_retention_duration is + * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR + * is not issued because setting max_conflict_retention_duration causes no harm, + * even when it is ineffective. + */ +static void +notify_ineffective_max_retention(bool update_maxretention) +{ + ereport(NOTICE, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + update_maxretention + ? errmsg("max_conflict_retention_duration has no effect when retain_dead_tuples is disabled") + : errmsg("disabling retain_dead_tuples will render max_conflict_retention_duration ineffective")); } /* diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 37377f7eb63..add5fc1ad18 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -23,6 +23,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" #include "lib/dshash.h" @@ -43,6 +44,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -102,7 +104,7 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void update_conflict_slot_xmin(TransactionId new_xmin); /* @@ -152,6 +154,7 @@ get_subscription_list(void) sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); sub->retaindeadtuples = subform->subretaindeadtuples; + sub->retentionactive = subform->subretentionactive; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -1183,6 +1186,7 @@ ApplyLauncherMain(Datum main_arg) long wait_time = DEFAULT_NAPTIME_PER_CYCLE; bool can_advance_xmin = true; bool retain_dead_tuples = false; + bool retention_inactive = false; TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1214,17 +1218,6 @@ ApplyLauncherMain(Datum main_arg) { retain_dead_tuples = true; - /* - * Can't advance xmin of the slot unless all the subscriptions - * with retain_dead_tuples are enabled. This is required to - * ensure that we don't advance the xmin of - * CONFLICT_DETECTION_SLOT if one of the subscriptions is not - * enabled. Otherwise, we won't be able to detect conflicts - * reliably for such a subscription even though it has set the - * retain_dead_tuples option. - */ - can_advance_xmin &= sub->enabled; - /* * Create a replication slot to retain information necessary * for conflict detection such as dead tuples, commit @@ -1240,6 +1233,28 @@ ApplyLauncherMain(Datum main_arg) * subscription was enabled. */ CreateConflictDetectionSlot(); + + if (sub->retentionactive) + { + /* + * Can't advance xmin of the slot unless all the + * subscriptions actively retaining dead tuples are + * enabled. This is required to ensure that we don't + * advance the xmin of CONFLICT_DETECTION_SLOT if one of + * the subscriptions is not enabled. Otherwise, we won't + * be able to detect conflicts reliably for such a + * subscription even though it has set the + * retain_dead_tuples option. + */ + can_advance_xmin &= sub->enabled; + + /* + * Consider overall retention inactive only when all + * subscriptions with retain_dead_tuples enabled have + * marked it as inactive. + */ + retention_inactive = false; + } } if (!sub->enabled) @@ -1256,7 +1271,8 @@ ApplyLauncherMain(Datum main_arg) * required for conflict detection among all running apply * workers that enables retain_dead_tuples. */ - if (sub->retaindeadtuples && can_advance_xmin) + if (sub->retaindeadtuples && sub->retentionactive && + can_advance_xmin) compute_min_nonremovable_xid(w, &xmin); /* worker is running already */ @@ -1265,11 +1281,11 @@ ApplyLauncherMain(Datum main_arg) /* * Can't advance xmin of the slot unless all the workers - * corresponding to subscriptions with retain_dead_tuples are - * running, disabling the further computation of the minimum + * corresponding to subscriptions actively retaining dead tuples + * are running, disabling the further computation of the minimum * nonremovable xid. */ - if (sub->retaindeadtuples) + if (sub->retaindeadtuples && sub->retentionactive) can_advance_xmin = false; /* @@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg) sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID, - sub->retaindeadtuples)) + sub->retaindeadtuples && + sub->retentionactive)) { /* * We get here either if we failed to launch a worker @@ -1320,13 +1337,19 @@ ApplyLauncherMain(Datum main_arg) * that requires us to retain dead tuples. Otherwise, if required, * advance the slot's xmin to protect dead tuples required for the * conflict detection. + * + * However, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to cease retention, + * marking it as inactive, the new xmin will be set to + * InvalidTransactionId. We then update slot.xmin accordingly to + * permit the removal of dead tuples. */ if (MyReplicationSlot) { if (!retain_dead_tuples) ReplicationSlotDropAcquired(); - else if (can_advance_xmin) - advance_conflict_slot_xmin(xmin); + else if (can_advance_xmin || retention_inactive) + update_conflict_slot_xmin(xmin); } /* Switch back to original memory context. */ @@ -1402,17 +1425,17 @@ acquire_conflict_slot_if_exists(void) } /* - * Advance the xmin the replication slot used to retain information required + * Update the xmin the replication slot used to retain information required * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +update_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(!TransactionIdIsValid(new_xmin) || + TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ + /* Return if the xmin value of the slot cannot be updated */ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) return; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 22ad9051db3..29d0c9a6e45 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -190,6 +190,16 @@ * update_deleted is necessary, as the UPDATEs in remote transactions should be * ignored if their timestamp is earlier than that of the dead tuples. * + * If max_conflict_retention_duration is defined, one additional phase is + * involved: + * + * - RDT_STOP_CONFLICT_INFO_RETENTION: + * This phase is triggered when the wait time in either the + * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds + * max_conflict_retention_duration. During this phase, + * pg_subscription.subretentionactive is updated to false within a new + * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. + * * Note that advancing the non-removable transaction ID is not supported if the * publisher is also a physical standby. This is because the logical walsender * on the standby can only get the WAL replay position but there may be more @@ -373,7 +383,8 @@ typedef enum RDT_GET_CANDIDATE_XID, RDT_REQUEST_PUBLISHER_STATUS, RDT_WAIT_FOR_PUBLISHER_STATUS, - RDT_WAIT_FOR_LOCAL_FLUSH + RDT_WAIT_FOR_LOCAL_FLUSH, + RDT_STOP_CONFLICT_INFO_RETENTION, } RetainDeadTuplesPhase; /* @@ -415,6 +426,9 @@ typedef struct RetainDeadTuplesData * updated in final phase * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + long table_sync_wait_time; /* time spent waiting for table sync + * to finish */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -555,6 +569,9 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data); static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); +static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -3220,6 +3237,7 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, { TransactionId oldestxmin; ReplicationSlot *slot; + bool retention_active; /* * Return false if either dead tuples are not retained or commit timestamp @@ -3228,6 +3246,49 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, if (!MySubscription->retaindeadtuples || !track_commit_timestamp) return false; + /* + * Check whether the leader apply worker has stopped retaining information + * for detecting conflicts. + * + * Use the worker's oldest_nonremovable_xid instead of + * pg_subscription.subretentionactive to determine whether retention is + * active, as retention resumption might not be complete even when + * subretentionactive is set to true; this is because the launcher assigns + * the initial oldest_nonremovable_xid after the apply worker updates the + * catalog (see resume_conflict_info_retention). + */ + if (am_leader_apply_worker()) + { + retention_active = + TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid); + } + else + { + LogicalRepWorker *leader; + + /* + * Obtain the information from the leader apply worker as only the + * leader manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + leader = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + + SpinLockAcquire(&leader->relmutex); + retention_active = TransactionIdIsValid(leader->oldest_nonremovable_xid); + SpinLockRelease(&leader->relmutex); + LWLockRelease(LogicalRepWorkerLock); + } + + /* + * Return false if the leader apply worker has stopped retaining + * information for detecting conflicts. This implies that update_deleted + * can no longer be reliably detected. + */ + if (!retention_active) + return false; + /* * For conflict detection, we use the conflict slot's xmin value instead * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as @@ -3254,7 +3315,15 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, oldestxmin = slot->data.xmin; SpinLockRelease(&slot->mutex); - Assert(TransactionIdIsValid(oldestxmin)); + /* + * Return false if the conflict detection slot.xmin is set to + * InvalidTransactionId. This situation arises if the current worker is + * either a table synchronization or parallel apply worker, and the leader + * stopped retention immediately after checking the + * oldest_nonremovable_xid above. + */ + if (!TransactionIdIsValid(oldestxmin)) + return false; if (OidIsValid(localidxoid) && IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) @@ -4110,7 +4179,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Ensure to wake up when it's possible to advance the non-removable * transaction ID. */ - if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid) && + rdt_data.phase == RDT_GET_CANDIDATE_XID && rdt_data.xid_advance_interval) wait_time = Min(wait_time, rdt_data.xid_advance_interval); @@ -4325,6 +4395,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; + /* No need to advance if we have already stopped retaining */ + if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + return false; + return true; } @@ -4350,6 +4424,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, case RDT_WAIT_FOR_LOCAL_FLUSH: wait_for_local_flush(rdt_data); break; + case RDT_STOP_CONFLICT_INFO_RETENTION: + stop_conflict_info_retention(rdt_data); + break; } } @@ -4468,6 +4545,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, if (!status_received) return; + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) + return; + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) rdt_data->remote_wait_for = rdt_data->remote_nextxid; @@ -4549,6 +4633,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * have a WAL position greater than the rdt_data->remote_lsn. */ if (!AllTablesyncsReady()) + { + TimestampTz now; + + now = rdt_data->last_recv_time + ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Record the time spent waiting for table sync, it is needed for the + * timeout check in should_stop_conflict_info_retention(). + */ + rdt_data->table_sync_wait_time = + TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now); + + return; + } + + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) return; /* @@ -4594,12 +4699,67 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase. + */ +static void +stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + /* + * Do not update the catalog during an active transaction. The transaction + * may be started during change application, leading to a possible + * rollback of catalog updates if the application fails subsequently. + */ + if (IsTransactionState()) + return; + + StartTransactionCommand(); + /* - * Reset all data fields except those used to determine the timing for the - * next round of transaction ID advancement. We can even use - * flushpos_update_time in the next round to decide whether to get the - * latest flush position. + * Updating pg_subscription might involve TOAST table access, so ensure we + * have a valid snapshot. */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Set pg_subscription.subretentionactive to false */ + UpdateDeadTupleRetentionStatus(MySubscription->oid, false); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + MySubscription->name), + errdetail("The retention duration for information used in conflict detection has exceeded the maximum limit of %u ms.", + MySubscription->maxconflretention), + errhint("You might need to increase \"%s\".", + "max_conflict_retention_duration")); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + + reset_retention_data_fields(rdt_data); +} + +/* + * Reset all data fields of RetainDeadTuplesData except those used to + * determine the timing for the next round of transaction ID advancement. We + * can even use flushpos_update_time in the next round to decide whether to get + * the latest flush position. + */ +static void +reset_retention_data_fields(RetainDeadTuplesData *rdt_data) +{ rdt_data->phase = RDT_GET_CANDIDATE_XID; rdt_data->remote_lsn = InvalidXLogRecPtr; rdt_data->remote_oldestxid = InvalidFullTransactionId; @@ -4607,9 +4767,56 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) rdt_data->reply_time = 0; rdt_data->remote_wait_for = InvalidFullTransactionId; rdt_data->candidate_xid = InvalidTransactionId; + rdt_data->table_sync_wait_time = 0; +} + +/* + * Check whether conflict information retention should be stopped because the + * wait time has exceeded the maximum limit (max_conflict_retention_duration). + * + * If retention should be stopped, proceed to the + * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return + * false. + * + * Currently, the retention will not resume automatically unless user manually + * disables retain_dead_tuples and re-enables it after confirming that the + * replication slot has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rdt_data->candidate_xid)); + Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || + rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); + + if (!MySubscription->maxconflretention) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return if the wait time has not exceeded the maximum limit + * (max_conflict_retention_duration). The time spent waiting for table + * synchronization is not counted, as it's an infrequent operation. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + MySubscription->maxconflretention + + rdt_data->table_sync_wait_time)) + return false; + + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; /* process the next phase */ process_rdt_phase_transition(rdt_data, false); + + return true; } /* @@ -4621,8 +4828,8 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * 3 minutes which should be sufficient to avoid using CPU or network * resources without much benefit. * - * The interval is reset to a minimum value of 100ms once there is some - * activity on the node. + * The interval is reset to the lesser of 100ms and + * max_conflict_retention_duration once there is some activities on the node. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises. @@ -4642,6 +4849,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) */ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, max_interval); + + /* Ensure the wait time remains within the maximum limit */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxconflretention); } else { @@ -5462,7 +5673,7 @@ InitializeLogRepWorker(void) * logicalrep_worker_launch. */ if (am_leader_apply_worker() && - MySubscription->retaindeadtuples && + MySubscription->retaindeadtuples && MySubscription->retentionactive && !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) { ereport(LOG, @@ -5634,7 +5845,10 @@ DisableSubscriptionAndExit(void) * context. */ if (MySubscription->retaindeadtuples) - CheckSubDeadTupleRetention(false, true, WARNING); + CheckSubDeadTupleRetention(false, true, WARNING, + MySubscription->retaindeadtuples, + MySubscription->retentionactive, false, + MySubscription->maxconflretention); proc_exit(0); } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fc7a6639163..8d8bcf61075 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5048,6 +5048,7 @@ getSubscriptions(Archive *fout) int i_subenabled; int i_subfailover; int i_subretaindeadtuples; + int i_submaxconflretention; int i, ntups; @@ -5127,10 +5128,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 190000) appendPQExpBufferStr(query, - " s.subretaindeadtuples\n"); + " s.subretaindeadtuples,\n"); else appendPQExpBufferStr(query, - " false AS subretaindeadtuples\n"); + " false AS subretaindeadtuples,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " s.submaxconflretention\n"); + else + appendPQExpBuffer(query, + " 0 AS submaxconflretention\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5165,6 +5173,7 @@ getSubscriptions(Archive *fout) i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); + i_submaxconflretention = PQfnumber(res, "submaxconflretention"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5200,6 +5209,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); subinfo[i].subretaindeadtuples = (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); + subinfo[i].submaxconflretention = + atoi(PQgetvalue(res, i, i_submaxconflretention)); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5461,6 +5472,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subretaindeadtuples) appendPQExpBufferStr(query, ", retain_dead_tuples = true"); + if (subinfo->submaxconflretention) + appendPQExpBuffer(query, ", max_conflict_retention_duration = %d", subinfo->submaxconflretention); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index dde85ed156c..e6b94422af7 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -717,6 +717,7 @@ typedef struct _SubscriptionInfo bool subrunasowner; bool subfailover; bool subretaindeadtuples; + int submaxconflretention; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7a06af48842..b6d57d02778 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false}; + false, false, false, false}; if (pset.sversion < 100000) { @@ -6815,10 +6815,20 @@ describeSubscriptions(const char *pattern, bool verbose) ", subfailover AS \"%s\"\n", gettext_noop("Failover")); if (pset.sversion >= 190000) + { appendPQExpBuffer(&buf, ", subretaindeadtuples AS \"%s\"\n", gettext_noop("Retain dead tuples")); + appendPQExpBuffer(&buf, + ", submaxconflretention AS \"%s\"\n", + gettext_noop("Max conflict retention duration")); + + appendPQExpBuffer(&buf, + ", subretentionactive AS \"%s\"\n", + gettext_noop("Dead tuple retention active")); + } + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8b10f2313f3..2d7016fe717 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2321,7 +2321,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", + COMPLETE_WITH("binary", "disable_on_error", "failover", + "max_conflict_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3780,7 +3781,8 @@ match_previous_words(int pattern_id, /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", + "disable_on_error", "enabled", "failover", + "max_conflict_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 231ef84ec9a..fcc1ad173ca 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -81,6 +81,17 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subretaindeadtuples; /* True if dead tuples useful for * conflict detection are retained */ + int32 submaxconflretention; /* The maximum duration (in + * milliseconds) for which information + * useful for conflict detection can + * be retained */ + + bool subretentionactive; /* True if retain_dead_tuples is enabled + * and the retention duration has not + * exceeded + * max_conflict_retention_duration, when + * defined */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -136,6 +147,14 @@ typedef struct Subscription * to be synchronized to the standbys. */ bool retaindeadtuples; /* True if dead tuples useful for conflict * detection are retained */ + int32 maxconflretention; /* The maximum duration (in milliseconds) + * for which information useful for + * conflict detection can be retained */ + bool retentionactive; /* True if retain_dead_tuples is enabled + * and the retention duration has not + * exceeded + * max_conflict_retention_duration, when + * defined */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index f458447a0e5..02f97a547dd 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -92,4 +92,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active); + #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 9b288ad22a6..85f6f45d4fa 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -31,6 +31,10 @@ extern char defGetStreamingMode(DefElem *def); extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel); extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, - int elevel_for_sub_disabled); + int elevel_for_sub_disabled, + bool retain_dead_tuples, + bool retention_active, + bool check_max_retention, + int max_retention); #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7c0204dd6f4..b86c759394f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -94,6 +94,9 @@ typedef struct LogicalRepWorker * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to * decide when to advance the xmin value of the slot. + * + * This ID would be set to InvalidTransactionId if the apply worker has + * stopped retaining information useful for conflict detection. */ TransactionId oldest_nonremovable_xid; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a98c97f7616..c6b0784b253 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+------------------------------+------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -433,10 +433,36 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - max_conflict_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = foo); +ERROR: max_conflict_retention_duration requires an integer value +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = 1000); +NOTICE: max_conflict_retention_duration has no effect when retain_dead_tuples is disabled +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_conflict_retention_duration = 0); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Dead tuple retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+-----------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index f0f714fe747..9b2c489adaf 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -298,6 +298,22 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - max_conflict_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = foo); + +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = 1000); + +\dRs+ + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_conflict_retention_duration = 0); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; -- 2.51.0.windows.1