From 74384374a24334616ac427fdcfeed5199ee1fc43 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 31 Oct 2025 18:27:25 +0200 Subject: [PATCH 1/2] Fix bug where we truncated CLOG that was still needed by LISTEN/NOTIFY MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The async notification queue contains the XID of the sender, and wnen processing notifications we call TransactionIdDidCommit() on the XID. But we had no safeguards to prevent the CLOG segments containing those XIDs from being truncated. As a result, if a backend didn't for some reason process the notifications for a long time, or when a new backend issued LISTEN, you could get an error like: test=# listen c21; ERROR: 58P01: could not access status of transaction 14279685 DETAIL: Could not open file "pg_xact/000D": No such file or directory. LOCATION: SlruReportIOError, slru.c:1087 This was first reported by Sergey Zhuravlev in 2021, with many other people hitting the same issue later. I believe the bug goes back all the way to commit d1e027221d, which introduced the SLRU-based async notification queue. Thanks to: - Alexandra Wang, Daniil Davydov, Andrei Varashen and Jacques Combrink for investigating and providing reproducable test cases, - Matheus Alcantara and Arseniy Mukhin for earlier proposed patches to fix this, - Álvaro Herrera and Masahiko Sawada for reviewing said earlier patches, - Yura Sokolov aka funny-falcon for the idea of marking transactions as committed in the notification queue, and - Joel Jacobson for the final patch version. I hope I didn't forget anyone. Author: Joel Jacobson, Arseniy Mukhin Discussion: https://www.postgresql.org/message-id/16961-25f29f95b3604a8a@postgresql.org Discussion: https://www.postgresql.org/message-id/18804-bccbbde5e77a68c2@postgresql.org Discussion: https://www.postgresql.org/message-id/CAK98qZ3wZLE-RZJN_Y%2BTFjiTRPPFPBwNBpBi5K5CU8hUHkzDpw@mail.gmail.com --- src/backend/commands/async.c | 114 ++++++++++++++++++ src/backend/commands/vacuum.c | 7 ++ src/include/commands/async.h | 3 + src/test/isolation/expected/async-notify.out | 33 ++++- src/test/isolation/specs/async-notify.spec | 24 ++++ src/test/modules/xid_wraparound/meson.build | 1 + .../xid_wraparound/t/004_notify_freeze.pl | 67 ++++++++++ 7 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 src/test/modules/xid_wraparound/t/004_notify_freeze.pl diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..ba06234dc8e 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -2168,6 +2168,120 @@ asyncQueueAdvanceTail(void) LWLockRelease(NotifyQueueTailLock); } +/* + * AsyncNotifyFreezeXids + * + * Prepare the async notification queue for CLOG truncation by freezing + * transaction IDs that are about to become inaccessible. + * + * This function is called by VACUUM before advancing datfrozenxid. It scans + * the notification queue and replaces XIDs that would become inaccessible + * after CLOG truncation with special markers: + * - Committed transactions are set to FrozenTransactionId + * - Aborted/crashed transactions are set to InvalidTransactionId + * + * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG + * pages will be truncated. If XID < newFrozenXid, it cannot still be running + * (or it would have held back newFrozenXid through ProcArray). + * Therefore, if TransactionIdDidCommit returns false, we know the transaction + * either aborted explicitly or crashed, and we can safely mark it invalid. + */ +void +AsyncNotifyFreezeXids(TransactionId newFrozenXid) +{ + QueuePosition pos; + QueuePosition head; + int64 curpage = -1; + int slotno = -1; + char *page_buffer = NULL; + bool page_dirty = false; + + /* + * Acquire locks in the correct order to avoid deadlocks. As per the + * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU + * bank locks. + * + * We only need SHARED mode since we're just reading the head/tail + * positions, not modifying them. + */ + LWLockAcquire(NotifyQueueTailLock, LW_SHARED); + LWLockAcquire(NotifyQueueLock, LW_SHARED); + + pos = QUEUE_TAIL; + head = QUEUE_HEAD; + + /* Release NotifyQueueLock early, we only needed to read the positions */ + LWLockRelease(NotifyQueueLock); + + /* + * Scan the queue from tail to head, freezing XIDs as needed. We hold + * NotifyQueueTailLock throughout to ensure the tail doesn't move while + * we're working. + */ + while (!QUEUE_POS_EQUAL(pos, head)) + { + AsyncQueueEntry *qe; + TransactionId xid; + int64 pageno = QUEUE_POS_PAGE(pos); + int offset = QUEUE_POS_OFFSET(pos); + + /* If we need a different page, release old lock and get new one */ + if (pageno != curpage) + { + LWLock *lock; + + /* Release previous page if any */ + if (slotno >= 0) + { + if (page_dirty) + { + NotifyCtl->shared->page_dirty[slotno] = true; + page_dirty = false; + } + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + } + + lock = SimpleLruGetBankLock(NotifyCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + page_buffer = NotifyCtl->shared->page_buffer[slotno]; + curpage = pageno; + } + + qe = (AsyncQueueEntry *) (page_buffer + offset); + xid = qe->xid; + + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, newFrozenXid)) + { + if (TransactionIdDidCommit(xid)) + { + qe->xid = FrozenTransactionId; + page_dirty = true; + } + else + { + qe->xid = InvalidTransactionId; + page_dirty = true; + } + } + + /* Advance to next entry */ + asyncQueueAdvance(&pos, qe->length); + } + + /* Release final page lock if we acquired one */ + if (slotno >= 0) + { + if (page_dirty) + NotifyCtl->shared->page_dirty[slotno] = true; + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + } + + LWLockRelease(NotifyQueueTailLock); +} + /* * ProcessIncomingNotify * diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index ed03e3bd50d..e785dd55ce5 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -37,6 +37,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" @@ -1941,6 +1942,12 @@ vac_truncate_clog(TransactionId frozenXID, return; } + /* + * Freeze any old transaction IDs in the async notification queue before + * CLOG truncation. + */ + AsyncNotifyFreezeXids(frozenXID); + /* * Advance the oldest value for commit timestamps before truncating, so * that if a user requests a timestamp for a transaction we're truncating diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..aaec7314c10 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +/* freeze old transaction IDs in notify queue (called by VACUUM) */ +extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid); + #endif /* ASYNC_H */ diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 556e1805893..20d5763f319 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 3 sessions +Parsed test spec with 4 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -104,6 +104,37 @@ step l2commit: COMMIT; listener2: NOTIFY "c1" with payload "" from notifier step l2stop: UNLISTEN *; +starting permutation: llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck +step llisten: LISTEN c1; LISTEN c2; +step n1begins: BEGIN ISOLATION LEVEL SERIALIZABLE; +step n1select: SELECT * FROM t1; +a +- +(0 rows) + +step n1insert: INSERT INTO t1 DEFAULT VALUES; +step notify1: NOTIFY c1; +step n2begins: BEGIN ISOLATION LEVEL SERIALIZABLE; +step n2select: SELECT * FROM t1; +a +- +(0 rows) + +step n2insert: INSERT INTO t1 DEFAULT VALUES; +step n2notify1: NOTIFY c1, 'n2_payload'; +step n1commit: COMMIT; +step n2commit: COMMIT; +ERROR: could not serialize access due to read/write dependencies among transactions +step notify1: NOTIFY c1; +step lcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +listener: NOTIFY "c1" with payload "" from notifier +listener: NOTIFY "c1" with payload "" from notifier + starting permutation: llisten lbegin usage bignotify usage step llisten: LISTEN c1; LISTEN c2; step lbegin: BEGIN; diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 0b8cfd91083..51b7ad43849 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -5,6 +5,10 @@ # Note we assume that each step is delivered to the backend as a single Query # message so it will run as one transaction. +# t1 table is used for serializable conflict +setup { CREATE TABLE t1 (a bigserial); } +teardown { DROP TABLE t1; } + session notifier step listenc { LISTEN c1; LISTEN c2; } step notify1 { NOTIFY c1; } @@ -33,8 +37,21 @@ step notifys1 { } step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; } step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } +step n1begins { BEGIN ISOLATION LEVEL SERIALIZABLE; } +step n1select { SELECT * FROM t1; } +step n1insert { INSERT INTO t1 DEFAULT VALUES; } +step n1commit { COMMIT; } teardown { UNLISTEN *; } +# notifier2 session is used to reproduce serializable conflict with notifier + +session notifier2 +step n2begins { BEGIN ISOLATION LEVEL SERIALIZABLE; } +step n2select { SELECT * FROM t1; } +step n2insert { INSERT INTO t1 DEFAULT VALUES; } +step n2commit { COMMIT; } +step n2notify1 { NOTIFY c1, 'n2_payload'; } + # The listener session is used for cross-backend notify checks. session listener @@ -73,6 +90,13 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck # and notify queue is not empty permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop +# Test checks that listeners ignore notifications from aborted +# transaction even if notifications have been added to the listen/notify +# queue. To reproduce it we use the fact that serializable conflicts +# are checked after tx adds notifications to the queue. + +permutation llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck + # Verify that pg_notification_queue_usage correctly reports a non-zero result, # after submitting notifications while another connection is listening for # those notifications and waiting inside an active transaction. We have to diff --git a/src/test/modules/xid_wraparound/meson.build b/src/test/modules/xid_wraparound/meson.build index f7dada67f67..3aec430df8c 100644 --- a/src/test/modules/xid_wraparound/meson.build +++ b/src/test/modules/xid_wraparound/meson.build @@ -30,6 +30,7 @@ tests += { 't/001_emergency_vacuum.pl', 't/002_limits.pl', 't/003_wraparounds.pl', + 't/004_notify_freeze.pl', ], }, } diff --git a/src/test/modules/xid_wraparound/t/004_notify_freeze.pl b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl new file mode 100644 index 00000000000..50824e7b5d7 --- /dev/null +++ b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl @@ -0,0 +1,67 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group +# +# Test freezing the XIDs in the async notification queue +# + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +if (!$ENV{PG_TEST_EXTRA} || $ENV{PG_TEST_EXTRA} !~ /\bxid_wraparound\b/) +{ + plan skip_all => "test xid_wraparound not enabled in PG_TEST_EXTRA"; +} + +# Setup +$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound'); +$node->safe_psql('postgres', + 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true'); + +# --- Start Session 1 and leave it idle in transaction +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe('listen s;', "Session 1 listens to 's'"); +$psql_session1->query_safe('begin;', "Session 1 starts a transaction"); + +# --- Send multiple notify's from other sessions --- +for my $i (1 .. 10) +{ + $node->safe_psql( + 'postgres', " + BEGIN; + NOTIFY s, '$i'; + COMMIT;"); +} + +# Consume enough XIDs to trigger truncation, and one more with 'txid_current' to +# bump up the freeze horizon. +$node->safe_psql('postgres', 'select consume_xids(10000000);'); +$node->safe_psql('postgres', 'select txid_current()'); + +# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced. +my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +# Execute vacuum freeze on all databases +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +# Get the new datfrozenxid after vacuum freeze to ensure that is advanced. +my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); +ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced'); + +# On Session 1, commit and ensure that the all the notifications are received. (This depends +# on correctly freezing the XIDs in the pending notification entries.) +my $res = $psql_session1->query_safe('commit;', "commit listen s;"); +my $notifications_count = 0; +foreach my $i (split('\n', $res)) +{ + $notifications_count++; + like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/); +} +is($notifications_count, 10, 'received all committed notifications'); + +done_testing(); -- 2.47.3