From 919359849b6df0e3e8d18316f26bc41b0580840f Mon Sep 17 00:00:00 2001 From: alterego665 <824662526@qq.com> Date: Wed, 30 Jul 2025 23:07:03 +0800 Subject: [PATCH v8] Optimize transaction waiting during logical decoding on hot standby servers. On hot standby servers, XactLockTableWait falls back to polling TransactionIdIsInProgress() with fixed 1ms sleeps when transactions from the primary have no local lock table entries, causing excessive CPU usage. Introduce a new waiting waiting path for hot-standby that uses the existing KnownAssignedXids infrastructure combined with a condition variable to replace the use of XactLockTableWait in standby, eliminating the polling loop. --- src/backend/replication/logical/snapbuild.c | 10 +- src/backend/storage/ipc/procarray.c | 208 +++++++++++++++++- src/backend/storage/lmgr/lmgr.c | 6 +- .../utils/activity/wait_event_names.txt | 1 + src/include/storage/procarray.h | 2 + 5 files changed, 218 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 8532bfd27e5..87e9f4725ff 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1451,7 +1451,15 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) if (TransactionIdFollows(xid, cutoff)) continue; - XactLockTableWait(xid, NULL, NULL, XLTW_None); + /* + * In primary, we use XactLockTableWait to wait for the transaction to finish. + * In standby, since xact lock table is not maintained, we use XidWaitInStandby + * to accomplish the same. + */ + if (!RecoveryInProgress()) + XactLockTableWait(xid, NULL, NULL, XLTW_None); + else + XidWaitInStandby(xid); } /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index bf987aed8d3..26174e9c31a 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -62,6 +62,7 @@ #include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" +#include "utils/hsearch.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -282,6 +283,11 @@ static TransactionId *KnownAssignedXids; static bool *KnownAssignedXidsValid; static TransactionId latestObservedXid = InvalidTransactionId; +/* + * Array of XIDs to wake up on standby + */ +static TransactionId *KnownAssignedXidsToWakeup; + /* * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is * the highest xid that might still be running that we don't have in @@ -306,6 +312,31 @@ static GlobalVisState GlobalVisTempRels; */ static TransactionId ComputeXidHorizonsResultLastXmin; +/* + * XID waiter hash table partition count + */ +#define NUM_XID_WAIT_PARTITIONS 16 + +/* + * Hash table entry for per-XID waiting on standby servers. + */ +typedef struct XidWaitEntry +{ + TransactionId xid; /* transaction ID being waited for */ + ConditionVariable cv; /* condition variable for this XID */ + pg_atomic_uint32 waiter_count; /* number of backends waiting */ +} XidWaitEntry; + +/* + * Global hash table for XID waiting. + * + * This hash table maps transaction IDs to XidWaitEntry structures, + * enabling efficient per-XID waiting during hot standby recovery. + * The table is partitioned to reduce lock contention and uses the + * same infrastructure as PostgreSQL's lock manager. + */ +static HTAB *XidWaitHash = NULL; + #ifdef XIDCACHE_DEBUG /* counters for XidCache measurement */ @@ -352,7 +383,7 @@ static bool KnownAssignedXidExists(TransactionId xid); static void KnownAssignedXidsRemove(TransactionId xid); static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, TransactionId *subxids); -static void KnownAssignedXidsRemovePreceding(TransactionId removeXid); +static int KnownAssignedXidsRemovePreceding(TransactionId removeXid); static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax); static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, @@ -369,6 +400,9 @@ static inline FullTransactionId FullXidRelativeTo(FullTransactionId rel, TransactionId xid); static void GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons); +static void WakeXidWaiters(TransactionId xid); +static void WakeAllXidWaiters(void); + /* * Report shared-memory space needed by ProcArrayShmemInit */ @@ -403,9 +437,11 @@ ProcArrayShmemSize(void) { size = add_size(size, mul_size(sizeof(TransactionId), - TOTAL_MAX_CACHED_SUBXIDS)); + 2 * TOTAL_MAX_CACHED_SUBXIDS)); size = add_size(size, mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS)); + size = add_size(size, + hash_estimate_size(MaxBackends * 2, sizeof(XidWaitEntry))); } return size; @@ -458,6 +494,26 @@ ProcArrayShmemInit(void) ShmemInitStruct("KnownAssignedXidsValid", mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS), &found); + KnownAssignedXidsToWakeup = (TransactionId *) + ShmemInitStruct("KnownAssignedXidsToWakeup", + mul_size(sizeof(TransactionId), + TOTAL_MAX_CACHED_SUBXIDS), + &found); + + /* Initialize XID waiter hash table for standby XID waiting */ + { + HASHCTL info; + + info.keysize = sizeof(TransactionId); + info.entrysize = sizeof(XidWaitEntry); + info.num_partitions = NUM_XID_WAIT_PARTITIONS; + + XidWaitHash = ShmemInitHash("XID Wait Hash", + MaxBackends, /* init_size */ + MaxBackends * 2, /* max_size */ + &info, + HASH_ELEM | HASH_BLOBS | HASH_PARTITION); + } } } @@ -1113,6 +1169,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) * throw them away before we apply the recovery snapshot. */ KnownAssignedXidsReset(); + WakeAllXidWaiters(); standbyState = STANDBY_INITIALIZED; } else @@ -1370,6 +1427,10 @@ ProcArrayApplyXidAssignment(TransactionId topxid, procArray->lastOverflowedXid = max_xid; LWLockRelease(ProcArrayLock); + + /* Wake up waiters for expired subtransactions */ + for (i = 0; i < nsubxids; i++) + WakeXidWaiters(subxids[i]); } /* @@ -4450,6 +4511,11 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, TransamVariables->xactCompletionCount++; LWLockRelease(ProcArrayLock); + + /* Wake up per-XID waiters */ + WakeXidWaiters(xid); + for (int i = 0; i < nsubxids; i++) + WakeXidWaiters(subxids[i]); } /* @@ -4462,7 +4528,7 @@ ExpireAllKnownAssignedTransactionIds(void) FullTransactionId latestXid; LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - KnownAssignedXidsRemovePreceding(InvalidTransactionId); + (void) KnownAssignedXidsRemovePreceding(InvalidTransactionId); /* Reset latestCompletedXid to nextXid - 1 */ Assert(FullTransactionIdIsValid(TransamVariables->nextXid)); @@ -4483,6 +4549,9 @@ ExpireAllKnownAssignedTransactionIds(void) */ procArray->lastOverflowedXid = InvalidTransactionId; LWLockRelease(ProcArrayLock); + + /* Wake all XID waiters since all transactions are being expired */ + WakeAllXidWaiters(); } /* @@ -4494,6 +4563,8 @@ void ExpireOldKnownAssignedTransactionIds(TransactionId xid) { TransactionId latestXid; + int i; + int count; LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -4513,8 +4584,12 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid) */ if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid)) procArray->lastOverflowedXid = InvalidTransactionId; - KnownAssignedXidsRemovePreceding(xid); + count = KnownAssignedXidsRemovePreceding(xid); LWLockRelease(ProcArrayLock); + + /* Wake XID waiters that have expired transactions they're waiting for */ + for (i = 0; i < count; i++) + WakeXidWaiters(KnownAssignedXidsToWakeup[i]); } /* @@ -4991,9 +5066,11 @@ KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid * then clear the whole table. * + * Returns the number of XIDs removed. + * * Caller must hold ProcArrayLock in exclusive mode. */ -static void +static int KnownAssignedXidsRemovePreceding(TransactionId removeXid) { ProcArrayStruct *pArray = procArray; @@ -5005,9 +5082,10 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid) if (!TransactionIdIsValid(removeXid)) { elog(DEBUG4, "removing all KnownAssignedXids"); + count = pArray->numKnownAssignedXids; pArray->numKnownAssignedXids = 0; pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0; - return; + return count; } elog(DEBUG4, "prune KnownAssignedXids to %u", removeXid); @@ -5031,6 +5109,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid) if (!StandbyTransactionIdIsPrepared(knownXid)) { KnownAssignedXidsValid[i] = false; + KnownAssignedXidsToWakeup[count] = knownXid; count++; } } @@ -5060,6 +5139,8 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid) /* Opportunistically compress the array */ KnownAssignedXidsCompress(KAX_PRUNE, true); + + return count; } /* @@ -5227,3 +5308,118 @@ KnownAssignedXidsReset(void) LWLockRelease(ProcArrayLock); } + +/* + * Wait for XID completion using condition variables. + * + * This function implements efficient waiting for transaction completion + * on standby servers by using a hash table of condition variables keyed + * by transaction ID. + * + * The function handles the complete lifecycle of waiting: finding or + * creating the hash entry, managing waiter counts, and cleaning up + * when the last waiter finishes. + * + * Note: This function is only meaningful during hot standby recovery. + * Primary servers should use the lock-based waiting mechanisms. + */ +void +XidWaitInStandby(TransactionId xid) +{ + XidWaitEntry *entry; + bool found; + uint32 hashcode; + TransactionId wait_xid; + + Assert(XidWaitHash); + Assert(TransactionIdIsValid(xid)); + Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny())); + + /* Quick exit if transaction already complete */ + if (!TransactionIdIsInProgress(xid)) + return; + + /* Always wait on the topmost transaction to avoid lost wake-ups */ + wait_xid = SubTransGetTopmostTransaction(xid); + + hashcode = get_hash_value(XidWaitHash, &wait_xid); + + entry = hash_search_with_hash_value(XidWaitHash, &wait_xid, hashcode, + HASH_ENTER, &found); + + if (!found) + { + /* Initialize new entry */ + entry->xid = wait_xid; + ConditionVariableInit(&entry->cv); + pg_atomic_init_u32(&entry->waiter_count, 0); + } + + pg_atomic_fetch_add_u32(&entry->waiter_count, 1); + + ConditionVariablePrepareToSleep(&entry->cv); + + /* Wait loop with condition re-checking */ + while (TransactionIdIsInProgress(xid)) + { + ConditionVariableSleep(&entry->cv, WAIT_EVENT_XACT_COMPLETE); + CHECK_FOR_INTERRUPTS(); + } + + ConditionVariableCancelSleep(); + + /* Decrement waiter count and cleanup if last waiter */ + if (pg_atomic_fetch_sub_u32(&entry->waiter_count, 1) == 1) + { + hash_search_with_hash_value(XidWaitHash, &wait_xid, hashcode, + HASH_REMOVE, NULL); + } +} + +/* + * Wake waiters for a specific XID. + * + * This function is called when a transaction completes on the primary + * server and we need to wake up any standby processes that were waiting + * for that specific transaction ID. + * + * Uses the hash table to locate waiters for the specified XID and + * broadcasts on the associated condition variable to wake all waiting + * backends simultaneously. + */ +static void +WakeXidWaiters(TransactionId xid) +{ + XidWaitEntry *entry; + uint32 hashcode; + + Assert(XidWaitHash); + + hashcode = get_hash_value(XidWaitHash, &xid); + + entry = hash_search_with_hash_value(XidWaitHash, &xid, hashcode, + HASH_FIND, NULL); + if (entry) + ConditionVariableBroadcast(&entry->cv); +} + +/* + * Wake all XID waiters. + * + * This function wakes up all backends waiting on any transaction ID. + * It is primarily used during standby promotion when the server is + * transitioning from recovery mode to normal operation, at which point + * all XID-based waiting becomes invalid. + */ +static void +WakeAllXidWaiters(void) +{ + HASH_SEQ_STATUS status; + XidWaitEntry *entry; + + Assert(XidWaitHash); + + hash_seq_init(&status, XidWaitHash); + while ((entry = (XidWaitEntry *) hash_seq_search(&status)) != NULL) + ConditionVariableBroadcast(&entry->cv); +} \ No newline at end of file diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 3f6bf70bd3c..536b1542a09 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -668,6 +668,8 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, ErrorContextCallback callback; bool first = true; + Assert(!RecoveryInProgress()); + /* * If an operation is specified, set up our verbose error context * callback. @@ -718,7 +720,6 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, */ if (!first) { - CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } first = false; @@ -741,6 +742,8 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure) LOCKTAG tag; bool first = true; + Assert(!RecoveryInProgress()); + for (;;) { Assert(TransactionIdIsValid(xid)); @@ -761,7 +764,6 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure) /* See XactLockTableWait about this case */ if (!first) { - CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } first = false; diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 0be307d2ca0..775cc7313ad 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -160,6 +160,7 @@ WAL_BUFFER_INIT "Waiting on WAL buffer to be initialized." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." +XACT_COMPLETE "Waiting for a transaction to complete." XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at transaction end." ABI_compatibility: diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 2f4ae06c279..0f8354173b2 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -100,4 +100,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void XidWaitInStandby(TransactionId xid); + #endif /* PROCARRAY_H */ -- 2.49.0