From 976226800da1564edcb1ec32bd0e96b723b9c4a5 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 14 Jan 2026 17:55:21 -0500
Subject: [PATCH v35 2/2] Optimize LISTEN/NOTIFY via shared channel map and
 direct advancement.

This patch reworks LISTEN/NOTIFY to avoid waking backends that have
no need to process the notification messages we just sent.

The primary change is to create a shared hash table that tracks
which processes are listening to which channels (where a "channel" is
defined by a database OID and channel name).  This allows a notifying
process to accurately determine which listeners are interested,
replacing the previous weak approximation that listeners in other
databases couldn't be interested.

Secondly, if a listener is known not to be interested and is
currently stopped at the old queue head, we avoid waking it at all
and just directly advance its queue pointer past the notifications
we inserted.

These changes permit very significant improvements (integer multiples)
in NOTIFY throughput, as well as a noticeable reduction in latency,
when there are many listeners but only a few are interested in any
specific message.  There is no improvement for the simplest case where
every listener reads every message, but any loss seems below the noise
level.

Author: Joel Jacobson <joel@compiler.org>
Reviewed-by: Tom Lane <tgl@sss.pgh.pa.us>
Discussion: https://postgr.es/m/6899c044-4a82-49be-8117-e6f669765f7e@app.fastmail.com
---
 src/backend/commands/async.c                  | 1141 ++++++++++++++---
 .../utils/activity/wait_event_names.txt       |    1 +
 src/include/storage/lwlocklist.h              |    1 +
 src/tools/pgindent/typedefs.list              |    6 +
 4 files changed, 966 insertions(+), 183 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 40c42f572ed..657c591618d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -13,19 +13,16 @@
  */
 
 /*-------------------------------------------------------------------------
- * Async Notification Model as of 9.0:
+ * Async Notification Model as of v19:
  *
- * 1. Multiple backends on same machine. Multiple backends listening on
- *	  several channels. (Channels are also called "conditions" in other
- *	  parts of the code.)
+ * 1. Multiple backends on same machine.  Multiple backends may be listening
+ *	  on each of several channels.
  *
  * 2. There is one central queue in disk-based storage (directory pg_notify/),
  *	  with actively-used pages mapped into shared memory by the slru.c module.
  *	  All notification messages are placed in the queue and later read out
- *	  by listening backends.
- *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  by listening backends.  The single queue allows us to guarantee that
+ *	  notifications are received in commit order.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -62,22 +59,17 @@
  *	  page number and the offset in that page. This is done before marking the
  *	  transaction as committed in clog. If we run into problems writing the
  *	  notifications, we can still call elog(ERROR, ...) and the transaction
- *	  will roll back.
+ *	  will roll back safely.
  *
  *	  Once we have put all of the notifications into the queue, we return to
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
  *	  After commit we are called another time (AtCommit_Notify()). Here we
- *	  make any actual updates to the effective listen state (listenChannels).
+ *	  make any required updates to the effective listen state (see below).
  *	  Then we signal any backends that may be interested in our messages
  *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
+ *	  each relevant backend, as described below.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -109,6 +101,47 @@
  *	  often. We make sending backends do this work if they advanced the queue
  *	  head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
  *
+ * 7. So far we have not discussed how backends change their listening state,
+ *	  nor how notification senders know which backends to awaken.  To handle
+ *	  the latter, we maintain a global channel table (implemented as a dynamic
+ *	  shared hash table, or dshash) that maps channel names to the set of
+ *	  backends listening on each channel.  This table is created lazily on the
+ *	  first LISTEN command and grows dynamically as needed.  There is also a
+ *	  local channel table (a plain dynahash table) in each listening backend,
+ *	  tracking which channels that backend is listening to.  The local table
+ *	  serves to reduce the number of accesses needed to the shared table.
+ *
+ *	  If the current transaction has executed any LISTEN/UNLISTEN actions,
+ *	  PreCommit_Notify() prepares to commit those.  For LISTEN, it
+ *	  pre-allocates entries in both the per-backend localChannelTable and the
+ *	  shared globalChannelTable (with listening=false so that these entries
+ *	  are no-ops for the moment).  It also records the final per-channel
+ *	  intent in pendingListenActions, so post-commit/abort processing can
+ *	  apply that in a single step.  Since all these allocations happen before
+ *	  committing to clog, we can safely abort the transaction on failure.
+ *
+ *	  After commit, AtCommit_Notify() runs through pendingListenActions and
+ *	  updates the backend's per-channel listening flags to activate or
+ *	  deactivate listening.  This happens before sending signals.
+ *
+ *	  SignalBackends() consults the shared global channel table to identify
+ *	  listeners for the channels that the current transaction sent
+ *	  notification(s) to.  Each selected backend is marked as having a wakeup
+ *	  pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
+ *	  signal is sent to it.
+ *
+ * 8. While writing notifications, PreCommit_Notify() records the queue head
+ *	  position both before and after the write.  Because all writers serialize
+ *	  on a cluster-wide heavyweight lock, no other backend can insert entries
+ *	  between these two points.  SignalBackends() uses this fact to directly
+ *	  advance the queue pointer for any backend that is still positioned at
+ *	  the old head, or within the range written, but is not interested in any
+ *	  of our notifications.  This avoids unnecessary wakeups for idle
+ *	  listeners that have nothing to read.  Backends that are not interested
+ *	  in our notifications, but cannot be directly advanced, are signaled only
+ *	  if they are far behind the current queue head; that is to ensure that
+ *	  we can advance the queue tail without undue delay.
+ *
  * An application that listens on the same channel it notifies will get
  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
  * by comparing be_pid in the NOTIFY message to the application's own backend's
@@ -119,7 +152,7 @@
  * The amount of shared memory used for notify management (notify_buffers)
  * can be varied without affecting anything but performance.  The maximum
  * amount of notification data that can be queued at one time is determined
- * by max_notify_queue_pages GUC.
+ * by the max_notify_queue_pages GUC.
  *-------------------------------------------------------------------------
  */
 
@@ -137,14 +170,17 @@
 #include "commands/async.h"
 #include "common/hashfn.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
+#include "storage/dsm_registry.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/procsignal.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/dsa.h"
 #include "utils/guc_hooks.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -224,11 +260,17 @@ typedef struct QueuePosition
 	 (x).page != (y).page ? (x) : \
 	 (x).offset > (y).offset ? (x) : (y))
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) || \
+	 ((x).page == (y).page && (x).offset < (y).offset))
+
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * also the distance by which a backend that's not interested in our
+ * notifications needs to be behind before we'll decide we need to wake it
+ * up so it can advance its pointer.
  *
  * Resist the temptation to make this really large.  While that would save
  * work in some places, it would add cost in others.  In particular, this
