diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 8fc1bbba7a..088215acdc 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -455,13 +455,11 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); -static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot); -static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); static void AddEventToPendingNotifies(Notification *n); @@ -1022,7 +1020,7 @@ AtCommit_Notify(void) * PreCommit_Notify(). */ if (pendingNotifies != NULL) - SignalBackends(); + SignalBackends(false); /* * If it's time to try to advance the global tail pointer, do that. @@ -1036,7 +1034,7 @@ AtCommit_Notify(void) if (tryAdvanceTail) { tryAdvanceTail = false; - asyncQueueAdvanceTail(); + asyncQueueAdvanceTail(false); } /* And clean up */ @@ -1494,7 +1492,7 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS) double usage; /* Advance the queue tail so we don't report a too-large result */ - asyncQueueAdvanceTail(); + asyncQueueAdvanceTail(false); LWLockAcquire(NotifyQueueLock, LW_SHARED); usage = asyncQueueUsage(); @@ -1587,9 +1585,16 @@ asyncQueueFillWarning(void) * * This is called during CommitTransaction(), so it's important for it * to have very low probability of failure. + * + * If all_databases is false (normal NOTIFY), we signal listeners in our own + * database unless they're caught up, and listeners in other databases only + * if they are far behind (QUEUE_CLEANUP_DELAY pages). + * + * If all_databases is true (VACUUM cleanup), we signal all listeners across + * all databases that aren't already caught up, with no distance filtering. */ -static void -SignalBackends(void) +void +SignalBackends(bool all_databases) { int32 *pids; ProcNumber *procnos; @@ -1615,25 +1620,21 @@ SignalBackends(void) Assert(pid != InvalidPid); pos = QUEUE_BACKEND_POS(i); - if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) - { - /* - * Always signal listeners in our own database, unless they're - * already caught up (unlikely, but possible). - */ - if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) - continue; - } - else - { - /* - * Listeners in other databases should be signaled only if they - * are far behind. - */ - if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), - QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) - continue; - } + + /* + * Always skip backends that are already caught up. + */ + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; + + /* + * Skip if we're not signaling all databases AND this is a different + * database AND the listener is not far behind. + */ + if (!all_databases && QUEUE_BACKEND_DBOID(i) != MyDatabaseId && + asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), + QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) + continue; /* OK, need to signal this one */ pids[count] = pid; procnos[count] = i; @@ -2134,7 +2135,7 @@ GetOldestNotifyTransactionId(void) TransactionId oldestXid = InvalidTransactionId; /* First advance the shared queue tail pointer */ - asyncQueueAdvanceTail(); + asyncQueueAdvanceTail(false); /* * We must start at QUEUE_TAIL since notification data might have been @@ -2191,15 +2192,38 @@ GetOldestNotifyTransactionId(void) return oldestXid; } +/* + * Check if there are any active listeners in the notification queue. + * + * Returns true if at least one backend is registered as a listener, + * false otherwise. + */ +bool +asyncQueueHasListeners(void) +{ + bool hasListeners; + + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + hasListeners = (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER); + LWLockRelease(NotifyQueueLock); + + return hasListeners; +} + /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. * - * This is (usually) called during CommitTransaction(), so it's important for - * it to have very low probability of failure. + * If force_to_head is false (normal case), we compute the new tail as the + * minimum of all listener positions. This is (usually) called during + * CommitTransaction(), so it's important for it to have very low probability + * of failure. + * + * If force_to_head is true (VACUUM cleanup), we advance the tail directly to + * the head, discarding all notifications, but only if there are no listeners. */ -static void -asyncQueueAdvanceTail(void) +void +asyncQueueAdvanceTail(bool force_to_head) { QueuePosition min; int64 oldtailpage; @@ -2227,12 +2251,38 @@ asyncQueueAdvanceTail(void) * to access the pages we are in the midst of truncating. */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - min = QUEUE_HEAD; - for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) + + if (force_to_head) { - Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + /* + * Verify that there are still no listeners. It's possible + * that a listener appeared since VACUUM checked. + */ + if (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER) + { + LWLockRelease(NotifyQueueLock); + LWLockRelease(NotifyQueueTailLock); + return; + } + + /* + * Advance the logical tail to the head, discarding all notifications. + */ + min = QUEUE_HEAD; } + else + { + /* + * Normal case: compute minimum position from all listeners. + */ + min = QUEUE_HEAD; + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) + { + Assert(QUEUE_BACKEND_PID(i) != InvalidPid); + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + } + } + QUEUE_TAIL = min; oldtailpage = QUEUE_STOP_PAGE; LWLockRelease(NotifyQueueLock); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index e5fedfb323..4ee06f233b 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -1739,11 +1739,24 @@ vac_update_datfrozenxid(void) * Also consider the oldest XID in the notification queue, since backends * will need to call TransactionIdDidCommit() on those XIDs when * processing the notifications. + * + * If the queue is blocking datfrozenxid advancement, attempt to clean it + * up. If listeners exist, wake them to process their pending + * notifications. If no listeners exist, discard all notifications. + * Either way, we back off datfrozenxid for this VACUUM cycle; the next + * VACUUM will benefit from the cleanup we've triggered. */ oldestNotifyXid = GetOldestNotifyTransactionId(); if (TransactionIdIsValid(oldestNotifyXid) && TransactionIdPrecedes(oldestNotifyXid, newFrozenXid)) + { + if (asyncQueueHasListeners()) + SignalBackends(true); + else + asyncQueueAdvanceTail(true); + newFrozenXid = oldestNotifyXid; + } Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index 0f8f17ad22..8ae9c4dfae 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -29,6 +29,11 @@ extern void NotifyMyFrontEnd(const char *channel, /* get oldest XID in the notification queue for vacuum freeze */ extern TransactionId GetOldestNotifyTransactionId(void); +/* functions for vacuum to manage notification queue */ +extern bool asyncQueueHasListeners(void); +extern void SignalBackends(bool all_databases); +extern void asyncQueueAdvanceTail(bool force_to_head); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel);