Re: Optimize LISTEN/NOTIFY - Mailing list pgsql-hackers

From Joel Jacobson
Subject Re: Optimize LISTEN/NOTIFY
Date
Msg-id 2eeea4f1-1b4f-430c-8571-544da04f08dc@app.fastmail.com
Whole thread Raw
In response to Re: Optimize LISTEN/NOTIFY  ("Joel Jacobson" <joel@compiler.org>)
List pgsql-hackers
On Thu, Nov 13, 2025, at 08:13, Joel Jacobson wrote:
> Attached, please find a new version rebased on top of the bug fix
> patches that just got committed in 0bdc777, 797e9ea, 8eeb4a0, and
> 1b46990.

To help reviewers, here is a new write-up of the patch:

PROBLEM
=======

The current implementation has no central knowledge of which backend
listens on which channel. When a backend commits a transaction that
issued NOTIFY, SignalBackends() iterates over all registered listeners
in the same database and sends each one a PROCSIG_NOTIFY_INTERRUPT
signal, regardless of whether they are listening on the notified
channel.

This behavior is fine when all listeners are on the same channel, but
when many backends are listening on different channels, each NOTIFY
triggers unnecessary wakeups and context switches. As the number of idle
listeners grows, this often becomes the bottleneck and throughput drops
sharply.

Performance degrades dramatically: benchmarks show throughput dropping
from ~9,000 TPS with few listeners to ~200 TPS with 1,000 idle listeners
on unrelated channels - a 45x slowdown purely from waking backends that
have no notifications to process.


SOLUTION OVERVIEW
=================

This patch introduces two optimizations:

1. Targeted Signaling
   A lazily-created dynamic shared hash table (dshash) backed by dynamic
   shared memory (DSA) maps (database OID, channel name) to arrays of
   listening backends (ProcNumbers). This allows the notifier to signal
   only those backends actually listening on the channels being
   notified.

2. Direct Advancement
   Even with targeted signaling, idle backends might still need to be
   woken to advance their queue read positions past notifications they
   don't care about. This patch avoids those unnecessary wakeups by
   directly advancing the queue positions of idle backends that are not
   listening on the channels being notified.

   This is possible because all NOTIFY writers are serialized by a
   heavyweight lock, allowing the notifier to identify precisely which
   queue entries belong to the current transaction. The notifier can
   then determine which idle backends are positioned within that range
   and safely advance their positions without waking them, since we know
   from the shared channel hash that they are not listening on any of
   the notified channels.


IMPLEMENTATION DETAILS
=======================

Shared Channel Hash
-------------------

The patch adds a dshash table that maps (dboid, channel) keys to
ChannelEntry structures.

The listenersArray starts with capacity for 4 listeners and doubles when
full. Memory is allocated from a DSA area and freed when a channel has
zero listeners.

The table is created lazily on the first LISTEN command. The DSA handle
and dshash handle are stored in AsyncQueueControl for other backends to
attach.


Dual Data Structures
--------------------

The implementation maintains two complementary data structures:

1. Shared channelHash: Used during commit to determine which backends
   need to be signaled. Updated during Exec_ListenCommit/UnlistenCommit/
   UnlistenAllCommit.

2. Local listenChannelsHash: Changed from a List to an HTAB for fast
   lookups, used by IsListeningOn().

This separation avoids contention on the shared hash during the frequent
IsListeningOn() checks that occur for every notification read from the
queue.


Direct Advancement Algorithm
-----------------------------

In PreCommit_Notify(), while holding the heavyweight lock on "database
0" that serializes all NOTIFY writers:

1. Before writing the first notification, capture queueHeadBeforeWrite
2. Write all notifications for the transaction to the queue 3. After
writing the last notification, capture queueHeadAfterWrite

The heavyweight lock guarantee means the range [queueHeadBeforeWrite,
queueHeadAfterWrite) contains only notifications written by this commit,
and no other backend could have inserted entries in this range.

SignalBackends() then processes each backend:

  - If the backend has wakeupPending: skip (already signaled)

  - If the backend is advancing (reading the queue):
      If advancingPos < queueHeadAfterWrite: signal it
      (it will get stuck before our new entries without a signal)

  - If the backend is idle:
      If pos < queueHeadBeforeWrite: signal it
      (it might be interested in older messages)

      If pos >= queueHeadBeforeWrite AND pos < queueHeadAfterWrite:
      Direct advance pos to queueHeadAfterWrite
      (skip our messages entirely, no signal needed)


New QueueBackendStatus Fields
-----------------------------

Each backend's entry in AsyncQueueControl now includes:

  wakeupPending:  signal sent but not yet processed

  isAdvancing:    backend is advancing its position

  advancingPos:   target position backend is advancing to

