From 615f51df20cb5c56ad7b2894d3e4e8b0d899abe8 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Mon, 21 Jul 2025 09:16:59 -0400 Subject: [PATCH v3] Fix a deadlock during ALTER SUBSCRIPTION... DROP PUBLICATION When user drops a publication from a subscription, this will result in a publication refresh on the subscriber which will try and drop any pending origins. Meanwhile the apply worker could also be trying to cleanup origins. There could be a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId are not aligned between functions process_syncing_tables_for_apply() and AlterSubscription_refresh(). The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId in that order when dropping tracking origins. --- src/backend/catalog/pg_subscription.c | 5 ++-- src/backend/commands/subscriptioncmds.c | 11 ++++++++- src/backend/replication/logical/tablesync.c | 37 +++++++++++++++++++++++++---- src/include/catalog/pg_subscription_rel.h | 2 +- 4 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index add51ca..518a25e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -324,7 +324,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, */ void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) + XLogRecPtr sublsn, bool lock_needed) { Relation rel; HeapTuple tup; @@ -332,7 +332,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; - LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + if (lock_needed) + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 334717c..c0df1e5 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1323,7 +1323,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { - Relation rel; + Relation rel, sub_rel; ObjectAddress myself; HeapTuple tup; Oid subid; @@ -1460,7 +1460,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * Note that the state can't change because we have already stopped both * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. + * + * Lock pg_subscription_rel with AccessExclusiveLock to prevent any + * deadlock with apply workers of other subscriptions trying + * to drop tracking origin. */ + sub_rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + rstates = GetSubscriptionNotReadyRelations(subid); foreach(lc, rstates) { @@ -1493,6 +1499,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); replorigin_drop_by_name(originname, true, false); + /* Once the origin tracking has been dropped, we can release lock */ + table_close(sub_rel, AccessExclusiveLock); + /* * Tell the cumulative stats system that the subscription is getting * dropped. diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e6159ac..9aee210 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -314,7 +314,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop @@ -340,7 +341,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * is dropped. So passing missing_ok = false. */ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); - finish_sync_worker(); } else @@ -379,6 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -470,7 +471,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * refresh for the subscription where we remove the table * state and its origin and by this time the origin might be * already removed. So passing missing_ok = true. + * + * Lock SubscriptionRelationId with AccessShareLock and + * take AccessExclusiveLock on SubscriptionRelRelationId to + * prevent any deadlocks with user concurrently performing + * refresh on the subscription. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -482,7 +493,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn); + rstate->lsn, false); + } } else @@ -533,7 +545,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, AccessExclusiveLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -593,6 +612,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* close and unlock table if opened*/ + if (rel) + { + table_close(rel, AccessExclusiveLock); + } + if (started_tx) { CommitTransactionCommand(); @@ -1310,7 +1335,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); CommitTransactionCommand(); pgstat_report_stat(true); @@ -1431,7 +1457,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, SUBREL_STATE_FINISHEDCOPY, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); CommitTransactionCommand(); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9df99c3..a1c5bb1 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -83,7 +83,7 @@ typedef struct SubscriptionRelState extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn); + XLogRecPtr sublsn, bool lock_needed); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -- 1.8.3.1