From 928cc032706ac154153279adbdfba95f6af2fae4 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sun, 19 Oct 2025 08:12:47 +0200 Subject: [PATCH] Implement idea #2: donePos --- src/backend/commands/async.c | 57 +++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..c81807107d1 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -285,7 +285,8 @@ typedef struct QueueBackendStatus int32 pid; /* either a PID or InvalidPid */ Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ - QueuePosition pos; /* backend has read queue up to here */ + QueuePosition pos; /* next position to read from */ + QueuePosition donePos; /* backend has definitively processed up to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_DONEPOS(i) (asyncQueueControl->backend[i].donePos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -674,6 +676,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -1312,6 +1315,7 @@ Exec_ListenPreCommit(void) prevListener = i; } QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ @@ -2048,6 +2052,7 @@ SignalBackends(void) i = QUEUE_NEXT_LISTENER(i)) { QueuePosition pos; + QueuePosition donePos; int64 lag; int32 pid; @@ -2055,6 +2060,7 @@ SignalBackends(void) continue; pos = QUEUE_BACKEND_POS(i); + donePos = QUEUE_BACKEND_DONEPOS(i); /* Direct advancement for idle backends at the old head */ if (pendingNotifies != NULL && @@ -2064,9 +2070,17 @@ SignalBackends(void) pos = queueHeadAfterWrite; } - /* Signal backends that have fallen too far behind */ + /* + * Signal backends that have fallen too far behind. + * + * We use donePos rather than pos for the lag check because donePos + * is what matters for queue truncation (see asyncQueueAdvanceTail). + * A backend may have been directly advanced (pos is recent) while + * donePos is still far behind, holding back the tail. We need to + * wake such backends so they can advance their donePos. + */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), - QUEUE_POS_PAGE(pos)); + QUEUE_POS_PAGE(donePos)); if (lag >= QUEUE_CLEANUP_DELAY) { @@ -2319,14 +2333,25 @@ asyncQueueReadAllNotifications(void) QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; - LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) { - /* Nothing to do, we have read all notifications already. */ + /* + * Nothing to do, we have read all notifications already. + * + * Update donePos to match pos before returning. This is important + * when our position was advanced via direct advancement: we need to + * update donePos so the queue tail can advance. Without this, + * backends that have been directly advanced would hold back queue + * truncation indefinitely. + */ + QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos; + LWLockRelease(NotifyQueueLock); return; } + LWLockRelease(NotifyQueueLock); + /*---------- * Get snapshot we'll use to decide which xacts are still in progress. * This is trickier than it might seem, because of race conditions. @@ -2437,9 +2462,19 @@ asyncQueueReadAllNotifications(void) } PG_FINALLY(); { - /* Update shared state */ + /* + * Update shared state. + * + * We update donePos to what we actually read (the local pos variable), + * as this is used for truncation safety. For the read position (pos), + * we use the maximum of our local position and the current shared + * position, in case another backend used direct advancement to skip us + * ahead while we were reading. This prevents us from going backwards + * and potentially pointing to a truncated page. + */ LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos; + QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, QUEUE_BACKEND_POS(MyProcNumber)); LWLockRelease(NotifyQueueLock); } PG_END_TRY(); @@ -2589,7 +2624,13 @@ asyncQueueAdvanceTail(void) for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + /* + * Use donePos rather than pos for truncation safety. The donePos + * field represents what the backend has definitively processed, while + * pos can be advanced by other backends via direct advancement. This + * prevents truncating pages that a backend is still reading from. + */ + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i)); } QUEUE_TAIL = min; oldtailpage = QUEUE_STOP_PAGE; -- 2.50.1