From 31ff0b7c35320afacf30685c006f17d6de179421 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sun, 19 Oct 2025 18:55:25 +0200 Subject: [PATCH] Prevent VACUUM from truncating XIDs still present in notification queue VACUUM's computation of datfrozenxid did not account for transaction IDs in the LISTEN/NOTIFY queue. This allowed VACUUM to truncate clog entries for XIDs that were still referenced by queued notifications, causing backends to fail in TransactionIdDidCommit when later processing those notifications. Fix by adding GetOldestQueuedNotifyXid to find the oldest XID in queued notifications for the current database, and constraining datfrozenxid to not pass that. The function scans from QUEUE_TAIL, since notifications may have been written before any listeners existed. To avoid code duplication, refactor SLRU page-reading code into a new helper function asyncQueueReadPageToBuffer. --- src/backend/commands/async.c | 139 ++++++++++++++++++++++++++++------ src/backend/commands/vacuum.c | 14 ++++ src/include/commands/async.h | 3 + 3 files changed, 132 insertions(+), 24 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..7c9d7831c9f 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -1841,6 +1841,44 @@ ProcessNotifyInterrupt(bool flush) ProcessIncomingNotify(flush); } +/* + * Read a page from the SLRU queue into a local buffer. + * + * Reads the page containing 'pos', copying the data from the current offset + * either to the end of the page or up to 'head' (whichever comes first) + * into page_buffer. + */ +static void +asyncQueueReadPageToBuffer(QueuePosition pos, QueuePosition head, + char *page_buffer) +{ + int64 curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; + + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + + if (curpage == QUEUE_POS_PAGE(head)) + { + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + + memcpy(page_buffer + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); +} /* * Read all pending notifications from the queue, and deliver appropriate @@ -1932,36 +1970,13 @@ asyncQueueReadAllNotifications(void) do { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; - /* * We copy the data from SLRU into a local buffer, so as to avoid * holding the SLRU lock while we are examining the entries and * possibly transmitting them to our frontend. Copy only the part * of the page we will actually inspect. */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + asyncQueueReadPageToBuffer(pos, head, page_buffer.buf); /* * Process messages up to the stop position, end of page, or an @@ -2097,6 +2112,82 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, return reachedStop; } +/* + * Get the oldest XID in the notification queue that has not yet been + * processed by all listening backends. + * + * Returns InvalidTransactionId if there are no unprocessed notifications. + */ +TransactionId +GetOldestQueuedNotifyXid(void) +{ + QueuePosition pos; + QueuePosition head; + TransactionId oldestXid = InvalidTransactionId; + + /* page_buffer must be adequately aligned, so use a union */ + union + { + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; + } page_buffer; + + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* + * We must start at QUEUE_TAIL since notification data might have been + * written before there were any listening backends. + */ + pos = QUEUE_TAIL; + head = QUEUE_HEAD; + + /* If the queue is empty, no XIDs need protection */ + if (QUEUE_POS_EQUAL(pos, head)) + { + LWLockRelease(NotifyQueueLock); + return InvalidTransactionId; + } + + while (!QUEUE_POS_EQUAL(pos, head)) + { + int curoffset; + AsyncQueueEntry *qe; + + /* Read the current page from SLRU into our local buffer */ + asyncQueueReadPageToBuffer(pos, head, page_buffer.buf); + + curoffset = QUEUE_POS_OFFSET(pos); + + /* Process all entries on this page up to head */ + while (curoffset + QUEUEALIGN(AsyncQueueEntryEmptySize) <= QUEUE_PAGESIZE && + !QUEUE_POS_EQUAL(pos, head)) + { + qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset); + + /* + * Check if this entry is for our database and has a valid XID. + * Only entries for our database matter for our datfrozenxid. + */ + if (qe->dboid == MyDatabaseId && TransactionIdIsValid(qe->xid)) + { + if (!TransactionIdIsValid(oldestXid) || + TransactionIdPrecedes(qe->xid, oldestXid)) + oldestXid = qe->xid; + } + + /* Advance to next entry */ + if (asyncQueueAdvance(&pos, qe->length)) + break; /* advanced to next page */ + + curoffset = QUEUE_POS_OFFSET(pos); + } + } + + LWLockRelease(NotifyQueueLock); + + return oldestXid; +} + /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index ed03e3bd50d..4f278c6b988 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -37,6 +37,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" @@ -1733,6 +1734,19 @@ vac_update_datfrozenxid(void) if (bogus) return; + /* + * Also consider the oldest XID in the notification queue, since + * backends will need to call TransactionIdDidCommit() on those + * XIDs when processing the notifications. + */ + { + TransactionId oldestNotifyXid = GetOldestQueuedNotifyXid(); + + if (TransactionIdIsValid(oldestNotifyXid) && + TransactionIdPrecedes(oldestNotifyXid, newFrozenXid)) + newFrozenXid = oldestNotifyXid; + } + Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..d707f516316 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -26,6 +26,9 @@ extern void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid); +/* get oldest XID in the notification queue for vacuum */ +extern TransactionId GetOldestQueuedNotifyXid(void); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel); -- 2.50.1