@@ -246,6 +288,8 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	bool		wakeupPending;	/* signal sent to backend, not yet processed */
+	bool		isAdvancing;	/* backend is advancing its position */
 } QueueBackendStatus;
 
 /*
@@ -260,14 +304,18 @@ typedef struct QueueBackendStatus
  * (since no other backend will inspect it).
  *
  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
- * entries of other backends and also change the head pointer. When holding
- * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointers.
+ * entries of other backends and also change the head pointer. They can
+ * also advance other backends' queue positions, unless the other backend
+ * has isAdvancing set (i.e., is in process of doing that itself).
+ *
+ * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
+ * mode, backends can change the tail pointers.
  *
  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
  * the control lock for the pg_notify SLRU buffers.
  * In order to avoid deadlocks, whenever we need multiple locks, we first get
- * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
+ * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
+ * globalChannelTable partition locks.
  *
  * Each backend uses the backend[] array entry with index equal to its
  * ProcNumber.  We rely on this to make SendProcSignal fast.
@@ -288,6 +336,9 @@ typedef struct AsyncQueueControl
 	ProcNumber	firstListener;	/* id of first listener, or
 								 * INVALID_PROC_NUMBER */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
+	dsa_handle	globalChannelTableDSA;	/* global channel table's DSA handle */
+	dshash_table_handle globalChannelTableDSH;	/* and its dshash handle */
+	/* Array with room for MaxBackends entries: */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
@@ -301,6 +352,8 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_IS_ADVANCING(i)	(asyncQueueControl->backend[i].isAdvancing)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -313,16 +366,54 @@ static SlruCtlData NotifyCtlData;
 #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
 
 /*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on).  It is a simple list of channel names,
- * allocated in TopMemoryContext.
+ * Global channel table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ProcNumbers representing the backends listening or about to listen
+ * on each channel.  The "listening" flags allow us to create hash table
+ * entries pre-commit and not have to assume that creating them post-commit
+ * will succeed.
+ */
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct GlobalChannelKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+} GlobalChannelKey;
+
+typedef struct ListenerEntry
+{
+	ProcNumber	procNo;			/* listener's ProcNumber */
+	bool		listening;		/* true if committed listener */
+} ListenerEntry;
+
+typedef struct GlobalChannelEntry
+{
+	GlobalChannelKey key;		/* hash key */
+	dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
+	int			numListeners;	/* Number of listeners currently stored */
+	int			allocatedListeners; /* Allocated size of array */
+} GlobalChannelEntry;
+
+static dshash_table *globalChannelTable = NULL;
+static dsa_area *globalChannelDSA = NULL;
+
+/*
+ * localChannelTable caches the channel names this backend is listening on
+ * (including those we have staged to be listened on, but not yet committed).
+ * Used by IsListeningOn() for fast lookups when reading notifications.
  */
-static List *listenChannels = NIL;	/* list of C strings */
+static HTAB *localChannelTable = NULL;
+
+/* We test this condition to detect that we're not listening at all */
+#define LocalChannelTableIsEmpty() \
+	(localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
 
 /*
  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
  * all actions requested in the current transaction.  As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change listen state until we reach transaction commit.
  *
  * The list is kept in CurTransactionContext.  In subtransactions, each
  * subtransaction has its own list in its own CurTransactionContext, but
@@ -351,6 +442,28 @@ typedef struct ActionList
 
 static ActionList *pendingActions = NULL;
 
+/*
+ * Hash table recording the final listen/unlisten intent per channel for
+ * the current transaction.  Key is channel name, value is PENDING_LISTEN or
+ * PENDING_UNLISTEN.  This keeps critical commit/abort processing to one step
+ * per channel instead of replaying every action.  This is built from the
+ * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
+ * AtAbort_Notify.
+ */
+typedef enum
+{
+	PENDING_LISTEN,
+	PENDING_UNLISTEN,
+} PendingListenAction;
+
+typedef struct PendingListenEntry
+{
+	char		channel[NAMEDATALEN];	/* hash key */
+	PendingListenAction action; /* which action should we perform? */
+} PendingListenEntry;
+
+static HTAB *pendingListenActions = NULL;
+
 /*
  * State for outbound notifies consists of a list of all channels+payloads
  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
@@ -391,6 +504,8 @@ typedef struct NotificationList
 	int			nestingLevel;	/* current transaction nesting depth */
 	List	   *events;			/* list of Notification structs */
 	HTAB	   *hashtab;		/* hash of NotificationHash structs, or NULL */
+	List	   *uniqueChannelNames; /* unique channel names being notified */
+	HTAB	   *uniqueChannelHash;	/* hash of unique channel names, or NULL */
 	struct NotificationList *upper; /* details for upper transaction levels */
 } NotificationList;
 
@@ -403,6 +518,15 @@ struct NotificationHash
 
 static NotificationList *pendingNotifies = NULL;
 
+/*
+ * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
+ * (both just carry the channel name, with no payload).
+ */
+typedef struct ChannelName
+{
+	char		channel[NAMEDATALEN];	/* hash key */
+} ChannelName;
+
 /*
  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
  * called from inside a signal handler. That just sets the
@@ -418,6 +542,23 @@ static bool unlistenExitRegistered = false;
 /* True if we're currently registered as a listener in asyncQueueControl */
 static bool amRegisteredListener = false;
 
+/*
+ * Queue head positions for direct advancement.
+ * These are captured during PreCommit_Notify while holding the heavyweight
+ * lock on database 0, ensuring no other backend can insert notifications
+ * between them.  SignalBackends uses these to advance idle backends.
+ */
+static QueuePosition queueHeadBeforeWrite;
+static QueuePosition queueHeadAfterWrite;
+
+/*
+ * Workspace arrays for SignalBackends.  These are preallocated in
+ * PreCommit_Notify to avoid needing memory allocation after committing to
+ * clog.
+ */
+static int32 *signalPids = NULL;
+static ProcNumber *signalProcnos = NULL;
+
 /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
 static bool tryAdvanceTail = false;
 
@@ -430,12 +571,23 @@ int			max_notify_queue_pages = 1048576;
 /* local function prototypes */
 static inline int64 asyncQueuePageDiff(int64 p, int64 q);
 static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
+static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
+										const char *channel);
+static dshash_hash globalChannelTableHash(const void *key, size_t size,
+										  void *arg);
+static void initGlobalChannelTable(void);
+static void initLocalChannelTable(void);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static void BecomeRegisteredListener(void);
+static void PrepareTableEntriesForListen(const char *channel);
+static void PrepareTableEntriesForUnlisten(const char *channel);
+static void PrepareTableEntriesForUnlistenAll(void);
+static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
+									  ListenerEntry *listeners,
+									  int idx);
+static void ApplyPendingListenActions(bool isCommit);
+static void CleanupListenersOnExit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
@@ -477,6 +629,145 @@ asyncQueuePagePrecedes(int64 p, int64 q)
 	return p < q;
 }
 
