From 828f133d7a4d7b672f210fb70054eb7e43e65e59 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Wed, 17 Dec 2025 11:53:47 +0530 Subject: [PATCH v15 1/6] Add configurable conflict log table for Logical Replication This patch adds a feature to provide a structured, queryable record of all logical replication conflicts. The current approach of logging conflicts as plain text in the server logs makes it difficult to query, analyze, and use for external monitoring and automation. This patch addresses these limitations by introducing a configurable conflict_log_destination=(log/table/all) option in the CREATE SUBSCRIPTION command. If user choose to log into the table the table will automatically created while creating the subscription with internal name i.e. conflict_log_table_$subid$. The table will be created in the current search path and table would be automatically dropped while dropping the subscription. The conflict details, including the original and remote tuples, are stored in JSON columns, providing a flexible format to accommodate different table schemas. The log table captures essential attributes such as local and remote transaction IDs, LSNs, commit timestamps, and conflict type, providing a complete record for post-mortem analysis. This feature will make logical replication conflicts easier to monitor and manage, significantly improving the overall resilience and operability of replication setups. The conflict log tables will not be included in a publication, even if the publication is configured to include ALL TABLES or ALL TABLES IN SCHEMA. --- src/backend/catalog/pg_publication.c | 26 +- src/backend/catalog/pg_subscription.c | 7 + src/backend/commands/subscriptioncmds.c | 337 ++++++++++++++++++++- src/bin/psql/describe.c | 21 +- src/bin/psql/tab-complete.in.c | 8 +- src/include/catalog/pg_subscription.h | 11 + src/include/commands/subscriptioncmds.h | 5 + src/include/replication/conflict.h | 50 +++ src/test/regress/expected/subscription.out | 328 ++++++++++++++------ src/test/regress/sql/subscription.sql | 109 +++++++ src/tools/pgindent/typedefs.list | 1 + 11 files changed, 799 insertions(+), 104 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 7aa3f179924..9f84e02b7ef 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -31,6 +31,7 @@ #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" +#include "commands/subscriptioncmds.h" #include "funcapi.h" #include "utils/array.h" #include "utils/builtins.h" @@ -85,6 +86,15 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); + + /* Can't be conflict log table */ + if (IsConflictLogTable(RelationGetRelid(targetrel))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(RelationGetNamespace(targetrel)), + RelationGetRelationName(targetrel)), + errdetail("This operation is not supported for conflict log tables."))); } /* @@ -145,6 +155,13 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) /* * Another variant of is_publishable_class(), taking a Relation. + * + * Note: Conflict log tables are not publishable. However, we intentionally + * skip this check here because this function is called for every change and + * performing this check during every change publication is costly. To ensure + * unpublishable entries are ignored without incurring performance overhead, + * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL + * flag. This allows the decoding layer to bypass these entries automatically. */ bool is_publishable_relation(Relation rel) @@ -169,7 +186,10 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); if (!HeapTupleIsValid(tuple)) PG_RETURN_NULL(); - result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)); + + /* Subscription conflict log tables are not published */ + result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) && + !IsConflictLogTable(relid); ReleaseSysCache(tuple); PG_RETURN_BOOL(result); } @@ -890,7 +910,9 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; + /* Subscription conflict log tables are not published */ if (is_publishable_class(relid, relForm) && + !IsConflictLogTable(relid) && !(relForm->relispartition && pubviaroot)) result = lappend_oid(result, relid); } @@ -1018,7 +1040,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) Oid relid = relForm->oid; char relkind; - if (!is_publishable_class(relid, relForm)) + if (!is_publishable_class(relid, relForm) || IsConflictLogTable(relid)) continue; relkind = get_rel_relkind(relid); diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ad6fbd77ffd..27a9aee1c56 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -106,6 +106,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->retaindeadtuples = subform->subretaindeadtuples; sub->maxretention = subform->submaxretention; sub->retentionactive = subform->subretentionactive; + sub->conflictrelid = subform->subconflictlogrelid; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, @@ -141,6 +142,12 @@ GetSubscription(Oid subid, bool missing_ok) Anum_pg_subscription_suborigin); sub->origin = TextDatumGetCString(datum); + /* Get conflict log destination */ + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_sublogdestination); + sub->logdestination = TextDatumGetCString(datum); + /* Is the subscription owner a superuser? */ sub->ownersuperuser = superuser_arg(sub->owner); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index abbcaff0838..05dcb5e3fe4 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -15,25 +15,30 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/heap.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/pg_am_d.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "commands/comment.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "funcapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" @@ -51,6 +56,7 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" +#include "utils/regproc.h" #include "utils/syscache.h" /* @@ -75,6 +81,7 @@ #define SUBOPT_MAX_RETENTION_DURATION 0x00008000 #define SUBOPT_LSN 0x00010000 #define SUBOPT_ORIGIN 0x00020000 +#define SUBOPT_CONFLICT_LOG_DESTINATION 0x00040000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -103,6 +110,7 @@ typedef struct SubOpts bool retaindeadtuples; int32 maxretention; char *origin; + ConflictLogDest logdest; XLogRecPtr lsn; } SubOpts; @@ -135,7 +143,8 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); - +static Oid create_conflict_log_table(Oid subid, char *subname, Oid namespaceId, + char *conflictrel); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -191,6 +200,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DESTINATION)) + opts->logdest = CONFLICT_LOG_DEST_LOG; /* Parse options */ foreach(lc, stmt_options) @@ -402,6 +413,28 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DESTINATION) && + strcmp(defel->defname, "conflict_log_destination") == 0) + { + char *val; + ConflictLogDest dest; + + if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DESTINATION)) + errorConflictingDefElem(defel, pstate); + + val = defGetString(defel); + + dest = GetLogDestination(val); + + if (dest == CONFLICT_LOG_DEST_INVALID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized conflict_log_destination value: \"%s\"", val), + errhint("Valid values are \"log\", \"table\", and \"all\"."))); + + opts->logdest = dest; + opts->specified_opts |= SUBOPT_CONFLICT_LOG_DESTINATION; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -612,7 +645,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | - SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN); + SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_DESTINATION); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -747,6 +781,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + /* Always set the destination, default will be log. */ + values[Anum_pg_subscription_sublogdestination - 1] = + CStringGetTextDatum(ConflictLogDestLabels[opts.logdest]); + + /* + * If the conflict log destination includes 'table', generate an internal + * name using the subscription OID and determine the target namespace based + * on the current search path. Store the namespace OID and the conflict log + * format in the pg_subscription catalog tuple., then physically create + * the table. + */ + if (opts.logdest == CONFLICT_LOG_DEST_TABLE || + opts.logdest == CONFLICT_LOG_DEST_ALL) + { + char conflict_table_name[NAMEDATALEN]; + Oid namespaceId, logrelid; + + GetConflictLogTableName(conflict_table_name, subid); + namespaceId = RangeVarGetCreationNamespace( + makeRangeVar(NULL, conflict_table_name, -1)); + + /* Store the Oid returned from creation. */ + logrelid = create_conflict_log_table(subid, stmt->subname, namespaceId, + conflict_table_name); + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(logrelid); + } + else + { + /* Destination is "log"; no table is needed. */ + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(InvalidOid); + } + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -1410,7 +1478,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_MAX_RETENTION_DURATION | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_DESTINATION); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1665,6 +1734,72 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, origin = opts.origin; } + if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DESTINATION)) + { + ConflictLogDest old_dest = + GetLogDestination(sub->logdestination); + + if (opts.logdest != old_dest) + { + bool want_table = + (opts.logdest == CONFLICT_LOG_DEST_TABLE || + opts.logdest == CONFLICT_LOG_DEST_ALL); + bool has_oldtable = + (old_dest == CONFLICT_LOG_DEST_TABLE || + old_dest == CONFLICT_LOG_DEST_ALL); + + values[Anum_pg_subscription_sublogdestination - 1] = + CStringGetTextDatum(ConflictLogDestLabels[opts.logdest]); + replaces[Anum_pg_subscription_sublogdestination - 1] = true; + + if (want_table && !has_oldtable) + { + char relname[NAMEDATALEN]; + Oid nspid; + Oid relid; + + GetConflictLogTableName(relname, subid); + nspid = RangeVarGetCreationNamespace(makeRangeVar( + NULL, relname, -1)); + + relid = create_conflict_log_table(subid, sub->name, + nspid, relname); + + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(relid); + replaces[Anum_pg_subscription_subconflictlogrelid - 1] = + true; + } + else if (!want_table && has_oldtable) + { + ObjectAddress object; + + /* + * Conflict log tables are recorded as internal + * dependencies of the subscription. Drop the + * table if it is not required anymore to avoid + * stale or orphaned relations. + * + * XXX: At present, only conflict log tables are + * managed this way. In future if we introduce + * additional internal dependencies, we may need + * a targeted deletion to avoid deletion of any + * other objects. + */ + ObjectAddressSet(object, SubscriptionRelationId, + subid); + performDeletion(&object, DROP_CASCADE, + PERFORM_DELETION_INTERNAL | + PERFORM_DELETION_SKIP_ORIGINAL); + + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(InvalidOid); + replaces[Anum_pg_subscription_subconflictlogrelid - 1] = + true; + } + } + } + update_tuple = true; break; } @@ -2027,6 +2162,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) Form_pg_subscription form; List *rstates; bool must_use_password; + ObjectAddress object; /* * The launcher may concurrently start a new worker for this subscription. @@ -2184,6 +2320,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); + /* + * Conflict log tables are recorded as internal dependencies of the + * subscription. We must drop the dependent objects before the + * subscription itself is removed. By using + * PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the conflict log + * table is reaped while the subscription remains for the final deletion + * step. + */ + ObjectAddressSet(object, SubscriptionRelationId, subid); + performDeletion(&object, DROP_CASCADE, + PERFORM_DELETION_INTERNAL | + PERFORM_DELETION_SKIP_ORIGINAL); + /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); @@ -3188,3 +3337,185 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Builds the TupleDesc for the conflict log table. + */ +static TupleDesc +create_conflict_log_table_tupdesc(void) +{ + TupleDesc tupdesc; + + tupdesc = CreateTemplateTupleDesc(MAX_CONFLICT_ATTR_NUM); + + for (int i = 0; i < MAX_CONFLICT_ATTR_NUM; i++) + { + Oid type_oid = ConflictLogSchema[i].atttypid; + + /* + * Special handling for the JSON array type for proper + * TupleDescInitEntry call. + */ + if (type_oid == JSONARRAYOID) + type_oid = get_array_type(JSONOID); + + TupleDescInitEntry(tupdesc, i + 1, + ConflictLogSchema[i].attname, + type_oid, + -1, 0); + } + + return BlessTupleDesc(tupdesc); +} + +/* + * Create conflict log table. + * + * The subscription owner becomes the owner of this table and has all + * privileges on it. + */ +static Oid +create_conflict_log_table(Oid subid, char *subname, Oid namespaceId, + char *conflictrel) +{ + TupleDesc tupdesc; + Oid relid; + char comment[256]; + ObjectAddress myself; + ObjectAddress subaddr; + + /* + * Conflict log tables must be permanent relations. Disallow creation in + * temporary namespaces to ensure the same. + */ + if (isTempNamespace(namespaceId)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not generate conflict log table \"%s\"", + conflictrel), + errdetail("Conflict log tables cannot be created in a temporary namespace."), + errhint("Ensure your 'search_path' is set to permanent schema."))); + + /* Report an error if the specified conflict log table already exists. */ + if (OidIsValid(get_relname_relid(conflictrel, namespaceId))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("could not generate conflict log table \"%s.%s\"", + get_namespace_name(namespaceId), conflictrel), + errdetail("A table with the internally generated name already exists."), + errhint("Drop the existing table or change your 'search_path' to use a different schema."))); + + /* Build the tuple descriptor for the new table. */ + tupdesc = create_conflict_log_table_tupdesc(); + + /* Create conflict log table. */ + relid = heap_create_with_catalog(conflictrel, + namespaceId, + 0, + InvalidOid, + InvalidOid, + InvalidOid, + GetUserId(), + HEAP_TABLE_AM_OID, + tupdesc, + NIL, + RELKIND_RELATION, + RELPERSISTENCE_PERMANENT, + false, + false, + ONCOMMIT_NOOP, + (Datum) 0, + false, + false, + false, + InvalidOid, + NULL); + + /* Add a comments for the conflict log table. */ + snprintf(comment, sizeof(comment), + "Conflict log table for subscription \"%s\"", subname); + CreateComments(relid, RelationRelationId, 0, comment); + + /* + * Establish an internal dependency between the conflict log table and + * the subscription. By using DEPENDENCY_INTERNAL, we ensure the table + * is automatically reaped when the subscription is dropped. This also + * prevents the table from being dropped independently unless the + * subscription itself is removed. + */ + ObjectAddressSet(myself, RelationRelationId, relid); + ObjectAddressSet(subaddr, SubscriptionRelationId, subid); + recordDependencyOn(&myself, &subaddr, DEPENDENCY_INTERNAL); + + /* Release tuple descriptor memory. */ + FreeTupleDesc(tupdesc); + + return relid; +} + +/* + * Format the standardized internal conflict log table name for a subscription + * + * Use the OID to prevent collisions during rename operations. + */ +void +GetConflictLogTableName(char *dest, Oid subid) +{ + snprintf(dest, NAMEDATALEN, "conflict_log_table_%u", subid); +} + +/* + * GetLogDestination + * + * Convert string to enum by comparing against standardized labels. + */ +ConflictLogDest +GetLogDestination(const char *dest) +{ + /* Empty string or NULL defaults to LOG. */ + if (dest == NULL || dest[0] == '\0') + return CONFLICT_LOG_DEST_LOG; + + for (int i = CONFLICT_LOG_DEST_LOG; i <= CONFLICT_LOG_DEST_ALL; i++) + { + if (pg_strcasecmp(dest, ConflictLogDestLabels[i]) == 0) + return (ConflictLogDest) i; + } + + /* Unrecognized string. */ + return CONFLICT_LOG_DEST_INVALID; +} + +/* + * Check if the specified relation is used as a conflict log table by any + * subscription. + */ +bool +IsConflictLogTable(Oid relid) +{ + Relation rel; + TableScanDesc scan; + HeapTuple tup; + bool is_clt = false; + + rel = table_open(SubscriptionRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* Direct Oid comparison from catalog */ + if (OidIsValid(subform->subconflictlogrelid) && + subform->subconflictlogrelid == relid) + { + is_clt = true; + break; + } + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + + return is_clt; +} diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 36f24502842..cc80f0f661c 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6806,7 +6806,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false, false, false}; + false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6900,15 +6900,22 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subskiplsn AS \"%s\"\n", gettext_noop("Skip LSN")); + + /* Conflict log destination is supported in v19 and higher */ + if (pset.sversion >= 190000) + { + appendPQExpBuffer(&buf, + ", sublogdestination AS \"%s\"\n", + gettext_noop("Conflict log destination")); + } } /* Only display subscriptions in current database. */ - appendPQExpBufferStr(&buf, - "FROM pg_catalog.pg_subscription\n" - "WHERE subdbid = (SELECT oid\n" - " FROM pg_catalog.pg_database\n" - " WHERE datname = pg_catalog.current_database())"); - + appendPQExpBuffer(&buf, + "FROM pg_catalog.pg_subscription " + "WHERE subdbid = (SELECT oid\n" + " FROM pg_catalog.pg_database\n" + " WHERE datname = pg_catalog.current_database())"); if (!validateSQLNamePattern(&buf, pattern, true, false, NULL, "subname", NULL, NULL, diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index b1ff6f6cd94..fd279d98b1b 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2344,8 +2344,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", - "max_retention_duration", "origin", + COMPLETE_WITH("binary", "conflict_log_destination", "disable_on_error", + "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3828,8 +3828,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("WITH ("); /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) - COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", + COMPLETE_WITH("binary", "conflict_log_destination", "connect", "copy_data", + "create_slot", "disable_on_error", "enabled", "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 55cb9b1eefa..55f4bfa0419 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -90,6 +90,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * exceeded max_retention_duration, when * defined */ + Oid subconflictlogrelid; /* Relid of the conflict log table. */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -103,6 +104,14 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW /* List of publications subscribed to */ text subpublications[1] BKI_FORCE_NOT_NULL; + /* + * Strategy for logging replication conflicts: + * log - server log only, + * table - internal table only, + * all - both log and table. + */ + text sublogdestination; + /* Only publish data originating from the specified origin */ text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY); #endif @@ -152,12 +161,14 @@ typedef struct Subscription * and the retention duration has not * exceeded max_retention_duration, when * defined */ + Oid conflictrelid; /* conflict log table Oid */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + char *logdestination; /* Conflict log destination */ } Subscription; #ifdef EXPOSE_TO_CLIENT_CODE diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index fb4e26a51a4..255e1e241b8 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/conflict.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -36,4 +37,8 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, bool retention_active, bool max_retention_set); +extern void GetConflictLogTableName(char *dest, Oid subid); +extern ConflictLogDest GetLogDestination(const char *dest); +extern bool IsConflictLogTable(Oid relid); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index c8fbf9e51b8..70f8744b381 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -10,6 +10,7 @@ #define CONFLICT_H #include "access/xlogdefs.h" +#include "catalog/pg_type.h" #include "nodes/pg_list.h" #include "utils/timestamp.h" @@ -79,6 +80,55 @@ typedef struct ConflictTupleInfo * conflicting local row occurred */ } ConflictTupleInfo; +/* + * Conflict log destination types. + * + * Internally we use these enum values for fast comparison, but we store + * the string equivalent in pg_subscription.sublogdestination. + */ +typedef enum ConflictLogDest +{ + CONFLICT_LOG_DEST_INVALID = 0, + CONFLICT_LOG_DEST_LOG, /* "log" (default) */ + CONFLICT_LOG_DEST_TABLE, /* "table" */ + CONFLICT_LOG_DEST_ALL /* "all" */ +} ConflictLogDest; + +/* + * Array mapping for converting internal enum to string. + */ +static const char *const ConflictLogDestLabels[] = { + [CONFLICT_LOG_DEST_LOG] = "log", + [CONFLICT_LOG_DEST_TABLE] = "table", + [CONFLICT_LOG_DEST_ALL] = "all" +}; + +/* Structure to hold metadata for one column of the conflict log table */ +typedef struct ConflictLogColumnDef +{ + const char *attname; /* Column name */ + Oid atttypid; /* Data type OID */ +} ConflictLogColumnDef; + +/* The single source of truth for the conflict log table schema */ +static const ConflictLogColumnDef ConflictLogSchema[] = +{ + { .attname = "relid", .atttypid = OIDOID }, + { .attname = "schemaname", .atttypid = TEXTOID }, + { .attname = "relname", .atttypid = TEXTOID }, + { .attname = "conflict_type", .atttypid = TEXTOID }, + { .attname = "remote_xid", .atttypid = XIDOID }, + { .attname = "remote_commit_lsn",.atttypid = LSNOID }, + { .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "remote_origin", .atttypid = TEXTOID }, + { .attname = "replica_identity", .atttypid = JSONOID }, + { .attname = "remote_tuple", .atttypid = JSONOID }, + { .attname = "local_conflicts", .atttypid = JSONARRAYOID } +}; + +/* Define the count using the array size */ +#define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0])) + extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 327d1e7731f..4ab58d90925 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | log (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | log (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -433,10 +433,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -450,19 +450,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) -- ok ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -517,7 +517,159 @@ COMMIT; -- ok, owning it is enough for this stuff ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG DESTINATION TESTS +-- +SET SESSION AUTHORIZATION 'regress_subscription_user'; +-- fail - unrecognized format value +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid'); +ERROR: unrecognized conflict_log_destination value: "invalid" +HINT: Valid values are "log", "table", and "all". +-- verify sublogdestination is 'log' and relid is 0 (InvalidOid) for default case +CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +SELECT subname, sublogdestination, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_log_default'; + subname | sublogdestination | subconflictlogrelid +------------------------------+-------------------+--------------------- + regress_conflict_log_default | log | 0 +(1 row) + +-- verify empty string defaults to 'log' +CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = ''); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +SELECT subname, sublogdestination, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_empty_str'; + subname | sublogdestination | subconflictlogrelid +----------------------------+-------------------+--------------------- + regress_conflict_empty_str | log | 0 +(1 row) + +-- this should generate an internal table named conflict_log_table_$subid$ +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +-- check metadata in pg_subscription: destination should be 'table' and relid valid +SELECT subname, sublogdestination, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + subname | sublogdestination | has_relid +------------------------+-------------------+----------- + regress_conflict_test1 | table | t +(1 row) + +-- verify the physical table exists and its OID matches subconflictlogrelid +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'conflict_log_table_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND c.oid = s.subconflictlogrelid; + count +------- + 1 +(1 row) + +-- check if the internal table has the correct schema (11 columns) +SELECT count(*) +FROM pg_attribute a +JOIN pg_class c ON a.attrelid = c.oid +JOIN pg_subscription s ON c.relname = 'conflict_log_table_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0; + count +------- + 11 +(1 row) + +-- ALTER: State transitions +-- transition from 'log' to 'all' +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); +-- verify metadata after ALTER (destination should be 'all') +SELECT subname, sublogdestination, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | sublogdestination | has_relid +------------------------+-------------------+----------- + regress_conflict_test2 | all | t +(1 row) + +-- transition from 'all' to 'table' (should NOT drop the table, only change destination string) +SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table'); +SELECT sublogdestination, subconflictlogrelid = :old_relid AS relid_unchanged +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + sublogdestination | relid_unchanged +-------------------+----------------- + table | t +(1 row) + +-- transition from 'table' to 'log' (should drop the table and clear relid) +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log'); +SELECT sublogdestination, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + sublogdestination | subconflictlogrelid +-------------------+--------------------- + log | 0 +(1 row) + +-- verify the physical table is gone +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'conflict_log_table_' || s.oid +WHERE s.subname = 'regress_conflict_test2'; + count +------- + 0 +(1 row) + +-- ensure drop table not allowed and DROP SUBSCRIPTION reaps the table +-- re-enable table logging for verification +ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table'); +-- fail - drop table not allowed due to internal dependency +-- use DO block to hide OID in error message +DO $$ +BEGIN + EXECUTE 'DROP TABLE ' || (SELECT 'conflict_log_table_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1'); +EXCEPTION WHEN dependent_objects_still_exist THEN + RAISE NOTICE 'captured expected error: dependent_objects_still_exist'; +END $$; +NOTICE: captured expected error: dependent_objects_still_exist +-- PUBLICATION: Verify internal tables are not publishable +-- pg_relation_is_publishable should return false for internal conflict log tables +SELECT pg_relation_is_publishable(subconflictlogrelid) +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + pg_relation_is_publishable +---------------------------- + f +(1 row) + +-- CLEANUP: Proper drop reaps the table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +-- Verify the table OID for reap check +SELECT 'conflict_log_table_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset +DROP SUBSCRIPTION regress_conflict_test1; +-- should return NULL, meaning the internal table was reaped via dependency +SELECT to_regclass(:'internal_tablename'); + to_regclass +------------- + +(1 row) + +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_log_default DISABLE; +ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_log_default; +ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE; +ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_empty_str; +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; RESET SESSION AUTHORIZATION; +DROP SCHEMA clt; +ERROR: schema "clt" does not exist DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index ef0c298d2df..3359ff8be5c 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -365,7 +365,116 @@ COMMIT; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG DESTINATION TESTS +-- + +SET SESSION AUTHORIZATION 'regress_subscription_user'; + +-- fail - unrecognized format value +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid'); + +-- verify sublogdestination is 'log' and relid is 0 (InvalidOid) for default case +CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +SELECT subname, sublogdestination, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_log_default'; + +-- verify empty string defaults to 'log' +CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = ''); +SELECT subname, sublogdestination, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_empty_str'; + +-- this should generate an internal table named conflict_log_table_$subid$ +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); + +-- check metadata in pg_subscription: destination should be 'table' and relid valid +SELECT subname, sublogdestination, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- verify the physical table exists and its OID matches subconflictlogrelid +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'conflict_log_table_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND c.oid = s.subconflictlogrelid; + +-- check if the internal table has the correct schema (11 columns) +SELECT count(*) +FROM pg_attribute a +JOIN pg_class c ON a.attrelid = c.oid +JOIN pg_subscription s ON c.relname = 'conflict_log_table_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0; + +-- ALTER: State transitions +-- transition from 'log' to 'all' +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); + +-- verify metadata after ALTER (destination should be 'all') +SELECT subname, sublogdestination, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- transition from 'all' to 'table' (should NOT drop the table, only change destination string) +SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table'); +SELECT sublogdestination, subconflictlogrelid = :old_relid AS relid_unchanged +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- transition from 'table' to 'log' (should drop the table and clear relid) +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log'); +SELECT sublogdestination, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- verify the physical table is gone +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'conflict_log_table_' || s.oid +WHERE s.subname = 'regress_conflict_test2'; + +-- ensure drop table not allowed and DROP SUBSCRIPTION reaps the table +-- re-enable table logging for verification +ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table'); + +-- fail - drop table not allowed due to internal dependency +-- use DO block to hide OID in error message +DO $$ +BEGIN + EXECUTE 'DROP TABLE ' || (SELECT 'conflict_log_table_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1'); +EXCEPTION WHEN dependent_objects_still_exist THEN + RAISE NOTICE 'captured expected error: dependent_objects_still_exist'; +END $$; + +-- PUBLICATION: Verify internal tables are not publishable +-- pg_relation_is_publishable should return false for internal conflict log tables +SELECT pg_relation_is_publishable(subconflictlogrelid) +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- CLEANUP: Proper drop reaps the table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); + +-- Verify the table OID for reap check +SELECT 'conflict_log_table_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset + +DROP SUBSCRIPTION regress_conflict_test1; + +-- should return NULL, meaning the internal table was reaped via dependency +SELECT to_regclass(:'internal_tablename'); + +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_log_default DISABLE; +ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_log_default; + +ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE; +ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_empty_str; + +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; + RESET SESSION AUTHORIZATION; +DROP SCHEMA clt; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 04845d5e680..21826be5bd7 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -500,6 +500,7 @@ ConditionVariableMinimallyPadded ConditionalStack ConfigData ConfigVariable +ConflictLogColumnDef ConflictTupleInfo ConflictType ConnCacheEntry -- 2.43.0