From bde71c8af4228f58179b5966e60c74ffbb086896 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 24 Apr 2025 20:38:20 +0800 Subject: [PATCH v30 3/7] Add a retain_conflict_info option to subscriptions This patch adds a subscription option allowing users to specify whether information on the subscriber, which is useful for detecting update_deleted conflicts, should be retained. The default setting is false. If set to true, the detection of update_deleted will be enabled, and an additional replication slot named pg_conflict_detection will be created on the subscriber to prevent conflict information from being removed. Note that if multiple subscriptions on one node enable this option, only one replication slot will be created. The logical launcher will create and maintain a replication slot named pg_conflict_detection only if any local subscription has the retain_conflict_info option enabled. Enabling retain_conflict_info is prohibited if the publisher is currently in recovery mode (operating as a standby server). Bump catalog version --- doc/src/sgml/catalogs.sgml | 12 ++ doc/src/sgml/logical-replication.sgml | 3 +- doc/src/sgml/ref/alter_subscription.sgml | 12 +- doc/src/sgml/ref/create_subscription.sgml | 29 ++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 151 +++++++++++++++-- src/backend/replication/logical/launcher.c | 51 ++++-- src/backend/replication/logical/worker.c | 16 +- src/bin/pg_dump/pg_dump.c | 18 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_upgrade/check.c | 52 +++++- src/bin/pg_upgrade/info.c | 25 ++- src/bin/pg_upgrade/pg_upgrade.h | 4 +- src/bin/pg_upgrade/t/004_subscription.pl | 48 ++++++ src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.in.c | 10 +- src/include/catalog/pg_subscription.h | 5 + src/test/regress/expected/subscription.out | 178 ++++++++++++--------- src/test/regress/sql/subscription.sql | 16 ++ 20 files changed, 512 insertions(+), 129 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index cbd4e40a320..0fb3894feb6 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8088,6 +8088,18 @@ SCRAM-SHA-256$<iteration count>:&l + + + subretainconflictinfo bool + + + If true, the detection of is + enabled and the information (e.g., dead tuples, commit timestamps, and + origins) on the subscriber that is still useful for conflict detection + is retained. + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f288c049a5c..9fcb7c0ff73 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2408,7 +2408,8 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER max_replication_slots must be set to at least the number of subscriptions expected to connect, - plus some reserve for table synchronization. + plus some reserve for table synchronization and one if + retain_conflict_info is enabled. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d007f..e5415c3150d 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -235,8 +235,9 @@ ALTER SUBSCRIPTION name RENAME TO < password_required, run_as_owner, origin, - failover, and - two_phase. + failover, + two_phase, and + retain_conflict_info. Only a superuser can set password_required = false. @@ -285,6 +286,13 @@ ALTER SUBSCRIPTION name RENAME TO < option is changed from true to false, the publisher will replicate the transactions again when they are committed. + + + If the retain_conflict_info + option is altered to false and no other subscription + has this option enabled, the additional replication slot that was created + to retain conflict information will be dropped. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index eec85cde880..b7ec696eb3c 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -437,6 +437,35 @@ CREATE SUBSCRIPTION subscription_name + + + retain_conflict_info (boolean) + + + Specifies whether the information (e.g., dead tuples, commit + timestamps, and origins) on the subscriber that is still useful for + conflict detection is retained. The default is + false. If set to true, the detection of + is enabled, and an + additional replication slot named + pg_conflict_detection will be + created on the subscriber to prevent the conflict information from + being removed. + + + + Note that the information useful for conflict detection is retained + only after the creation of the additional slot. You can verify the + existence of this slot by querying pg_replication_slots.conflicting + And even if multiple subscriptions on one node enable this option, + only one replication slot will be created. + + + + This option cannot be enabled if the publisher is also a physical standby. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032413e..39cfae43d6f 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->retainconflictinfo = subform->subretainconflictinfo; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 15efb02badb..637a312e198 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1383,7 +1383,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subretainconflictinfo, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 46d4e65da97..47ba22d28b8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -71,8 +72,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_RETAIN_CONFLICT_INFO 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +100,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool retainconflictinfo; char *origin; XLogRecPtr lsn; } SubOpts; @@ -107,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname); +static void check_conflict_info_retaintion(WalReceiverConn *wrconn, + bool retain_conflict_info); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + opts->retainconflictinfo = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_RETAIN_CONFLICT_INFO) && + strcmp(defel->defname, "retain_conflict_info") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_RETAIN_CONFLICT_INFO; + opts->retainconflictinfo = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -608,6 +625,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, errmsg("password_required=false is superuser-only"), errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser."))); + if (opts.retainconflictinfo && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("information for detecting conflicts cannot be fully retained when \"%s\" is disabled", + "track_commit_timestamp")); + /* * If built with appropriate switch, whine when regression-testing * conventions for subscription names are violated. @@ -670,6 +693,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subretainconflictinfo - 1] = + BoolGetDatum(opts.retainconflictinfo); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -724,6 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); + check_conflict_info_retaintion(wrconn, opts.retainconflictinfo); + /* * Set sync state based on if we were asked to do data copy or * not. @@ -1110,6 +1137,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_tuple = false; bool update_failover = false; bool update_two_phase = false; + bool retain_conflict_info = false; Subscription *sub; Form_pg_subscription form; bits32 supported_opts; @@ -1165,7 +1193,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1325,6 +1353,29 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_CONFLICT_INFO)) + { + values[Anum_pg_subscription_subretainconflictinfo - 1] = + BoolGetDatum(opts.retainconflictinfo); + replaces[Anum_pg_subscription_subretainconflictinfo - 1] = true; + + if (opts.retainconflictinfo && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("information for detecting conflicts cannot be fully retained when \"%s\" is disabled", + "track_commit_timestamp")); + + /* + * Notify the launcher to manage the replication slot for + * conflict detection. This ensures that replication slot + * is efficiently handled (created, updated, or dropped) + * in response to any configuration changes. + */ + ApplyLauncherWakeupAtCommit(); + + retain_conflict_info = opts.retainconflictinfo; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -1355,6 +1406,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, ApplyLauncherWakeupAtCommit(); update_tuple = true; + + /* + * The subscription might be initially created with + * connect=false and retain_conflict_info=true, meaning the + * remote server's status may not be checked. Ensure this + * check is conducted now. + */ + retain_conflict_info = sub->retainconflictinfo; break; } @@ -1369,6 +1428,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, CStringGetTextDatum(stmt->conninfo); replaces[Anum_pg_subscription_subconninfo - 1] = true; update_tuple = true; + + /* + * Since the remote server configuration might have changed, + * perform a check to ensure it permits enabling + * retain_conflict_info. + */ + retain_conflict_info = sub->retainconflictinfo; break; case ALTER_SUBSCRIPTION_SET_PUBLICATION: @@ -1568,14 +1634,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering the slot, if - * needed. + * Try to acquire the connection necessary either for modifying the slot + * or for checking if the remote server permits enabling + * retain_conflict_info. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (update_failover || update_two_phase) + if (update_failover || update_two_phase || retain_conflict_info) { bool must_use_password; char *err; @@ -1584,10 +1651,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ + /* + * Try to connect to the publisher, using the new connection string if + * available. + */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, - sub->name, &err); + wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo, + true, true, must_use_password, sub->name, + &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1596,9 +1667,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, - update_failover ? &opts.failover : NULL, - update_two_phase ? &opts.twophase : NULL); + check_conflict_info_retaintion(wrconn, retain_conflict_info); + + if (update_failover || update_two_phase) + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { @@ -2196,6 +2270,57 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, walrcv_clear_result(res); } +/* + * Check if the publisher's status permits enabling retain_conflict_info. + * + * Enabling retain_conflict_info is not allowed if the publisher's version is + * prior to PG18 or if the publisher is in recovery (operating as a standby + * server). + * + * Refer to the comments atop maybe_advance_nonremovable_xid() for detailed + * reasons. + */ +static void +check_conflict_info_retaintion(WalReceiverConn *wrconn, bool retain_conflict_info) +{ + WalRcvExecResult *res; + Oid RecoveryRow[1] = {BOOLOID}; + TupleTableSlot *slot; + bool isnull; + bool remote_in_recovery; + + if (!retain_conflict_info) + return; + + if (walrcv_server_version(wrconn) < 18000) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable retain_conflict_info if the publisher is running a version earlier than PostgreSQL 18.")); + + res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not obtain recovery progress from the publisher: %s", + res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + elog(ERROR, "failed to fetch tuple for the recovery progress"); + + remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull)); + + if (remote_in_recovery) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable retain_conflict_info if the publisher is in recovery.")); + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8cef4460848..07d686c6d8c 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -151,6 +151,7 @@ get_subscription_list(void) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); + sub->retainconflictinfo = subform->subretainconflictinfo; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -1156,6 +1157,7 @@ ApplyLauncherMain(Datum main_arg) MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; bool can_advance_xmin = true; + bool retain_conflict_info = false; FullTransactionId xmin = InvalidFullTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1177,16 +1179,26 @@ ApplyLauncherMain(Datum main_arg) long elapsed; /* - * Create the conflict slot before starting the worker to prevent - * it from unnecessarily maintaining its oldest_nonremovable_xid. + * Create a replication slot to retain information (e.g., dead + * tuples, commit timestamps, and origins) useful for conflict + * detection if any subscription requests it. Only advance xmin + * when all such subscriptions are enabled. */ - create_conflict_slot_if_not_exists(); + if (sub->retainconflictinfo) + { + retain_conflict_info = true; + can_advance_xmin &= sub->enabled; + + /* + * Create the conflict slot before starting the worker to + * prevent it from unnecessarily maintaining its + * oldest_nonremovable_xid. + */ + create_conflict_slot_if_not_exists(); + } if (!sub->enabled) - { - can_advance_xmin = false; continue; - } LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); @@ -1196,10 +1208,11 @@ ApplyLauncherMain(Datum main_arg) { /* * Collect non-removable transaction IDs from all apply - * workers to determine the xmin for advancing the replication - * slot used in conflict detection. + * workers that enabled retain_conflict_info. This determines + * the new xmin for advancing the replication slot used in + * conflict detection. */ - if (can_advance_xmin) + if (sub->retainconflictinfo && can_advance_xmin) { FullTransactionId nonremovable_xid; @@ -1225,7 +1238,8 @@ ApplyLauncherMain(Datum main_arg) * The worker has not yet started, so there is no valid * non-removable transaction ID available for advancement. */ - can_advance_xmin = false; + if (sub->retainconflictinfo) + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1262,7 +1276,7 @@ ApplyLauncherMain(Datum main_arg) * Maintain the xmin value of the replication slot for conflict * detection if needed. */ - if (sublist) + if (retain_conflict_info) { if (can_advance_xmin) advance_conflict_slot_xmin(xmin); @@ -1271,7 +1285,8 @@ ApplyLauncherMain(Datum main_arg) } /* - * Drop the slot if we're no longer retaining dead tuples. + * Drop the slot if we're no longer retaining information useful for + * conflict detection */ else if (slot_maybe_exist) { @@ -1307,7 +1322,7 @@ ApplyLauncherMain(Datum main_arg) } /* - * Create and acquire the replication slot used to retain dead tuples for + * Create and acquire the replication slot used to retain information for * conflict detection, if not yet. */ static void @@ -1349,7 +1364,7 @@ create_conflict_slot_if_not_exists(void) /* * Attempt to advance the xmin value of the replication slot used to retain - * dead tuples for conflict detection. + * information useful for conflict detection. */ static void advance_conflict_slot_xmin(FullTransactionId new_xmin) @@ -1358,7 +1373,9 @@ advance_conflict_slot_xmin(FullTransactionId new_xmin) FullTransactionId next_full_xid; Assert(MyReplicationSlot); - Assert(FullTransactionIdIsValid(new_xmin)); + + if (!FullTransactionIdIsValid(new_xmin)) + return; next_full_xid = ReadNextFullTransactionId(); @@ -1398,8 +1415,8 @@ advance_conflict_slot_xmin(FullTransactionId new_xmin) } /* - * Drop the replication slot used to retain dead tuples for conflict detection, - * if it exists. + * Drop the replication slot used to retain information useful for conflict + * detection, if it exists. */ static void drop_conflict_slot_if_exists(void) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4922104b018..471a95da3fb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4109,6 +4109,10 @@ static void maybe_advance_nonremovable_xid(RetainConflictInfoData *data, bool status_received) { + /* Exit early if retaining conflict information is not required */ + if (!MySubscription->retainconflictinfo) + return; + /* * It is sufficient to manage non-removable transaction ID for a * subscription by the main apply worker to detect update_deleted conflict @@ -4518,6 +4522,15 @@ maybe_reread_subscription(void) * worker won't restart if the streaming option's value is changed from * 'parallel' to any other value or the server decides not to stream the * in-progress transaction. + * + * Additionally, exit if the retain_conflict_info option is disabled. This + * is necessary to reset the oldest non-removable transaction ID and the + * state of advancement. Direct resetting could not work without a + * restart, as the worker might be in an intermediate state (e.g., waiting + * publisher status). If the option is re-enabled before the old publisher + * status is received, it could incorrectly use the old status in a new + * transaction ID advancement cycle, leading to premature advancement of + * the non-removable transaction ID. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || strcmp(newsub->name, MySubscription->name) != 0 || @@ -4527,7 +4540,8 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + (!newsub->retainconflictinfo && MySubscription->retainconflictinfo)) { if (am_parallel_apply_worker()) ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 105e917aa7b..0a6b43ccda5 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4960,6 +4960,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subretainconflictinfo; int i, ntups; @@ -5032,10 +5033,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBufferStr(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + + if (fout->remoteVersion >= 180000) + appendPQExpBufferStr(query, + " s.subretainconflictinfo\n"); + else + appendPQExpBufferStr(query, + " false AS subretainconflictinfo\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5069,6 +5077,7 @@ getSubscriptions(Archive *fout) i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); + i_subretainconflictinfo = PQfnumber(res, "subretainconflictinfo"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5102,6 +5111,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0); subinfo[i].subfailover = (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); + subinfo[i].subretainconflictinfo = + (strcmp(PQgetvalue(res, i, i_subretainconflictinfo), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5360,6 +5371,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subfailover) appendPQExpBufferStr(query, ", failover = true"); + if (subinfo->subretainconflictinfo) + appendPQExpBufferStr(query, ", retain_conflict_info = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index b426b5e4736..e9c9aa2f2f9 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -707,6 +707,7 @@ typedef struct _SubscriptionInfo bool subpasswordrequired; bool subrunasowner; bool subfailover; + bool subretainconflictinfo; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 18c2d652bb6..611bc14c1a2 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -629,7 +629,7 @@ check_and_dump_old_cluster(void) * Before that the logical slots are not upgraded, so we will not be * able to upgrade the logical replication clusters completely. */ - get_subscription_count(&old_cluster); + get_subscription_info(&old_cluster); check_old_cluster_subscription_state(); } @@ -2014,9 +2014,10 @@ check_new_cluster_logical_replication_slots(void) /* * check_new_cluster_subscription_configuration() * - * Verify that the max_active_replication_origins configuration specified is - * enough for creating the subscriptions. This is required to create the - * replication origin for each subscription. + * Verify that the max_active_replication_origins and max_replication_slots + * configurations specified are enough for creating the subscriptions. This is + * required to create the replication origin and the conflict detection slot + * for each subscription. */ static void check_new_cluster_subscription_configuration(void) @@ -2024,6 +2025,8 @@ check_new_cluster_subscription_configuration(void) PGresult *res; PGconn *conn; int max_active_replication_origins; + int max_replication_slots; + int nslots_on_old; /* Subscriptions and their dependencies can be migrated since PG17. */ if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) @@ -2049,6 +2052,31 @@ check_new_cluster_subscription_configuration(void) "subscriptions (%d) on the old cluster", max_active_replication_origins, old_cluster.nsubs); + PQclear(res); + + /* Return if no subscriptions enabled the retain_conflict_info option. */ + if (!old_cluster.sub_retain_conflict_info) + { + PQfinish(conn); + check_ok(); + return; + } + + res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings " + "WHERE name = 'max_replication_slots';"); + + if (PQntuples(res) != 1) + pg_fatal("could not determine parameter settings on new cluster"); + + nslots_on_old = count_old_cluster_logical_slots(); + + max_replication_slots = atoi(PQgetvalue(res, 0, 0)); + if (nslots_on_old + 1 > max_replication_slots) + pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " + "logical replication slots on the old cluster plus one additional slot required " + "for retaining conflict detection information (%d)", + max_replication_slots, nslots_on_old + 1); + PQclear(res); PQfinish(conn); @@ -2111,6 +2139,22 @@ check_old_cluster_for_valid_slots(void) "The slot \"%s\" has not consumed the WAL yet\n", slot->slotname); } + + /* + * The name "pg_conflict_detection" (defined as + * CONFLICT_DETECTION_SLOT) has been reserved for logical + * replication conflict detection since PG18. + */ + if (strcmp(slot->slotname, "pg_conflict_detection") == 0) + { + if (script == NULL && + (script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + fprintf(script, + "The slot name \"%s\" is reserved\n", + slot->slotname); + } } } diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 4b7a56f5b3b..69658595e0b 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -752,20 +752,33 @@ count_old_cluster_logical_slots(void) } /* - * get_subscription_count() + * get_subscription_info() * - * Gets the number of subscriptions in the cluster. + * Gets the information of subscriptions in the cluster. */ void -get_subscription_count(ClusterInfo *cluster) +get_subscription_info(ClusterInfo *cluster) { PGconn *conn; PGresult *res; + int i_nsub; + int i_retain_conflict_info; conn = connectToServer(cluster, "template1"); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription"); - cluster->nsubs = atoi(PQgetvalue(res, 0, 0)); + if (GET_MAJOR_VERSION(cluster->major_version) >= 1800) + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "COUNT(CASE WHEN subretainconflictinfo THEN 1 END) AS retain_conflict_info " + "FROM pg_catalog.pg_subscription"); + else + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "'f' AS retain_conflict_info " + "FROM pg_catalog.pg_subscription"); + + i_nsub = PQfnumber(res, "nsub"); + i_retain_conflict_info = PQfnumber(res, "retain_conflict_info"); + + cluster->nsubs = atoi(PQgetvalue(res, 0, i_nsub)); + cluster->sub_retain_conflict_info = (strcmp(PQgetvalue(res, 0, i_retain_conflict_info), "1") == 0); PQclear(res); PQfinish(conn); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 69c965bb7d0..352c8b6f376 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -302,6 +302,8 @@ typedef struct uint32 bin_version; /* version returned from pg_ctl */ const char *tablespace_suffix; /* directory specification */ int nsubs; /* number of subscriptions */ + bool sub_retain_conflict_info; /* whether a subscription enables + * retain_conflict_info. */ } ClusterInfo; @@ -441,7 +443,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, const char *new_pgdata); void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); -void get_subscription_count(ClusterInfo *cluster); +void get_subscription_info(ClusterInfo *cluster); /* option.c */ diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl index c545abf6581..dc6deed5557 100644 --- a/src/bin/pg_upgrade/t/004_subscription.pl +++ b/src/bin/pg_upgrade/t/004_subscription.pl @@ -87,6 +87,54 @@ $publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1"); $old_sub->start; $old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); + +# ------------------------------------------------------ +# Check that pg_upgrade fails when max_replication_slots configured in the new +# cluster is less than the number of logical slots in the old cluster + 1 when +# subscription's retain_conflict_info option is enabled. +# ------------------------------------------------------ +# It is sufficient to use disabled subscription to test upgrade failure. +$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1"); +$old_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (enabled = false, retain_conflict_info = true)" +); + +$old_sub->stop; + +$new_sub->append_conf('postgresql.conf', "max_replication_slots = 0"); + +# pg_upgrade will fail because the new cluster has insufficient +# max_replication_slots. +command_checks_all( + [ + 'pg_upgrade', + '--no-sync', + '--old-datadir' => $old_sub->data_dir, + '--new-datadir' => $new_sub->data_dir, + '--old-bindir' => $oldbindir, + '--new-bindir' => $newbindir, + '--socketdir' => $new_sub->host, + '--old-port' => $old_sub->port, + '--new-port' => $new_sub->port, + $mode, + '--check', + ], + 1, + [ + qr/"max_replication_slots" \(0\) must be greater than or equal to the number of logical replication slots on the old cluster plus one additional slot required for retaining conflict detection information \(1\)/ + ], + [qr//], + 'run of pg_upgrade where the new cluster has insufficient max_replication_slots' +); + +# Reset max_replication_slots +$new_sub->append_conf('postgresql.conf', "max_replication_slots = 10"); + +# Cleanup +$publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1"); +$old_sub->start; +$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); + # ------------------------------------------------------ # Check that pg_upgrade refuses to run if: # a) there's a subscription with tables in a state other than 'r' (ready) or diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 1d08268393e..43a6682a131 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6745,7 +6745,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6813,6 +6813,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 180000) + appendPQExpBuffer(&buf, + ", subretainconflictinfo AS \"%s\"\n", + gettext_noop("Retain conflict info")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index c916b9299a8..692a7d311cd 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2298,8 +2298,9 @@ match_previous_words(int pattern_id, /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_conflict_info", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3722,8 +3723,9 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_conflict_info", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 20fc329992d..0ac7c0b120c 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -78,6 +78,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subretainconflictinfo; /* True if information useful for + * conflict detection is retained */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +134,8 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool retainconflictinfo; /* True if information useful for conflict + * detection is retained */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 1443e1d9292..bff4cc051db 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,44 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - retain_conflict_info must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = foo); +ERROR: retain_conflict_info requires a Boolean value +-- ok - but a warning will occur because track_commit_timestamp is not enabled +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = true); +WARNING: information for detecting conflicts cannot be fully retained when "track_commit_timestamp" is disabled +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (retain_conflict_info = false); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain conflict info | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+----------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 007c9e70374..c65397e5ac6 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -287,6 +287,22 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - retain_conflict_info must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = foo); + +-- ok - but a warning will occur because track_commit_timestamp is not enabled +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_conflict_info = true); + +\dRs+ + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (retain_conflict_info = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; -- 2.30.0.windows.2