diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 90a530cfc61..751400b8315 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -285,7 +285,8 @@ typedef struct QueueBackendStatus int32 pid; /* either a PID or InvalidPid */ Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ - QueuePosition pos; /* backend has read queue up to here */ + QueuePosition pos; /* next position to read from */ + QueuePosition donePos; /* backend has definitively processed up to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_DONEPOS(i) (asyncQueueControl->backend[i].donePos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -668,6 +670,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -1290,6 +1293,7 @@ Exec_ListenPreCommit(void) prevListener = i; } QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ @@ -2415,9 +2419,19 @@ asyncQueueReadAllNotifications(void) } PG_FINALLY(); { - /* Update shared state */ + /* + * Update shared state. + * + * We update donePos to what we actually read (the local pos variable), + * as this is used for truncation safety. For the read position (pos), + * we use the maximum of our local position and the current shared + * position, in case another backend used direct advancement to skip us + * ahead while we were reading. This prevents us from going backwards + * and potentially pointing to a truncated page. + */ LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos; + QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, QUEUE_BACKEND_POS(MyProcNumber)); LWLockRelease(NotifyQueueLock); } PG_END_TRY(); @@ -2567,7 +2581,13 @@ asyncQueueAdvanceTail(void) for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + /* + * Use donePos rather than pos for truncation safety. The donePos + * field represents what the backend has definitively processed, while + * pos can be advanced by other backends via direct advancement. This + * prevents truncating pages that a backend is still reading from. + */ + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i)); } QUEUE_TAIL = min; oldtailpage = QUEUE_STOP_PAGE;