+/*
+ * GlobalChannelKeyInit
+ *		Prepare a global channel table key for hashing.
+ */
+static inline void
+GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(GlobalChannelKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * globalChannelTableHash
+ *		Hash function for global channel table keys.
+ */
+static dshash_hash
+globalChannelTableHash(const void *key, size_t size, void *arg)
+{
+	const GlobalChannelKey *k = (const GlobalChannelKey *) key;
+	dshash_hash h;
+
+	h = DatumGetUInt32(hash_uint32(k->dboid));
+	h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+								 strnlen(k->channel, NAMEDATALEN)));
+
+	return h;
+}
+
+/* parameters for the global channel table */
+static const dshash_parameters globalChannelTableDSHParams = {
+	sizeof(GlobalChannelKey),
+	sizeof(GlobalChannelEntry),
+	dshash_memcmp,
+	globalChannelTableHash,
+	dshash_memcpy,
+	LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * initGlobalChannelTable
+ *		Lazy initialization of the global channel table.
+ */
+static void
+initGlobalChannelTable(void)
+{
+	MemoryContext oldcontext;
+
+	/* Quick exit if we already did this */
+	if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID &&
+		globalChannelTable != NULL)
+		return;
+
+	/* Otherwise, use a lock to ensure only one process creates the table */
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	/* Be sure any local memory allocated by DSA routines is persistent */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID)
+	{
+		/* Initialize dynamic shared hash table for global channels */
+		globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+		dsa_pin(globalChannelDSA);
+		dsa_pin_mapping(globalChannelDSA);
+		globalChannelTable = dshash_create(globalChannelDSA,
+										   &globalChannelTableDSHParams,
+										   NULL);
+
+		/* Store handles in shared memory for other backends to use */
+		asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA);
+		asyncQueueControl->globalChannelTableDSH =
+			dshash_get_hash_table_handle(globalChannelTable);
+	}
+	else if (!globalChannelTable)
+	{
+		/* Attach to existing dynamic shared hash table */
+		globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA);
+		dsa_pin_mapping(globalChannelDSA);
+		globalChannelTable = dshash_attach(globalChannelDSA,
+										   &globalChannelTableDSHParams,
+										   asyncQueueControl->globalChannelTableDSH,
+										   NULL);
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+	LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * initLocalChannelTable
+ *		Lazy initialization of the local channel table.
+ *		Once created, this table lasts for the life of the session.
+ */
+static void
+initLocalChannelTable(void)
+{
+	HASHCTL		hash_ctl;
+
+	/* Quick exit if we already did this */
+	if (localChannelTable != NULL)
+		return;
+
+	/* Initialize local hash table for this backend's listened channels */
+	hash_ctl.keysize = NAMEDATALEN;
+	hash_ctl.entrysize = sizeof(ChannelName);
+
+	localChannelTable =
+		hash_create("Local Listen Channels",
+					64,
+					&hash_ctl,
+					HASH_ELEM | HASH_STRINGS);
+}
+
+/*
+ * initPendingListenActions
+ *		Lazy initialization of the pending listen actions hash table.
+ *		This is allocated in CurTransactionContext during PreCommit_Notify,
+ *		and destroyed at transaction end.
+ */
+static void
+initPendingListenActions(void)
+{
+	HASHCTL		hash_ctl;
+
+	if (pendingListenActions != NULL)
+		return;
+
+	hash_ctl.keysize = NAMEDATALEN;
+	hash_ctl.entrysize = sizeof(PendingListenEntry);
+	hash_ctl.hcxt = CurTransactionContext;
+
+	pendingListenActions =
+		hash_create("Pending Listen Actions",
+					list_length(pendingActions->actions),
+					&hash_ctl,
+					HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+}
+
 /*
  * Report space needed for our shared memory area
  */
@@ -520,12 +811,16 @@ AsyncShmemInit(void)
 		QUEUE_STOP_PAGE = 0;
 		QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
 		asyncQueueControl->lastQueueFillWarn = 0;
+		asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID;
+		asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID;
 		for (int i = 0; i < MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+			QUEUE_BACKEND_IS_ADVANCING(i) = false;
 		}
 	}
 
@@ -656,6 +951,9 @@ Async_Notify(const char *channel, const char *payload)
 		notifies->events = list_make1(n);
 		/* We certainly don't need a hashtable yet */
 		notifies->hashtab = NULL;
+		/* We won't build uniqueChannelNames/Hash till later, either */
+		notifies->uniqueChannelNames = NIL;
+		notifies->uniqueChannelHash = NULL;
 		notifies->upper = pendingNotifies;
 		pendingNotifies = notifies;
 	}
@@ -682,8 +980,8 @@ Async_Notify(const char *channel, const char *payload)
  *		Common code for listen, unlisten, unlisten all commands.
  *
  *		Adds the request to the list of pending actions.
- *		Actual update of the listenChannels list happens during transaction
- *		commit.
+ *		Actual update of localChannelTable and globalChannelTable happens during
+ *		PreCommit_Notify, with staged changes committed in AtCommit_Notify.
  */
 static void
 queue_listen(ListenActionKind action, const char *channel)
@@ -693,10 +991,9 @@ queue_listen(ListenActionKind action, const char *channel)
 	int			my_level = GetCurrentTransactionNestLevel();
 
 	/*
-	 * Unlike Async_Notify, we don't try to collapse out duplicates. It would
-	 * be too complicated to ensure we get the right interactions of
-	 * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
-	 * would be any performance benefit anyway in sane applications.
+	 * Unlike Async_Notify, we don't try to collapse out duplicates here. We
+	 * keep the ordered list to preserve interactions like UNLISTEN ALL; the
+	 * final per-channel intent is computed during PreCommit_Notify.
 	 */
 	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
