From 9f2da84a9c58df155961481aa0802ffb95460811 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Fri, 7 Nov 2025 19:24:37 +0100 Subject: [PATCH 3/3] Add instrumentation for analyzing LISTEN/NOTIFY wakeup behavior This commit adds a set of atomic counters and SQL-accessible functions to help understand how SignalBackends and asyncQueueReadAllNotifications interact under various workloads. The instrumentation is intended only for development and performance analysis and will not be included in the final patch. Specifically: * Added several pg_atomic_uint32 counters in AsyncQueueControl tracking wakeup categories such as signaled backends, advancing vs. idle positions, direct advancements, and unnecessary wakeups. * Incremented these counters in SignalBackends() and asyncQueueReadAllNotifications() to classify wakeup decisions. * Added SQL functions pg_get_async_wakeup_stats() and pg_reset_async_wakeup_stats() for reading and resetting these counters during test runs. * Modified asyncQueueProcessPageEntries() to report whether any notifications were of interest to the backend, allowing differentiation between necessary and unnecessary wakeups. This is purely diagnostic code to help reason about backend wakeup patterns and validate assumptions during optimization. It introduces no user-visible or behavioral changes and is not intended for commit to the main tree. --- src/backend/commands/async.c | 171 +++++++++++++++++++++++++++----- src/include/catalog/pg_proc.dat | 13 ++- 2 files changed, 159 insertions(+), 25 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 9f7b8a3324a..2ec6b6b9e2b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -136,6 +136,7 @@ #include #include +#include "access/htup_details.h" #include "access/parallel.h" #include "access/slru.h" #include "access/transam.h" @@ -332,6 +333,14 @@ typedef struct AsyncQueueControl TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ dsa_handle channelHashDSA; dshash_table_handle channelHashDSH; + pg_atomic_uint32 signaledTargeted; /* listening to some of the channels; signal needed */ + pg_atomic_uint32 advancingBehind; /* advancing, position behind queue head before write */ + pg_atomic_uint32 advancingAhead; /* advancing, position ahead of queue head after write */ + pg_atomic_uint32 idleBehind; /* stationary at a position behind queue head before write */ + pg_atomic_uint32 avoidedWakeups; /* directly advanced */ + pg_atomic_uint32 alreadyAhead; /* already caught up or ahead, no action needed */ + pg_atomic_uint32 necessaryWakeups; /* wakeups where at least one message was interesting */ + pg_atomic_uint32 unnecessaryWakeups; /* wakeups where we had no interest in any of the messages */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; @@ -518,7 +527,8 @@ static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, - Snapshot snapshot); + Snapshot snapshot, + bool *interested); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); @@ -684,6 +694,15 @@ AsyncShmemInit(void) asyncQueueControl->lastQueueFillWarn = 0; asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID; asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID; + pg_atomic_init_u32(&asyncQueueControl->signaledTargeted, 0); + pg_atomic_init_u32(&asyncQueueControl->advancingBehind, 0); + pg_atomic_init_u32(&asyncQueueControl->advancingAhead, 0); + pg_atomic_init_u32(&asyncQueueControl->idleBehind, 0); + pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0); + pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0); + pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0); + pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0); + for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; @@ -998,6 +1017,85 @@ pg_listening_channels(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } +/* + * SQL function: return statistics about NOTIFY wakeups + * + * This function returns a single row with: + * - necessary_wakeups: wakeups where at least one message was interesting + * - unnecessary_wakeups: wakeups where no messages were interesting + * - direct_advancements_success: directly advanced + * - already_advancing: already advancing its position + * - signaled_uncertain: signaled due to uncertain need + * - already_ahead: already ahead, no action needed + */ +Datum +pg_get_async_wakeup_stats(PG_FUNCTION_ARGS) +{ + TupleDesc tupdesc; + Datum values[8]; + bool nulls[8]; + HeapTuple tuple; + uint32 signaled_targeted; + uint32 advancing_behind; + uint32 advancing_ahead; + uint32 idle_behind; + uint32 avoided_wakeups; + uint32 already_ahead; + uint32 necessary_wakeups; + uint32 unnecessary_wakeups; + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context that cannot accept type record"))); + + /* Read the atomic counters */ + signaled_targeted = pg_atomic_read_u32(&asyncQueueControl->signaledTargeted); + advancing_behind = pg_atomic_read_u32(&asyncQueueControl->advancingBehind); + advancing_ahead = pg_atomic_read_u32(&asyncQueueControl->advancingAhead); + idle_behind = pg_atomic_read_u32(&asyncQueueControl->idleBehind); + avoided_wakeups = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups); + already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead); + necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups); + unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups); + + /* Fill in the values */ + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum((int64) signaled_targeted); + values[1] = Int64GetDatum((int64) advancing_behind); + values[2] = Int64GetDatum((int64) advancing_ahead); + values[3] = Int64GetDatum((int64) idle_behind); + values[4] = Int64GetDatum((int64) avoided_wakeups); + values[5] = Int64GetDatum((int64) already_ahead); + values[6] = Int64GetDatum((int64) necessary_wakeups); + values[7] = Int64GetDatum((int64) unnecessary_wakeups); + + tuple = heap_form_tuple(tupdesc, values, nulls); + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); +} + +/* + * SQL function: reset NOTIFY wakeup statistics + * + * This function resets all the async wakeup counters to zero. + */ +Datum +pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS) +{ + /* Reset all the atomic counters to zero */ + pg_atomic_write_u32(&asyncQueueControl->signaledTargeted, 0); + pg_atomic_write_u32(&asyncQueueControl->advancingBehind, 0); + pg_atomic_write_u32(&asyncQueueControl->advancingAhead, 0); + pg_atomic_write_u32(&asyncQueueControl->idleBehind, 0); + pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0); + pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0); + pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0); + pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0); + + PG_RETURN_VOID(); +} + /* * Async_UnlistenOnExit * @@ -2016,6 +2114,7 @@ SignalBackends(void) Assert(pid != InvalidPid); + pg_atomic_fetch_add_u32(&asyncQueueControl->signaledTargeted, 1); QUEUE_BACKEND_WAKEUP_PENDING(i) = true; pids[count] = pid; procnos[count] = i; @@ -2037,6 +2136,7 @@ SignalBackends(void) { QueuePosition pos; int32 pid; + bool need_signal = false; if (QUEUE_BACKEND_WAKEUP_PENDING(i)) continue; @@ -2044,21 +2144,39 @@ SignalBackends(void) pos = QUEUE_BACKEND_POS(i); pid = QUEUE_BACKEND_PID(i); - /* - * We need to signal advancing listening backends that would get - * stuck at a position before the new queue head. We also need to - * signal listening backends that are idle at a position before - * the old queue head since they could be interested in the - * messages in-between. - * - * Listening backends that are not advancing and are stationary at - * a position somewhere in the range we just wrote, can safely be - * direct advanced to the new queue head, since we know that they - * are not interested in our messages. - */ - if (QUEUE_BACKEND_IS_ADVANCING(i) ? - QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite) : - QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)) + if (QUEUE_BACKEND_IS_ADVANCING(i)) + { + if (QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite)) + { + need_signal = true; + pg_atomic_fetch_add_u32(&asyncQueueControl->advancingBehind, 1); + } + else + { + pg_atomic_fetch_add_u32(&asyncQueueControl->advancingAhead, 1); + } + } + else + { + if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)) + { + need_signal = true; + pg_atomic_fetch_add_u32(&asyncQueueControl->idleBehind, 1); + } + else if (QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite)) + { + QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; + pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1); + } + else + { + Assert(QUEUE_POS_EQUAL(pos, queueHeadAfterWrite) || + QUEUE_POS_PRECEDES(queueHeadAfterWrite, pos)); + pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1); + } + } + + if (need_signal) { Assert(pid != InvalidPid); @@ -2067,13 +2185,7 @@ SignalBackends(void) procnos[count] = i; count++; } - else if (!QUEUE_BACKEND_IS_ADVANCING(i) && - QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite)) - { - Assert(!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)); - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; - } } } LWLockRelease(NotifyQueueLock); @@ -2302,6 +2414,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; + bool interested = false; /* page_buffer must be adequately aligned, so use a union */ union @@ -2438,7 +2551,8 @@ asyncQueueReadAllNotifications(void) */ reachedStop = asyncQueueProcessPageEntries(&pos, head, page_buffer.buf, - snapshot); + snapshot, + &interested); } while (!reachedStop); } PG_FINALLY(); @@ -2452,6 +2566,11 @@ asyncQueueReadAllNotifications(void) } PG_END_TRY(); + if (interested) + pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1); + else + pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1); + /* Done with snapshot */ UnregisterSnapshot(snapshot); } @@ -2476,7 +2595,8 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, - Snapshot snapshot) + Snapshot snapshot, + bool *interested) { bool reachedStop = false; bool reachedEndOfPage; @@ -2537,6 +2657,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, char *payload = qe->data + strlen(channel) + 1; NotifyMyFrontEnd(channel, payload, qe->srcPid); + + /* Mark were interested in at least one message */ + *interested = true; } } else diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9121a382f76..b259bccfa4b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -8571,7 +8571,18 @@ proname => 'pg_notification_queue_usage', provolatile => 'v', proparallel => 'r', prorettype => 'float8', proargtypes => '', prosrc => 'pg_notification_queue_usage' }, - +{ oid => '9315', + descr => 'get statistics about NOTIFY wakeups', + proname => 'pg_get_async_wakeup_stats', provolatile => 'v', + proparallel => 'r', prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o,o}', + proargnames => '{signaled_targeted,advancing_behind,advancing_ahead,idle_behind,avoided_wakeups,already_ahead,necessary_wakeups,unnecessary_wakeups}', + prosrc => 'pg_get_async_wakeup_stats' }, +{ oid => '9316', + descr => 'reset statistics about NOTIFY wakeups', + proname => 'pg_reset_async_wakeup_stats', provolatile => 'v', + proparallel => 'r', prorettype => 'void', proargtypes => '', + prosrc => 'pg_reset_async_wakeup_stats' }, # shared memory usage { oid => '5052', descr => 'allocations from the main shared memory segment', proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't', -- 2.50.1