From 01a08074d02e2d39b1bd0778b1345ebc3f996d7f Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Mon, 8 Jun 2026 14:54:34 -0300 Subject: [PATCH v1] Add pattern matching support for LISTEN/NOTIFY channels This adds glob-style pattern matching for LISTEN channel names, allowing clients to subscribe to multiple channels with a single LISTEN command using wildcards. The supported wildcards are: * - matches zero or more characters ? - matches exactly one character \ - escapes the next character to match literal * or ? Example: LISTEN "user_*" will receive notifications sent to channels like "user_123", "user_alice", etc. To support targeted wakeups for pattern listeners, patterns are stored in two shared memory structures: 1. globalChannelTable - keyed by the pattern string itself, storing the list of backends listening on that pattern. This allows looking up listeners once we know which pattern matched. 2. globalPatterns list - an enumerable array of all registered patterns. SignalBackends() iterates this list, matching each pattern against the notification channel. For matching patterns, it looks up the entry in globalChannelTable to find and wake only those listeners. --- src/backend/commands/async.c | 564 +++++++++++++++++-- src/test/isolation/expected/async-notify.out | 89 ++- src/test/isolation/specs/async-notify.spec | 53 ++ src/test/regress/expected/async.out | 21 + src/test/regress/sql/async.sql | 27 + src/tools/pgindent/typedefs.list | 1 + 6 files changed, 711 insertions(+), 44 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index eee8bc29f38..f92a6a99290 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -111,6 +111,22 @@ * tracking which channels that backend is listening to. The local table * serves to reduce the number of accesses needed to the shared table. * + * Pattern channels (those containing wildcards like * or ?) are handled + * specially. A pattern cannot be looked up by exact name match when a + * notification arrives, since we need to match the notification channel + * against the pattern, not the other way around. To support targeted + * wakeups for pattern listeners, we maintain two structures: + * + * (a) The pattern is stored in globalChannelTable keyed by the pattern + * string itself. This allows us to find the list of backends + * listening on a specific pattern once we know which pattern matched. + * + * (b) A global pattern list (asyncQueueControl->globalPatterns) holds all + * registered patterns. SignalBackends() iterates this list, matching + * each pattern against the notification channel. When a pattern + * matches, we look up its entry in globalChannelTable to find and + * wake the listeners. + * * If the current transaction has executed any LISTEN/UNLISTEN actions, * PreCommit_Notify() prepares to commit those. For LISTEN, it * pre-allocates entries in both the per-backend localChannelTable and the @@ -292,8 +308,36 @@ typedef struct QueueBackendStatus QueuePosition pos; /* backend has read queue up to here */ bool wakeupPending; /* signal sent to backend, not yet processed */ bool isAdvancing; /* backend is advancing its position */ + bool hasPatterns; /* backend is listening on pattern channels */ } QueueBackendStatus; +#define INITIAL_PATTERNS_ARRAY_SIZE 8 + +/* + * Global pattern list definitions + * + * This structure holds all registered pattern channels in shared memory. + * It exists because patterns cannot be looked up by exact match in the + * globalChannelTable when a notification arrives -- we need to iterate + * through all patterns and test each one against the notification channel. + * + * Each pattern is stored as a GlobalChannelKey, which can be used directly + * to look up the corresponding GlobalChannelEntry in globalChannelTable + * (where the pattern string is stored as the "channel" field of the key). + * + * This allows SignalBackends() to: + * 1. Iterate through all patterns in the list + * 2. Match each pattern against the notification channel being notified + * 3. For matching patterns, use the key to find listeners in globalChannelTable + * 4. Wake only those backends whose patterns actually matched + */ +typedef struct GlobalPatternList +{ + dsa_pointer patternsArray; /* DSA pointer to GlobalChannelKey array */ + int numPatterns; /* Number of patterns currently stored */ + int allocatedPatterns; /* Allocated size of array */ +} GlobalPatternList; + /* * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) * @@ -340,6 +384,7 @@ typedef struct AsyncQueueControl TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ dsa_handle globalChannelTableDSA; /* global channel table's DSA handle */ dshash_table_handle globalChannelTableDSH; /* and its dshash handle */ + GlobalPatternList globalPatterns; /* shared list of pattern channels */ /* Array with room for MaxBackends entries: */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; @@ -365,6 +410,7 @@ const ShmemCallbacks AsyncShmemCallbacks = { #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) #define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing) +#define QUEUE_BACKEND_HAS_PATTERNS(i) (asyncQueueControl->backend[i].hasPatterns) /* * The SLRU buffer area through which we access the notification queue @@ -423,7 +469,8 @@ static HTAB *localChannelTable = NULL; /* We test this condition to detect that we're not listening at all */ #define LocalChannelTableIsEmpty() \ - (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0) + ((localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0) && \ + localPatternList == NIL) /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of @@ -474,6 +521,7 @@ typedef enum typedef struct PendingListenEntry { char channel[NAMEDATALEN]; /* hash key */ + bool is_pattern; /* channel contains wildcards? */ PendingListenAction action; /* which action should we perform? */ } PendingListenEntry; @@ -557,6 +605,13 @@ static bool unlistenExitRegistered = false; /* True if we're currently registered as a listener in asyncQueueControl */ static bool amRegisteredListener = false; +/* + * List of pattern channel names. These are stored separately because they + * cannot be matched via hash lookup. The list contains palloc'd strings in + * TopMemoryContext. + */ +static List *localPatternList = NIL; + /* * Queue head positions for direct advancement. * These are captured during PreCommit_Notify while holding the heavyweight @@ -599,9 +654,15 @@ static void PrepareTableEntriesForUnlisten(const char *channel); static void PrepareTableEntriesForUnlistenAll(void); static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, - int idx); + int idx, + bool is_pattern); static void ApplyPendingListenActions(bool isCommit); static void CleanupListenersOnExit(void); +static bool IsPattern(const char *channel); +static bool MatchPattern(const char *channel, const char *pattern); +static bool IsListeningOnPattern(const char *channel); +static void AddPatternToGlobalList(const char *pattern); +static void RemovePatternFromGlobalList(const char *pattern); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); static bool asyncQueueIsFull(void); @@ -611,6 +672,7 @@ static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); static void SignalBackends(void); +static void SignalListenersForEntry(GlobalChannelEntry *entry, int *count); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, @@ -843,6 +905,7 @@ AsyncShmemInit(void *arg) SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; QUEUE_BACKEND_IS_ADVANCING(i) = false; + QUEUE_BACKEND_HAS_PATTERNS(i) = false; } /* @@ -1525,6 +1588,9 @@ BecomeRegisteredListener(void) * an entry in localChannelTable, and pre-allocating an entry in the shared * globalChannelTable with removeOnAbort set. AtCommit_Notify will clear * removeOnAbort; abort processing will remove entries still marked so. + * + * For pattern channels, we add them to the local pattern list instead of the + * global channel table, since they cannot be looked up by exact name. */ static void PrepareTableEntriesForListen(const char *channel) @@ -1534,6 +1600,7 @@ PrepareTableEntriesForListen(const char *channel) bool found; ListenerEntry *listeners; PendingListenEntry *pending; + bool is_pattern = IsPattern(channel); /* * Record in local pending hash that we want to LISTEN, overwriting any @@ -1542,6 +1609,7 @@ PrepareTableEntriesForListen(const char *channel) pending = (PendingListenEntry *) hash_search(pendingListenActions, channel, HASH_ENTER, NULL); pending->action = PENDING_LISTEN; + pending->is_pattern = is_pattern; /* * Ensure that there is an entry for the channel in localChannelTable. @@ -1556,6 +1624,55 @@ PrepareTableEntriesForListen(const char *channel) */ (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL); + /* + * For pattern channels, add to the local pattern list. We also add + * patterns to globalChannelTable (keyed by the pattern string itself) so + * that once SignalBackends() finds a matching pattern, it can look up the + * listeners for that pattern by using the pattern string as the key. + * + * Patterns are stored in two places in shared memory: + * globalChannelTable: keyed by the pattern string, stores the listener + * list (same as exact channels). + * globalPatterns list: allows SignalBackends() to iterate all patterns and + * match them against notification channels being notified. + */ + if (is_pattern) + { + MemoryContext oldcxt; + char *pattern_copy; + bool exists = false; + + /* Check if we already have this pattern locally */ + foreach_ptr(char, existing, localPatternList) + { + if (strcmp(existing, channel) == 0) + { + exists = true; + break; + } + } + + if (!exists) + { + /* + * Add pattern to the list in TopMemoryContext so it persists + * + * XXX: Should it really use TopMemoryContext? + */ + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + pattern_copy = pstrdup(channel); + localPatternList = lappend(localPatternList, pattern_copy); + MemoryContextSwitchTo(oldcxt); + + /* Set the flag so we know we have patterns */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = true; + LWLockRelease(NotifyQueueLock); + } + + /* Fall through to also add to globalChannelTable */ + } + /* Pre-allocate entry in shared globalChannelTable */ GlobalChannelKeyInit(&key, MyDatabaseId, channel); entry = dshash_find_or_insert(globalChannelTable, &key, &found); @@ -1618,6 +1735,27 @@ PrepareTableEntriesForListen(const char *channel) entry->numListeners++; dshash_release_lock(globalChannelTable, entry); + + /* + * If this is a pattern, also add it to the global pattern list. + * + * The pattern is already stored in globalChannelTable (keyed by the + * pattern string) where the listener list is kept. The global pattern + * list provides an enumerable collection of all patterns so that + * SignalBackends() can iterate through them and match against + * notification channels. + */ + if (is_pattern) + { + /* + * XXX: At this point we should only allocate room for the entry on + * the global pattern list, otherwise it will be visible for other + * transactions before the running transaction commit. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + AddPatternToGlobalList(channel); + LWLockRelease(NotifyQueueLock); + } } /* @@ -1650,6 +1788,7 @@ PrepareTableEntriesForUnlisten(const char *channel) pending = (PendingListenEntry *) hash_search(pendingListenActions, channel, HASH_ENTER, NULL); pending->action = PENDING_UNLISTEN; + pending->is_pattern = IsPattern(channel); } /* @@ -1676,6 +1815,16 @@ PrepareTableEntriesForUnlistenAll(void) pending = (PendingListenEntry *) hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL); pending->action = PENDING_UNLISTEN; + pending->is_pattern = IsPattern(channelEntry->channel); + } + + /* Also unlisten from all pattern channels */ + foreach_ptr(char, pattern, localPatternList) + { + pending = (PendingListenEntry *) + hash_search(pendingListenActions, pattern, HASH_ENTER, NULL); + pending->action = PENDING_UNLISTEN; + pending->is_pattern = true; } } @@ -1685,13 +1834,17 @@ PrepareTableEntriesForUnlistenAll(void) * Decrements numListeners, compacts the array, and frees the entry if empty. * Sets *entry_ptr to NULL if the entry was deleted. * + * If is_pattern is true and the entry becomes empty, also removes the pattern + * from the global pattern list. + * * We could get the listeners pointer from the entry, but all callers * already have it at hand. */ static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, ListenerEntry *listeners, - int idx) + int idx, + bool is_pattern) { GlobalChannelEntry *entry = *entry_ptr; @@ -1702,6 +1855,17 @@ RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr, if (entry->numListeners == 0) { + /* + * If this was a pattern entry, remove it from the global pattern list + * before deleting the channel entry. + */ + if (is_pattern) + { + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + RemovePatternFromGlobalList(entry->key.channel); + LWLockRelease(NotifyQueueLock); + } + dsa_free(globalChannelDSA, entry->listenersArray); dshash_delete_entry(globalChannelTable, entry); /* tells caller not to release the entry's lock: */ @@ -1738,6 +1902,53 @@ ApplyPendingListenActions(bool isCommit) bool removeLocal = true; bool foundListener = false; + /* + * Handle pattern channels' local state. Patterns are also in + * globalChannelTable (keyed by the pattern string), so we still need + * to fall through to the globalChannelTable logic below. + */ + if (pending->is_pattern) + { + bool removeFromLocal = true; + + if (isCommit && pending->action == PENDING_LISTEN) + { + /* Pattern LISTEN committed - keep in localPatternList */ + removeFromLocal = false; + } + + /* + * If removeFromLocal is true, it means we are either aborting or + * committing an UNLISTEN. In both cases, remove from local list. + */ + if (removeFromLocal) + { + ListCell *lc; + + foreach(lc, localPatternList) + { + char *pattern = (char *) lfirst(lc); + + if (strcmp(pattern, pending->channel) == 0) + { + localPatternList = foreach_delete_current(localPatternList, lc); + pfree(pattern); + break; + } + } + + /* Update hasPatterns flag if we have no more patterns */ + if (localPatternList == NIL) + { + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false; + LWLockRelease(NotifyQueueLock); + } + } + + /* Fall through to handle globalChannelTable entry */ + } + /* * Find the global entry for this channel. If isCommit, it had better * exist (it was created in PreCommit). In an abort, it might not @@ -1777,7 +1988,8 @@ ApplyPendingListenActions(bool isCommit) * UNLISTEN being committed: remove pre-allocated * entries from both tables. */ - RemoveListenerFromChannel(&entry, listeners, i); + RemoveListenerFromChannel(&entry, listeners, i, + pending->is_pattern); } } else @@ -1794,7 +2006,8 @@ ApplyPendingListenActions(bool isCommit) * Staged LISTEN (or LISTEN+UNLISTEN) being aborted, * so remove pre-allocated entries from both tables. */ - RemoveListenerFromChannel(&entry, listeners, i); + RemoveListenerFromChannel(&entry, listeners, i, + pending->is_pattern); } else { @@ -1855,18 +2068,31 @@ CleanupListenersOnExit(void) localChannelTable = NULL; } + /* Clear the pattern list */ + if (localPatternList != NIL) + { + list_free_deep(localPatternList); + localPatternList = NIL; + } + /* Now remove our entries from the shared globalChannelTable */ if (globalChannelTable == NULL) return; + /* XXX: Only acquire the lock if is_pattern=true. */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + dshash_seq_init(&status, globalChannelTable, true); while ((entry = dshash_seq_next(&status)) != NULL) { ListenerEntry *listeners; + bool is_pattern; if (entry->key.dboid != MyDatabaseId) continue; /* not relevant */ + is_pattern = IsPattern(entry->key.channel); + listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, entry->listenersArray); @@ -1881,6 +2107,10 @@ CleanupListenersOnExit(void) if (entry->numListeners == 0) { + /* Remove from global pattern list if this was a pattern */ + if (is_pattern) + RemovePatternFromGlobalList(entry->key.channel); + dsa_free(globalChannelDSA, entry->listenersArray); dshash_delete_current(&status); } @@ -1889,6 +2119,196 @@ CleanupListenersOnExit(void) } } dshash_seq_term(&status); + + LWLockRelease(NotifyQueueLock); +} + +/* + * Check if a channel name contains pattern wildcards (* or ?). + */ +static bool +IsPattern(const char *channel) +{ + return (strchr(channel, '*') != NULL || strchr(channel, '?') != NULL); +} + +/* + * Match a channel name against a glob-style pattern. + * Supports: + * * - matches zero or more characters + * ? - matches exactly one character + * \ - escapes the next character (to match literal * or ?) + * + * Returns true if the channel matches the pattern. + */ +static bool +MatchPattern(const char *channel, const char *pattern) +{ + const char *c = channel; + const char *p = pattern; + + /* Position in channel and pattern after last * match */ + const char *c_star = NULL; + const char *p_star = NULL; + + while (*c != '\0') + { + if (*p == '\\' && *(p + 1) != '\0') + { + /* Escaped character - must match literally */ + p++; + if (*c == *p) + { + c++; + p++; + } + else if (p_star != NULL) + { + /* Backtrack to last * */ + p = p_star + 1; + c = ++c_star; + } + else + return false; + } + else if (*p == '?') + { + /* ? matches any single character */ + c++; + p++; + } + else if (*p == '*') + { + /* Remember position for backtracking */ + p_star = p++; + c_star = c; + } + else if (*c == *p) + { + /* Literal match */ + c++; + p++; + } + else if (p_star != NULL) + { + /* Mismatch, but we have a * to backtrack to */ + p = p_star + 1; + c = ++c_star; + } + else + { + /* Mismatch with no * to backtrack to */ + return false; + } + } + + /* Skip trailing *'s in pattern */ + while (*p == '*') + p++; + + return (*p == '\0'); +} + +/* + * Check if the channel matches any pattern we are listening on. + */ +static bool +IsListeningOnPattern(const char *channel) +{ + foreach_ptr(char, pattern, localPatternList) + { + if (MatchPattern(channel, pattern)) + return true; + } + + return false; +} + +/* + * Add a pattern to the global pattern list in shared memory. + * + * Caller must hold NotifyQueueLock in exclusive mode. + */ +static void +AddPatternToGlobalList(const char *pattern) +{ + GlobalPatternList *gpl = &asyncQueueControl->globalPatterns; + GlobalChannelKey *patterns; + + /* Initialize the array if needed */ + if (!DsaPointerIsValid(gpl->patternsArray)) + { + gpl->patternsArray = dsa_allocate(globalChannelDSA, + sizeof(GlobalChannelKey) * INITIAL_PATTERNS_ARRAY_SIZE); + gpl->allocatedPatterns = INITIAL_PATTERNS_ARRAY_SIZE; + gpl->numPatterns = 0; + } + + patterns = (GlobalChannelKey *) + dsa_get_address(globalChannelDSA, gpl->patternsArray); + + /* Check if this pattern already exists */ + for (int i = 0; i < gpl->numPatterns; i++) + { + if (patterns[i].dboid == MyDatabaseId && + strcmp(patterns[i].channel, pattern) == 0) + return; /* already registered */ + } + + /* Grow array if necessary */ + if (gpl->numPatterns >= gpl->allocatedPatterns) + { + int new_size = gpl->allocatedPatterns * 2; + dsa_pointer old_array = gpl->patternsArray; + dsa_pointer new_array = dsa_allocate(globalChannelDSA, + sizeof(GlobalChannelKey) * new_size); + GlobalChannelKey *new_patterns = (GlobalChannelKey *) + dsa_get_address(globalChannelDSA, new_array); + + memcpy(new_patterns, patterns, sizeof(GlobalChannelKey) * gpl->numPatterns); + gpl->patternsArray = new_array; + gpl->allocatedPatterns = new_size; + dsa_free(globalChannelDSA, old_array); + patterns = new_patterns; + } + + /* Add the new pattern */ + strlcpy(patterns[gpl->numPatterns].channel, pattern, NAMEDATALEN); + patterns[gpl->numPatterns].dboid = MyDatabaseId; + GlobalChannelKeyInit(&patterns[gpl->numPatterns], MyDatabaseId, pattern); + gpl->numPatterns++; +} + +/* + * Remove a pattern from the global pattern list. + * + * Caller must hold NotifyQueueLock in exclusive mode. + */ +static void +RemovePatternFromGlobalList(const char *pattern) +{ + GlobalPatternList *gpl = &asyncQueueControl->globalPatterns; + GlobalChannelKey *patterns; + + if (!DsaPointerIsValid(gpl->patternsArray)) + return; + + patterns = (GlobalChannelKey *) + dsa_get_address(globalChannelDSA, gpl->patternsArray); + + for (int i = 0; i < gpl->numPatterns; i++) + { + if (patterns[i].dboid == MyDatabaseId && + strcmp(patterns[i].channel, pattern) == 0) + { + /* Remove by shifting remaining entries */ + gpl->numPatterns--; + if (i < gpl->numPatterns) + memmove(&patterns[i], &patterns[i + 1], + sizeof(GlobalChannelKey) * (gpl->numPatterns - i)); + return; + } + } } /* @@ -1902,7 +2322,10 @@ IsListeningOn(const char *channel) if (localChannelTable == NULL) return false; - return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL); + if (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL) + return true; + + return IsListeningOnPattern(channel); } /* @@ -1926,6 +2349,7 @@ asyncQueueUnregister(void) QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false; + QUEUE_BACKEND_HAS_PATTERNS(MyProcNumber) = false; /* and remove it from the list */ if (QUEUE_FIRST_LISTENER == MyProcNumber) QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber); @@ -2274,6 +2698,15 @@ SignalBackends(void) * Identify backends that we need to signal. We don't want to send * signals while holding the NotifyQueueLock, so this part just builds a * list of target PIDs in signalPids[] and signalProcnos[]. + * + * For each notification channel, we perform two lookups: 1. Exact match: + * look up the channel name directly in globalChannelTable 2. Pattern + * match: iterate the global pattern list and test each pattern against the + * channel; for matches, look up the pattern's listener list in + * globalChannelTable + * + * Both exact and pattern listeners can receive the same notification, so + * we process both independently. */ count = 0; @@ -2284,59 +2717,52 @@ SignalBackends(void) { GlobalChannelKey key; GlobalChannelEntry *entry; - ListenerEntry *listeners; + /* + * First try exact match lookup. This handles backends that issued + * LISTEN 'channel_name' for this specific channel. + */ GlobalChannelKeyInit(&key, MyDatabaseId, channel); entry = dshash_find(globalChannelTable, &key, false); - if (entry == NULL) - continue; /* nobody is listening */ - - listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, - entry->listenersArray); + if (entry != NULL) + SignalListenersForEntry(entry, &count); /* - * Identify listeners that now need waking, add them to arrays. - * - * Note that we signal listeners regardless of the state of their - * removeOnAbort flags. Hence a new listener that reached PreCommit, - * but then failed before AtCommit_Notify, can receive a signal even - * though it was never really listening. This is okay because it will - * not do anything in response to that signal. If we did not do it - * like this then a new listener might miss some messages due to the - * direct-advance logic below. + * Now check all patterns in the global pattern list. For each pattern + * that matches the notification channel, look up the pattern's entry + * in globalChannelTable (keyed by the pattern string) to find and + * wake its listeners. */ - for (int j = 0; j < entry->numListeners; j++) + if (DsaPointerIsValid(asyncQueueControl->globalPatterns.patternsArray)) { - ProcNumber i = listeners[j].procNo; - int32 pid; - QueuePosition pos; - - if (QUEUE_BACKEND_WAKEUP_PENDING(i)) - continue; /* already signaled, no need to repeat */ + GlobalPatternList *gpl = &asyncQueueControl->globalPatterns; + GlobalChannelKey *patterns; - pid = QUEUE_BACKEND_PID(i); - pos = QUEUE_BACKEND_POS(i); + patterns = (GlobalChannelKey *) + dsa_get_address(globalChannelDSA, gpl->patternsArray); - if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) - continue; /* it's fully caught up already */ - - Assert(pid != InvalidPid); + /* Check all matching patterns */ + for (int i = 0; i < gpl->numPatterns; i++) + { + GlobalChannelKey *pattern = &patterns[i]; - QUEUE_BACKEND_WAKEUP_PENDING(i) = true; - signalPids[count] = pid; - signalProcnos[count] = i; - count++; + if (pattern->dboid == MyDatabaseId && MatchPattern(channel, pattern->channel)) + { + entry = dshash_find(globalChannelTable, pattern, false); + if (entry != NULL) + SignalListenersForEntry(entry, &count); + } + } } - - dshash_release_lock(globalChannelTable, entry); } /* * Scan all listeners. Any that are not already pending wakeup must not * be interested in our notifications (else we'd have set their wakeup - * flags above). Check to see if we can directly advance their queue - * pointers to save a wakeup. Otherwise, if they are far behind, wake - * them anyway so they will catch up. + * flags above, including pattern listeners via the global pattern list). + * Check to see if we can directly advance their queue pointers to save a + * wakeup. Otherwise, if they are far behind, wake them anyway so they + * will catch up. */ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { @@ -3297,3 +3723,55 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + +/* + * SignalListenersForEntry --- helper for SignalBackends + * + * Wake all listeners registered in the given GlobalChannelEntry by adding + * them to the signalPids/signalProcnos arrays. Releases the dshash lock + * on the entry before returning. + * + * This is called for both exact channel matches and pattern matches, + * allowing a single notification to wake listeners from multiple entries. + */ +static void +SignalListenersForEntry(GlobalChannelEntry *entry, int *count) +{ + ListenerEntry *listeners = dsa_get_address(globalChannelDSA, entry->listenersArray); + + /* + * Identify listeners that now need waking, add them to arrays. + * + * Note that we signal listeners regardless of the state of their + * removeOnAbort flags. Hence a new listener that reached PreCommit, but + * then failed before AtCommit_Notify, can receive a signal even though it + * was never really listening. This is okay because it will not do + * anything in response to that signal. If we did not do it like this + * then a new listener might miss some messages due to the direct-advance + * logic below. + */ + for (int j = 0; j < entry->numListeners; j++) + { + ProcNumber i = listeners[j].procNo; + int32 pid; + QueuePosition pos; + + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; /* already signaled, no need to repeat */ + + pid = QUEUE_BACKEND_PID(i); + pos = QUEUE_BACKEND_POS(i); + + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; /* it's fully caught up already */ + + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + signalPids[*count] = pid; + signalProcnos[*count] = i; + (*count)++; + } + + dshash_release_lock(globalChannelTable, entry); +} diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 55b7cbc6e02..95980ded634 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 6 sessions +Parsed test spec with 8 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -238,3 +238,90 @@ nonzero t (1 row) + +starting permutation: plisten_star pnotify_user_login pnotify_user_logout pnotify_user_empty pcheck +step plisten_star: LISTEN "user_*"; +step pnotify_user_login: NOTIFY user_login, 'login payload'; +step pnotify_user_logout: NOTIFY user_logout, 'logout payload'; +step pnotify_user_empty: NOTIFY user_, 'empty suffix'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "user_login" with payload "login payload" from pattern_notifier +pattern_listener: NOTIFY "user_logout" with payload "logout payload" from pattern_notifier +pattern_listener: NOTIFY "user_" with payload "empty suffix" from pattern_notifier + +starting permutation: plisten_question pnotify_msg_1 pnotify_msg_a pnotify_msg_12 pcheck +step plisten_question: LISTEN "msg_?"; +step pnotify_msg_1: NOTIFY msg_1, 'message 1'; +step pnotify_msg_a: NOTIFY msg_a, 'message a'; +step pnotify_msg_12: NOTIFY msg_12, 'should not match'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "msg_1" with payload "message 1" from pattern_notifier +pattern_listener: NOTIFY "msg_a" with payload "message a" from pattern_notifier + +starting permutation: plisten_complex pnotify_prefix_mid_suffix pnotify_prefix_suffix pnotify_prefix_a_b_suffix pcheck +step plisten_complex: LISTEN "prefix_*_suffix"; +step pnotify_prefix_mid_suffix: NOTIFY prefix_mid_suffix, 'complex match'; +step pnotify_prefix_suffix: NOTIFY prefix_suffix, 'no middle'; +step pnotify_prefix_a_b_suffix: NOTIFY prefix_a_b_suffix, 'multi char middle'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "prefix_mid_suffix" with payload "complex match" from pattern_notifier +pattern_listener: NOTIFY "prefix_a_b_suffix" with payload "multi char middle" from pattern_notifier + +starting permutation: plisten_multi pnotify_event_created pnotify_alert_x pnotify_nomatch pcheck +step plisten_multi: LISTEN "event_*"; LISTEN "alert_?"; +step pnotify_event_created: NOTIFY event_created, 'event'; +step pnotify_alert_x: NOTIFY alert_x, 'alert'; +step pnotify_nomatch: NOTIFY completely_different, 'no match'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "event_created" with payload "event" from pattern_notifier +pattern_listener: NOTIFY "alert_x" with payload "alert" from pattern_notifier + +starting permutation: plisten_star pnotify_user_login pcheck +step plisten_star: LISTEN "user_*"; +step pnotify_user_login: NOTIFY user_login, 'login payload'; +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "user_login" with payload "login payload" from pattern_notifier + +starting permutation: llisten plisten_star notify1 pnotify_user_login lcheck pcheck +step llisten: LISTEN c1; LISTEN c2; +step plisten_star: LISTEN "user_*"; +step notify1: NOTIFY c1; +step pnotify_user_login: NOTIFY user_login, 'login payload'; +step lcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +step pcheck: SELECT 1 AS x; +x +- +1 +(1 row) + +pattern_listener: NOTIFY "user_login" with payload "login payload" from pattern_notifier diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 7aef2e8d180..ab4cc2449dc 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -94,6 +94,30 @@ step lsnotify { NOTIFY c1, 'subxact_test'; } step lsnotify_check { NOTIFY c1, 'should_not_receive'; } teardown { UNLISTEN *; } +# Session for pattern listening tests +session pattern_listener +step plisten_star { LISTEN "user_*"; } +step plisten_question { LISTEN "msg_?"; } +step plisten_complex { LISTEN "prefix_*_suffix"; } +step plisten_multi { LISTEN "event_*"; LISTEN "alert_?"; } +step pcheck { SELECT 1 AS x; } +teardown { UNLISTEN *; } + +# Session for sending notifications to pattern listeners +session pattern_notifier +step pnotify_user_login { NOTIFY user_login, 'login payload'; } +step pnotify_user_logout { NOTIFY user_logout, 'logout payload'; } +step pnotify_user_empty { NOTIFY user_, 'empty suffix'; } +step pnotify_msg_1 { NOTIFY msg_1, 'message 1'; } +step pnotify_msg_a { NOTIFY msg_a, 'message a'; } +step pnotify_msg_12 { NOTIFY msg_12, 'should not match'; } +step pnotify_prefix_mid_suffix { NOTIFY prefix_mid_suffix, 'complex match'; } +step pnotify_prefix_suffix { NOTIFY prefix_suffix, 'no middle'; } +step pnotify_prefix_a_b_suffix { NOTIFY prefix_a_b_suffix, 'multi char middle'; } +step pnotify_event_created { NOTIFY event_created, 'event'; } +step pnotify_alert_x { NOTIFY alert_x, 'alert'; } +step pnotify_nomatch { NOTIFY completely_different, 'no match'; } + # Trivial cases. permutation listenc notify1 notify2 notify3 notifyf @@ -145,3 +169,32 @@ permutation lch_listen nch_notify lch_check # Hence, this should be the last test in this script. permutation llisten lbegin usage bignotify usage + +# +# Pattern LISTEN/NOTIFY tests +# + +# Basic pattern with * (zero or more characters) +# Should receive: user_login, user_logout, user_ (empty suffix matches *) +permutation plisten_star pnotify_user_login pnotify_user_logout pnotify_user_empty pcheck + +# Pattern with ? (exactly one character) +# Should receive: msg_1, msg_a +# Should NOT receive: msg_12 (two chars after _) +permutation plisten_question pnotify_msg_1 pnotify_msg_a pnotify_msg_12 pcheck + +# Complex pattern with * in the middle +# Should receive: prefix_mid_suffix, prefix_a_b_suffix +# Should NOT receive: prefix_suffix (requires at least one char for *) +permutation plisten_complex pnotify_prefix_mid_suffix pnotify_prefix_suffix pnotify_prefix_a_b_suffix pcheck + +# Multiple patterns on same session +# Should receive: event_created (matches event_*), alert_x (matches alert_?) +# Should NOT receive: completely_different +permutation plisten_multi pnotify_event_created pnotify_alert_x pnotify_nomatch pcheck + +# Cross-session pattern test: listener listens before notifier sends +permutation plisten_star pnotify_user_login pcheck + +# Pattern listener with regular listener (mixed) +permutation llisten plisten_star notify1 pnotify_user_login lcheck pcheck diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out index 19cbe38e636..c5236c15fb9 100644 --- a/src/test/regress/expected/async.out +++ b/src/test/regress/expected/async.out @@ -40,3 +40,24 @@ SELECT pg_notification_queue_usage(); 0 (1 row) +-- +-- Pattern LISTEN tests +-- +-- Pattern with * (matches zero or more characters) +LISTEN "user_*"; +UNLISTEN "user_*"; +-- Pattern with ? (matches exactly one character) +LISTEN "msg_?"; +UNLISTEN "msg_?"; +-- Pattern with wildcards in the middle +LISTEN "prefix_*_suffix"; +UNLISTEN "prefix_*_suffix"; +-- Escaped wildcards (for literal * and ? matching) +LISTEN "literal\*star"; +LISTEN "literal\?question"; +UNLISTEN "literal\*star"; +UNLISTEN "literal\?question"; +-- Multiple patterns +LISTEN "event_*"; +LISTEN "alert_?"; +UNLISTEN *; diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql index 40f6e015387..ca666c3019e 100644 --- a/src/test/regress/sql/async.sql +++ b/src/test/regress/sql/async.sql @@ -21,3 +21,30 @@ UNLISTEN *; -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. SELECT pg_notification_queue_usage(); + +-- +-- Pattern LISTEN tests +-- + +-- Pattern with * (matches zero or more characters) +LISTEN "user_*"; +UNLISTEN "user_*"; + +-- Pattern with ? (matches exactly one character) +LISTEN "msg_?"; +UNLISTEN "msg_?"; + +-- Pattern with wildcards in the middle +LISTEN "prefix_*_suffix"; +UNLISTEN "prefix_*_suffix"; + +-- Escaped wildcards (for literal * and ? matching) +LISTEN "literal\*star"; +LISTEN "literal\?question"; +UNLISTEN "literal\*star"; +UNLISTEN "literal\?question"; + +-- Multiple patterns +LISTEN "event_*"; +LISTEN "alert_?"; +UNLISTEN *; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8cf40c87043..13a38f680ab 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1158,6 +1158,7 @@ GistTsVectorOptions GistVacState GlobalChannelEntry GlobalChannelKey +GlobalPatternList GlobalTransaction GlobalTransactionData GlobalVisHorizonKind -- 2.50.1 (Apple Git-155)