From 839b2934fe3d73b78965198d86dba3b97e3696e2 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Tue, 26 Aug 2025 10:09:01 -0300 Subject: [PATCH v2] Consider LISTEN/NOTIFY min xid during VACUUM FREEZE Previously a listener backend that is delaying to consume notifications (due to idle in transaction for example), if the VACUUM FREEZE is executed during this period and drop clog files that contains transaction information about the notification in the queue, the listener backend can loose this notification when committing the transaction: ERROR: could not access status of transaction 756 DETAIL: Could not open file "pg_xact/0000": No such file or directory. This commit fix this issue by iterating over the queue notifications for each backend listener and check which is the oldest transaction xid on the queue and then consider this value during VACUUM FREEZE execution. --- src/backend/commands/async.c | 261 ++++++++++++++++++ src/backend/commands/vacuum.c | 13 + src/include/commands/async.h | 3 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_listen_notify/Makefile | 17 ++ .../modules/test_listen_notify/meson.build | 13 + .../test_listen_notify/t/001_xid_freeze.pl | 73 +++++ src/tools/pgindent/typedefs.list | 1 + 9 files changed, 383 insertions(+) create mode 100644 src/test/modules/test_listen_notify/Makefile create mode 100644 src/test/modules/test_listen_notify/meson.build create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..e986a27eb4e 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -401,6 +401,45 @@ struct NotificationHash Notification *event; /* => the actual Notification struct */ }; +/* + * A state-based iterator for consuming notifications (AsyncQueueEntry) from the async queue. + * + * Note that the iterator will iterate over the async queue based on the + * "current" and "head" positions, it will start at "current" and it will read + * until it reach the "head" position. For example, to read notifications for a + * specific backend it should just use the QUEUE_BACKEND_POS as a "current" + * position starting point and head as QUEUE_HEAD. To read all async queue + * notifications just use QUEUE_HEAD as "current". + */ +typedef struct AsyncQueueIterator +{ + /* Current queue position of iteration. */ + QueuePosition current; + + /* how far it will read */ + QueuePosition head; + + /* Current queue entry being read. */ + AsyncQueueEntry *current_entry; + + /* Snapshot used to decide which xacts are still in progress. */ + Snapshot snapshot; + + /* buffer to read pages from SLRU */ + union + { + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; + } page_buffer; + + /* Should read a page from SLRU? */ + bool read_next_page; + + /* No more entries to read */ + bool done; +} AsyncQueueIterator; + + static NotificationList *pendingNotifies = NULL; /* @@ -458,6 +497,9 @@ static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void AsyncQueueIterInit(AsyncQueueIterator *iter, QueuePosition current, QueuePosition head); +static AsyncQueueEntry *AsyncQueueIterNextNotification(AsyncQueueIterator *iter); + /* * Compute the difference between two queue page numbers. * Previously this function accounted for a wraparound. @@ -2395,3 +2437,222 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + + +/* + * Initializes an AsyncQueueIterator. + * + * It sets up the state and gets the initial snapshot. + */ +static void +AsyncQueueIterInit(AsyncQueueIterator *iter, QueuePosition current, QueuePosition head) +{ + /* Initialize internal state */ + iter->read_next_page = true; + iter->done = false; + iter->current = current; + iter->head = head; + + /* Get the snapshot we'll use for visibility checks */ + iter->snapshot = RegisterSnapshot(GetLatestSnapshot()); +} + +/* + * Returns the next AsyncQueueEntry from the async queue. + * + * Returns a pointer to the entry on success, otherwise NULL if there are no + * more notifications to process, or if an uncommitted notification is found. + * + * It handles fetching pages from the shared SLRU as needed. The returned + * pointer is to a local buffer, so it's only valid until the next call to this + * function. + */ +static AsyncQueueEntry * +AsyncQueueIterNextNotification(AsyncQueueIterator *iter) +{ + AsyncQueueEntry *qe; + QueuePosition thisentry; + + /* + * Loop until a valid notification is found or we reach the end of the + * queue or an uncommitted transaction. + */ + do + { + CHECK_FOR_INTERRUPTS(); + + /* No more entries to process. */ + if (iter->done) + return NULL; + + if (QUEUE_POS_EQUAL(iter->current, iter->head)) + { + /* Nothing to do, the backend don't have any notification to read. */ + iter->done = true; + return NULL; + } + + /* + * We need to process a page at a time. If we haven't read the current + * page yet, or have reached the end of the previous one, read the + * next page. + */ + if (iter->read_next_page) + { + int64 curpage = QUEUE_POS_PAGE(iter->current); + int curoffset = QUEUE_POS_OFFSET(iter->current); + int slotno; + int copysize; + + + /* + * We copy the data from SLRU into a local buffer, so as to avoid + * holding the SLRU lock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. + */ + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, InvalidTransactionId); + + /* Determine how much of the page we need to copy */ + if (curpage == QUEUE_POS_PAGE(iter->head)) + { + /* We only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(iter->head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + + memcpy(iter->page_buffer.buf, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + + /* + * Page was read, set to false to next calls will consume the + * entries on the read page. + */ + iter->read_next_page = false; + iter->current_entry = (AsyncQueueEntry *) iter->page_buffer.buf; + } + else + { + /* + * If we are not reading a new page, advance our local entry + * pointer to the next message. + */ + iter->current_entry = (AsyncQueueEntry *) ((char *) iter->current_entry + iter->current_entry->length); + } + + thisentry = iter->current; + + /* Read the AsyncQueueEntry within our local buffer. */ + qe = iter->current_entry; + + Assert(qe->length > 0); + + /* + * Advance iter->current over this message. If reached the end of the + * page set read_page to true to try to read the next one on next + * call. + * + * TODO(matheus): Incorporate comments from + * asyncQueueProcessPageEntries. + */ + iter->read_next_page = asyncQueueAdvance(&iter->current, qe->length); + + /* Ignore messages destined for other databases */ + if (qe->dboid == MyDatabaseId) + { + /* Check for uncommitted transaction and ignore if found */ + if (XidInMVCCSnapshot(qe->xid, iter->snapshot)) + { + /* + * The source transaction is still in progress, so we can't + * process this message yet. Break out of the loop, but first + * back up *current so we will reprocess the message next + * time. (Note: it is unlikely but not impossible for + * TransactionIdDidCommit to fail, so we can't really avoid + * this advance-then-back-up behavior when dealing with an + * uncommitted message.) + * + * Note that we must test XidInMVCCSnapshot before we test + * TransactionIdDidCommit, else we might return a message from + * a transaction that is not yet visible to snapshots; compare + * the comments at the head of heapam_visibility.c. + * + * Also, while our own xact won't be listed in the snapshot, + * we need not check for TransactionIdIsCurrentTransactionId + * because our transaction cannot (yet) have queued any + * messages. + */ + + iter->current = thisentry; + return NULL; + } + else if (TransactionIdDidCommit(qe->xid)) + { + /* Found a valid notification */ + return qe; + } + else + { + /* + * The source transaction aborted or crashed, so we just + * ignore its notifications and go to the next. + */ + continue; + } + } + } while (true); +} + +/* + * Cleans up the iterator by unregistering the snapshot. + */ +static void +AsyncQueueIterDestroy(AsyncQueueIterator *iter) +{ + UnregisterSnapshot(iter->snapshot); +} + +TransactionId +AsyncQueueMinXid(void) +{ + QueuePosition current; + QueuePosition head; + AsyncQueueEntry *qe; + AsyncQueueIterator iter; + TransactionId minXid = MaxTransactionId; + + /* + * First advance the global queue tail so we don't need to worry about + * notifications already processed by backends. + */ + asyncQueueAdvanceTail(); + + /* Fetch current state */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + current = QUEUE_TAIL; + head = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); + + AsyncQueueIterInit(&iter, current, head); + + while ((qe = AsyncQueueIterNextNotification(&iter)) != NULL) + { + if (qe->xid < minXid) + minXid = qe->xid; + } + + AsyncQueueIterDestroy(&iter); + + + return minXid; +} diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 733ef40ae7c..d35d6fc8e8a 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -37,6 +37,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" @@ -1739,6 +1740,18 @@ vac_update_datfrozenxid(void) if (bogus) return; + /* + * We need to check transaction status of notifications before of notify + * the client, if there is lag to consume the notifications we need to + * consider the older xid of notification on the queue so that the + * transaction status can be accessed. + * + * XXX(matheus): Maybe add a GUC to prevent lazy listeners or + * notifications that were added without listeners to block the VACUUM + * FREEZE newFronzenXid advance. + */ + newFrozenXid = Min(newFrozenXid, AsyncQueueMinXid()); + Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..3592103a0da 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); + +extern TransactionId AsyncQueueMinXid(void); + #endif /* ASYNC_H */ diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 903a8ac151a..4c0160df341 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -28,6 +28,7 @@ SUBDIRS = \ test_int128 \ test_integerset \ test_json_parser \ + test_listen_notify \ test_lfind \ test_misc \ test_oat_hooks \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 93be0f57289..144379b619b 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -27,6 +27,7 @@ subdir('test_ginpostinglist') subdir('test_int128') subdir('test_integerset') subdir('test_json_parser') +subdir('test_listen_notify') subdir('test_lfind') subdir('test_misc') subdir('test_oat_hooks') diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile new file mode 100644 index 00000000000..da1bf5bb1b7 --- /dev/null +++ b/src/test/modules/test_listen_notify/Makefile @@ -0,0 +1,17 @@ +# src/test/modules/test_listen_notify/Makefile + +MODULE = test_listen_notify +PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support" + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_listen_notify +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build new file mode 100644 index 00000000000..8119e6c761f --- /dev/null +++ b/src/test/modules/test_listen_notify/meson.build @@ -0,0 +1,13 @@ +# Copyright (c) 2022-2025, PostgreSQL Global Development Group + +tests += { + 'name': 'test_listen_notify', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_xid_freeze.pl', + ], + }, +} + diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl new file mode 100644 index 00000000000..79dcd73ed65 --- /dev/null +++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl @@ -0,0 +1,73 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use File::Path qw(mkpath); +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +# Setup +$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound'); +$node->safe_psql('postgres', + 'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,100000) g;' +); +$node->safe_psql('postgres', + 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true'); + +# --- Start Session 1 and leave it idle in transaction +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe('listen s;', "Session 1 listens to 's'"); +$psql_session1->query_safe('begin;', "Session 1 starts a transaction"); + +# --- Session 2, multiple notify's, and commit --- +for my $i (1 .. 10) +{ + $node->safe_psql( + 'postgres', " + BEGIN; + NOTIFY s, '$i'; + COMMIT;"); +} + +# Consume enough XIDs to trigger truncation +$node->safe_psql('postgres', 'select consume_xids(10000000);'); + +# Execute update so the frozen xid of "t" table is updated to a xid greater +# than consume_xids() result +$node->safe_psql('postgres', 'UPDATE t SET a = a+b;'); + +# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced. +my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +# Execute vacuum freeze on all databases +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but +# we can still get the notification status of the notification +my $datafronzenxid_partial_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); +ok($datafronzenxid_partial_freeze > $datafronzenxid, 'datfrozenxid is partially advanced'); + +# On Session 1, commit and ensure that the all notifications is received +my $res = $psql_session1->query_safe('commit;', "commit listen s;"); +my $notifications_count = 0; +foreach my $i (split('\n', $res)) +{ + $notifications_count++; + like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/); +} +is($notifications_count, 10, 'received all committed notifications'); + +# Execute vacuum freeze on all databases again and ensure that datfrozenxid is fully advanced. +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); +ok($datafronzenxid_freeze > $datafronzenxid_partial_freeze, 'datfrozenxid is advanced after notification is consumed'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a13e8162890..14684584cff 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -158,6 +158,7 @@ ArrayToken ArrayType AsyncQueueControl AsyncQueueEntry +AsyncQueueIterator AsyncRequest AttInMetadata AttStatsSlot -- 2.39.5 (Apple Git-154)