*** a/src/backend/commands/async.c --- b/src/backend/commands/async.c *************** *** 294,299 **** static SlruCtlData AsyncCtlData; --- 294,304 ---- static List *listenChannels = NIL; /* list of C strings */ /* + * If listenWildcard is set, we're listening on all channels. + */ + static bool listenWildcard = false; + + /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of * all actions requested in the current transaction. As explained above, * we don't actually change listenChannels until we reach transaction commit. *************** *** 306,311 **** static List *listenChannels = NIL; /* list of C strings */ --- 311,317 ---- typedef enum { LISTEN_LISTEN, + LISTEN_LISTEN_ALL, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL } ListenActionKind; *************** *** 373,378 **** static void queue_listen(ListenActionKind action, const char *channel); --- 379,385 ---- static void Async_UnlistenOnExit(int code, Datum arg); static void Exec_ListenPreCommit(void); static void Exec_ListenCommit(const char *channel); + static void Exec_ListenAllCommit(void); static void Exec_UnlistenCommit(const char *channel); static void Exec_UnlistenAllCommit(void); static bool IsListeningOn(const char *channel); *************** *** 598,604 **** Async_Notify(const char *channel, const char *payload) /* * queue_listen ! * Common code for listen, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. * Actual update of the listenChannels list happens during transaction --- 605,611 ---- /* * queue_listen ! * Common code for listen, listen all, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. * Actual update of the listenChannels list happens during transaction *************** *** 613,620 **** queue_listen(ListenActionKind action, const char *channel) /* * Unlike Async_Notify, we don't try to collapse out duplicates. It would * be too complicated to ensure we get the right interactions of ! * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there ! * would be any performance benefit anyway in sane applications. */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); --- 620,627 ---- /* * Unlike Async_Notify, we don't try to collapse out duplicates. It would * be too complicated to ensure we get the right interactions of ! * conflicting LISTEN/LISTEN_ALL/UNLISTEN/UNLISTEN_ALL, and it's unlikely ! * that there would be any performance benefit anyway in sane applications. */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); *************** *** 644,649 **** Async_Listen(const char *channel) --- 651,671 ---- } /* + * Async_ListenAll + * + * This is executed by the LISTEN * command. + */ + void + Async_ListenAll(void) + { + if (Trace_notify) + elog(DEBUG1, "Async_ListenAll(%d)", MyProcPid); + + queue_listen(LISTEN_LISTEN_ALL, ""); + } + + + /* * Async_Unlisten * * This is executed by the SQL unlisten command. *************** *** 790,795 **** PreCommit_Notify(void) --- 812,818 ---- switch (actrec->action) { case LISTEN_LISTEN: + case LISTEN_LISTEN_ALL: Exec_ListenPreCommit(); break; case LISTEN_UNLISTEN: *************** *** 895,900 **** AtCommit_Notify(void) --- 918,926 ---- case LISTEN_LISTEN: Exec_ListenCommit(actrec->channel); break; + case LISTEN_LISTEN_ALL: + Exec_ListenAllCommit(); + break; case LISTEN_UNLISTEN: Exec_UnlistenCommit(actrec->channel); break; *************** *** 905,911 **** AtCommit_Notify(void) } /* If no longer listening to anything, get out of listener array */ ! if (amRegisteredListener && listenChannels == NIL) asyncQueueUnregister(); /* And clean up */ --- 931,937 ---- } /* If no longer listening to anything, get out of listener array */ ! if (amRegisteredListener && listenChannels == NIL && !listenWildcard) asyncQueueUnregister(); /* And clean up */ *************** *** 1026,1031 **** Exec_ListenCommit(const char *channel) --- 1052,1076 ---- } /* + * Exec_ListenAllCommit --- subroutine for AtCommit_Notify + * + * Start listening on all notification channels. + */ + static void + Exec_ListenAllCommit(void) + { + listenWildcard = true; + + /* + * We can forget all notification channels right away. The only way to + * undo a "LISTEN *" is to "UNLISTEN *", and in that case we'd just release + * the list of channels anyway. + */ + list_free_deep(listenChannels); + listenChannels = NIL; + } + + /* * Exec_UnlistenCommit --- subroutine for AtCommit_Notify * * Remove the specified channel name from listenChannels. *************** *** 1072,1077 **** Exec_UnlistenAllCommit(void) --- 1117,1123 ---- list_free_deep(listenChannels); listenChannels = NIL; + listenWildcard = false; } /* *************** *** 1130,1136 **** ProcessCompletedNotifies(void) /* Send signals to other backends */ signalled = SignalBackends(); ! if (listenChannels != NIL) { /* Read the queue ourselves, and send relevant stuff to the frontend */ asyncQueueReadAllNotifications(); --- 1176,1182 ---- /* Send signals to other backends */ signalled = SignalBackends(); ! if (listenChannels != NIL || listenWildcard) { /* Read the queue ourselves, and send relevant stuff to the frontend */ asyncQueueReadAllNotifications(); *************** *** 1956,1962 **** asyncQueueProcessPageEntries(volatile QueuePosition *current, /* qe->data is the null-terminated channel name */ char *channel = qe->data; ! if (IsListeningOn(channel)) { /* payload follows channel name */ char *payload = qe->data + strlen(channel) + 1; --- 2002,2008 ---- /* qe->data is the null-terminated channel name */ char *channel = qe->data; ! if (listenWildcard || IsListeningOn(channel)) { /* payload follows channel name */ char *payload = qe->data + strlen(channel) + 1; *************** *** 2044,2050 **** ProcessIncomingNotify(void) notifyInterruptPending = false; /* Do nothing else if we aren't actively listening */ ! if (listenChannels == NIL) return; if (Trace_notify) --- 2090,2096 ---- notifyInterruptPending = false; /* Do nothing else if we aren't actively listening */ ! if (listenChannels == NIL && !listenWildcard) return; if (Trace_notify) *** a/src/backend/parser/gram.y --- b/src/backend/parser/gram.y *************** *** 8543,8548 **** ListenStmt: LISTEN ColId --- 8543,8554 ---- n->conditionname = $2; $$ = (Node *)n; } + | LISTEN '*' + { + ListenStmt *n = makeNode(ListenStmt); + n->conditionname = NULL; + $$ = (Node *)n; + } ; UnlistenStmt: *** a/src/backend/tcop/utility.c --- b/src/backend/tcop/utility.c *************** *** 609,615 **** standard_ProcessUtility(Node *parsetree, PreventCommandDuringRecovery("LISTEN"); CheckRestrictedOperation("LISTEN"); ! Async_Listen(stmt->conditionname); } break; --- 609,618 ---- PreventCommandDuringRecovery("LISTEN"); CheckRestrictedOperation("LISTEN"); ! if (stmt->conditionname) ! Async_Listen(stmt->conditionname); ! else ! Async_ListenAll(); } break; *** a/src/include/commands/async.h --- b/src/include/commands/async.h *************** *** 31,36 **** extern void AsyncShmemInit(void); --- 31,37 ---- /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel); + extern void Async_ListenAll(void); extern void Async_Unlisten(const char *channel); extern void Async_UnlistenAll(void);