From a1d71b15ea385087ea21530e4262434d6c2f35ef Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Fri, 26 Dec 2025 20:18:13 +0530 Subject: [PATCH v17 5/6] Preserve conflict log destination for subscriptions Support pg_dump to dump and restore the conflict_log_destination setting for subscriptions. During a normal CREATE SUBSCRIPTION, a conflict log table is created automatically when required. However, during dump/restore or binary upgrade, the conflict log table may already exist and must be reused rather than recreated. To ensure correct behavior, pg_dump now emits an ALTER SUBSCRIPTION command after subscription creation to restore the conflict_log_destination setting. While dumping, pg_dump temporarily sets the search_path to the schema in which the conflict log table was created, ensuring that the conflict log table is resolved with the appropriate schema. --- src/backend/commands/subscriptioncmds.c | 143 ++++++++++++++++++------ src/bin/pg_dump/pg_dump.c | 47 +++++++- src/bin/pg_dump/pg_dump.h | 2 + src/bin/pg_dump/pg_dump_sort.c | 31 +++++ src/bin/pg_dump/t/002_pg_dump.pl | 5 +- 5 files changed, 188 insertions(+), 40 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index dcbf192be24..fb48d64801b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1378,6 +1378,109 @@ CheckAlterSubOption(Subscription *sub, const char *option, } } +/* + * AlterSubscriptionConflictLogDestination + * + * Update the conflict log table associated with a subscription when its + * conflict log destination is changed. + * + * If the new destination requires a conflict log table and none was previously + * required, this function validates an existing conflict log table identified + * by the subscription specific naming convention or creates a new one. + * + * If the new destination no longer requires a conflict log table, the existing + * conflict log table associated with the subscription is removed via internal + * dependency cleanup to prevent orphaned relations. + * + * The function enforces that any conflict log table used is a permanent + * relation in a permanent schema, matches the expected structure, and is not + * already associated with another subscription. + * + * On success, *conflicttablerelid is set to the OID of the conflict log table + * that was created or validated, or to InvalidOid if no table is required. + * + * Returns true if the subscription's conflict log table reference must be + * updated as a result of the destination change; false otherwise. + */ +static bool +AlterSubscriptionConflictLogDestination(Subscription *sub, + ConflictLogDest logdest, + Oid *conflicttablerelid) +{ + ConflictLogDest old_dest = GetLogDestination(sub->conflictlogdest); + bool want_table; + bool has_oldtable; + bool update_relid = false; + Oid relid = InvalidOid; + + want_table = IsSet(logdest, CONFLICT_LOG_DEST_TABLE); + has_oldtable = IsSet(old_dest, CONFLICT_LOG_DEST_TABLE); + + if (want_table && !has_oldtable) + { + char relname[NAMEDATALEN]; + + snprintf(relname, NAMEDATALEN, "conflict_log_table_%u", sub->oid); + + relid = get_relname_relid(relname, PG_CONFLICT_NAMESPACE); + if (OidIsValid(relid)) + { + Relation conflictlogrel; + char *nspname = get_namespace_name(PG_CONFLICT_NAMESPACE); + + conflictlogrel = table_open(relid, RowExclusiveLock); + if (conflictlogrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("conflict log table \"%s.%s\" must be a permanent table", + nspname, relname), + errhint("Specify a permanent table as the conflict log table.")); + + if (IsConflictLogTable(relid)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("conflict log table \"%s.%s\" cannot be used", + nspname, relname), + errdetail("The specified table is already registered for a different subscription."), + errhint("Specify a different conflict log table.")); + if (!ValidateConflictLogTable(conflictlogrel)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("conflict log table \"%s.%s\" has an incompatible definition", + nspname, relname), + errdetail("The table does not match the required conflict log table structure."), + errhint("Create the conflict log table with the expected definition or specify a different table.")); + + table_close(conflictlogrel, NoLock); + } + else + relid = create_conflict_log_table(sub->oid, sub->name); + update_relid = true; + } + else if (!want_table && has_oldtable) + { + ObjectAddress object; + + /* + * Conflict log tables are recorded as internal dependencies of the + * subscription. Drop the table if it is not required anymore to + * avoid stale or orphaned relations. + * + * XXX: At present, only conflict log tables are managed this way. In + * future if we introduce additional internal dependencies, we may + * need a targeted deletion to avoid deletion of any other objects. + */ + ObjectAddressSet(object, SubscriptionRelationId, sub->oid); + performDeletion(&object, DROP_CASCADE, + PERFORM_DELETION_INTERNAL | + PERFORM_DELETION_SKIP_ORIGINAL); + update_relid = true; + } + + *conflicttablerelid = relid; + return update_relid; +} + /* * Alter the existing subscription. */ @@ -1725,53 +1828,21 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (opts.logdest != old_dest) { - bool want_table = - IsSet(opts.logdest, CONFLICT_LOG_DEST_TABLE); - bool has_oldtable = - IsSet(old_dest, CONFLICT_LOG_DEST_TABLE); + bool update_relid; + Oid relid = InvalidOid; values[Anum_pg_subscription_subconflictlogdest - 1] = CStringGetTextDatum(ConflictLogDestNames[opts.logdest]); replaces[Anum_pg_subscription_subconflictlogdest - 1] = true; - if (want_table && !has_oldtable) + update_relid = AlterSubscriptionConflictLogDestination(sub, opts.logdest, &relid); + if (update_relid) { - Oid relid; - - relid = create_conflict_log_table(subid, sub->name); - values[Anum_pg_subscription_subconflictlogrelid - 1] = ObjectIdGetDatum(relid); replaces[Anum_pg_subscription_subconflictlogrelid - 1] = true; } - else if (!want_table && has_oldtable) - { - ObjectAddress object; - - /* - * Conflict log tables are recorded as internal - * dependencies of the subscription. Drop the - * table if it is not required anymore to avoid - * stale or orphaned relations. - * - * XXX: At present, only conflict log tables are - * managed this way. In future if we introduce - * additional internal dependencies, we may need a - * targeted deletion to avoid deletion of any - * other objects. - */ - ObjectAddressSet(object, SubscriptionRelationId, - subid); - performDeletion(&object, DROP_CASCADE, - PERFORM_DELETION_INTERNAL | - PERFORM_DELETION_SKIP_ORIGINAL); - - values[Anum_pg_subscription_subconflictlogrelid - 1] = - ObjectIdGetDatum(InvalidOid); - replaces[Anum_pg_subscription_subconflictlogrelid - 1] = - true; - } } } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 27f6be3f0f8..3bd2eef66e6 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5130,6 +5130,8 @@ getSubscriptions(Archive *fout) int i_subfailover; int i_subretaindeadtuples; int i_submaxretention; + int i_subconflictlogrelid; + int i_sublogdestination; int i, ntups; @@ -5216,10 +5218,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 190000) appendPQExpBufferStr(query, - " s.submaxretention\n"); + " s.submaxretention,\n"); else appendPQExpBuffer(query, - " 0 AS submaxretention\n"); + " 0 AS submaxretention,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " s.subconflictlogrelid, s.subconflictlogdest\n"); + else + appendPQExpBufferStr(query, + " NULL AS subconflictlogrelid, NULL AS subconflictlogdest\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5261,6 +5270,8 @@ getSubscriptions(Archive *fout) i_subpublications = PQfnumber(res, "subpublications"); i_suborigin = PQfnumber(res, "suborigin"); i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); + i_subconflictlogrelid = PQfnumber(res, "subconflictlogrelid"); + i_sublogdestination = PQfnumber(res, "subconflictlogdest"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -5309,6 +5320,33 @@ getSubscriptions(Archive *fout) else subinfo[i].suboriginremotelsn = pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn)); + if (PQgetisnull(res, i, i_subconflictlogrelid)) + subinfo[i].subconflictlogrelid = InvalidOid; + else + { + TableInfo *tableInfo; + + subinfo[i].subconflictlogrelid = + atooid(PQgetvalue(res, i, i_subconflictlogrelid)); + + if (subinfo[i].subconflictlogrelid) + { + tableInfo = findTableByOid(subinfo[i].subconflictlogrelid); + if (!tableInfo) + pg_fatal("could not find conflict log table with OID %u", + subinfo[i].subconflictlogrelid); + + addObjectDependency(&subinfo[i].dobj, tableInfo->dobj.dumpId); + + if (!dopt->binary_upgrade) + tableInfo->dobj.dump = DUMP_COMPONENT_NONE; + } + } + if (PQgetisnull(res, i, i_sublogdestination)) + subinfo[i].subconflictlogdest = NULL; + else + subinfo[i].subconflictlogdest = + pg_strdup(PQgetvalue(res, i, i_sublogdestination)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5564,6 +5602,11 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBufferStr(query, ");\n"); + appendPQExpBuffer(query, + "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = %s);\n", + qsubname, + subinfo->subconflictlogdest); + /* * In binary-upgrade mode, we allow the replication to continue after the * upgrade. diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 72a00e1bc20..c6273049f63 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -719,12 +719,14 @@ typedef struct _SubscriptionInfo bool subfailover; bool subretaindeadtuples; int submaxretention; + Oid subconflictlogrelid; char *subconninfo; char *subslotname; char *subsynccommit; char *subpublications; char *suborigin; char *suboriginremotelsn; + char *subconflictlogdest; } SubscriptionInfo; /* diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index e2a4df4cf4b..2f170cae70f 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -1131,6 +1131,19 @@ repairTableAttrDefMultiLoop(DumpableObject *tableobj, addObjectDependency(attrdefobj, tableobj->dumpId); } +/* + * Because we make subscriptions depend on their conflict log tables, while + * there is an automatic dependency in the other direction, we need to break + * the loop. Remove the automatic dependency, allowing the table to be created + * first. + */ +static void +repairSubscriptionTableLoop(DumpableObject *subobj, DumpableObject *tableobj) +{ + /* Remove table's dependency on subscription */ + removeObjectDependency(tableobj, subobj->dumpId); +} + /* * CHECK, NOT NULL constraints on domains work just like those on tables ... */ @@ -1361,6 +1374,24 @@ repairDependencyLoop(DumpableObject **loop, return; } + /* + * Subscription and its Conflict Log Table + */ + if (nLoop == 2 && + loop[0]->objType == DO_TABLE && + loop[1]->objType == DO_SUBSCRIPTION) + { + repairSubscriptionTableLoop(loop[1], loop[0]); + return; + } + if (nLoop == 2 && + loop[0]->objType == DO_SUBSCRIPTION && + loop[1]->objType == DO_TABLE) + { + repairSubscriptionTableLoop(loop[0], loop[1]); + return; + } + /* index on partitioned table and corresponding index on partition */ if (nLoop == 2 && loop[0]->objType == DO_INDEX && diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index e33aa95f6ff..1023bbf2d1d 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -3204,9 +3204,10 @@ my %tests = ( create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub3 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 - WITH (connect = false, origin = any, streaming = on);', + WITH (connect = false, origin = any, streaming = on, conflict_log_destination= table);', regexp => qr/^ - \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E + \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n + \QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = table);\E /xm, like => { %full_runs, section_post_data => 1, }, unlike => { -- 2.43.0