From 9c40e8474622208fec7a6ccc94da2d82fbb2999c Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 30 Jul 2025 02:17:44 -0400 Subject: [PATCH v7] Fix a deadlock during ALTER SUBSCRIPTION... DROP PUBLICATION When user drops a publication from a subscription, this will result in a publication refresh on the subscriber which will try and drop any pending origins. Meanwhile the apply worker could also be trying to cleanup origins. There could be a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId are not aligned between functions process_syncing_tables_for_apply() and AlterSubscription_refresh(). The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId in that order when dropping tracking origins. --- src/backend/catalog/pg_subscription.c | 37 +++++++++++++++++++++++++---- src/backend/replication/logical/tablesync.c | 27 ++++++++++++++++++--- src/include/catalog/pg_subscription_rel.h | 2 ++ 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 29fc421..288f970 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -281,8 +281,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, * Update the state of a subscription table. */ void -UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) +UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn, bool already_locked) { Relation rel; HeapTuple tup; @@ -290,9 +290,26 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; - LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); - - rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + if (already_locked) + { +#ifdef USE_ASSERT_CHECKING + LOCKTAG tag; +#endif + + Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId, + RowExclusiveLock, true)); + + rel = table_open(SubscriptionRelRelationId, NoLock); +#ifdef USE_ASSERT_CHECKING + SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0); + Assert(LockHeldByMe(&tag, AccessShareLock)); +#endif + } + else + { + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + } /* Try finding existing mapping. */ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, @@ -327,6 +344,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, } /* + * Update the state of a subscription table. + */ +void +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + UpdateSubscriptionRelStateEx(subid, relid, state, sublsn, false); +} + +/* * Get state of subscription table. * * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription. diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 26b71de..3e9de50 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -366,6 +366,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -463,7 +464,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * refresh for the subscription where we remove the table * state and its origin and by this time the origin might be * already removed. So passing missing_ok = true. + * + * Lock the subscription and origin in the same order as we + * are doing during DDL commands to avoid deadlocks. See + * AlterSubscription_refresh. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + if (!rel) + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -473,9 +483,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* * Update the state to READY only after the origin cleanup. */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn); + UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn, true); } } else @@ -526,7 +536,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, NoLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -586,6 +603,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* Close table if opened */ + if (rel) + table_close(rel, NoLock); + if (started_tx) { CommitTransactionCommand(); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index ed94f57..f68fa2d 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -85,6 +85,8 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); +extern void UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn, bool already_locked); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -- 1.8.3.1