Re: Optimize LISTEN/NOTIFY

From: "Joel Jacobson" <joel(at)compiler(dot)org>
To: pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Optimize LISTEN/NOTIFY
Date: 2025-11-14 16:01:59
Message-ID: 2eeea4f1-1b4f-430c-8571-544da04f08dc@app.fastmail.com
Views: Whole Thread | Raw Message | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Nov 13, 2025, at 08:13, Joel Jacobson wrote:
> Attached, please find a new version rebased on top of the bug fix
> patches that just got committed in 0bdc777, 797e9ea, 8eeb4a0, and
> 1b46990.

To help reviewers, here is a new write-up of the patch:

PROBLEM
=======

The current implementation has no central knowledge of which backend
listens on which channel. When a backend commits a transaction that
issued NOTIFY, SignalBackends() iterates over all registered listeners
in the same database and sends each one a PROCSIG_NOTIFY_INTERRUPT
signal, regardless of whether they are listening on the notified
channel.

This behavior is fine when all listeners are on the same channel, but
when many backends are listening on different channels, each NOTIFY
triggers unnecessary wakeups and context switches. As the number of idle
listeners grows, this often becomes the bottleneck and throughput drops
sharply.

Performance degrades dramatically: benchmarks show throughput dropping
from ~9,000 TPS with few listeners to ~200 TPS with 1,000 idle listeners
on unrelated channels - a 45x slowdown purely from waking backends that
have no notifications to process.

SOLUTION OVERVIEW
=================

This patch introduces two optimizations:

1. Targeted Signaling
A lazily-created dynamic shared hash table (dshash) backed by dynamic
shared memory (DSA) maps (database OID, channel name) to arrays of
listening backends (ProcNumbers). This allows the notifier to signal
only those backends actually listening on the channels being
notified.

2. Direct Advancement
Even with targeted signaling, idle backends might still need to be
woken to advance their queue read positions past notifications they
don't care about. This patch avoids those unnecessary wakeups by
directly advancing the queue positions of idle backends that are not
listening on the channels being notified.

This is possible because all NOTIFY writers are serialized by a
heavyweight lock, allowing the notifier to identify precisely which
queue entries belong to the current transaction. The notifier can
then determine which idle backends are positioned within that range
and safely advance their positions without waking them, since we know
from the shared channel hash that they are not listening on any of
the notified channels.

IMPLEMENTATION DETAILS
=======================

Shared Channel Hash
-------------------

The patch adds a dshash table that maps (dboid, channel) keys to
ChannelEntry structures.

The listenersArray starts with capacity for 4 listeners and doubles when
full. Memory is allocated from a DSA area and freed when a channel has
zero listeners.

The table is created lazily on the first LISTEN command. The DSA handle
and dshash handle are stored in AsyncQueueControl for other backends to
attach.

Dual Data Structures
--------------------

The implementation maintains two complementary data structures:

1. Shared channelHash: Used during commit to determine which backends
need to be signaled. Updated during Exec_ListenCommit/UnlistenCommit/
UnlistenAllCommit.

2. Local listenChannelsHash: Changed from a List to an HTAB for fast
lookups, used by IsListeningOn().

This separation avoids contention on the shared hash during the frequent
IsListeningOn() checks that occur for every notification read from the
queue.

Direct Advancement Algorithm
-----------------------------

In PreCommit_Notify(), while holding the heavyweight lock on "database
0" that serializes all NOTIFY writers:

1. Before writing the first notification, capture queueHeadBeforeWrite
2. Write all notifications for the transaction to the queue 3. After
writing the last notification, capture queueHeadAfterWrite

The heavyweight lock guarantee means the range [queueHeadBeforeWrite,
queueHeadAfterWrite) contains only notifications written by this commit,
and no other backend could have inserted entries in this range.

SignalBackends() then processes each backend:

- If the backend has wakeupPending: skip (already signaled)

- If the backend is advancing (reading the queue):
If advancingPos < queueHeadAfterWrite: signal it
(it will get stuck before our new entries without a signal)

- If the backend is idle:
If pos < queueHeadBeforeWrite: signal it
(it might be interested in older messages)

If pos >= queueHeadBeforeWrite AND pos < queueHeadAfterWrite:
Direct advance pos to queueHeadAfterWrite
(skip our messages entirely, no signal needed)

New QueueBackendStatus Fields
-----------------------------

Each backend's entry in AsyncQueueControl now includes:

wakeupPending: signal sent but not yet processed

isAdvancing: backend is advancing its position