These flags ensure correct interaction between direct advancement and
backends that are concurrently processing their queue.


Transaction-Local State
------------------------

PreCommit_Notify() builds a list of unique channels
(pendingNotifyChannels) from the transaction's notifications. This list
is used by SignalBackends() to look up listeners in the shared hash
efficiently, avoiding duplicate lookups when multiple notifications are
sent to the same channel.

Functions Modified
------------------

AsyncShmemInit
  Initialize channelHashDSA/DSH handles (InvalidHandle) and new
  per-backend fields: wakeupPending, isAdvancing, advancingPos.

Async_Notify
   Initialize channelHashtab.

pg_listening_channels
  Rewritten to iterate over listenChannelsHash using HASH_SEQ_STATUS
  instead of traversing the old listenChannels list.

PreCommit_Notify
  Build pendingNotifyChannels list of unique channels from transaction's
  notifications. Capture queueHeadBeforeWrite before writing first
  notification and queueHeadAfterWrite after each write to enable direct
  advancement optimization.

AtCommit_Notify
  Check hash table entry count instead of list emptiness when deciding
  whether to unregister from listener array.

Exec_ListenCommit
  Complete rewrite to maintain both local listenChannelsHash and shared
  channelHash. Insert backend's ProcNumber into DSA-allocated listeners
  array, growing array (doubling strategy) when full.

Exec_UnlistenCommit
  Remove from both local and shared hashes. Compact listeners array with
  memmove, free DSA memory and delete hash entry when last listener
  removed.

Exec_UnlistenAllCommit
  Iterate shared channelHash with dshash_seq_*, remove this backend from
  all channel entries in current database, clean up DSA memory and
  delete entries when empty.

IsListeningOn
  Simplified to single hash_search() call on listenChannelsHash.

asyncQueueUnregister
  Clear QUEUE_BACKEND_WAKEUP_PENDING flag and update assertion to check
  hash table instead of list.

SignalBackends
  Rewrite to use targeted signaling instead of broadcast. Iterate
  pendingNotifyChannels, look up listeners per channel in shared
  channelHash. Implement direct advancement: advance idle backends
  positioned in [queueHeadBeforeWrite, queueHeadAfterWrite) without
  signaling. Use wakeupPending flag to prevent duplicate signals and
  respect isAdvancing flag to avoid interfering with concurrent position
  updates.

AtAbort_Notify
  Use listenChannelsHash instead of listenChannels.

asyncQueueReadAllNotifications
  Set isAdvancing flag and advancingPos before reading, clear
  isAdvancing after advancing position.

asyncQueueProcessPageEntries
  Use listenChannelsHash instead of listenChannels.

ProcessIncomingNotify
  Use listenChannelsHash instead of listenChannels.

AddEventToPendingNotifies
  Build channelHashtab when notification count exceeds
  MIN_HASHABLE_NOTIFIES, enabling efficient extraction of unique channel
  names in PreCommit_Notify.

ClearPendingActionsAndNotifies
  Also free pendingNotifyChannels.

Functions Added
---------------

asyncQueuePagePrecedes
  Inline function returning true if page p precedes page q (p < q).

channelHashFunc
  Hash function for ChannelHashKey, combining hash of dboid and channel
  name using XOR. Required callback for dshash operations.

initChannelHash
  Lazy initialization of shared dshash table mapping (dboid, channel) to
  listener arrays. First caller creates DSA area and dshash, stores
  handles in asyncQueueControl; subsequent callers attach using stored
  handles.

initListenChannelsHash
  Lazy initialization of backend-local hash table (listenChannelsHash)
  for faster IsListeningOn() checks.

ChannelHashPrepareKey
  Inline helper to construct ChannelHashKey.

TESTING
=======

The patch adds comprehensive isolation tests covering:

1. Subtransaction handling:
   - LISTEN in subtransaction with SAVEPOINT/RELEASE - LISTEN merge path
   (both outer and inner transactions) - ROLLBACK TO SAVEPOINT
   discarding pending actions

2. Notification deduplication:
   - Hash table duplicate detection with 17 notifications + 1 duplicate

3. Listener array growth:
   - Multiple listeners triggering ChannelEntry array expansion

4. Cross-session delivery:
   - Notifications from non-listening backend to listener in another
   session

Total test sessions expanded from 3 to 7 to cover these scenarios.

/Joel



pgsql-hackers by date:

Previous
From: Tomas Vondra
Date:
Subject: Re: Performance issues with parallelism and LIMIT
Next
From: Rafia Sabih
Date:
Subject: Re: Bypassing cursors in postgres_fdw to enable parallel plans