@@ -782,30 +1079,49 @@ Async_UnlistenAll(void)
  * SQL function: return a set of the channel names this backend is actively
  * listening to.
  *
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the localChannelTable cannot
  * change within a transaction.
  */
 Datum
 pg_listening_channels(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
+	HASH_SEQ_STATUS *status;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* Initialize hash table iteration if we have any channels */
+		if (localChannelTable != NULL)
+		{
+			MemoryContext oldcontext;
+
+			oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+			status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
+			hash_seq_init(status, localChannelTable);
+			funcctx->user_fctx = status;
+			MemoryContextSwitchTo(oldcontext);
+		}
+		else
+		{
+			funcctx->user_fctx = NULL;
+		}
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
+	status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
 
-	if (funcctx->call_cntr < list_length(listenChannels))
+	if (status != NULL)
 	{
-		char	   *channel = (char *) list_nth(listenChannels,
-												funcctx->call_cntr);
+		ChannelName *entry;
 
-		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+		entry = (ChannelName *) hash_seq_search(status);
+		if (entry != NULL)
+			SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
 	}
 
 	SRF_RETURN_DONE(funcctx);
@@ -821,7 +1137,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 static void
 Async_UnlistenOnExit(int code, Datum arg)
 {
-	Exec_UnlistenAllCommit();
+	CleanupListenersOnExit();
 	asyncQueueUnregister();
 }
 
@@ -868,8 +1184,16 @@ PreCommit_Notify(void)
 		elog(DEBUG1, "PreCommit_Notify");
 
 	/* Preflight for any pending listen/unlisten actions */
+	initGlobalChannelTable();
+
 	if (pendingActions != NULL)
 	{
+		/* Ensure we have a local channel table */
+		initLocalChannelTable();
+		/* Create pendingListenActions hash table for this transaction */
+		initPendingListenActions();
+
+		/* Stage all the actions this transaction wants to perform */
 		foreach(p, pendingActions->actions)
 		{
 			ListenAction *actrec = (ListenAction *) lfirst(p);
@@ -877,13 +1201,14 @@ PreCommit_Notify(void)
 			switch (actrec->action)
 			{
 				case LISTEN_LISTEN:
-					Exec_ListenPreCommit();
+					BecomeRegisteredListener();
+					PrepareTableEntriesForListen(actrec->channel);
 					break;
 				case LISTEN_UNLISTEN:
-					/* there is no Exec_UnlistenPreCommit() */
+					PrepareTableEntriesForUnlisten(actrec->channel);
 					break;
 				case LISTEN_UNLISTEN_ALL:
-					/* there is no Exec_UnlistenAllPreCommit() */
+					PrepareTableEntriesForUnlistenAll();
 					break;
 			}
 		}
@@ -893,6 +1218,60 @@ PreCommit_Notify(void)
 	if (pendingNotifies)
 	{
 		ListCell   *nextNotify;
+		bool		firstIteration = true;
+
+		/*
+		 * Build list of unique channel names being notified for use by
+		 * SignalBackends().
+		 *
+		 * If uniqueChannelHash is available, use it to efficiently get the
+		 * unique channels.  Otherwise, fall back to the O(N^2) approach.
+		 */
+		pendingNotifies->uniqueChannelNames = NIL;
+		if (pendingNotifies->uniqueChannelHash != NULL)
+		{
+			HASH_SEQ_STATUS status;
+			ChannelName *channelEntry;
+
+			hash_seq_init(&status, pendingNotifies->uniqueChannelHash);
+			while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
+				pendingNotifies->uniqueChannelNames =
+					lappend(pendingNotifies->uniqueChannelNames,
+							channelEntry->channel);
+		}
+		else
+		{
+			/* O(N^2) approach is better for small number of notifications */
+			foreach_ptr(Notification, n, pendingNotifies->events)
+			{
+				char	   *channel = n->data;
+				bool		found = false;
+
+				/* Name present in list? */
+				foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames)
+				{
+					if (strcmp(oldchan, channel) == 0)
+					{
+						found = true;
+						break;
+					}
+				}
+				/* Add if not already in list */
+				if (!found)
+					pendingNotifies->uniqueChannelNames =
+						lappend(pendingNotifies->uniqueChannelNames,
+								channel);
+			}
+		}
+
+		/* Preallocate workspace that will be needed by SignalBackends() */
+		if (signalPids == NULL)
+			signalPids = MemoryContextAlloc(TopMemoryContext,
+											MaxBackends * sizeof(int32));
+
+		if (signalProcnos == NULL)
+			signalProcnos = MemoryContextAlloc(TopMemoryContext,
+											   MaxBackends * sizeof(ProcNumber));
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
@@ -921,6 +1300,23 @@ PreCommit_Notify(void)
 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
 						 AccessExclusiveLock);
 
+		/*
+		 * For the direct advancement optimization in SignalBackends(), we
+		 * need to ensure that no other backend can insert queue entries
+		 * between queueHeadBeforeWrite and queueHeadAfterWrite.  The
+		 * heavyweight lock above provides this guarantee, since it serializes
+		 * all writers.
+		 *
+		 * Note: if the heavyweight lock were ever removed for scalability
+		 * reasons, we could achieve the same guarantee by holding
+		 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
+		 * than releasing and reacquiring it for each page as we do below.
+		 */
+
+		/* Initialize values to a safe default in case list is empty */
+		SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
+		SET_QUEUE_POS(queueHeadAfterWrite, 0, 0);
+
 		/* Now push the notifications into the queue */
 		nextNotify = list_head(pendingNotifies->events);
 		while (nextNotify != NULL)
@@ -938,12 +1334,18 @@ PreCommit_Notify(void)
 			 * point in time we can still roll the transaction back.
 			 */
 			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+			if (firstIteration)
+			{
+				queueHeadBeforeWrite = QUEUE_HEAD;
+				firstIteration = false;
+			}
 			asyncQueueFillWarning();
 			if (asyncQueueIsFull())
 				ereport(ERROR,
 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 						 errmsg("too many notifications in the NOTIFY queue")));
 			nextNotify = asyncQueueAddEntries(nextNotify);
+			queueHeadAfterWrite = QUEUE_HEAD;
 			LWLockRelease(NotifyQueueLock);
 		}
 
@@ -956,7 +1358,7 @@ PreCommit_Notify(void)
  *
  *		This is called at transaction commit, after committing to clog.
  *
- *		Update listenChannels and clear transaction-local state.
+ *		Apply pending listen/unlisten changes and clear transaction-local state.
  *
  *		If we issued any notifications in the transaction, send signals to
  *		listening backends (possibly including ourselves) to process them.
@@ -966,8 +1368,6 @@ PreCommit_Notify(void)
 void
 AtCommit_Notify(void)
 {
-	ListCell   *p;
-
 	/*
 	 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
 	 * return as soon as possible
@@ -978,30 +1378,11 @@ AtCommit_Notify(void)
 	if (Trace_notify)
 		elog(DEBUG1, "AtCommit_Notify");
 
-	/* Perform any pending listen/unlisten actions */
-	if (pendingActions != NULL)
-	{
-		foreach(p, pendingActions->actions)
-		{
-			ListenAction *actrec = (ListenAction *) lfirst(p);
-
-			switch (actrec->action)
-			{
-				case LISTEN_LISTEN:
-					Exec_ListenCommit(actrec->channel);
-					break;
-				case LISTEN_UNLISTEN:
-					Exec_UnlistenCommit(actrec->channel);
-					break;
-				case LISTEN_UNLISTEN_ALL:
-					Exec_UnlistenAllCommit();
-					break;
-			}
-		}
-	}
+	/* Apply staged listen/unlisten changes */
+	ApplyPendingListenActions(true);
 
 	/* If no longer listening to anything, get out of listener array */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener && LocalChannelTableIsEmpty())
 		asyncQueueUnregister();
 
 	/*
@@ -1032,12 +1413,12 @@ AtCommit_Notify(void)
 }
 
 /*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * BecomeRegisteredListener --- subroutine for PreCommit_Notify
  *
  * This function must make sure we are ready to catch any incoming messages.
  */
 static void
