From 37675f6feff30a365de3b26719002b65a81dabb3 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sun, 19 Oct 2025 18:55:25 +0200 Subject: [PATCH v11 1/2] Prevent VACUUM from truncating XIDs still present in notification queue VACUUM's computation of datfrozenxid did not account for transaction IDs in the LISTEN/NOTIFY queue. This allowed VACUUM to truncate clog entries for XIDs that were still referenced by queued notifications, causing backends to fail in TransactionIdDidCommit when later processing those notifications. Fix by adding GetOldestQueuedNotifyXid to find the oldest XID in queued notifications for the current database, and constraining datfrozenxid to not pass that. The function scans from QUEUE_TAIL, since notifications may have been written before any listeners existed. To avoid code duplication, refactor SLRU page-reading code into a new helper function asyncQueueReadPageToBuffer. Co-authored-by: Matheus Alcantara --- src/backend/commands/async.c | 156 ++++++++++++++---- src/backend/commands/vacuum.c | 12 ++ src/include/commands/async.h | 3 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + .../modules/test_listen_notify/.gitignore | 4 + src/test/modules/test_listen_notify/Makefile | 19 +++ .../modules/test_listen_notify/meson.build | 13 ++ .../test_listen_notify/t/001_xid_freeze.pl | 89 ++++++++++ src/tools/pgindent/typedefs.list | 1 + 10 files changed, 268 insertions(+), 31 deletions(-) create mode 100644 src/test/modules/test_listen_notify/.gitignore create mode 100644 src/test/modules/test_listen_notify/Makefile create mode 100644 src/test/modules/test_listen_notify/meson.build create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..8fc1bbba7a2 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -401,6 +401,17 @@ struct NotificationHash Notification *event; /* => the actual Notification struct */ }; +/* + * Page buffers used to read from SLRU cache must be adequately aligned, + * so use a union. + */ +typedef union +{ + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; +} AlignedQueueEntryPage; + + static NotificationList *pendingNotifies = NULL; /* @@ -1841,6 +1852,44 @@ ProcessNotifyInterrupt(bool flush) ProcessIncomingNotify(flush); } +/* + * Read a page from the SLRU queue into a local buffer. + * + * Reads the page containing 'pos', copying the data from the current offset + * either to the end of the page or up to 'head' (whichever comes first) + * into page_buffer. + */ +static void +asyncQueueReadPageToBuffer(QueuePosition pos, QueuePosition head, + char *page_buffer) +{ + int64 curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; + + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + + if (curpage == QUEUE_POS_PAGE(head)) + { + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + + memcpy(page_buffer + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); +} /* * Read all pending notifications from the queue, and deliver appropriate @@ -1853,13 +1902,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; - - /* page_buffer must be adequately aligned, so use a union */ - union - { - char buf[QUEUE_PAGESIZE]; - AsyncQueueEntry align; - } page_buffer; + AlignedQueueEntryPage page_buffer; /* Fetch current state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); @@ -1932,36 +1975,13 @@ asyncQueueReadAllNotifications(void) do { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; - /* * We copy the data from SLRU into a local buffer, so as to avoid * holding the SLRU lock while we are examining the entries and * possibly transmitting them to our frontend. Copy only the part * of the page we will actually inspect. */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + asyncQueueReadPageToBuffer(pos, head, page_buffer.buf); /* * Process messages up to the stop position, end of page, or an @@ -2097,6 +2117,80 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, return reachedStop; } +/* + * Get the oldest XID in the notification queue that has not yet been + * processed by all listening backends. + * + * Returns InvalidTransactionId if there are no unprocessed notifications or if + * all unprocessed notifications are created on other databases different from + * MyDatabaseId. + */ +TransactionId +GetOldestNotifyTransactionId(void) +{ + QueuePosition pos; + QueuePosition head; + AlignedQueueEntryPage page_buffer; + TransactionId oldestXid = InvalidTransactionId; + + /* First advance the shared queue tail pointer */ + asyncQueueAdvanceTail(); + + /* + * We must start at QUEUE_TAIL since notification data might have been + * written before there were any listening backends. + */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + pos = QUEUE_TAIL; + head = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); + + /* If the queue is empty, no XIDs need protection */ + if (QUEUE_POS_EQUAL(pos, head)) + return InvalidTransactionId; + + while (!QUEUE_POS_EQUAL(pos, head)) + { + int curoffset; + AsyncQueueEntry *qe; + + /* Read the current page from SLRU into our local buffer */ + asyncQueueReadPageToBuffer(pos, head, page_buffer.buf); + + curoffset = QUEUE_POS_OFFSET(pos); + + /* Process all entries on this page up to head */ + for (;;) + { + bool reachedEndOfPage; + + qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset); + + /* + * Check if this entry is for our database and has a valid XID. + * Only entries for our database matter for our datfrozenxid. + */ + if (qe->dboid == MyDatabaseId && TransactionIdIsValid(qe->xid)) + { + if (!TransactionIdIsValid(oldestXid) || + TransactionIdPrecedes(qe->xid, oldestXid)) + oldestXid = qe->xid; + } + + /* Advance to next entry */ + reachedEndOfPage = asyncQueueAdvance(&pos, qe->length); + + if (reachedEndOfPage || QUEUE_POS_EQUAL(pos, head)) + break; + + + curoffset = QUEUE_POS_OFFSET(pos); + } + } + + return oldestXid; +} + /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index ed03e3bd50d..e5fedfb3238 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -37,6 +37,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" @@ -1617,6 +1618,7 @@ vac_update_datfrozenxid(void) bool dirty = false; ScanKeyData key[1]; void *inplace_state; + TransactionId oldestNotifyXid; /* * Restrict this task to one backend per database. This avoids race @@ -1733,6 +1735,16 @@ vac_update_datfrozenxid(void) if (bogus) return; + /* + * Also consider the oldest XID in the notification queue, since backends + * will need to call TransactionIdDidCommit() on those XIDs when + * processing the notifications. + */ + oldestNotifyXid = GetOldestNotifyTransactionId(); + if (TransactionIdIsValid(oldestNotifyXid) && + TransactionIdPrecedes(oldestNotifyXid, newFrozenXid)) + newFrozenXid = oldestNotifyXid; + Assert(TransactionIdIsNormal(newFrozenXid)); Assert(MultiXactIdIsValid(newMinMulti)); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..0f8f17ad22b 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -26,6 +26,9 @@ extern void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid); +/* get oldest XID in the notification queue for vacuum freeze */ +extern TransactionId GetOldestNotifyTransactionId(void); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel); diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 902a7954101..a015c961d35 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -29,6 +29,7 @@ SUBDIRS = \ test_int128 \ test_integerset \ test_json_parser \ + test_listen_notify \ test_lfind \ test_lwlock_tranches \ test_misc \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 14fc761c4cf..6af33448d7b 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -28,6 +28,7 @@ subdir('test_ginpostinglist') subdir('test_int128') subdir('test_integerset') subdir('test_json_parser') +subdir('test_listen_notify') subdir('test_lfind') subdir('test_lwlock_tranches') subdir('test_misc') diff --git a/src/test/modules/test_listen_notify/.gitignore b/src/test/modules/test_listen_notify/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_listen_notify/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile new file mode 100644 index 00000000000..c1eb4fde370 --- /dev/null +++ b/src/test/modules/test_listen_notify/Makefile @@ -0,0 +1,19 @@ +# src/test/modules/test_listen_notify/Makefile + +MODULE = test_listen_notify +PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support" + +TAP_TESTS = 1 + +EXTRA_INSTALL=src/test/modules/xid_wraparound + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_listen_notify +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build new file mode 100644 index 00000000000..ebf9444108a --- /dev/null +++ b/src/test/modules/test_listen_notify/meson.build @@ -0,0 +1,13 @@ +# Copyright (c) 2022-2025, PostgreSQL Global Development Group + +tests += { + 'name': 'test_listen_notify', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_xid_freeze.pl' + ], + }, +} + diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl new file mode 100644 index 00000000000..c5553d6c792 --- /dev/null +++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl @@ -0,0 +1,89 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +# Test that VACUUM FREEZE don't remove clog files that are needed to check the +# transaction status of notifications that are on the LISTEN/NOTIFY queue +# during its execution. The VACUUM FREEZE operation should check the oldest xid +# on the queue during execution. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init; +$node->start; + +# Check if the extension xid_wraparound is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node->check_extension('xid_wraparound')) +{ + plan skip_all => 'Extension xid_wraparound not installed'; +} + +# Setup +$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound'); +$node->safe_psql('postgres', + 'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,10) g;' +); +$node->safe_psql('postgres', + 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true'); + +# --- Start Session 1 and leave it idle in transaction +my $psql_session1 = $node->background_psql('postgres'); +$psql_session1->query_safe('listen s;', "Session 1 listens to 's'"); +$psql_session1->query_safe('begin;', "Session 1 starts a transaction"); + +# --- Session 2, multiple notify's, and commit --- +for my $i (1 .. 10) +{ + $node->safe_psql( + 'postgres', " + BEGIN; + NOTIFY s, '$i'; + COMMIT;"); +} + +# Consume enough XIDs to trigger truncation +$node->safe_psql('postgres', 'select consume_xids(1050000);'); + +# Execute update so the frozen xid of "t" table is updated to a xid greater +# than consume_xids() result +$node->safe_psql('postgres', 'UPDATE t SET a = a+b;'); + +# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced. +my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +# Execute vacuum freeze on all databases +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +# Get the new datfrozenxid after vacuum freeze to ensure that is not advanced +# to a value greater than the xid used to send the notifications. +my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); +print("\n\n$datafronzenxid < $datafronzenxid_freeze\n\n"); +ok($datafronzenxid < $datafronzenxid_freeze, 'datfrozenxid is not fully advanced'); + +# On Session 1, commit and ensure that the all notifications is received +my $res = $psql_session1->query_safe('commit;', "commit listen s;"); +my $notifications_count = 0; +foreach my $i (split('\n', $res)) +{ + $notifications_count++; + like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/); +} +is($notifications_count, 10, 'received all committed notifications'); + +# Execute vacuum freeze on all databases and ensure that the datafrozenxid is advanced + +$datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ], + "vacuumdb --all --freeze"); + +$datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'"); + +ok($datafronzenxid_freeze > $datafronzenxid, "datfrozenxid is advanced: $datafronzenxid_freeze > $datafronzenxid"); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 43fe3bcd593..a8aa1365382 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -58,6 +58,7 @@ Aggref AggregateInstrumentation AlenState Alias +AlignedQueueEntryPage AllocBlock AllocFreeListLink AllocPointer -- 2.51.0