From a8fc7b1876055b4483b1a6070e08d115f6e9e230 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 23 Jul 2025 13:38:12 +0800 Subject: [PATCH v61 1/2] Introduce a 'max_conflict_retention_duration' option to subscriptions. This commit introduces a subscription option max_conflict_retention_duration, designed to prevent excessive accumulation of dead tuples when subscription with retain_dead_tuples enabled is present and the apply worker cannot catch up with the publisher's workload. If the time spent advancing non-removable transaction ID surpasses the max_conflict_retention_duration threshold, the apply worker would stop retaining information for conflict detection. The replication slot pg_conflict_detection.xmin will be set to InvalidTransactionId if all apply workers associated with the subscription, where retain_dead_tuples is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration. In this patch, a replication slot will not be automatically re-initialized. Users can disable retain_dead_tuples and re-enable it after confirming that the replication slot has been dropped. An upcoming patch will include support for automatic slot re-initialization once at least one apply worker confirms that the retention duration is within the max_conflict_retention_duration limit. To monitor worker's conflict retention status, this patch also introduces a new column 'dead_tuple_retention_active' in the pg_stat_subscription view. This column indicates whether the apply worker is effectively retaining conflict information. The value is set to true only if retain_dead_tuples is enabled for the associated subscription, and the retention duration for conflict detection by the apply worker has not exceeded max_conflict_retention_duration. --- doc/src/sgml/monitoring.sgml | 13 ++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 41 +++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 7 +- src/backend/commands/subscriptioncmds.c | 62 ++++++- src/backend/replication/logical/launcher.c | 45 +++-- src/backend/replication/logical/tablesync.c | 22 +-- src/backend/replication/logical/worker.c | 183 +++++++++++++++++-- src/bin/pg_dump/pg_dump.c | 18 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.in.c | 6 +- src/include/catalog/pg_proc.dat | 6 +- src/include/catalog/pg_subscription.h | 7 + src/include/replication/worker_internal.h | 10 +- src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/subscription.out | 186 +++++++++++--------- src/test/regress/sql/subscription.sql | 16 ++ src/test/subscription/t/035_conflicts.pl | 10 +- 20 files changed, 514 insertions(+), 138 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3f4a27a736e..e9d519bf382 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2114,6 +2114,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage sender; NULL for parallel apply workers + + + + dead_tuple_retention_active boolean + + + True if retain_dead_tuples + is enabled and the duration for which information useful for conflict + detection is retained by this apply worker does not exceed + max_conflict_retention_duration; NULL for + parallel apply workers and table synchronization workers. + + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index d48cdc76bd3..f2c2e147472 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -236,8 +236,9 @@ ALTER SUBSCRIPTION name RENAME TO < run_as_owner, origin, failover, - two_phase, and - retain_dead_tuples. + two_phase, + retain_dead_tuples, and + max_conflict_retention_duration. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 247c5bd2604..4190c190ae8 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -521,6 +521,47 @@ CREATE SUBSCRIPTION subscription_name + + + max_conflict_retention_duration (integer) + + + Maximum duration that the apply worker, according to this subscription, + is allowed to retain the information useful for conflict detection when + retain_dead_tuples is enabled for the associated + subscriptions. The default value is 0, indicating + that the information is retained until it is no longer needed for + detection purposes. This value is taken as milliseconds. + + + The information useful for conflict detection is no longer retained if + all apply workers associated with the subscriptions, where + retain_dead_tuples is enabled, confirm that the + retention duration exceeded the + max_conflict_retention_duration set within the + corresponding subscription. To re-enable retention, you can disable + retain_dead_tuples for all subscriptions and + re-enable it after confirming this replication slot has been dropped. + + + Note that overall retention will not stop if other subscriptions + specify a greater value and have not exceeded it, or if they set this + option to 0. + + + This option is effective only when + retain_conflict_info is enabled and the apply + worker associated with the subscription is active. + + + + Note that setting a non-zero value for this option could lead to + information for conflict detection being removed prematurely, + potentially missing some conflict detections. + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 244acf52f36..9da33cbcc28 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; sub->retaindeadtuples = subform->subretaindeadtuples; + sub->maxconflretention = subform->maxconflretention; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 1b3c5a55882..0c803ca6c43 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -996,7 +996,8 @@ CREATE VIEW pg_stat_subscription AS st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.dead_tuple_retention_active FROM pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL) st ON (st.subid = su.oid); @@ -1389,8 +1390,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subretaindeadtuples, subslotname, subsynccommit, - subpublications, suborigin) + subretaindeadtuples, maxconflretention, subslotname, + subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index faa3650d287..111151756fb 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -73,8 +73,9 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 -#define SUBOPT_LSN 0x00008000 -#define SUBOPT_ORIGIN 0x00010000 +#define SUBOPT_MAX_CONFLICT_RETENTION_DURATION 0x00008000 +#define SUBOPT_LSN 0x00010000 +#define SUBOPT_ORIGIN 0x00020000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -101,6 +102,7 @@ typedef struct SubOpts bool runasowner; bool failover; bool retaindeadtuples; + int32 maxconflretention; char *origin; XLogRecPtr lsn; } SubOpts; @@ -112,6 +114,7 @@ static void check_publications_origin(WalReceiverConn *wrconn, Oid *subrel_local_oids, int subrel_count, char *subname); static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); +static void notify_ineffective_max_conflict_retention(void); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -169,6 +172,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) opts->retaindeadtuples = false; + if (IsSet(supported_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + opts->maxconflretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -323,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; opts->retaindeadtuples = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION) && + strcmp(defel->defname, "max_conflict_retention_duration") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MAX_CONFLICT_RETENTION_DURATION; + opts->maxconflretention = defGetInt32(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -580,7 +594,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_CONFLICT_RETENTION_DURATION | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -651,6 +666,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.retaindeadtuples) CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + /* Notify that max_conflict_retention_duration is ineffective */ + else if (opts.maxconflretention) + notify_ineffective_max_conflict_retention(); + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) opts.slot_name = stmt->subname; @@ -693,6 +712,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); values[Anum_pg_subscription_subretaindeadtuples - 1] = BoolGetDatum(opts.retaindeadtuples); + values[Anum_pg_subscription_maxconflretention - 1] = + Int32GetDatum(opts.maxconflretention); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1175,6 +1196,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_failover = false; bool update_two_phase = false; bool check_pub_rdt = false; + bool ineffective_maxconflretention = false; bool retain_dead_tuples; char *origin; Subscription *sub; @@ -1235,7 +1257,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_CONFLICT_RETENTION_DURATION | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1434,6 +1458,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, check_pub_rdt = opts.retaindeadtuples; retain_dead_tuples = opts.retaindeadtuples; + + ineffective_maxconflretention = (!opts.retaindeadtuples && + sub->maxconflretention); + } + + if (IsSet(opts.specified_opts, SUBOPT_MAX_CONFLICT_RETENTION_DURATION)) + { + values[Anum_pg_subscription_maxconflretention - 1] = + Int32GetDatum(opts.maxconflretention); + replaces[Anum_pg_subscription_maxconflretention - 1] = true; + + ineffective_maxconflretention = (!retain_dead_tuples && + opts.maxconflretention); } if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) @@ -1453,6 +1490,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, origin = opts.origin; } + if (ineffective_maxconflretention) + notify_ineffective_max_conflict_retention(); + update_tuple = true; break; } @@ -2500,6 +2540,20 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, "retain_dead_tuples") : 0); } +/* + * Report a NOTICE to inform users that max_conflict_retention_duration is + * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR + * is not issued because setting max_conflict_retention_duration causes no harm, + * even when it is ineffective. + */ +static void +notify_ineffective_max_conflict_retention(void) +{ + ereport(NOTICE, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("max_conflict_retention_duration has no effect when retain_dead_tuples is disabled")); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 37377f7eb63..f1dfb51ccf8 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -23,6 +23,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" #include "lib/dshash.h" @@ -43,6 +44,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -102,7 +104,7 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void update_conflict_slot_xmin(TransactionId new_xmin); /* @@ -998,7 +1000,7 @@ ApplyLauncherShmemInit(void) LogicalRepWorker *worker = &LogicalRepCtx->workers[slot]; memset(worker, 0, sizeof(LogicalRepWorker)); - SpinLockInit(&worker->relmutex); + SpinLockInit(&worker->mutex); } } } @@ -1320,13 +1322,18 @@ ApplyLauncherMain(Datum main_arg) * that requires us to retain dead tuples. Otherwise, if required, * advance the slot's xmin to protect dead tuples required for the * conflict detection. + * + * However, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to cease retention, the + * new xmin will be set to InvalidTransactionId. We then update + * slot.xmin accordingly to permit the removal of dead tuples. */ if (MyReplicationSlot) { if (!retain_dead_tuples) ReplicationSlotDropAcquired(); else if (can_advance_xmin) - advance_conflict_slot_xmin(xmin); + update_conflict_slot_xmin(xmin); } /* Switch back to original memory context. */ @@ -1374,11 +1381,16 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) */ Assert(MyReplicationSlot); - SpinLockAcquire(&worker->relmutex); + SpinLockAcquire(&worker->mutex); nonremovable_xid = worker->oldest_nonremovable_xid; - SpinLockRelease(&worker->relmutex); + SpinLockRelease(&worker->mutex); - Assert(TransactionIdIsValid(nonremovable_xid)); + /* + * Skip collecting oldest_nonremovable_xid for workers that have stopped + * conflict retention. + */ + if (!TransactionIdIsValid(nonremovable_xid)) + return; if (!TransactionIdIsValid(*xmin) || TransactionIdPrecedes(nonremovable_xid, *xmin)) @@ -1402,17 +1414,17 @@ acquire_conflict_slot_if_exists(void) } /* - * Advance the xmin the replication slot used to retain information required + * Update the xmin the replication slot used to retain information required * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +update_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(!TransactionIdIsValid(new_xmin) || + TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ + /* Return if the xmin value of the slot cannot be updated */ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) return; @@ -1518,7 +1530,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 10 +#define PG_STAT_GET_SUBSCRIPTION_COLS 11 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1595,6 +1607,15 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) elog(ERROR, "unknown worker type"); } + /* + * Only the leader apply worker manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + if (!isParallelApplyWorker(&worker) && !isTablesyncWorker(&worker)) + values[10] = TransactionIdIsValid(worker.oldest_nonremovable_xid); + else + nulls[10] = true; + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d3356bc84ee..1ab5496f63f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -293,7 +293,7 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn) @@ -305,7 +305,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * UpdateSubscriptionRelState must be called within a transaction. @@ -390,7 +390,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) finish_sync_worker(); } else - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); } /* @@ -534,7 +534,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (syncworker) { /* Found one, update our copy of its state */ - SpinLockAcquire(&syncworker->relmutex); + SpinLockAcquire(&syncworker->mutex); rstate->state = syncworker->relstate; rstate->lsn = syncworker->relstate_lsn; if (rstate->state == SUBREL_STATE_SYNCWAIT) @@ -547,7 +547,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) syncworker->relstate_lsn = Max(syncworker->relstate_lsn, current_lsn); } - SpinLockRelease(&syncworker->relmutex); + SpinLockRelease(&syncworker->mutex); /* If we told worker to catch up, wait for it. */ if (rstate->state == SUBREL_STATE_SYNCWAIT) @@ -1342,10 +1342,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = relstate; MyLogicalRepWorker->relstate_lsn = relstate_lsn; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * If synchronization is already done or no longer necessary, exit now @@ -1428,10 +1428,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) goto copy_table_done; } - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* Update the state and make it visible to others. */ StartTransactionCommand(); @@ -1586,10 +1586,10 @@ copy_table_done: /* * We are done with the initial data synchronization, update the state. */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; MyLogicalRepWorker->relstate_lsn = *origin_startpos; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); /* * Finally, wait until the leader apply worker tells us to catch up and diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0fdc5de57ba..afa3f96ea3c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -415,6 +415,9 @@ typedef struct RetainDeadTuplesData * updated in final phase * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + long table_sync_wait_time; /* time spent waiting for table sync + * to finish */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -555,6 +558,8 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data); static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); +static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -3220,6 +3225,7 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, { TransactionId oldestxmin; ReplicationSlot *slot; + bool stop_retention; /* * Return false if either dead tuples are not retained or commit timestamp @@ -3228,6 +3234,42 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, if (!MySubscription->retaindeadtuples || !track_commit_timestamp) return false; + /* + * Check whether the leader apply worker has stopped retaining information + * for detecting conflicts. + */ + if (am_leader_apply_worker()) + { + stop_retention = + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid); + } + else + { + LogicalRepWorker *leader; + + /* + * Obtain the information from the leader apply worker as only the + * leader manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + leader = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + + SpinLockAcquire(&leader->mutex); + stop_retention = !TransactionIdIsValid(leader->oldest_nonremovable_xid); + SpinLockRelease(&leader->mutex); + LWLockRelease(LogicalRepWorkerLock); + } + + /* + * Return false if the leader apply worker has stopped retaining + * information for detecting conflicts. This implies that update_deleted + * can no longer be reliably detected. + */ + if (stop_retention) + return false; + /* * For conflict detection, we use the conflict slot's xmin value instead * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as @@ -3254,7 +3296,15 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, oldestxmin = slot->data.xmin; SpinLockRelease(&slot->mutex); - Assert(TransactionIdIsValid(oldestxmin)); + /* + * Return false if the conflict detection slot.xmin is set to + * InvalidTransactionId. This situation arises if the current worker is + * either a table synchronization or parallel apply worker, and the leader + * stopped retention immediately after checking the + * oldest_nonremovable_xid above. + */ + if (!TransactionIdIsValid(oldestxmin)) + return false; if (OidIsValid(localidxoid) && IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) @@ -4110,7 +4160,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Ensure to wake up when it's possible to advance the non-removable * transaction ID. */ - if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid) && + rdt_data.phase == RDT_GET_CANDIDATE_XID && rdt_data.xid_advance_interval) wait_time = Min(wait_time, rdt_data.xid_advance_interval); @@ -4325,6 +4376,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; + /* No need to advance if we have already stopped retaining */ + if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + return false; + return true; } @@ -4468,6 +4523,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, if (!status_received) return; + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) + return; + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) rdt_data->remote_wait_for = rdt_data->remote_nextxid; @@ -4549,6 +4611,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * have a WAL position greater than the rdt_data->remote_lsn. */ if (!AllTablesyncsReady()) + { + TimestampTz now; + + now = rdt_data->last_recv_time + ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Record the time spent waiting for table sync, it is needed for the + * timeout check in should_stop_conflict_info_retention(). + */ + rdt_data->table_sync_wait_time = + TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now); + + return; + } + + /* + * Stop retaining conflict information if required (See + * should_stop_conflict_info_retention() for details). + */ + if (should_stop_conflict_info_retention(rdt_data)) return; /* @@ -4583,9 +4666,9 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * transactions up to that position on the publisher have been applied and * flushed locally. So, we can advance the non-removable transaction ID. */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); + SpinLockAcquire(&MyLogicalRepWorker->mutex); MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; - SpinLockRelease(&MyLogicalRepWorker->relmutex); + SpinLockRelease(&MyLogicalRepWorker->mutex); elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", LSN_FORMAT_ARGS(rdt_data->remote_lsn), @@ -4594,12 +4677,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); - /* - * Reset all data fields except those used to determine the timing for the - * next round of transaction ID advancement. We can even use - * flushpos_update_time in the next round to decide whether to get the - * latest flush position. - */ + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Reset all data fields of RetainDeadTuplesData except those used to + * determine the timing for the next round of transaction ID advancement. We + * can even use flushpos_update_time in the next round to decide whether to get + * the latest flush position. + */ +static void +reset_retention_data_fields(RetainDeadTuplesData *rdt_data) +{ rdt_data->phase = RDT_GET_CANDIDATE_XID; rdt_data->remote_lsn = InvalidXLogRecPtr; rdt_data->remote_oldestxid = InvalidFullTransactionId; @@ -4607,9 +4699,68 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) rdt_data->reply_time = 0; rdt_data->remote_wait_for = InvalidFullTransactionId; rdt_data->candidate_xid = InvalidTransactionId; + rdt_data->table_sync_wait_time = 0; +} - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); +/* + * Check whether conflict information retention should be stopped because the + * wait time has exceeded the maximum limit (max_conflict_retention_duration). + * + * If retention should be stopped, set LogicalRepWorker->oldest_nonremovable_xid + * to InvalidTransactionId, notify the launcher to set the slot.xmin to + * InvalidTransactionId as well, and return true. Return false otherwise. + * + * Currently, the retention will not resume automatically unless user manually + * disables retain_dead_tuples and re-enables it after confirming that the + * replication slot has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rdt_data->candidate_xid)); + Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || + rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); + + if (!MySubscription->maxconflretention) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return if the wait time has not exceeded the maximum limit + * (max_conflict_retention_duration). The time spent waiting for table + * synchronization is not counted, as it's an infrequent operation. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + MySubscription->maxconflretention + + rdt_data->table_sync_wait_time)) + return false; + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", + MySubscription->name), + errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", + MySubscription->maxconflretention), + errhint("You might need to increase \"%s\".", + "max_conflict_retention_duration")); + + SpinLockAcquire(&MyLogicalRepWorker->mutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->mutex); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + + reset_retention_data_fields(rdt_data); + + return true; } /* @@ -4621,8 +4772,8 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * 3 minutes which should be sufficient to avoid using CPU or network * resources without much benefit. * - * The interval is reset to a minimum value of 100ms once there is some - * activity on the node. + * The interval is reset to the lesser of 100ms and + * max_conflict_retention_duration once there is some activities on the node. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises. @@ -4642,6 +4793,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) */ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, max_interval); + + /* Ensure the wait time remains within the maximum limit */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxconflretention); } else { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fc7a6639163..b993071eb78 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5048,6 +5048,7 @@ getSubscriptions(Archive *fout) int i_subenabled; int i_subfailover; int i_subretaindeadtuples; + int i_maxconflretention; int i, ntups; @@ -5127,10 +5128,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 190000) appendPQExpBufferStr(query, - " s.subretaindeadtuples\n"); + " s.subretaindeadtuples,\n"); else appendPQExpBufferStr(query, - " false AS subretaindeadtuples\n"); + " false AS subretaindeadtuples,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " s.maxconflretention\n"); + else + appendPQExpBuffer(query, + " 0 AS maxconflretention\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5165,6 +5173,7 @@ getSubscriptions(Archive *fout) i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); + i_maxconflretention = PQfnumber(res, "maxconflretention"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5200,6 +5209,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); subinfo[i].subretaindeadtuples = (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); + subinfo[i].maxconflretention = + atoi(PQgetvalue(res, i, i_maxconflretention)); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5461,6 +5472,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subretaindeadtuples) appendPQExpBufferStr(query, ", retain_dead_tuples = true"); + if (subinfo->maxconflretention) + appendPQExpBuffer(query, ", maxconflretention = %d", subinfo->maxconflretention); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index dde85ed156c..6c302177f4a 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -717,6 +717,7 @@ typedef struct _SubscriptionInfo bool subrunasowner; bool subfailover; bool subretaindeadtuples; + int maxconflretention; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7a06af48842..f2940fbf7a4 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false}; + false, false, false}; if (pset.sversion < 100000) { @@ -6815,10 +6815,16 @@ describeSubscriptions(const char *pattern, bool verbose) ", subfailover AS \"%s\"\n", gettext_noop("Failover")); if (pset.sversion >= 190000) + { appendPQExpBuffer(&buf, ", subretaindeadtuples AS \"%s\"\n", gettext_noop("Retain dead tuples")); + appendPQExpBuffer(&buf, + ", maxconflretention AS \"%s\"\n", + gettext_noop("Max conflict retention duration")); + } + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8b10f2313f3..2d7016fe717 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2321,7 +2321,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", + COMPLETE_WITH("binary", "disable_on_error", "failover", + "max_conflict_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3780,7 +3781,8 @@ match_previous_words(int pattern_id, /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", + "disable_on_error", "enabled", "failover", + "max_conflict_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 118d6da1ace..3810f3883b7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5696,9 +5696,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text,bool}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type,dead_tuple_retention_active}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 231ef84ec9a..e7ed9dafae4 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -81,6 +81,10 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subretaindeadtuples; /* True if dead tuples useful for * conflict detection are retained */ + int32 maxconflretention; /* The maximum duration (in milliseconds) + * for which information useful for + * conflict detection can be retained */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -136,6 +140,9 @@ typedef struct Subscription * to be synchronized to the standbys. */ bool retaindeadtuples; /* True if dead tuples useful for conflict * detection are retained */ + int32 maxconflretention; /* The maximum duration (in milliseconds) + * for which information useful for + * conflict detection can be retained */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7c0204dd6f4..9c0c2b8050c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -64,7 +64,12 @@ typedef struct LogicalRepWorker Oid relid; char relstate; XLogRecPtr relstate_lsn; - slock_t relmutex; + + /* + * Spinlock used to protect table synchronization information and the + * oldest_nonremovable_xid. + */ + slock_t mutex; /* * Used to create the changes and subxact files for the streaming @@ -94,6 +99,9 @@ typedef struct LogicalRepWorker * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to * decide when to advance the xmin value of the slot. + * + * This ID would be set to InvalidTransactionId if the apply worker has + * stopped retaining information useful for conflict detection. */ TransactionId oldest_nonremovable_xid; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 35e8aad7701..183fc193ad3 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2174,9 +2174,10 @@ pg_stat_subscription| SELECT su.oid AS subid, st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, - st.latest_end_time + st.latest_end_time, + st.dead_tuple_retention_active FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type, dead_tuple_retention_active) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a98c97f7616..16b173a7ea0 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist2 | 0/00012345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+------------------------------+------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | local | dbname=regress_doesnotexist2 | 0/00000000 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -433,10 +433,36 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - max_conflict_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = foo); +ERROR: max_conflict_retention_duration requires an integer value +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = 1000); +NOTICE: max_conflict_retention_duration has no effect when retain_dead_tuples is disabled +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_conflict_retention_duration = 0); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max conflict retention duration | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+---------------------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index f0f714fe747..9b2c489adaf 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -298,6 +298,22 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - max_conflict_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = foo); + +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_conflict_retention_duration = 1000); + +\dRs+ + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_conflict_retention_duration = 0); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 36aeb14c563..c1b8ede81cb 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -214,6 +214,10 @@ ok( $node_B->poll_query_until( ), "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); +my $result = $node_B->safe_psql('postgres', + "SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_BA';"); +is($result, qq(t), 'worker on node B retains conflict information'); + ################################################## # Check that the retain_dead_tuples option can be enabled only for disabled # subscriptions. Validate the NOTICE message during the subscription DDL, and @@ -254,6 +258,10 @@ ok( $node_A->poll_query_until( ), "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); +$result = $node_A->safe_psql('postgres', + "SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_AB';"); +is($result, qq(t), 'worker on node A retains conflict information'); + ################################################## # Check the WARNING when changing the origin to ANY, if retain_dead_tuples is # enabled. This warns of the possibility of receiving changes from origins @@ -281,7 +289,7 @@ $node_A->psql('postgres', $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); $node_A->wait_for_catchup($subname_BA); -my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); is($result, qq(1|1 2|2), 'check replicated insert on node B'); -- 2.50.1.windows.1