From b1c9387871c8eb765c412124f767feecce39f547 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 31 Jul 2025 06:18:08 -0400 Subject: [PATCH v8] Fix a deadlock during ALTER SUBSCRIPTION... DROP PUBLICATION When user drops a publication from a subscription, this will result in a publication refresh on the subscriber which will try and drop any pending origins. Meanwhile the apply worker could also be trying to cleanup origins. There could be a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId are not aligned between functions process_syncing_tables_for_apply() and AlterSubscription_refresh(). The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId in that order when dropping tracking origins. --- src/backend/catalog/pg_subscription.c | 33 +++++++++++++++++++++++++---- src/backend/replication/logical/tablesync.c | 27 ++++++++++++++++++++--- src/include/catalog/pg_subscription_rel.h | 2 ++ 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc915..68e32c4 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -287,8 +287,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, * Update the state of a subscription table. */ void -UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) +UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn, bool already_locked) { Relation rel; HeapTuple tup; @@ -296,9 +296,24 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; - LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + if (already_locked) + { +#ifdef USE_ASSERT_CHECKING + LOCKTAG tag; - rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId, + RowExclusiveLock, true)); + SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0); + Assert(LockHeldByMe(&tag, AccessShareLock, true)); +#endif + + rel = table_open(SubscriptionRelRelationId, NoLock); + } + else + { + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + } /* Try finding existing mapping. */ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, @@ -333,6 +348,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, } /* + * Update the state of a subscription table. + */ +void +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + UpdateSubscriptionRelStateEx(subid, relid, state, sublsn, false); +} + +/* * Get state of subscription table. * * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription. diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index c8893ff..1da5a7e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -426,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ListCell *lc; bool started_tx = false; bool should_exit = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -493,7 +494,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * worker to remove the origin tracking as if there is any * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. + * + * Lock the subscription and origin in the same order as we + * are doing during DDL commands to avoid deadlocks. See + * AlterSubscription_refresh. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + if (!rel) + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -503,9 +513,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* * Update the state to READY only after the origin cleanup. */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn); + UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn, true); } } else @@ -556,7 +566,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, NoLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -623,6 +640,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* Close table if opened */ + if (rel) + table_close(rel, NoLock); + if (started_tx) { /* diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 8244ad5..0afda88 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -86,6 +86,8 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); +extern void UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn, bool already_locked); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -- 1.8.3.1