advancingPos: target position backend is advancing to

These flags ensure correct interaction between direct advancement and
backends that are concurrently processing their queue.

Transaction-Local State
------------------------

PreCommit_Notify() builds a list of unique channels
(pendingNotifyChannels) from the transaction's notifications. This list
is used by SignalBackends() to look up listeners in the shared hash
efficiently, avoiding duplicate lookups when multiple notifications are
sent to the same channel.

Functions Modified
------------------

AsyncShmemInit
Initialize channelHashDSA/DSH handles (InvalidHandle) and new
per-backend fields: wakeupPending, isAdvancing, advancingPos.

Async_Notify
Initialize channelHashtab.

pg_listening_channels
Rewritten to iterate over listenChannelsHash using HASH_SEQ_STATUS
instead of traversing the old listenChannels list.

PreCommit_Notify
Build pendingNotifyChannels list of unique channels from transaction's
notifications. Capture queueHeadBeforeWrite before writing first
notification and queueHeadAfterWrite after each write to enable direct
advancement optimization.

AtCommit_Notify
Check hash table entry count instead of list emptiness when deciding
whether to unregister from listener array.

Exec_ListenCommit
Complete rewrite to maintain both local listenChannelsHash and shared
channelHash. Insert backend's ProcNumber into DSA-allocated listeners
array, growing array (doubling strategy) when full.

Exec_UnlistenCommit
Remove from both local and shared hashes. Compact listeners array with
memmove, free DSA memory and delete hash entry when last listener
removed.

Exec_UnlistenAllCommit
Iterate shared channelHash with dshash_seq_*, remove this backend from
all channel entries in current database, clean up DSA memory and
delete entries when empty.

IsListeningOn
Simplified to single hash_search() call on listenChannelsHash.

asyncQueueUnregister
Clear QUEUE_BACKEND_WAKEUP_PENDING flag and update assertion to check
hash table instead of list.

SignalBackends
Rewrite to use targeted signaling instead of broadcast. Iterate
pendingNotifyChannels, look up listeners per channel in shared
channelHash. Implement direct advancement: advance idle backends
positioned in [queueHeadBeforeWrite, queueHeadAfterWrite) without
signaling. Use wakeupPending flag to prevent duplicate signals and
respect isAdvancing flag to avoid interfering with concurrent position
updates.

AtAbort_Notify
Use listenChannelsHash instead of listenChannels.

asyncQueueReadAllNotifications
Set isAdvancing flag and advancingPos before reading, clear
isAdvancing after advancing position.

asyncQueueProcessPageEntries
Use listenChannelsHash instead of listenChannels.

ProcessIncomingNotify
Use listenChannelsHash instead of listenChannels.

AddEventToPendingNotifies
Build channelHashtab when notification count exceeds
MIN_HASHABLE_NOTIFIES, enabling efficient extraction of unique channel
names in PreCommit_Notify.

ClearPendingActionsAndNotifies
Also free pendingNotifyChannels.

Functions Added
---------------

asyncQueuePagePrecedes
Inline function returning true if page p precedes page q (p < q).

channelHashFunc
Hash function for ChannelHashKey, combining hash of dboid and channel
name using XOR. Required callback for dshash operations.

initChannelHash
Lazy initialization of shared dshash table mapping (dboid, channel) to
listener arrays. First caller creates DSA area and dshash, stores
handles in asyncQueueControl; subsequent callers attach using stored
handles.

initListenChannelsHash
Lazy initialization of backend-local hash table (listenChannelsHash)
for faster IsListeningOn() checks.

ChannelHashPrepareKey
Inline helper to construct ChannelHashKey.

TESTING
=======

The patch adds comprehensive isolation tests covering:

1. Subtransaction handling:
- LISTEN in subtransaction with SAVEPOINT/RELEASE - LISTEN merge path
(both outer and inner transactions) - ROLLBACK TO SAVEPOINT
discarding pending actions

2. Notification deduplication:
- Hash table duplicate detection with 17 notifications + 1 duplicate

3. Listener array growth:
- Multiple listeners triggering ChannelEntry array expansion

4. Cross-session delivery:
- Notifications from non-listening backend to listener in another
session

Total test sessions expanded from 3 to 7 to cover these scenarios.

/Joel

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Rafia Sabih 2025-11-14 16:06:08 Re: Bypassing cursors in postgres_fdw to enable parallel plans
Previous Message Tomas Vondra 2025-11-14 16:00:08 Re: Performance issues with parallelism and LIMIT