diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..ba06234dc8e 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -2168,6 +2168,120 @@ asyncQueueAdvanceTail(void) LWLockRelease(NotifyQueueTailLock); } +/* + * AsyncNotifyFreezeXids + * + * Prepare the async notification queue for CLOG truncation by freezing + * transaction IDs that are about to become inaccessible. + * + * This function is called by VACUUM before advancing datfrozenxid. It scans + * the notification queue and replaces XIDs that would become inaccessible + * after CLOG truncation with special markers: + * - Committed transactions are set to FrozenTransactionId + * - Aborted/crashed transactions are set to InvalidTransactionId + * + * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG + * pages will be truncated. If XID < newFrozenXid, it cannot still be running + * (or it would have held back newFrozenXid through ProcArray). + * Therefore, if TransactionIdDidCommit returns false, we know the transaction + * either aborted explicitly or crashed, and we can safely mark it invalid. + */ +void +AsyncNotifyFreezeXids(TransactionId newFrozenXid) +{ + QueuePosition pos; + QueuePosition head; + int64 curpage = -1; + int slotno = -1; + char *page_buffer = NULL; + bool page_dirty = false; + + /* + * Acquire locks in the correct order to avoid deadlocks. As per the + * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU + * bank locks. + * + * We only need SHARED mode since we're just reading the head/tail + * positions, not modifying them. + */ + LWLockAcquire(NotifyQueueTailLock, LW_SHARED); + LWLockAcquire(NotifyQueueLock, LW_SHARED); + + pos = QUEUE_TAIL; + head = QUEUE_HEAD; + + /* Release NotifyQueueLock early, we only needed to read the positions */ + LWLockRelease(NotifyQueueLock); + + /* + * Scan the queue from tail to head, freezing XIDs as needed. We hold + * NotifyQueueTailLock throughout to ensure the tail doesn't move while + * we're working. + */ + while (!QUEUE_POS_EQUAL(pos, head)) + { + AsyncQueueEntry *qe; + TransactionId xid; + int64 pageno = QUEUE_POS_PAGE(pos); + int offset = QUEUE_POS_OFFSET(pos); + + /* If we need a different page, release old lock and get new one */ + if (pageno != curpage) + { + LWLock *lock; + + /* Release previous page if any */ + if (slotno >= 0) + { + if (page_dirty) + { + NotifyCtl->shared->page_dirty[slotno] = true; + page_dirty = false; + } + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + } + + lock = SimpleLruGetBankLock(NotifyCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + page_buffer = NotifyCtl->shared->page_buffer[slotno]; + curpage = pageno; + } + + qe = (AsyncQueueEntry *) (page_buffer + offset); + xid = qe->xid; + + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, newFrozenXid)) + { + if (TransactionIdDidCommit(xid)) + { + qe->xid = FrozenTransactionId; + page_dirty = true; + } + else + { + qe->xid = InvalidTransactionId; + page_dirty = true; + } + } + + /* Advance to next entry */ + asyncQueueAdvance(&pos, qe->length); + } + + /* Release final page lock if we acquired one */ + if (slotno >= 0) + { + if (page_dirty) + NotifyCtl->shared->page_dirty[slotno] = true; + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + } + + LWLockRelease(NotifyQueueTailLock); +} + /* * ProcessIncomingNotify * diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index ed03e3bd50d..2e8dbf03bd5 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -37,6 +37,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" @@ -1758,6 +1759,13 @@ vac_update_datfrozenxid(void) dbform = (Form_pg_database) GETSTRUCT(tuple); + /* + * Before advancing datfrozenxid, freeze any old transaction IDs in the + * async notification queue to prevent "could not access status of + * transaction" errors when LISTEN is called after CLOG truncation. + */ + AsyncNotifyFreezeXids(newFrozenXid); + /* * As in vac_update_relstats(), we ordinarily don't want to let * datfrozenxid go backward; but if it's "in the future" then it must be diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..aaec7314c10 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +/* freeze old transaction IDs in notify queue (called by VACUUM) */ +extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid); + #endif /* ASYNC_H */