-Exec_ListenPreCommit(void)
+BecomeRegisteredListener(void)
 {
 	QueuePosition head;
 	QueuePosition max;
@@ -1051,7 +1432,7 @@ Exec_ListenPreCommit(void)
 		return;
 
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
+		elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
 
 	/*
 	 * Before registering, make sure we will unlisten before dying. (Note:
@@ -1098,6 +1479,8 @@ Exec_ListenPreCommit(void)
 	QUEUE_BACKEND_POS(MyProcNumber) = max;
 	QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+	QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
 	/* Insert backend into list of listeners at correct position */
 	if (prevListener != INVALID_PROC_NUMBER)
 	{
@@ -1127,99 +1510,393 @@ Exec_ListenPreCommit(void)
 }
 
 /*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
  *
- * Add the channel to the list of channels we are listening on.
+ * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
+ * an entry in localChannelTable, and pre-allocating an entry in the shared
+ * globalChannelTable with listening=false.  The listening flag will be set
+ * to true in AtCommit_Notify.  If we abort later, unwanted table entries
+ * will be removed.
  */
 static void
-Exec_ListenCommit(const char *channel)
+PrepareTableEntriesForListen(const char *channel)
 {
-	MemoryContext oldcontext;
+	GlobalChannelKey key;
+	GlobalChannelEntry *entry;
+	bool		found;
+	ListenerEntry *listeners;
+	PendingListenEntry *pending;
+
+	/*
+	 * Record in local pending hash that we want to LISTEN, overwriting any
+	 * earlier attempt to UNLISTEN.
+	 */
+	pending = (PendingListenEntry *)
+		hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
+	pending->action = PENDING_LISTEN;
+
+	/*
+	 * Ensure that there is an entry for the channel in localChannelTable.
+	 * (Should this fail, we can just roll back.)  If the transaction fails
+	 * after this point, we will remove the entry if appropriate during
+	 * ApplyPendingListenActions.  Note that this entry allows IsListeningOn()
+	 * to return TRUE; we assume nothing is going to consult that before
+	 * AtCommit_Notify/AtAbort_Notify.  However, if later actions attempt to
+	 * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
+	 * present to ensure they do the right things; see
+	 * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
+	 */
+	(void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
+
+	/* Pre-allocate entry in shared globalChannelTable with listening=false */
+	GlobalChannelKeyInit(&key, MyDatabaseId, channel);
+	entry = dshash_find_or_insert(globalChannelTable, &key, &found);
+
+	if (!found)
+	{
+		/* New channel entry, so initialize it to a safe state */
+		entry->listenersArray = InvalidDsaPointer;
+		entry->numListeners = 0;
+		entry->allocatedListeners = 0;
+	}
+
+	/*
+	 * Create listenersArray if entry doesn't have one.  It's tempting to fold
+	 * this into the !found case, but this coding allows us to cope in case
+	 * dsa_allocate() failed in an earlier attempt.
+	 */
+	if (!DsaPointerIsValid(entry->listenersArray))
+	{
+		entry->listenersArray = dsa_allocate(globalChannelDSA,
+											 sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
+		entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
+	}
 
-	/* Do nothing if we are already listening on this channel */
-	if (IsListeningOn(channel))
+	listeners = (ListenerEntry *)
+		dsa_get_address(globalChannelDSA, entry->listenersArray);
+
+	/*
+	 * Check if we already have a ListenerEntry (possibly from earlier in this
+	 * transaction)
+	 */
+	for (int i = 0; i < entry->numListeners; i++)
+	{
+		if (listeners[i].procNo == MyProcNumber)
+		{
+			/* Already have an entry; listening flag stays as-is until commit */
+			dshash_release_lock(globalChannelTable, entry);
+			return;
+		}
+	}
+
+	/* Need to add a new entry; grow array if necessary */
+	if (entry->numListeners >= entry->allocatedListeners)
+	{
+		int			new_size = entry->allocatedListeners * 2;
+		dsa_pointer old_array = entry->listenersArray;
+		dsa_pointer new_array = dsa_allocate(globalChannelDSA,
+											 sizeof(ListenerEntry) * new_size);
+		ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array);
+
+		memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
+		entry->listenersArray = new_array;
+		entry->allocatedListeners = new_size;
+		dsa_free(globalChannelDSA, old_array);
+		listeners = new_listeners;
+	}
+
+	listeners[entry->numListeners].procNo = MyProcNumber;
+	listeners[entry->numListeners].listening = false;	/* staged, not yet
+														 * committed */
+	entry->numListeners++;
+
+	dshash_release_lock(globalChannelTable, entry);
+}
+
+/*
+ * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
+ *
+ * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
+ * we're currently listening (committed or staged).  We don't touch
+ * globalChannelTable yet - the listener keeps receiving signals until
+ * commit, when the entry is removed.
+ */
+static void
+PrepareTableEntriesForUnlisten(const char *channel)
+{
+	PendingListenEntry *pending;
+
+	/*
+	 * If the channel name is not in localChannelTable, then we are neither
+	 * listening on it nor preparing to listen on it, so we don't need to
+	 * record an UNLISTEN action.
+	 */
+	Assert(localChannelTable != NULL);
+	if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
 		return;
 
 	/*
-	 * Add the new channel name to listenChannels.
-	 *
-	 * XXX It is theoretically possible to get an out-of-memory failure here,
-	 * which would be bad because we already committed.  For the moment it
-	 * doesn't seem worth trying to guard against that, but maybe improve this
-	 * later.
+	 * Record in local pending hash that we want to UNLISTEN, overwriting any
+	 * earlier attempt to LISTEN.  Don't touch localChannelTable or
+	 * globalChannelTable yet - we keep receiving signals until commit.
 	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-	listenChannels = lappend(listenChannels, pstrdup(channel));
-	MemoryContextSwitchTo(oldcontext);
+	pending = (PendingListenEntry *)
+		hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
+	pending->action = PENDING_UNLISTEN;
 }
 
 /*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
  *
- * Remove the specified channel name from listenChannels.
+ * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
+ * about-to-be-listened channels in pendingListenActions.
  */
 static void
-Exec_UnlistenCommit(const char *channel)
+PrepareTableEntriesForUnlistenAll(void)
 {
-	ListCell   *q;
+	HASH_SEQ_STATUS seq;
+	ChannelName *channelEntry;
+	PendingListenEntry *pending;
 
-	if (Trace_notify)
-		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+	/*
+	 * Scan localChannelTable, which will have the names of all channels that
+	 * we are listening on or have prepared to listen on.  Record an UNLISTEN
+	 * action for each one, overwriting any earlier attempt to LISTEN.
+	 */
+	hash_seq_init(&seq, localChannelTable);
+	while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
+	{
+		pending = (PendingListenEntry *)
+			hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL);
+		pending->action = PENDING_UNLISTEN;
+	}
+}
+
+/*
+ * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
+ *
+ * Decrements numListeners, compacts the array, and frees the entry if empty.
+ * Sets *entry_ptr to NULL if the entry was deleted.
+ *
+ * We could get the listeners pointer from the entry, but all callers
+ * already have it at hand.
+ */
+static void
+RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
+						  ListenerEntry *listeners,
+						  int idx)
+{
+	GlobalChannelEntry *entry = *entry_ptr;
+
+	entry->numListeners--;
+	if (idx < entry->numListeners)
+		memmove(&listeners[idx], &listeners[idx + 1],
+				sizeof(ListenerEntry) * (entry->numListeners - idx));
 
-	foreach(q, listenChannels)
+	if (entry->numListeners == 0)
 	{
-		char	   *lchan = (char *) lfirst(q);
+		dsa_free(globalChannelDSA, entry->listenersArray);
+		dshash_delete_entry(globalChannelTable, entry);
+		/* tells caller not to release the entry's lock: */
+		*entry_ptr = NULL;
+	}
+}
+
+/*
+ * ApplyPendingListenActions
+ *
+ * Apply, or revert, staged listen/unlisten changes to the local and global
+ * hash tables.
+ */
+static void
+ApplyPendingListenActions(bool isCommit)
+{
+	HASH_SEQ_STATUS seq;
+	PendingListenEntry *pending;
+
+	/* Quick exit if nothing to do */
+	if (pendingListenActions == NULL)
+		return;
 
-		if (strcmp(lchan, channel) == 0)
+	/* We made a globalChannelTable before building pendingListenActions */
+	if (globalChannelTable == NULL)
+		elog(PANIC, "global channel table missing post-commit/abort");
+
+	/* For each staged action ... */
+	hash_seq_init(&seq, pendingListenActions);
+	while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
+	{
+		GlobalChannelKey key;
+		GlobalChannelEntry *entry;
+		bool		removeLocal = true;
+		bool		foundListener = false;
+
+		/*
+		 * Find the global entry for this channel.  If isCommit, it had better
+		 * exist (it was created in PreCommit).  In an abort, it might not
+		 * exist, in which case we are not listening and should discard any
+		 * local entry that PreCommit may have managed to create.
+		 */
+		GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
+		entry = dshash_find(globalChannelTable, &key, true);
+		if (entry != NULL)
 		{
-			listenChannels = foreach_delete_current(listenChannels, q);
-			pfree(lchan);
-			break;
+			/* Scan entry to find the ListenerEntry for this backend */
+			ListenerEntry *listeners;
+
+			listeners = (ListenerEntry *)
+				dsa_get_address(globalChannelDSA, entry->listenersArray);
+
+			for (int i = 0; i < entry->numListeners; i++)
+			{
+				if (listeners[i].procNo != MyProcNumber)
+					continue;
+				foundListener = true;
+				if (isCommit)
+				{
+					if (pending->action == PENDING_LISTEN)
+					{
+						/*
+						 * LISTEN being committed: set listening=true.
+						 * localChannelTable entry was created during
+						 * PreCommit and should be kept.
+						 */
+						listeners[i].listening = true;
+						removeLocal = false;
+					}
+					else
+					{
+						/*
+						 * UNLISTEN being committed: remove pre-allocated
+						 * entries from both tables.
+						 */
+						RemoveListenerFromChannel(&entry, listeners, i);
+					}
+				}
+				else
+				{
+					/*
+					 * Note: this part is reachable only if the transaction
+					 * aborts after PreCommit_Notify() has made some
+					 * pendingListenActions entries, so it's pretty hard to
+					 * test.
+					 */
+					if (!listeners[i].listening)
+					{
+						/*
+						 * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
+						 * and we weren't listening before, so remove
+						 * pre-allocated entries from both tables.
+						 */
+						RemoveListenerFromChannel(&entry, listeners, i);
+					}
+					else
+					{
+						/*
+						 * We're aborting, but the previous state was that
+						 * we're listening, so keep localChannelTable entry.
+						 */
+						removeLocal = false;
+					}
+				}
+				break;			/* there shouldn't be another match */
+			}
+
+			/* We might have already released the entry by removing it */
+			if (entry != NULL)
+				dshash_release_lock(globalChannelTable, entry);
 		}
-	}
 
-	/*
-	 * We do not complain about unlistening something not being listened;
-	 * should we?
-	 */
+		/*
+		 * If we're committing a LISTEN action, we should have found a
+		 * matching ListenerEntry, but otherwise it's okay if we didn't.
+		 */
+		if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
+			elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
+				 pending->channel, MyProcNumber);
+
+		/*
+		 * If we did not find a globalChannelTable entry for our backend, or
+		 * if we are unlistening, remove any localChannelTable entry that may
+		 * exist.  (Note in particular that this cleans up if we created a
+		 * localChannelTable entry and then failed while trying to create a
+		 * globalChannelTable entry.)
+		 */
+		if (removeLocal && localChannelTable != NULL)
+			(void) hash_search(localChannelTable, pending->channel,
+							   HASH_REMOVE, NULL);
+	}
 }
 
 /*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * CleanupListenersOnExit --- called from Async_UnlistenOnExit
  *
- *		Unlisten on all channels for this backend.
+ *		Remove this backend from all channels in the shared global table.
  */
 static void
