From afff0f3f8b01cfde369c564025313e6acc9a610a Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sun, 19 Oct 2025 08:08:05 +0200 Subject: [PATCH] Implements idea #1: advisoryPos --- src/backend/commands/async.c | 63 +++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..6a02f5e3acc 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -264,6 +264,11 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@ -286,6 +291,7 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + QueuePosition advisoryPos; /* backend could skip queue to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_ADVISORY_POS(i) (asyncQueueControl->backend[i].advisoryPos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -674,6 +681,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -1312,6 +1320,7 @@ Exec_ListenPreCommit(void) prevListener = i; } QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ @@ -2031,9 +2040,13 @@ SignalBackends(void) * Even though we may take and release NotifyQueueLock multiple times * while writing, the heavyweight lock guarantees this region contains * only our messages. Therefore, any backend still positioned at the - * queue head from before our write can be safely advanced to the current + * queue head from before our write can be advised to skip to the current * queue head without waking it. * + * We use the advisoryPos field rather than directly modifying pos. + * The backend controls its own pos field and will check advisoryPos + * when it's safe to do so. + * * False-positive possibility: if a backend was previously signaled but * hasn't yet awoken, we'll skip advancing it (because wakeupPending is * true). This is safe - the backend will advance its pointer when it @@ -2048,6 +2061,7 @@ SignalBackends(void) i = QUEUE_NEXT_LISTENER(i)) { QueuePosition pos; + QueuePosition advisoryPos; int64 lag; int32 pid; @@ -2055,15 +2069,31 @@ SignalBackends(void) continue; pos = QUEUE_BACKEND_POS(i); + advisoryPos = QUEUE_BACKEND_ADVISORY_POS(i); - /* Direct advancement for idle backends at the old head */ + /* + * Direct advancement for idle backends at the old head. + * + * We check advisoryPos rather than pos to allow accumulating advances + * from multiple consecutive notifying backends. If we checked pos, + * only the first notifier could advance idle backends; subsequent + * notifiers would find pos unchanged (since the backend hasn't woken + * up yet) and fail to advance further. + */ if (pendingNotifies != NULL && - QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) + QUEUE_POS_EQUAL(advisoryPos, queueHeadBeforeWrite)) { - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; - pos = queueHeadAfterWrite; + QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite; + advisoryPos = queueHeadAfterWrite; } + /* + * For lag calculation, use whichever position is further ahead. + * This ensures we don't spuriously wake a backend that has been + * directly advanced. + */ + pos = QUEUE_POS_MAX(pos, advisoryPos); + /* Signal backends that have fallen too far behind */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(pos)); @@ -2302,6 +2332,7 @@ static void asyncQueueReadAllNotifications(void) { volatile QueuePosition pos; + QueuePosition advisoryPos; QueuePosition head; Snapshot snapshot; @@ -2319,6 +2350,21 @@ asyncQueueReadAllNotifications(void) QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; + + /* + * Check if another backend has set an advisory position for us. + * If so, and if we haven't yet read past that point, we can safely + * adopt the advisory position and skip the intervening notifications. + */ + advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber); + + if (!QUEUE_POS_EQUAL(advisoryPos, pos) && + QUEUE_POS_PRECEDES(pos, advisoryPos)) + { + pos = advisoryPos; + QUEUE_BACKEND_POS(MyProcNumber) = pos; + } + LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) @@ -2440,6 +2486,13 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyProcNumber) = pos; + /* + * Advance advisoryPos to our current position if it has fallen behind, + * but preserve any newer advisory position that may have been set by + * another backend while we were processing notifications. + */ + QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = + QUEUE_POS_MAX(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)); LWLockRelease(NotifyQueueLock); } PG_END_TRY(); -- 2.50.1