From 0676b9c1b425ea069a6c38816636034fc090662f Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Tue, 16 Dec 2025 09:16:26 +0530 Subject: [PATCH v13 2/2] pg_dump: dump conflict log table configuration for subscriptions Allow pg_dump to preserve the conflict_log_table setting of logical replication subscriptions. --- src/backend/commands/subscriptioncmds.c | 245 ++++++++++++++------- src/bin/pg_dump/pg_dump.c | 49 ++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/pg_dump_sort.c | 32 +++ src/bin/pg_dump/t/002_pg_dump.pl | 5 +- src/test/regress/expected/subscription.out | 15 +- src/test/regress/sql/subscription.sql | 8 + 7 files changed, 265 insertions(+), 90 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b044ed70a2a..569c1a5a76b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1242,6 +1242,148 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, table_close(rel, NoLock); } +/* + * AlterSubscriptionConflictLogTable + * + * Set, change, or remove the conflict log table associated with a + * subscription. + * + * If a conflict log table name is provided, this function validates the + * specified relation (or creates it if it does not exist) and records it + * as an internal dependency of the subscription. The table must be a + * permanent relation in a non temporary schema and must match the expected + * conflict log table definition. + * + * If the subscription already uses the specified conflict log table and the + * table still exists, no change is made and a NOTICE is emitted. + * + * Any previously associated conflict log table is removed by dropping the + * subscription's internal dependencies before associating a new table. + * + * Returns: + * NULL when the association has been removed. + * else conflict log table associated with the subscription. + */ +static char * +AlterSubscriptionConflictLogTable(Oid subid, char *conflictlogtable, + Oid *relnamespaceid) +{ + Oid nspid = InvalidOid; + Oid old_nspid = InvalidOid; + char *old_relname = NULL; + char *relname = NULL; + List *names = NIL; + char *nspname; + ObjectAddress object; + + if (conflictlogtable != NULL) + { + /* Explicitly check for empty string before any processing. */ + if (conflictlogtable[0] == '\0') + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("conflict log table name cannot be empty"), + errhint("Provide a valid table name or omit the parameter."))); + + names = stringToQualifiedNameList(conflictlogtable, NULL); + nspid = QualifiedNameGetCreationNamespace(names, &relname); + nspname = get_namespace_name(nspid); + } + + /* Fetch the existing conflict table information. */ + old_relname = get_subscription_conflict_log_table(subid, &old_nspid); + + /* + * If the subscription already uses this conflict log table and it exists, + * just issue a notice. + */ + if (old_relname != NULL && relname != NULL + && (strcmp(old_relname, relname) == 0) && + old_nspid == nspid && + OidIsValid(get_relname_relid(old_relname, old_nspid))) + { + ereport(NOTICE, + errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription", + nspname, relname)); + } + else + { + /* + * Conflict log tables are recorded as internal dependencies of the + * subscription. Before associating a new table, drop the existing + * table to avoid stale or orphaned relations. + * + * XXX: At present, only conflict log tables are managed this way. In + * future if we introduce additional internal dependencies, we may + * need a targeted deletion to avoid deletion of any other objects. + */ + ObjectAddressSet(object, SubscriptionRelationId, subid); + performDeletion(&object, DROP_CASCADE, + PERFORM_DELETION_INTERNAL | + PERFORM_DELETION_SKIP_ORIGINAL); + + /* Need to create a new table if a new name was provided. */ + if (relname != NULL) + { + Oid conflictlogrelid = get_relname_relid(relname, nspid); + + if (OidIsValid(conflictlogrelid)) + { + Relation conflictlogrel; + + /* + * Conflict log tables must be permanent relations. Disallow + * in temporary namespaces to ensure the same. + */ + if (isTempNamespace(nspid)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use conflict log table \"%s.%s\" of a temporary namespace", + nspname, relname), + errhint("Specify table from a permanent schema.")); + + conflictlogrel = table_open(conflictlogrelid, + RowExclusiveLock); + + if (conflictlogrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("conflict log table \"%s.%s\" must be a permanent table", + nspname, relname), + errhint("Specify a permanent table as the conflict log table.")); + + if (IsConflictLogTable(conflictlogrelid)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("conflict log table \"%s.%s\" cannot be used", + nspname, relname), + errdetail("The specified table is already registered for a different subscription."), + errhint("Specify a different conflict log table.")); + + if (!ValidateConflictLogTable(conflictlogrel)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("conflict log table \"%s.%s\" has an incompatible definition", + nspname, relname), + errdetail("The table does not match the required conflict log table structure."), + errhint("Create the conflict log table with the expected definition or specify a different table.")); + + table_close(conflictlogrel, NoLock); + + } + else + create_conflict_log_table(nspid, relname, subid); + } + } + + pfree(nspname); + if (old_relname != NULL) + pfree(old_relname); + + *relnamespaceid = nspid; + return relname; +} + /* * Marks all sequences with INIT state. */ @@ -1727,92 +1869,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_TABLE)) { - Oid nspid = InvalidOid; - Oid old_nspid = InvalidOid; - char *old_relname = NULL; - char *relname = NULL; - List *names = NIL; - - if (opts.conflictlogtable != NULL) - { - /* Explicitly check for empty string before any processing. */ - if (opts.conflictlogtable[0] == '\0') - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("conflict log table name cannot be empty"), - errhint("Provide a valid table name or omit the parameter."))); - - names = stringToQualifiedNameList(opts.conflictlogtable, - NULL); - nspid = QualifiedNameGetCreationNamespace(names, &relname); - } - - /* Fetch the existing conflict table information. */ - old_relname = - get_subscription_conflict_log_table(subid, &old_nspid); - - /* - * If the subscription already uses this conflict log table - * and it exists, just issue a notice. - */ - if (old_relname != NULL && relname != NULL - && (strcmp(old_relname, relname) == 0) && - old_nspid == nspid && - OidIsValid(get_relname_relid(old_relname, old_nspid))) - { - char *nspname = get_namespace_name(nspid); - - ereport(NOTICE, - (errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription", - nspname, relname))); - pfree(nspname); - } + char *relname; + Oid nspid; + char *conftable = opts.conflictlogtable; + + relname = AlterSubscriptionConflictLogTable(subid, + conftable, + &nspid); + values[Anum_pg_subscription_subconflictlognspid - 1] = + ObjectIdGetDatum(nspid); + + if (relname != NULL) + values[Anum_pg_subscription_subconflictlogtable - 1] = + CStringGetTextDatum(relname); else - { - ObjectAddress object; - - /* - * Conflict log tables are recorded as internal - * dependencies of the subscription. Before - * associating a new table, drop the existing table to - * avoid stale or orphaned relations. - * - * XXX: At present, only conflict log tables are - * managed this way. In future if we introduce - * additional internal dependencies, we may need - * a targeted deletion to avoid deletion of any - * other objects. - */ - ObjectAddressSet(object, SubscriptionRelationId, subid); - performDeletion(&object, DROP_CASCADE, - PERFORM_DELETION_INTERNAL | - PERFORM_DELETION_SKIP_ORIGINAL); - - /* - * Need to create a new table if a new name was - * provided. - */ - if (relname != NULL) - create_conflict_log_table(nspid, relname, subid); - - values[Anum_pg_subscription_subconflictlognspid - 1] = - ObjectIdGetDatum(nspid); - - if (relname != NULL) - values[Anum_pg_subscription_subconflictlogtable - 1] = - CStringGetTextDatum(relname); - else - nulls[Anum_pg_subscription_subconflictlogtable - 1] = - true; - - replaces[Anum_pg_subscription_subconflictlognspid - 1] = - true; - replaces[Anum_pg_subscription_subconflictlogtable - 1] = - true; - } + nulls[Anum_pg_subscription_subconflictlogtable - 1] = + true; - if (old_relname != NULL) - pfree(old_relname); + replaces[Anum_pg_subscription_subconflictlognspid - 1] = + true; + replaces[Anum_pg_subscription_subconflictlogtable - 1] = + true; } update_tuple = true; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 27f6be3f0f8..f85263b8e12 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5130,6 +5130,7 @@ getSubscriptions(Archive *fout) int i_subfailover; int i_subretaindeadtuples; int i_submaxretention; + int i_subconflictlogrelid; int i, ntups; @@ -5216,10 +5217,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 190000) appendPQExpBufferStr(query, - " s.submaxretention\n"); + " s.submaxretention,\n"); else appendPQExpBuffer(query, - " 0 AS submaxretention\n"); + " 0 AS submaxretention,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " c.oid AS subconflictlogrelid\n"); + else + appendPQExpBufferStr(query, + " 0::oid AS subconflictlogrelid\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5229,6 +5237,12 @@ getSubscriptions(Archive *fout) "LEFT JOIN pg_catalog.pg_replication_origin_status o \n" " ON o.external_id = 'pg_' || s.oid::text \n"); + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + "LEFT JOIN pg_class c ON c.relname = s.subconflictlogtable\n" + "LEFT JOIN pg_namespace n \n" + " ON n.oid = c.relnamespace AND n.oid = s.subconflictlognspid\n"); + appendPQExpBufferStr(query, "WHERE s.subdbid = (SELECT oid FROM pg_database\n" " WHERE datname = current_database())"); @@ -5255,6 +5269,7 @@ getSubscriptions(Archive *fout) i_subfailover = PQfnumber(res, "subfailover"); i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); i_submaxretention = PQfnumber(res, "submaxretention"); + i_subconflictlogrelid = PQfnumber(res, "subconflictlogrelid"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5292,6 +5307,19 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); subinfo[i].submaxretention = atoi(PQgetvalue(res, i, i_submaxretention)); + subinfo[i].subconflictlogrelid = + atooid(PQgetvalue(res, i, i_subconflictlogrelid)); + + if (subinfo[i].subconflictlogrelid != InvalidOid) + { + TableInfo *tableInfo = findTableByOid(subinfo[i].subconflictlogrelid); + + if (!tableInfo) + pg_fatal("could not find conflict log table with OID %u", + subinfo[i].subconflictlogrelid); + + addObjectDependency(&subinfo[i].dobj, tableInfo->dobj.dumpId); + } subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5564,6 +5592,23 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBufferStr(query, ");\n"); + if (subinfo->subconflictlogrelid != InvalidOid) + { + PQExpBuffer conflictlogbuf = createPQExpBuffer(); + TableInfo *tbinfo = findTableByOid(subinfo->subconflictlogrelid); + + appendStringLiteralAH(conflictlogbuf, + fmtQualifiedDumpable(tbinfo), + fout); + + appendPQExpBuffer(query, + "\n\nALTER SUBSCRIPTION %s SET (conflict_log_table = %s);\n", + qsubname, + conflictlogbuf->data); + + destroyPQExpBuffer(conflictlogbuf); + } + /* * In binary-upgrade mode, we allow the replication to continue after the * upgrade. diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 72a00e1bc20..20ffae491eb 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -719,6 +719,7 @@ typedef struct _SubscriptionInfo bool subfailover; bool subretaindeadtuples; int submaxretention; + Oid subconflictlogrelid; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index e2a4df4cf4b..f0bda51b993 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -1131,6 +1131,19 @@ repairTableAttrDefMultiLoop(DumpableObject *tableobj, addObjectDependency(attrdefobj, tableobj->dumpId); } +/* + * Because we make subscriptions depend on their conflict log tables, while + * there is an automatic dependency in the other direction, we need to break + * the loop. Remove the automatic dependency, allowing the table to be created + * first. + */ +static void +repairSubscriptionTableLoop(DumpableObject *subobj, DumpableObject *tableobj) +{ + /* Remove table's dependency on subscription */ + removeObjectDependency(tableobj, subobj->dumpId); +} + /* * CHECK, NOT NULL constraints on domains work just like those on tables ... */ @@ -1361,6 +1374,25 @@ repairDependencyLoop(DumpableObject **loop, return; } + /* + * Subscription and its Conflict Log Table + */ + if (nLoop == 2 && + loop[0]->objType == DO_TABLE && + loop[1]->objType == DO_SUBSCRIPTION) + { + repairSubscriptionTableLoop(loop[1], loop[0]); + return; + } + + if (nLoop == 2 && + loop[0]->objType == DO_SUBSCRIPTION && + loop[1]->objType == DO_TABLE) + { + repairSubscriptionTableLoop(loop[0], loop[1]); + return; + } + /* index on partitioned table and corresponding index on partition */ if (nLoop == 2 && loop[0]->objType == DO_INDEX && diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index e33aa95f6ff..ef11db6b8ee 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -3204,9 +3204,10 @@ my %tests = ( create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub3 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 - WITH (connect = false, origin = any, streaming = on);', + WITH (connect = false, origin = any, streaming = on, conflict_log_table = \'conflict\');', regexp => qr/^ - \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E + \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n + \QALTER SUBSCRIPTION sub3 SET (conflict_log_table = 'public.conflict');\E /xm, like => { %full_runs, section_post_data => 1, }, unlike => { diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index f96687e107c..665a8fcc0aa 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -630,8 +630,19 @@ SELECT * FROM pg_publication_tables WHERE pubname = 'pub'; DROP PUBLICATION pub; -- fail - set conflict_log_table to one already used by a different subscription ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); -ERROR: cannot create conflict log table "public.regress_conflict_log1" because a table with that name already exists -HINT: Use a different name for the conflict log table or drop the existing table. +ERROR: conflict log table "public.regress_conflict_log1" cannot be used +DETAIL: The specified table is already registered for a different subscription. +HINT: Specify a different conflict log table. +-- fail - conflict log table must be a permanent relation (UNLOGGED not allowed) +CREATE UNLOGGED TABLE public.regress_conflict_log_unlogged (id int); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log_unlogged'); +ERROR: conflict log table "public.regress_conflict_log_unlogged" must be a permanent table +HINT: Specify a permanent table as the conflict log table. +DROP TABLE public.regress_conflict_log_unlogged; +-- fail - conflict log table must not be in a temporary schema +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'pg_temp.regress_conflict_log1'); +ERROR: cannot create conflict log table "regress_conflict_log1" in a temporary namespace +HINT: Use a permanent schema. -- ok - dropping subscription also drops the log table ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 6b6f1503145..5dd31b5ed12 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -428,6 +428,14 @@ DROP PUBLICATION pub; -- fail - set conflict_log_table to one already used by a different subscription ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); +-- fail - conflict log table must be a permanent relation (UNLOGGED not allowed) +CREATE UNLOGGED TABLE public.regress_conflict_log_unlogged (id int); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log_unlogged'); +DROP TABLE public.regress_conflict_log_unlogged; + +-- fail - conflict log table must not be in a temporary schema +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'pg_temp.regress_conflict_log1'); + -- ok - dropping subscription also drops the log table ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); -- 2.43.0