-Exec_UnlistenAllCommit(void)
+CleanupListenersOnExit(void)
 {
+	dshash_seq_status status;
+	GlobalChannelEntry *entry;
+
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+		elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
+
+	/* Clear our local cache (not really necessary, but be consistent) */
+	if (localChannelTable != NULL)
+	{
+		hash_destroy(localChannelTable);
+		localChannelTable = NULL;
+	}
+
+	/* Now remove our entries from the shared globalChannelTable */
+	if (globalChannelTable == NULL)
+		return;
+
+	dshash_seq_init(&status, globalChannelTable, true);
+	while ((entry = dshash_seq_next(&status)) != NULL)
+	{
+		ListenerEntry *listeners;
+
+		if (entry->key.dboid != MyDatabaseId)
+			continue;			/* not relevant */
+
+		listeners = (ListenerEntry *)
+			dsa_get_address(globalChannelDSA, entry->listenersArray);
 
-	list_free_deep(listenChannels);
-	listenChannels = NIL;
+		for (int i = 0; i < entry->numListeners; i++)
+		{
+			if (listeners[i].procNo == MyProcNumber)
+			{
+				entry->numListeners--;
+				if (i < entry->numListeners)
+					memmove(&listeners[i], &listeners[i + 1],
+							sizeof(ListenerEntry) * (entry->numListeners - i));
+
+				if (entry->numListeners == 0)
+				{
+					dsa_free(globalChannelDSA, entry->listenersArray);
+					dshash_delete_current(&status);
+				}
+				break;
+			}
+		}
+	}
+	dshash_seq_term(&status);
 }
 
 /*
  * Test whether we are actively listening on the given channel name.
  *
  * Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it.  In practice the list is likely to be
- * fairly short, though.
  */
 static bool
 IsListeningOn(const char *channel)
 {
-	ListCell   *p;
-
-	foreach(p, listenChannels)
-	{
-		char	   *lchan = (char *) lfirst(p);
+	if (localChannelTable == NULL)
+		return false;
 
-		if (strcmp(lchan, channel) == 0)
-			return true;
-	}
-	return false;
+	return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
 }
 
 /*
@@ -1229,7 +1906,7 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-	Assert(listenChannels == NIL);	/* else caller error */
+	Assert(LocalChannelTableIsEmpty()); /* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
 		return;
@@ -1241,6 +1918,8 @@ asyncQueueUnregister(void)
 	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+	QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
 	/* and remove it from the list */
 	if (QUEUE_FIRST_LISTENER == MyProcNumber)
 		QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -1565,11 +2244,9 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
