From 3ed2c16c7d4b03514d9828c96818a9b0ac6c279b Mon Sep 17 00:00:00 2001 From: Daniil Davidov Date: Mon, 4 Aug 2025 17:49:23 +0700 Subject: [PATCH] Advance tail of async queue before updating datfrozenxid in vacuum --- src/backend/commands/async.c | 9 ++++----- src/backend/commands/vacuum.c | 10 ++++++++++ src/include/commands/async.h | 2 ++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..5504132cf1b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -450,7 +450,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot); -static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); static void AddEventToPendingNotifies(Notification *n); @@ -1025,7 +1024,7 @@ AtCommit_Notify(void) if (tryAdvanceTail) { tryAdvanceTail = false; - asyncQueueAdvanceTail(); + AsyncQueueAdvanceTail(); } /* And clean up */ @@ -1483,7 +1482,7 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS) double usage; /* Advance the queue tail so we don't report a too-large result */ - asyncQueueAdvanceTail(); + AsyncQueueAdvanceTail(); LWLockAcquire(NotifyQueueLock, LW_SHARED); usage = asyncQueueUsage(); @@ -2104,8 +2103,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * This is (usually) called during CommitTransaction(), so it's important for * it to have very low probability of failure. */ -static void -asyncQueueAdvanceTail(void) +void +AsyncQueueAdvanceTail(void) { QueuePosition min; int64 oldtailpage; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 733ef40ae7c..aedae9bf94d 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -37,6 +37,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" @@ -1632,6 +1633,15 @@ vac_update_datfrozenxid(void) */ LockDatabaseFrozenIds(ExclusiveLock); + /* + * Note, that messages in async queue store transaction id of their senders. + * We must explicitly advance tail of async queue so that new channel + * listeners don't have to process old messages (during + * asyncQueueReadAllNotifications call), i.e. don't have to check their + * transactions statuses (that might be already gone after clog truncate). + */ + AsyncQueueAdvanceTail(); + /* * Initialize the "min" calculation with * GetOldestNonRemovableTransactionId(), which is a reasonable diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..47275f6f959 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -43,6 +43,8 @@ extern void AtPrepare_Notify(void); /* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */ extern void HandleNotifyInterrupt(void); +extern void AsyncQueueAdvanceTail(void); + /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); -- 2.43.0