From 7fc2c6dfeb9eaf963aeb7db46fc9ae31db84cdf2 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Wed, 27 May 2026 12:30:22 +0200 Subject: [PATCH] Fix NOTIFY wakeups for pre-commit LISTEN entries SignalBackends() used to ignore ListenerEntry entries whose flag said that the listener was not yet committed. That can be true for a LISTEN that has already registered its queue position, but has not yet reached AtCommit_Notify(). If another backend notifies the same channel in that window, advancing the listener queue pointer could make the LISTEN miss the notification after commit. Fix this by treating all channel entries as possible wakeup targets. Rename the flag to removeOnAbort to reflect its remaining purpose: identifying preallocated LISTEN entries that abort cleanup must remove. --- src/backend/commands/async.c | 54 +++++++++++++++--------------------- 1 file changed, 23 insertions(+), 31 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index db6a9a6561b..1aae70303d0 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -114,15 +114,15 @@ * If the current transaction has executed any LISTEN/UNLISTEN actions, * PreCommit_Notify() prepares to commit those. For LISTEN, it * pre-allocates entries in both the per-backend localChannelTable and the - * shared globalChannelTable (with listening=false so that these entries - * are no-ops for the moment). It also records the final per-channel - * intent in pendingListenActions, so post-commit/abort processing can - * apply that in a single step. Since all these allocations happen before - * committing to clog, we can safely abort the transaction on failure. + * shared globalChannelTable, marking new shared entries removeOnAbort. + * It also records the final per-channel intent in pendingListenActions, + * so post-commit/abort processing can apply that in a single step. + * Since all these allocations happen before committing to clog, we can + * safely abort the transaction on failure. * * After commit, AtCommit_Notify() runs through pendingListenActions and - * updates the backend's per-channel listening flags to activate or - * deactivate listening. This happens before sending signals. + * applies the final per-channel listen/unlisten state. This happens + * before sending signals. * * SignalBackends() consults the shared global channel table to identify * listeners for the channels that the current transaction sent @@ -384,10 +384,9 @@ static SlruDesc NotifySlruDesc; * Global channel table definitions * * This hash table maps (database OID, channel name) keys to arrays of - * ProcNumbers representing the backends listening or about to listen - * on each channel. The "listening" flags allow us to create hash table - * entries pre-commit and not have to assume that creating them post-commit - * will succeed. + * ProcNumbers representing the backends listening or about to listen on each + * channel. The removeOnAbort flags allow us to create hash table entries + * pre-commit and discard them later if the transaction aborts. */ #define INITIAL_LISTENERS_ARRAY_SIZE 4 @@ -400,7 +399,7 @@ typedef struct GlobalChannelKey typedef struct ListenerEntry { ProcNumber procNo; /* listener's ProcNumber */ - bool listening; /* true if committed listener */ + bool removeOnAbort; /* remove entry if current xact aborts */ } ListenerEntry; typedef struct GlobalChannelEntry @@ -1523,9 +1522,8 @@ BecomeRegisteredListener(void) * * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating * an entry in localChannelTable, and pre-allocating an entry in the shared - * globalChannelTable with listening=false. The listening flag will be set - * to true in AtCommit_Notify. If we abort later, unwanted table entries - * will be removed. + * globalChannelTable with removeOnAbort set. AtCommit_Notify clears + * removeOnAbort; abort processing removes entries still marked so. */ static void PrepareTableEntriesForListen(const char *channel) @@ -1557,7 +1555,7 @@ PrepareTableEntriesForListen(const char *channel) */ (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL); - /* Pre-allocate entry in shared globalChannelTable with listening=false */ + /* Pre-allocate entry in shared globalChannelTable */ GlobalChannelKeyInit(&key, MyDatabaseId, channel); entry = dshash_find_or_insert(globalChannelTable, &key, &found); @@ -1592,7 +1590,7 @@ PrepareTableEntriesForListen(const char *channel) { if (listeners[i].procNo == MyProcNumber) { - /* Already have an entry; listening flag stays as-is until commit */ + /* Already have an entry; leave removeOnAbort as-is */ dshash_release_lock(globalChannelTable, entry); return; } @@ -1615,8 +1613,7 @@ PrepareTableEntriesForListen(const char *channel) } listeners[entry->numListeners].procNo = MyProcNumber; - listeners[entry->numListeners].listening = false; /* staged, not yet - * committed */ + listeners[entry->numListeners].removeOnAbort = true; entry->numListeners++; dshash_release_lock(globalChannelTable, entry); @@ -1766,11 +1763,10 @@ ApplyPendingListenActions(bool isCommit) if (pending->action == PENDING_LISTEN) { /* - * LISTEN being committed: set listening=true. - * localChannelTable entry was created during - * PreCommit and should be kept. + * LISTEN being committed; localChannelTable entry + * was created during PreCommit and should be kept. */ - listeners[i].listening = true; + listeners[i].removeOnAbort = false; removeLocal = false; } else @@ -1790,20 +1786,19 @@ ApplyPendingListenActions(bool isCommit) * pendingListenActions entries, so it's pretty hard to * test. */ - if (!listeners[i].listening) + if (listeners[i].removeOnAbort) { /* * Staged LISTEN (or LISTEN+UNLISTEN) being aborted, - * and we weren't listening before, so remove - * pre-allocated entries from both tables. + * so remove pre-allocated entries from both tables. */ RemoveListenerFromChannel(&entry, listeners, i); } else { /* - * We're aborting, but the previous state was that - * we're listening, so keep localChannelTable entry. + * Entry pre-existed this transaction, so keep the + * localChannelTable entry. */ removeLocal = false; } @@ -2304,9 +2299,6 @@ SignalBackends(void) int32 pid; QueuePosition pos; - if (!listeners[j].listening) - continue; /* ignore not-yet-committed listeners */ - i = listeners[j].procNo; if (QUEUE_BACKEND_WAKEUP_PENDING(i)) -- 2.52.0