+ * Normally we signal only backends that are interested in the notifies that
+ * we just sent.  However, that will leave idle listeners falling further and
+ * further behind.  Waken them anyway if they're far enough behind, so they'll
  * advance their queue position pointers, allowing the global tail to advance.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
@@ -1580,60 +2257,124 @@ asyncQueueFillWarning(void)
 static void
 SignalBackends(void)
 {
-	int32	   *pids;
-	ProcNumber *procnos;
 	int			count;
 
+	/* Can't get here without PreCommit_Notify having made the global table */
+	Assert(globalChannelTable != NULL);
+
+	/* It should have set up these arrays, too */
+	Assert(signalPids != NULL && signalProcnos != NULL);
+
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
-	 * signals while holding the NotifyQueueLock, so this loop just builds a
-	 * list of target PIDs.
-	 *
-	 * XXX in principle these pallocs could fail, which would be bad. Maybe
-	 * preallocate the arrays?  They're not that large, though.
+	 * signals while holding the NotifyQueueLock, so this part just builds a
+	 * list of target PIDs in signalPids[] and signalProcnos[].
 	 */
-	pids = (int32 *) palloc(MaxBackends * sizeof(int32));
-	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
 	count = 0;
 
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	/* Scan each channel name that we notified in this transaction */
+	foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames)
+	{
+		GlobalChannelKey key;
+		GlobalChannelEntry *entry;
+		ListenerEntry *listeners;
+
+		GlobalChannelKeyInit(&key, MyDatabaseId, channel);
+		entry = dshash_find(globalChannelTable, &key, false);
+		if (entry == NULL)
+			continue;			/* nobody is listening */
+
+		listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
+													  entry->listenersArray);
+
+		/* Identify listeners that now need waking, add them to arrays */
+		for (int j = 0; j < entry->numListeners; j++)
+		{
+			ProcNumber	i;
+			int32		pid;
+			QueuePosition pos;
+
+			if (!listeners[j].listening)
+				continue;		/* ignore not-yet-committed listeners */
+
+			i = listeners[j].procNo;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+				continue;		/* already signaled, no need to repeat */
+
+			pid = QUEUE_BACKEND_PID(i);
+			pos = QUEUE_BACKEND_POS(i);
+
+			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+				continue;		/* it's fully caught up already */
+
+			Assert(pid != InvalidPid);
+
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+			signalPids[count] = pid;
+			signalProcnos[count] = i;
+			count++;
+		}
+
+		dshash_release_lock(globalChannelTable, entry);
+	}
+
+	/*
+	 * Scan all listeners.  Any that are not already pending wakeup must not
+	 * be interested in our notifications (else we'd have set their wakeup
+	 * flags above).  Check to see if we can directly advance their queue
+	 * pointers to save a wakeup.  Otherwise, if they are far behind, wake
+	 * them anyway so they will catch up.
+	 */
 	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
+		int32		pid;
 		QueuePosition pos;
 
-		Assert(pid != InvalidPid);
+		if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+			continue;
+
+		/* If it's currently advancing, we should not touch it */
+		if (QUEUE_BACKEND_IS_ADVANCING(i))
+			continue;
+
+		pid = QUEUE_BACKEND_PID(i);
 		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+
+		/*
+		 * We can directly advance the other backend's queue pointer if it's
+		 * not currently advancing (else there are race conditions), and its
+		 * current pointer is not behind queueHeadBeforeWrite (else we'd make
+		 * it miss some older messages), and we'd not be moving the pointer
+		 * backward.
+		 */
+		if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) &&
+			QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
 		{
-			/*
-			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
-			 */
-			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
-				continue;
+			/* We can directly advance its pointer past what we wrote */
+			QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
 		}
-		else
+		else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+									QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY)
 		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
-				continue;
+			/* It's idle and far behind, so wake it up */
+			Assert(pid != InvalidPid);
+
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+			signalPids[count] = pid;
+			signalProcnos[count] = i;
+			count++;
 		}
-		/* OK, need to signal this one */
-		pids[count] = pid;
-		procnos[count] = i;
-		count++;
 	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now send signals */
 	for (int i = 0; i < count; i++)
 	{
-		int32		pid = pids[i];
+		int32		pid = signalPids[i];
 
 		/*
 		 * If we are signaling our own process, no need to involve the kernel;
@@ -1651,12 +2392,9 @@ SignalBackends(void)
 		 * NotifyQueueLock; which is unlikely but certainly possible. So we
 		 * just log a low-level debug message if it happens.
 		 */
-		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
 			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
 	}
-
-	pfree(pids);
-	pfree(procnos);
 }
 
 /*
@@ -1664,18 +2402,18 @@ SignalBackends(void)
  *
  *	This is called at transaction abort.
  *
- *	Gets rid of pending actions and outbound notifies that we would have
- *	executed if the transaction got committed.
+ *	Revert any staged listen/unlisten changes and clean up transaction state.
+ *	This only does anything if we abort after PreCommit_Notify has staged
+ *	some entries.
  */
 void
 AtAbort_Notify(void)
 {
-	/*
-	 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
-	 * we have registered as a listener but have not made any entry in
-	 * listenChannels.  In that case, deregister again.
-	 */
-	if (amRegisteredListener && listenChannels == NIL)
+	/* Revert staged listen/unlisten changes */
+	ApplyPendingListenActions(false);
+
+	/* If we're no longer listening on anything, unregister */
+	if (amRegisteredListener && LocalChannelTableIsEmpty())
 		asyncQueueUnregister();
 
 	/* And clean up */
@@ -1854,20 +2592,27 @@ asyncQueueReadAllNotifications(void)
 	QueuePosition head;
 	Snapshot	snapshot;
 
-	/* Fetch current state */
+	/*
+	 * Fetch current state, indicate to others that we have woken up, and that
+	 * we are in process of advancing our position.
+	 */
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
-	LWLockRelease(NotifyQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))
 	{
 		/* Nothing to do, we have read all notifications already. */
+		LWLockRelease(NotifyQueueLock);
 		return;
 	}
 
