From 12dea53d33cadd32550c93ba935307cde176a6fe Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Fri, 26 Dec 2025 20:22:08 +0530 Subject: [PATCH v17 6/6] Allow combined conflict_log_destination settings Extend conflict_log_destination handling to support combined destination specifications. Previously, only log, table, or all were accepted. This change allows combinations of them like log, table and all, log, table etc --- src/backend/catalog/pg_subscription.c | 2 +- src/backend/commands/subscriptioncmds.c | 90 +++++++++++++++------- src/backend/replication/logical/conflict.c | 6 +- src/bin/pg_dump/pg_dump.c | 44 +++++++---- src/bin/pg_dump/t/002_pg_dump.pl | 4 +- src/include/catalog/pg_subscription.h | 4 +- src/include/commands/subscriptioncmds.h | 5 +- src/include/replication/conflict.h | 9 --- src/test/regress/expected/subscription.out | 72 +++++++++-------- src/test/regress/sql/subscription.sql | 11 ++- 10 files changed, 157 insertions(+), 90 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 5a0e5db1c03..c33b8de5943 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -147,7 +147,7 @@ GetSubscription(Oid subid, bool missing_ok) datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subconflictlogdest); - sub->conflictlogdest = TextDatumGetCString(datum); + sub->conflictlogdest = textarray_to_stringlist(DatumGetArrayTypeP(datum)); /* Is the subscription owner a superuser? */ sub->ownersuperuser = superuser_arg(sub->owner); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index fb48d64801b..502d0814faf 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,6 +60,7 @@ #include "utils/pg_lsn.h" #include "utils/regproc.h" #include "utils/syscache.h" +#include "utils/varlena.h" /* * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION @@ -85,9 +86,6 @@ #define SUBOPT_ORIGIN 0x00020000 #define SUBOPT_CONFLICT_LOG_DEST 0x00040000 -/* check if the 'val' has 'bits' set */ -#define IsSet(val, bits) (((val) & (bits)) == (bits)) - /* * Structure to hold a bitmap representing the user-provided CREATE/ALTER * SUBSCRIPTION command options and the parsed/default values of each of them. @@ -418,14 +416,20 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, strcmp(defel->defname, "conflict_log_destination") == 0) { char *val; - ConflictLogDest dest; + List *dest; if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST)) errorConflictingDefElem(defel, pstate); val = defGetString(defel); - dest = GetLogDestination(val); - opts->logdest = dest; + if (!SplitIdentifierString(val, ',', &dest)) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid list syntax in parameter \"%s\"", + "conflict_log_destination")); + + opts->logdest = GetLogDestination(dest, false); + opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST; } else @@ -605,6 +609,30 @@ publicationListToArray(List *publist) return PointerGetDatum(arr); } +/* + * Build a text[] array representing the conflict_log_destination flags. + */ +static Datum +ConflictLogDestFlagsToArray(ConflictLogDest logdest) +{ + Datum datums[3]; + int ndatums = 0; + + if (IsSet(logdest, CONFLICT_LOG_DEST_ALL)) + datums[ndatums++] = CStringGetTextDatum("all"); + else + { + if (IsSet(logdest, CONFLICT_LOG_DEST_LOG)) + datums[ndatums++] = CStringGetTextDatum("log"); + + if (IsSet(logdest, CONFLICT_LOG_DEST_TABLE)) + datums[ndatums++] = CStringGetTextDatum("table"); + } + + return PointerGetDatum( + construct_array_builtin(datums, ndatums, TEXTOID)); +} + /* * Create new subscription. */ @@ -776,14 +804,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* Always set the destination, default will be 'log'. */ values[Anum_pg_subscription_subconflictlogdest - 1] = - CStringGetTextDatum(ConflictLogDestNames[opts.logdest]); + ConflictLogDestFlagsToArray(opts.logdest); /* * If logging to a table is required, physically create the logging * relation and store its OID in the catalog. */ - if (opts.logdest == CONFLICT_LOG_DEST_TABLE || - opts.logdest == CONFLICT_LOG_DEST_ALL) + if (IsSet(opts.logdest, CONFLICT_LOG_DEST_TABLE)) { Oid logrelid; @@ -1407,7 +1434,7 @@ AlterSubscriptionConflictLogDestination(Subscription *sub, ConflictLogDest logdest, Oid *conflicttablerelid) { - ConflictLogDest old_dest = GetLogDestination(sub->conflictlogdest); + ConflictLogDest old_dest = GetLogDestination(sub->conflictlogdest, true); bool want_table; bool has_oldtable; bool update_relid = false; @@ -1824,7 +1851,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST)) { ConflictLogDest old_dest = - GetLogDestination(sub->conflictlogdest); + GetLogDestination(sub->conflictlogdest, true); if (opts.logdest != old_dest) { @@ -1832,7 +1859,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, Oid relid = InvalidOid; values[Anum_pg_subscription_subconflictlogdest - 1] = - CStringGetTextDatum(ConflictLogDestNames[opts.logdest]); + ConflictLogDestFlagsToArray(opts.logdest); replaces[Anum_pg_subscription_subconflictlogdest - 1] = true; update_relid = AlterSubscriptionConflictLogDestination(sub, opts.logdest, &relid); @@ -3488,27 +3515,38 @@ create_conflict_log_table(Oid subid, char *subname) /* * GetLogDestination * - * Convert string to enum by comparing against standardized labels. + * Convert log destination List of strings to enums. */ ConflictLogDest -GetLogDestination(const char *dest) +GetLogDestination(List *destlist, bool strnodelist) { - /* Empty string or NULL defaults to LOG. */ - if (dest == NULL || dest[0] == '\0' || pg_strcasecmp(dest, "log") == 0) + ConflictLogDest logdest = 0; + ListCell *cell; + + if (destlist == NULL) return CONFLICT_LOG_DEST_LOG; - if (pg_strcasecmp(dest, - ConflictLogDestNames[CONFLICT_LOG_DEST_TABLE]) == 0) - return CONFLICT_LOG_DEST_TABLE; + foreach(cell, destlist) + { + char *name; - if (pg_strcasecmp(dest, ConflictLogDestNames[CONFLICT_LOG_DEST_ALL]) == 0) - return CONFLICT_LOG_DEST_ALL; + name = (strnodelist) ? strVal(lfirst(cell)) : (char *) lfirst(cell); - /* Unrecognized string. */ - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("unrecognized conflict_log_destination value: \"%s\"", dest), - errhint("Valid values are \"log\", \"table\", and \"all\"."))); + if (pg_strcasecmp(name, "log") == 0) + logdest |= CONFLICT_LOG_DEST_LOG; + else if (pg_strcasecmp(name, "table") == 0) + logdest |= CONFLICT_LOG_DEST_TABLE; + else if (pg_strcasecmp(name, "all") == 0) + logdest |= CONFLICT_LOG_DEST_ALL; + else + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized value for subscription parameter \"%s\": \"%s\"", + "conflict_log_destination", name), + errhint("Valid values are \"log\", \"table\", and \"all\".")); + } + + return logdest; } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index d7fe6e40b2f..6bf9cdb5730 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -170,7 +170,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, /* Insert to table if destination is 'table' or 'all' */ if (conflictlogrel) { - Assert((dest & CONFLICT_LOG_DEST_TABLE) != 0); + Assert(IsSet(dest, CONFLICT_LOG_DEST_TABLE)); if (ValidateConflictLogTable(conflictlogrel)) { @@ -197,7 +197,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, pgstat_report_subscription_conflict(MySubscription->oid, type); /* Decide what detail to show in server logs. */ - if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL) + if (IsSet(dest, CONFLICT_LOG_DEST_LOG) || IsSet(dest, CONFLICT_LOG_DEST_ALL)) { /* Standard reporting with full internal details. */ ereport(elevel, @@ -273,7 +273,7 @@ GetConflictLogTableInfo(ConflictLogDest *log_dest) * Convert the text log destination to the internal enum. MySubscription * already contains the data from pg_subscription. */ - *log_dest = GetLogDestination(MySubscription->conflictlogdest); + *log_dest = GetLogDestination(MySubscription->conflictlogdest, true); conflictlogrelid = MySubscription->conflictlogrelid; /* If destination is 'log' only, no table to open. */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 3bd2eef66e6..1b7704b8f57 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5522,10 +5522,10 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) DumpOptions *dopt = fout->dopt; PQExpBuffer delq; PQExpBuffer query; - PQExpBuffer publications; + PQExpBuffer namebuf; char *qsubname; - char **pubnames = NULL; - int npubnames = 0; + char **names = NULL; + int nnames = 0; int i; /* Do nothing if not dumping schema */ @@ -5545,19 +5545,22 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendStringLiteralAH(query, subinfo->subconninfo, fout); /* Build list of quoted publications and append them to query. */ - if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) + if (!parsePGArray(subinfo->subpublications, &names, &nnames)) pg_fatal("could not parse %s array", "subpublications"); - publications = createPQExpBuffer(); - for (i = 0; i < npubnames; i++) + namebuf = createPQExpBuffer(); + for (i = 0; i < nnames; i++) { if (i > 0) - appendPQExpBufferStr(publications, ", "); + appendPQExpBufferStr(namebuf, ", "); - appendPQExpBufferStr(publications, fmtId(pubnames[i])); + appendPQExpBufferStr(namebuf, fmtId(names[i])); } - appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data); + appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", namebuf->data); + resetPQExpBuffer(namebuf); + free(names); + if (subinfo->subslotname) appendStringLiteralAH(query, subinfo->subslotname, fout); else @@ -5602,10 +5605,25 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBufferStr(query, ");\n"); + /* + * Build list of quoted conflict log destinations and append them to + * query. + */ + if (!parsePGArray(subinfo->subconflictlogdest, &names, &nnames)) + pg_fatal("could not parse %s array", "conflict_log_destination"); + + for (i = 0; i < nnames; i++) + { + if (i > 0) + appendPQExpBufferStr(namebuf, ", "); + + appendPQExpBuffer(namebuf, "%s", names[i]); + } + appendPQExpBuffer(query, - "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = %s);\n", + "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = '%s');\n", qsubname, - subinfo->subconflictlogdest); + namebuf->data); /* * In binary-upgrade mode, we allow the replication to continue after the @@ -5663,8 +5681,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) NULL, subinfo->rolname, subinfo->dobj.catId, 0, subinfo->dobj.dumpId); - destroyPQExpBuffer(publications); - free(pubnames); + destroyPQExpBuffer(namebuf); + free(names); destroyPQExpBuffer(delq); destroyPQExpBuffer(query); diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 1023bbf2d1d..7f841359d9f 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -3204,10 +3204,10 @@ my %tests = ( create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub3 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 - WITH (connect = false, origin = any, streaming = on, conflict_log_destination= table);', + WITH (connect = false, origin = any, streaming = on, conflict_log_destination= \'log,table\');', regexp => qr/^ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n - \QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = table);\E + \QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = 'all');\E /xm, like => { %full_runs, section_post_data => 1, }, unlike => { diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 3328ff919f6..3a19fd081a4 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -108,7 +108,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * Strategy for logging replication conflicts: 'log' - server log only, * 'table' - internal table only, 'all' - both log and table. */ - text subconflictlogdest; + text subconflictlogdest[1] BKI_FORCE_NULL; /* Only publish data originating from the specified origin */ text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY); @@ -167,7 +167,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ - char *conflictlogdest; /* Conflict log destination */ + List *conflictlogdest; /* Conflict log destination */ } Subscription; #ifdef EXPOSE_TO_CLIENT_CODE diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index fc403cc4c5f..4a07170c827 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -19,6 +19,9 @@ #include "parser/parse_node.h" #include "replication/conflict.h" +/* check if the 'val' has 'bits' set */ +#define IsSet(val, bits) (((val) & (bits)) == (bits)) + extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel); @@ -37,7 +40,7 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, bool retention_active, bool max_retention_set); -extern ConflictLogDest GetLogDestination(const char *dest); +extern ConflictLogDest GetLogDestination(List *destlist, bool strnodelist); extern bool IsConflictLogTable(Oid relid); #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 694e0ba26ee..5440e3b986f 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -100,15 +100,6 @@ typedef enum ConflictLogDest CONFLICT_LOG_DEST_ALL = (CONFLICT_LOG_DEST_LOG | CONFLICT_LOG_DEST_TABLE) } ConflictLogDest; -/* - * Array mapping for converting internal enum to string. - */ -static const char *const ConflictLogDestNames[] = { - [CONFLICT_LOG_DEST_LOG] = "log", - [CONFLICT_LOG_DEST_TABLE] = "table", - [CONFLICT_LOG_DEST_ALL] = "all" -}; - /* Structure to hold metadata for one column of the conflict log table */ typedef struct ConflictLogColumnDef { diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 934fad364b2..4ca1e7f5546 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); @@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | log + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | {log} (1 row) -- ok - with lsn = NONE @@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0 List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | {log} (1 row) BEGIN; @@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off. List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination ---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | log + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | {log} (1 row) -- rename back to keep the rest simple @@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); @@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) DROP SUBSCRIPTION regress_testsub; @@ -282,7 +282,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); @@ -290,7 +290,7 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); @@ -299,7 +299,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) -- fail - publication already exists @@ -317,7 +317,7 @@ ERROR: publication "testpub1" is already in subscription "regress_testsub" List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) -- fail - publication used more than once @@ -335,7 +335,7 @@ ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (ref List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) DROP SUBSCRIPTION regress_testsub; @@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) -- we can alter streaming when two_phase enabled @@ -383,7 +383,7 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,7 +396,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); @@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -436,7 +436,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -453,7 +453,7 @@ HINT: To initiate replication, you must manually create the replication slot, e List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) -- ok @@ -462,7 +462,7 @@ ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log} (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -523,7 +523,7 @@ DROP SUBSCRIPTION regress_testsub; SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - unrecognized parameter value CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid'); -ERROR: unrecognized conflict_log_destination value: "invalid" +ERROR: unrecognized value for subscription parameter "conflict_log_destination": "invalid" HINT: Valid values are "log", "table", and "all". -- verify subconflictlogdest is 'log' and relid is 0 (InvalidOid) for default case CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); @@ -533,7 +533,7 @@ SELECT subname, subconflictlogdest, subconflictlogrelid FROM pg_subscription WHERE subname = 'regress_conflict_log_default'; subname | subconflictlogdest | subconflictlogrelid ------------------------------+--------------------+--------------------- - regress_conflict_log_default | log | 0 + regress_conflict_log_default | {log} | 0 (1 row) -- verify empty string defaults to 'log' @@ -544,11 +544,11 @@ SELECT subname, subconflictlogdest, subconflictlogrelid FROM pg_subscription WHERE subname = 'regress_conflict_empty_str'; subname | subconflictlogdest | subconflictlogrelid ----------------------------+--------------------+--------------------- - regress_conflict_empty_str | log | 0 + regress_conflict_empty_str | {log} | 0 (1 row) -- this should generate an internal table named conflict_log_table_$subid$ -CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log, table'); WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. -- check metadata in pg_subscription: destination should be 'table' and relid valid @@ -556,7 +556,7 @@ SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid FROM pg_subscription WHERE subname = 'regress_conflict_test1'; subname | subconflictlogdest | has_relid ------------------------+--------------------+----------- - regress_conflict_test1 | table | t + regress_conflict_test1 | {all} | t (1 row) -- verify the physical table exists and its OID matches subconflictlogrelid @@ -586,18 +586,28 @@ WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0; -- These tests verify the transition logic between different logging -- destinations, ensuring internal tables are created or dropped as expected. -- --- transition from 'log' to 'all' +-- transition from 'log' to 'log, table' -- a new internal conflict log table should be created CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log, table'); +-- verify metadata after ALTER (destination should be 'log, table') +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogdest | has_relid +------------------------+--------------------+----------- + regress_conflict_test2 | {all} | t +(1 row) + +-- transition from 'log, table' to 'all' ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); -- verify metadata after ALTER (destination should be 'all') SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2'; subname | subconflictlogdest | has_relid ------------------------+--------------------+----------- - regress_conflict_test2 | all | t + regress_conflict_test2 | {all} | t (1 row) -- transition from 'all' to 'table' @@ -608,7 +618,7 @@ SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged FROM pg_subscription WHERE subname = 'regress_conflict_test2'; subconflictlogdest | relid_unchanged --------------------+----------------- - table | t + {table} | t (1 row) -- transition from 'table' to 'log' @@ -618,7 +628,7 @@ SELECT subconflictlogdest, subconflictlogrelid FROM pg_subscription WHERE subname = 'regress_conflict_test2'; subconflictlogdest | subconflictlogrelid --------------------+--------------------- - log | 0 + {log} | 0 (1 row) -- verify the physical table is gone diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index c88fb405711..2491dc16c2a 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -385,7 +385,7 @@ SELECT subname, subconflictlogdest, subconflictlogrelid FROM pg_subscription WHERE subname = 'regress_conflict_empty_str'; -- this should generate an internal table named conflict_log_table_$subid$ -CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log, table'); -- check metadata in pg_subscription: destination should be 'table' and relid valid SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid @@ -411,9 +411,16 @@ WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0; -- destinations, ensuring internal tables are created or dropped as expected. -- --- transition from 'log' to 'all' +-- transition from 'log' to 'log, table' -- a new internal conflict log table should be created CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log, table'); + +-- verify metadata after ALTER (destination should be 'log, table') +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- transition from 'log, table' to 'all' ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); -- verify metadata after ALTER (destination should be 'all') -- 2.43.0