diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 8dac12f8124..7e8e0b14f42 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -137,6 +137,7 @@ #include #include +#include "access/htup_details.h" #include "access/parallel.h" #include "access/slru.h" #include "access/transam.h" @@ -332,6 +333,13 @@ typedef struct AsyncQueueControl TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ dsa_handle channelHashDSA; dshash_table_handle channelHashDSH; + pg_atomic_uint32 signaledNeeded; /* listening to some of the channels; signal needed */ + pg_atomic_uint32 avoidedWakeups; /* directly advanced */ + pg_atomic_uint32 alreadyAdvancing; /* already advancing its position */ + pg_atomic_uint32 signaledUncertain; /* signaled due to uncertain need */ + pg_atomic_uint32 alreadyAhead; /* already ahead, no action needed */ + pg_atomic_uint32 necessaryWakeups; /* wakeups where at least one message was interesting */ + pg_atomic_uint32 unnecessaryWakeups; /* wakeups where no messages were interesting */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; @@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, - Snapshot snapshot); + Snapshot snapshot, + bool *interested); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); @@ -683,6 +692,13 @@ AsyncShmemInit(void) asyncQueueControl->lastQueueFillWarn = 0; asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID; asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID; + pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0); + pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0); + pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0); + pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0); + pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0); + pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0); + pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0); for (int i = 0; i < MaxBackends; i++) { @@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } +/* + * SQL function: return statistics about NOTIFY wakeups + * + * This function returns a single row with: + * - necessary_wakeups: wakeups where at least one message was interesting + * - unnecessary_wakeups: wakeups where no messages were interesting + * - direct_advancements_success: directly advanced + * - already_advancing: already advancing its position + * - signaled_uncertain: signaled due to uncertain need + * - already_ahead: already ahead, no action needed + */ +Datum +pg_get_async_wakeup_stats(PG_FUNCTION_ARGS) +{ + TupleDesc tupdesc; + Datum values[7]; + bool nulls[7]; + HeapTuple tuple; + uint32 signaled_needed; + uint32 direct_advancements_success; + uint32 already_advancing; + uint32 signaled_uncertain; + uint32 already_ahead; + uint32 necessary_wakeups; + uint32 unnecessary_wakeups; + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context that cannot accept type record"))); + + /* Read the atomic counters */ + signaled_needed = pg_atomic_read_u32(&asyncQueueControl->signaledNeeded); + direct_advancements_success = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups); + already_advancing = pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing); + signaled_uncertain = pg_atomic_read_u32(&asyncQueueControl->signaledUncertain); + already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead); + necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups); + unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups); + + /* Fill in the values */ + memset(nulls, 0, sizeof(nulls)); + values[0] = Int64GetDatum((int64) signaled_needed); + values[1] = Int64GetDatum((int64) direct_advancements_success); + values[2] = Int64GetDatum((int64) already_advancing); + values[3] = Int64GetDatum((int64) signaled_uncertain); + values[4] = Int64GetDatum((int64) already_ahead); + values[5] = Int64GetDatum((int64) necessary_wakeups); + values[6] = Int64GetDatum((int64) unnecessary_wakeups); + + tuple = heap_form_tuple(tupdesc, values, nulls); + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); +} + +/* + * SQL function: reset NOTIFY wakeup statistics + * + * This function resets all the async wakeup counters to zero. + */ +Datum +pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS) +{ + /* Reset all the atomic counters to zero */ + pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0); + pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0); + pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0); + pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0); + pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0); + pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0); + pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0); + + PG_RETURN_VOID(); +} + /* * Async_UnlistenOnExit * @@ -2014,6 +2105,7 @@ SignalBackends(void) Assert(pid != InvalidPid); + pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1); QUEUE_BACKEND_WAKEUP_PENDING(i) = true; pids[count] = pid; procnos[count] = i; @@ -2049,7 +2141,14 @@ SignalBackends(void) * currently advancing its position. */ if (!QUEUE_BACKEND_ADVANCING_POS(i)) + { QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; + pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1); + } + else + { + pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1); + } } else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)) { @@ -2060,6 +2159,7 @@ SignalBackends(void) */ Assert(pid != InvalidPid); + pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1); QUEUE_BACKEND_WAKEUP_PENDING(i) = true; pids[count] = pid; procnos[count] = i; @@ -2071,6 +2171,7 @@ SignalBackends(void) * The backend is already ahead of the notifications we wrote. * No need to do anything. */ + pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1); Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos)); } } @@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; + bool interested = false; /* page_buffer must be adequately aligned, so use a union */ union @@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void) */ reachedStop = asyncQueueProcessPageEntries(&pos, head, page_buffer.buf, - snapshot); + snapshot, + &interested); } while (!reachedStop); } PG_FINALLY(); @@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void) } PG_END_TRY(); + if (interested) + pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1); + else + pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1); + /* Done with snapshot */ UnregisterSnapshot(snapshot); } @@ -2474,7 +2582,8 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, - Snapshot snapshot) + Snapshot snapshot, + bool *interested) { bool reachedStop = false; bool reachedEndOfPage; @@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, char *payload = qe->data + strlen(channel) + 1; NotifyMyFrontEnd(channel, payload, qe->srcPid); + + /* Mark were interested in at least one message */ + *interested = true; } } else diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9121a382f76..0bbd7db39c7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -8571,7 +8571,18 @@ proname => 'pg_notification_queue_usage', provolatile => 'v', proparallel => 'r', prorettype => 'float8', proargtypes => '', prosrc => 'pg_notification_queue_usage' }, - +{ oid => '9315', + descr => 'get statistics about NOTIFY wakeups', + proname => 'pg_get_async_wakeup_stats', provolatile => 'v', + proparallel => 'r', prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o}', + proargnames => '{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}', + prosrc => 'pg_get_async_wakeup_stats' }, +{ oid => '9316', + descr => 'reset statistics about NOTIFY wakeups', + proname => 'pg_reset_async_wakeup_stats', provolatile => 'v', + proparallel => 'r', prorettype => 'void', proargtypes => '', + prosrc => 'pg_reset_async_wakeup_stats' }, # shared memory usage { oid => '5052', descr => 'allocations from the main shared memory segment', proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',