+	QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
+	LWLockRelease(NotifyQueueLock);
+
 	/*----------
 	 * Get snapshot we'll use to decide which xacts are still in progress.
 	 * This is trickier than it might seem, because of race conditions.
@@ -1902,7 +2647,7 @@ asyncQueueReadAllNotifications(void)
 	 *
 	 * What we do guarantee is that we'll see all notifications from
 	 * transactions committing after the snapshot we take here.
-	 * Exec_ListenPreCommit has already added us to the listener array,
+	 * BecomeRegisteredListener has already added us to the listener array,
 	 * so no not-yet-committed messages can be removed from the queue
 	 * before we see them.
 	 *----------
@@ -1955,6 +2700,7 @@ asyncQueueReadAllNotifications(void)
 		/* Update shared state */
 		LWLockAcquire(NotifyQueueLock, LW_SHARED);
 		QUEUE_BACKEND_POS(MyProcNumber) = pos;
+		QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
 		LWLockRelease(NotifyQueueLock);
 
 		ExitOnAnyError = save_ExitOnAnyError;
@@ -2049,9 +2795,11 @@ asyncQueueProcessPageEntries(QueuePosition *current,
 			 * that if there's a bad entry in the queue for which
 			 * TransactionIdDidCommit() fails for some reason, we can skip
 			 * over it on the first LISTEN in a session, and not get stuck on
-			 * it indefinitely.
+			 * it indefinitely.  (This is a little trickier than it looks: it
+			 * works because BecomeRegisteredListener runs this code before we
+			 * have made the first entry in localChannelTable.)
 			 */
-			if (listenChannels == NIL)
+			if (LocalChannelTableIsEmpty())
 				continue;
 
 			if (TransactionIdDidCommit(qe->xid))
@@ -2306,7 +3054,7 @@ ProcessIncomingNotify(bool flush)
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
-	if (listenChannels == NIL)
+	if (LocalChannelTableIsEmpty())
 		return;
 
 	if (Trace_notify)
@@ -2410,7 +3158,7 @@ AddEventToPendingNotifies(Notification *n)
 {
 	Assert(pendingNotifies->events != NIL);
 
-	/* Create the hash table if it's time to */
+	/* Create the hash tables if it's time to */
 	if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
 		pendingNotifies->hashtab == NULL)
 	{
@@ -2429,10 +3177,22 @@ AddEventToPendingNotifies(Notification *n)
 						&hash_ctl,
 						HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
 
+		/* Create the unique channel name table */
+		Assert(pendingNotifies->uniqueChannelHash == NULL);
+		hash_ctl.keysize = NAMEDATALEN;
+		hash_ctl.entrysize = sizeof(ChannelName);
+		hash_ctl.hcxt = CurTransactionContext;
+		pendingNotifies->uniqueChannelHash =
+			hash_create("Pending Notify Channel Names",
+						64L,
+						&hash_ctl,
+						HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+
 		/* Insert all the already-existing events */
 		foreach(l, pendingNotifies->events)
 		{
 			Notification *oldn = (Notification *) lfirst(l);
+			char	   *channel = oldn->data;
 			bool		found;
 
 			(void) hash_search(pendingNotifies->hashtab,
@@ -2440,15 +3200,22 @@ AddEventToPendingNotifies(Notification *n)
 							   HASH_ENTER,
 							   &found);
 			Assert(!found);
+
+			/* Add channel name to uniqueChannelHash; might be there already */
+			(void) hash_search(pendingNotifies->uniqueChannelHash,
+							   channel,
+							   HASH_ENTER,
+							   NULL);
 		}
 	}
 
 	/* Add new event to the list, in order */
 	pendingNotifies->events = lappend(pendingNotifies->events, n);
 
-	/* Add event to the hash table if needed */
+	/* Add event to the hash tables if needed */
 	if (pendingNotifies->hashtab != NULL)
 	{
+		char	   *channel = n->data;
 		bool		found;
 
 		(void) hash_search(pendingNotifies->hashtab,
@@ -2456,6 +3223,12 @@ AddEventToPendingNotifies(Notification *n)
 						   HASH_ENTER,
 						   &found);
 		Assert(!found);
+
+		/* Add channel name to uniqueChannelHash; might be there already */
+		(void) hash_search(pendingNotifies->uniqueChannelHash,
+						   channel,
+						   HASH_ENTER,
+						   NULL);
 	}
 }
 
@@ -2505,6 +3278,8 @@ ClearPendingActionsAndNotifies(void)
 	 */
 	pendingActions = NULL;
 	pendingNotifies = NULL;
+	/* Also clear pendingListenActions, which is derived from pendingActions */
+	pendingListenActions = NULL;
 }
 
 /*
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 3299de23bb3..7194aee3532 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -372,6 +372,7 @@ SubtransBuffer	"Waiting for I/O on a sub-transaction SLRU buffer."
 MultiXactOffsetBuffer	"Waiting for I/O on a multixact offset SLRU buffer."
 MultiXactMemberBuffer	"Waiting for I/O on a multixact member SLRU buffer."
 NotifyBuffer	"Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
+NotifyChannelHash	"Waiting to access the <command>NOTIFY</command> channel hash table."
 SerialBuffer	"Waiting for I/O on a serializable transaction conflict SLRU buffer."
 WALInsert	"Waiting to insert WAL data into a memory buffer."
 BufferContent	"Waiting to access a data page in memory."
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 94f818b9f10..e6b32daff99 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -102,6 +102,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
 PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
 PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
 PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
 PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 14dec2d49c1..3f3a888fd0e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -422,6 +422,7 @@ CatalogIdMapEntry
 CatalogIndexState
 ChangeVarNodes_callback
 ChangeVarNodes_context
+ChannelName
 CheckPoint
 CheckPointStmt
 CheckpointStatsData
@@ -1111,6 +1112,8 @@ GistSplitUnion
 GistSplitVector
 GistTsVectorOptions
 GistVacState
+GlobalChannelEntry
+GlobalChannelKey
 GlobalTransaction
 GlobalTransactionData
 GlobalVisHorizonKind
@@ -1580,6 +1583,7 @@ ListParsedLex
 ListenAction
 ListenActionKind
 ListenStmt
+ListenerEntry
 LoInfo
 LoadStmt
 LocalBufferLookupEnt
@@ -2176,6 +2180,8 @@ PatternInfoArray
 Pattern_Prefix_Status
 Pattern_Type
 PendingFsyncEntry
+PendingListenAction
+PendingListenEntry
 PendingRelDelete
 PendingRelSync
 PendingUnlinkEntry
-- 
2.43.7

