From 7ec33dfc14174e9a4424a1738977601a4fd13bf5 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Sat, 9 Aug 2025 13:51:21 -0300 Subject: [PATCH v0] Consider async queue min xid on VACUUM FREEZE --- src/backend/commands/async.c | 58 +++++++++++++++++++++++++++++++++++ src/backend/commands/vacuum.c | 11 +++++++ src/include/commands/async.h | 3 ++ 3 files changed, 72 insertions(+) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..fff055601a1 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -287,6 +287,7 @@ typedef struct AsyncQueueControl * tail.page */ ProcNumber firstListener; /* id of first listener, or * INVALID_PROC_NUMBER */ + TransactionId minXid; TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; @@ -458,6 +459,9 @@ static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void StoreEntryXid(TransactionId xid); +static void ReleaseEntryXid(TransactionId xid); + /* * Compute the difference between two queue page numbers. * Previously this function accounted for a wraparound. @@ -521,6 +525,7 @@ AsyncShmemInit(void) QUEUE_STOP_PAGE = 0; QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; asyncQueueControl->lastQueueFillWarn = 0; + asyncQueueControl->minXid = MaxTransactionId; for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; @@ -1428,6 +1433,12 @@ asyncQueueAddEntries(ListCell *nextNotify) &qe, qe.length); + /* + * Remember the notification xid so that vacuum don't frozen after + * this xid. + */ + StoreEntryXid(qe.xid); + /* Advance queue_head appropriately, and detect if page is full */ if (asyncQueueAdvance(&(queue_head), qe.length)) { @@ -2077,6 +2088,12 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, char *payload = qe->data + strlen(channel) + 1; NotifyMyFrontEnd(channel, payload, qe->srcPid); + + /* + * Notification was sent, so release the notification xid + * so that vacuum can freeze past this notification. + */ + ReleaseEntryXid(qe->xid); } } else @@ -2395,3 +2412,44 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + +/* + * Return the minimum notification xid on the queue. + * + * TODO(matheus): handle when the queue is empty, so we don't get locked in + * the past with a notification that was already sent. + */ +TransactionId +AsyncQueueMinXid() +{ + return asyncQueueControl->minXid; +} + +/* + * Subroutine of asyncQueueAddEntries + * + * We are holding NotifyQueueLock already from the caller. + */ +static void +StoreEntryXid(TransactionId xid) +{ + if (xid < asyncQueueControl->minXid) + asyncQueueControl->minXid = xid; +} + +/* + * Subroutine of asyncQueueAddEntries + * + * TODO(matheus): may update the code comment on + * asyncQeueuReadAllNotifications() when calling + * asyncQueueProcessPageEntries(). + * + */ +static void +ReleaseEntryXid(TransactionId xid) +{ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + if (xid > asyncQueueControl->minXid) + asyncQueueControl->minXid = xid; + LWLockRelease(NotifyQueueLock); +} diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 733ef40ae7c..f0d6d868e60 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -21,6 +21,7 @@ * *------------------------------------------------------------------------- */ +#include "commands/async.h" #include "postgres.h" #include @@ -1739,6 +1740,16 @@ vac_update_datfrozenxid(void) if (bogus) return; + /* + * We need to check transaction status of notifications before of notify + * the client, if there is lag to consume the notifications we need to + * consider the older xid of notification on the queue so that the + * transaction status can be accessed. + * + * XXX(matheus): Wrap this behavior into a GUC? + */ + newFrozenXid = Min(newFrozenXid, AsyncQueueMinXid()); + Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..512a8976126 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -13,6 +13,7 @@ #ifndef ASYNC_H #define ASYNC_H +#include #include extern PGDLLIMPORT bool Trace_notify; @@ -46,4 +47,6 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +extern TransactionId AsyncQueueMinXid(void); + #endif /* ASYNC_H */ -- 2.39.5 (Apple Git-154)