From b2603d3e8836c8aa52fdcf342f9e43bef7c90b70 Mon Sep 17 00:00:00 2001 From: Sami Imseih Date: Thu, 14 May 2026 12:27:43 -0500 Subject: [PATCH v1 2/4] pg_stat_statements: modernize entry storage with pgstat kind Replace the private shared-memory hash table with the pgstat subsystem's dshash, move counter updates to backend-local pending entries that flush periodically, and introduce admission control with timestamp-throttled inline eviction: when entry count reaches pgss_max, a backend attempts eviction using a conditional lock and a shared timestamp that ensures at most one eviction cycle per 10 seconds. Other backends skip entry creation without blocking. Variance/stddev computation uses Welford's online algorithm in per-backend pending accumulation, merged into shared memory via Chan's parallel variance algorithm during flush. See pg_stat_statements.max is now PGC_SIGHUP (changeable without restart). --- .../pg_stat_statements/pg_stat_statements.c | 1794 +++++++++-------- doc/src/sgml/pgstatstatements.sgml | 18 +- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 998 insertions(+), 815 deletions(-) diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c index 92315627916..0e6e65e3e51 100644 --- a/contrib/pg_stat_statements/pg_stat_statements.c +++ b/contrib/pg_stat_statements/pg_stat_statements.c @@ -5,8 +5,10 @@ * usage across a whole database cluster. * * Execution costs are totaled for each distinct source query, and kept in - * a shared hashtable. (We track only as many distinct queries as will fit - * in the designated amount of shared memory.) + * a dshash table managed by the pgstat subsystem (custom stats kind + * PGSTAT_KIND_PGSS). Counter updates accumulate in backend-local pending + * entries and are flushed to shared memory periodically or on demand via + * pgstat_report_anytime_stat(). * * Starting in Postgres 9.2, this module normalized query entries. As of * Postgres 14, the normalization is done by the core if compute_query_id is @@ -14,24 +16,15 @@ * * To facilitate presenting entries to users, we create "representative" query * strings in which constants are replaced with parameter symbols ($n), to - * make it clearer what a normalized entry can represent. To save on shared - * memory, and to avoid having to truncate oversized query strings, we store - * these strings in a temporary external query-texts file. Offsets into this - * file are kept in shared memory. + * make it clearer what a normalized entry can represent. * - * Note about locking issues: to create or delete an entry in the shared - * hashtable, one must hold pgss->lock exclusively. Modifying any field - * in an entry except the counters requires the same. To look up an entry, - * one must hold the lock shared. To read or update the counters within - * an entry, one must hold the lock shared or exclusive (so the entry doesn't - * disappear!) and also take the entry's mutex spinlock. - * The shared state variable pgss->extent (the next free spot in the external - * query-text file) should be accessed only while holding either the - * pgss->mutex spinlock, or exclusive lock on pgss->lock. We use the mutex to - * allow reserving file space while holding only shared lock on pgss->lock. - * Rewriting the entire external query-text file, eg for garbage collection, - * requires holding pgss->lock exclusively; this allows individual entries - * in the file to be read or written while holding only shared lock. + * Each shared pgstat entry carries its own query text, stored in an + * external file (PGSS_TEXT_FILE). + * + * Eviction of least-used entries is throttled to run at most once every + * EVICTION_INTERVAL_MS milliseconds. When eviction is needed, a single + * backend performs it inline using a conditional lock; other backends simply + * skip entry creation until space is freed. * * * Copyright (c) 2008-2026, PostgreSQL Global Development Group @@ -49,6 +42,7 @@ #include "access/htup_details.h" #include "access/parallel.h" +#include "access/xact.h" #include "catalog/pg_authid.h" #include "executor/instrument.h" #include "funcapi.h" @@ -58,7 +52,9 @@ #include "nodes/queryjumble.h" #include "optimizer/planner.h" #include "parser/analyze.h" +#include "common/hashfn.h" #include "pgstat.h" +#include "utils/pgstat_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -76,9 +72,6 @@ PG_MODULE_MAGIC_EXT( .version = PG_VERSION ); -/* Location of permanent stats file (valid when database is shut down) */ -#define PGSS_DUMP_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_statements.stat" - /* * Location of external query text file. */ @@ -87,18 +80,16 @@ PG_MODULE_MAGIC_EXT( /* Magic number identifying the stats file format */ static const uint32 PGSS_FILE_HEADER = 0x20250731; -/* PostgreSQL major version number, changes in which invalidate all entries */ -static const uint32 PGSS_PG_MAJOR_VERSION = PG_VERSION_NUM / 100; +/* Custom pgstat kind ID for pg_stat_statements entries */ +#define PGSTAT_KIND_PGSS PGSTAT_KIND_EXPERIMENTAL /* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */ #define USAGE_EXEC(duration) (1.0) #define USAGE_INIT (1.0) /* including initial planning */ -#define ASSUMED_MEDIAN_INIT (10.0) /* initial assumed median usage */ #define ASSUMED_LENGTH_INIT 1024 /* initial assumed mean query length */ #define USAGE_DECREASE_FACTOR (0.99) /* decreased every entry_dealloc */ -#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */ #define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */ -#define IS_STICKY(c) ((c.calls[PGSS_PLAN] + c.calls[PGSS_EXEC]) == 0) +#define EVICTION_INTERVAL_MS 10000 /* min ms between eviction cycles */ /* * Extension version number, for supporting older extension versions' objects @@ -140,18 +131,27 @@ typedef enum pgssStoreKind * zero the padding bytes. Otherwise, things will break, because pgss_hash is * created using HASH_BLOBS, and thus tag_hash is used to hash this. */ -typedef struct pgssHashKey +typedef struct pgssHashKey pgssHashKey; +typedef struct Counters Counters; +typedef struct pgssGlobalStats pgssGlobalStats; +typedef struct pgssSharedState pgssSharedState; +typedef struct PgStatShared_Pgss PgStatShared_Pgss; +typedef struct PgStat_PendingPgss PgStat_PendingPgss; +typedef struct UsageEntry UsageEntry; +typedef struct PendingDrop PendingDrop; + +struct pgssHashKey { Oid userid; /* user OID */ Oid dbid; /* database OID */ int64 queryid; /* query identifier */ bool toplevel; /* query executed at top level */ -} pgssHashKey; +}; /* * The actual stats counters kept within pgssEntry. */ -typedef struct Counters +struct Counters { int64 calls[PGSS_NUMKIND]; /* # of times planned/executed */ double total_time[PGSS_NUMKIND]; /* total planning/execution time, @@ -212,54 +212,76 @@ typedef struct Counters * launched */ int64 generic_plan_calls; /* number of calls using a generic plan */ int64 custom_plan_calls; /* number of calls using a custom plan */ -} Counters; +}; /* * Global statistics for pg_stat_statements */ -typedef struct pgssGlobalStats +struct pgssGlobalStats { int64 dealloc; /* # of times entries were deallocated */ TimestampTz stats_reset; /* timestamp with all stats reset */ -} pgssGlobalStats; - -/* - * Statistics per statement - * - * Note: in event of a failure in garbage collection of the query text file, - * we reset query_offset to zero and query_len to -1. This will be seen as - * an invalid state by qtext_fetch(). - */ -typedef struct pgssEntry -{ - pgssHashKey key; /* hash key of entry - MUST BE FIRST */ - Counters counters; /* the statistics for this query */ - Size query_offset; /* query text offset in external file */ - int query_len; /* # of valid bytes in query string, or -1 */ - int encoding; /* query text encoding */ - TimestampTz stats_since; /* timestamp of entry allocation */ - TimestampTz minmax_stats_since; /* timestamp of last min/max values reset */ - slock_t mutex; /* protects the counters only */ -} pgssEntry; +}; /* * Global shared state */ -typedef struct pgssSharedState +struct pgssSharedState { - LWLockPadded lock; /* protects hashtable search/modification */ - double cur_median_usage; /* current median usage in hashtable */ + LWLockPadded lock; /* protects query text file operations */ Size mean_query_len; /* current mean entry text length */ slock_t mutex; /* protects following fields only: */ Size extent; /* current extent of query file */ int n_writers; /* number of active writers to query file */ int gc_count; /* query file garbage collection cycle count */ + TimestampTz last_eviction_time; /* throttle: last time entry_dealloc ran */ pgssGlobalStats stats; /* global statistics for pgss */ -} pgssSharedState; +}; + +/* + * Shared memory entry for pgstat custom kind. + * This is what lives in the pgstat shared hash table. + */ +struct PgStatShared_Pgss +{ + PgStatShared_Common header; + pgssHashKey key; /* full original key for reconstruction */ + Counters counters; /* the statistics */ + TimestampTz stats_since; /* timestamp of entry allocation */ + TimestampTz minmax_stats_since; /* timestamp of last min/max reset */ + int query_len; /* length of query text, or -1 if invalid */ + int encoding; /* encoding of query text */ + + Size query_offset; /* offset in external query text file */ +}; + +/* + * Pending (backend-local) stats entry, accumulated before flush. + */ +struct PgStat_PendingPgss +{ + Counters counters; +}; + +/* + * Used during entry reset to collect keys for deferred drop. + */ +struct PendingDrop +{ + Oid dbid; + uint64 objid; +}; /* Links to shared memory state */ static pgssSharedState *pgss; -static HTAB *pgss_hash; + +/* Buffer used during serialization to avoid reloading text file per entry */ +static char *pgss_qtext_write_buffer = NULL; +static Size pgss_qtext_write_buffer_size = 0; + +/* File handle used during deserialization to rebuild query text file */ +static FILE *pgss_qtext_rebuild_file = NULL; +static Size pgss_qtext_rebuild_extent = 0; static void pgss_shmem_request(void *arg); static void pgss_shmem_init(void *arg); @@ -269,6 +291,32 @@ static const ShmemCallbacks pgss_shmem_callbacks = { .init_fn = pgss_shmem_init, }; +/* pgstat custom kind callbacks */ +static bool pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait); +static void pgss_to_serialized_data(const PgStat_HashKey *key, + const PgStatShared_Common *header, + FILE *statfile); +static bool pgss_from_serialized_data(const PgStat_HashKey *key, + PgStatShared_Common *header, + FILE *statfile); +static void pgss_finish(PgStat_StatsFileOp status); + +static const PgStat_KindInfo pgss_kind_info = { + .name = "pg_stat_statements", + .fixed_amount = false, + .write_to_file = true, + .track_entry_count = true, + .accessed_across_databases = true, + .shared_size = sizeof(PgStatShared_Pgss), + .shared_data_off = offsetof(PgStatShared_Pgss, key), + .shared_data_len = sizeof(PgStatShared_Pgss) - offsetof(PgStatShared_Pgss, key), + .pending_size = sizeof(PgStat_PendingPgss), + .flush_pending_cb = pgss_flush_pending_cb, + .to_serialized_data = pgss_to_serialized_data, + .from_serialized_data = pgss_from_serialized_data, + .finish = pgss_finish, +}; + /*---- Local variables ----*/ /* Current nesting depth of planner/ExecutorRun/ProcessUtility calls */ @@ -306,7 +354,6 @@ static bool pgss_track_utility = true; /* whether to track utility commands */ static bool pgss_track_planning = false; /* whether to track planning * duration */ static bool pgss_save = true; /* whether to save stats across shutdown */ - #define pgss_enabled(level) \ (!IsParallelWorker() && \ (pgss_track == PGSS_TRACK_ALL || \ @@ -335,7 +382,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_1_13); PG_FUNCTION_INFO_V1(pg_stat_statements); PG_FUNCTION_INFO_V1(pg_stat_statements_info); -static void pgss_shmem_shutdown(int code, Datum arg); static void pgss_post_parse_analyze(ParseState *pstate, Query *query, const JumbleState *jstate); static PlannedStmt *pgss_planner(Query *parse, @@ -368,8 +414,6 @@ static void pgss_store(const char *query, int64 queryId, static void pg_stat_statements_internal(FunctionCallInfo fcinfo, pgssVersion api_version, bool showtext); -static pgssEntry *entry_alloc(pgssHashKey *key, Size query_offset, int query_len, - int encoding, bool sticky); static void entry_dealloc(void); static bool qtext_store(const char *query, int query_len, Size *query_offset, int *gc_count); @@ -382,6 +426,35 @@ static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_ static char *generate_normalized_query(const JumbleState *jstate, const char *query, int query_loc, int *query_len_p); +static void pgss_entry_init(PgStatShared_Pgss *shared_entry, + const pgssHashKey *key, int encoding); +static void pgss_store_query_text(PgStatShared_Pgss *shared_entry, + const char *query, int query_len, + int encoding); + +struct UsageEntry +{ + pgssHashKey key; + double usage; +}; + +static int entry_cmp(const void *a, const void *b); +static void pgss_maybe_evict(void); + +/* + * Compute a uint64 objid from a pgssHashKey for use in PgStat_HashKey. + * We hash (userid, queryid, toplevel) together since dbid goes into dboid. + */ +static inline uint64 +pgss_objid(const pgssHashKey *key) +{ + uint64 hashval; + + hashval = murmurhash64((uint64) key->userid); + hashval = hash_combine64(hashval, murmurhash64((uint64) key->queryid)); + hashval = hash_combine64(hashval, murmurhash64((uint64) key->toplevel)); + return hashval; +} /* * Module load callback @@ -416,7 +489,7 @@ _PG_init(void) 5000, 100, INT_MAX / 2, - PGC_POSTMASTER, + PGC_SIGHUP, 0, NULL, NULL, @@ -474,6 +547,11 @@ _PG_init(void) */ RegisterShmemCallbacks(&pgss_shmem_callbacks); + /* + * Register custom statistics kind for pg_stat_statements entries. + */ + pgstat_register_kind(PGSTAT_KIND_PGSS, &pgss_kind_info); + /* * Install hooks. */ @@ -505,13 +583,6 @@ _PG_init(void) static void pgss_shmem_request(void *arg) { - ShmemRequestHash(.name = "pg_stat_statements hash", - .nelems = pgss_max, - .hash_info.keysize = sizeof(pgssHashKey), - .hash_info.entrysize = sizeof(pgssEntry), - .hash_flags = HASH_ELEM | HASH_BLOBS, - .ptr = &pgss_hash, - ); ShmemRequestStruct(.name = "pg_stat_statements", .size = sizeof(pgssSharedState), .ptr = (void **) &pgss, @@ -530,14 +601,7 @@ static void pgss_shmem_init(void *arg) { int tranche_id; - FILE *file = NULL; FILE *qfile = NULL; - uint32 header; - int32 num; - int32 pgver; - int32 i; - int buffer_size; - char *buffer = NULL; /* * We already checked that we're loaded from shared_preload_libraries in @@ -546,285 +610,33 @@ pgss_shmem_init(void *arg) Assert(!IsUnderPostmaster); /* - * Initialize the shmem area with no statistics. + * Initialize the shmem area. */ tranche_id = LWLockNewTrancheId("pg_stat_statements"); LWLockInitialize(&pgss->lock.lock, tranche_id); - pgss->cur_median_usage = ASSUMED_MEDIAN_INIT; pgss->mean_query_len = ASSUMED_LENGTH_INIT; SpinLockInit(&pgss->mutex); pgss->extent = 0; pgss->n_writers = 0; pgss->gc_count = 0; + pgss->last_eviction_time = 0; pgss->stats.dealloc = 0; pgss->stats.stats_reset = GetCurrentTimestamp(); - /* The hash table must've also been initialized by now */ - Assert(pgss_hash != NULL); - - /* - * Set up a shmem exit hook to dump the statistics to disk on postmaster - * (or standalone backend) exit. - */ - on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); - - /* - * Load any pre-existing statistics from file. - * - * Note: we don't bother with locks here, because there should be no other - * processes running when this code is reached. - */ - /* Unlink query text file possibly left over from crash */ unlink(PGSS_TEXT_FILE); /* Allocate new query text temp file */ qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); if (qfile == NULL) - goto write_error; - - /* - * If we were told not to load old statistics, we're done. (Note we do - * not try to unlink any old dump file in this case. This seems a bit - * questionable but it's the historical behavior.) - */ - if (!pgss_save) - { - FreeFile(qfile); - return; - } - - /* - * Attempt to load old statistics from the dump file. - */ - file = AllocateFile(PGSS_DUMP_FILE, PG_BINARY_R); - if (file == NULL) { - if (errno != ENOENT) - goto read_error; - /* No existing persisted stats file, so we're done */ - FreeFile(qfile); + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + PGSS_TEXT_FILE))); return; } - - buffer_size = 2048; - buffer = (char *) palloc(buffer_size); - - if (fread(&header, sizeof(uint32), 1, file) != 1 || - fread(&pgver, sizeof(uint32), 1, file) != 1 || - fread(&num, sizeof(int32), 1, file) != 1) - goto read_error; - - if (header != PGSS_FILE_HEADER || - pgver != PGSS_PG_MAJOR_VERSION) - goto data_error; - - for (i = 0; i < num; i++) - { - pgssEntry temp; - pgssEntry *entry; - Size query_offset; - - if (fread(&temp, sizeof(pgssEntry), 1, file) != 1) - goto read_error; - - /* Encoding is the only field we can easily sanity-check */ - if (!PG_VALID_BE_ENCODING(temp.encoding)) - goto data_error; - - /* Resize buffer as needed */ - if (temp.query_len >= buffer_size) - { - buffer_size = Max(buffer_size * 2, temp.query_len + 1); - buffer = repalloc(buffer, buffer_size); - } - - if (fread(buffer, 1, temp.query_len + 1, file) != temp.query_len + 1) - goto read_error; - - /* Should have a trailing null, but let's make sure */ - buffer[temp.query_len] = '\0'; - - /* Skip loading "sticky" entries */ - if (IS_STICKY(temp.counters)) - continue; - - /* Store the query text */ - query_offset = pgss->extent; - if (fwrite(buffer, 1, temp.query_len + 1, qfile) != temp.query_len + 1) - goto write_error; - pgss->extent += temp.query_len + 1; - - /* make the hashtable entry (discards old entries if too many) */ - entry = entry_alloc(&temp.key, query_offset, temp.query_len, - temp.encoding, - false); - - /* copy in the actual stats */ - entry->counters = temp.counters; - entry->stats_since = temp.stats_since; - entry->minmax_stats_since = temp.minmax_stats_since; - } - - /* Read global statistics for pg_stat_statements */ - if (fread(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) - goto read_error; - - pfree(buffer); - FreeFile(file); FreeFile(qfile); - - /* - * Remove the persisted stats file so it's not included in - * backups/replication standbys, etc. A new file will be written on next - * shutdown. - * - * Note: it's okay if the PGSS_TEXT_FILE is included in a basebackup, - * because we remove that file on startup; it acts inversely to - * PGSS_DUMP_FILE, in that it is only supposed to be around when the - * server is running, whereas PGSS_DUMP_FILE is only supposed to be around - * when the server is not running. Leaving the file creates no danger of - * a newly restored database having a spurious record of execution costs, - * which is what we're really concerned about here. - */ - unlink(PGSS_DUMP_FILE); - - return; - -read_error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - PGSS_DUMP_FILE))); - goto fail; -data_error: - ereport(LOG, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("ignoring invalid data in file \"%s\"", - PGSS_DUMP_FILE))); - goto fail; -write_error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - PGSS_TEXT_FILE))); -fail: - if (buffer) - pfree(buffer); - if (file) - FreeFile(file); - if (qfile) - FreeFile(qfile); - /* If possible, throw away the bogus file; ignore any error */ - unlink(PGSS_DUMP_FILE); - - /* - * Don't unlink PGSS_TEXT_FILE here; it should always be around while the - * server is running with pg_stat_statements enabled - */ -} - -/* - * shmem_shutdown hook: Dump statistics into file. - * - * Note: we don't bother with acquiring lock, because there should be no - * other processes running when this is called. - */ -static void -pgss_shmem_shutdown(int code, Datum arg) -{ - FILE *file; - char *qbuffer = NULL; - Size qbuffer_size = 0; - HASH_SEQ_STATUS hash_seq; - int32 num_entries; - pgssEntry *entry; - - /* Don't try to dump during a crash. */ - if (code) - return; - - /* Safety check ... shouldn't get here unless shmem is set up. */ - if (!pgss || !pgss_hash) - return; - - /* Don't dump if told not to. */ - if (!pgss_save) - return; - - file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); - if (file == NULL) - goto error; - - if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) - goto error; - if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) - goto error; - num_entries = hash_get_num_entries(pgss_hash); - if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) - goto error; - - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - goto error; - - /* - * When serializing to disk, we store query texts immediately after their - * entry data. Any orphaned query texts are thereby excluded. - */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - int len = entry->query_len; - char *qstr = qtext_fetch(entry->query_offset, len, - qbuffer, qbuffer_size); - - if (qstr == NULL) - continue; /* Ignore any entries with bogus texts */ - - if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || - fwrite(qstr, 1, len + 1, file) != len + 1) - { - /* note: we assume hash_seq_term won't change errno */ - hash_seq_term(&hash_seq); - goto error; - } - } - - /* Dump global statistics for pg_stat_statements */ - if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) - goto error; - - pfree(qbuffer); - qbuffer = NULL; - - if (FreeFile(file)) - { - file = NULL; - goto error; - } - - /* - * Rename file into place, so we atomically replace any old one. - */ - (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); - - /* Unlink query-texts file; it's not needed while shutdown */ - unlink(PGSS_TEXT_FILE); - - return; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - PGSS_DUMP_FILE ".tmp"))); - if (qbuffer) - pfree(qbuffer); - if (file) - FreeFile(file); - unlink(PGSS_DUMP_FILE ".tmp"); - unlink(PGSS_TEXT_FILE); } /* @@ -837,7 +649,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query, const JumbleState *jst prev_post_parse_analyze_hook(pstate, query, jstate); /* Safety check... */ - if (!pgss || !pgss_hash || !pgss_enabled(nesting_level)) + if (!pgss || !pgss_enabled(nesting_level)) return; /* @@ -1254,9 +1066,62 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, } } +/* + * Initialize a freshly-created shared entry. + * + * Caller must hold the entry lock. The entry is considered "new" when + * key.queryid is still zero (as left by pgstat entry creation). + */ +static void +pgss_entry_init(PgStatShared_Pgss *shared_entry, + const pgssHashKey *key, int encoding) +{ + if (shared_entry->key.queryid != INT64CONST(0)) + return; + + shared_entry->key = *key; + memset(&shared_entry->counters, 0, sizeof(Counters)); + shared_entry->counters.usage = USAGE_INIT; + shared_entry->stats_since = GetCurrentTimestamp(); + shared_entry->minmax_stats_since = shared_entry->stats_since; + shared_entry->query_len = -1; + shared_entry->encoding = encoding; + shared_entry->query_offset = 0; +} + +/* + * Store query text into a shared entry via the external text file. + * + * Caller must hold the entry lock. Does nothing if text is already present. + */ +static void +pgss_store_query_text(PgStatShared_Pgss *shared_entry, + const char *query, int query_len, int encoding) +{ + Size query_offset; + int gc_count; + + if (shared_entry->query_len >= 0) + return; + + LWLockAcquire(&pgss->lock.lock, LW_SHARED); + if (qtext_store(query, query_len, &query_offset, &gc_count)) + { + shared_entry->query_offset = query_offset; + shared_entry->query_len = query_len; + shared_entry->encoding = encoding; + } + LWLockRelease(&pgss->lock.lock); +} + /* * Store some statistics for a statement. * + * Shared entry creation and query text storage are written directly to + * shared memory, making entries immediately visible to other backends. + * Counter accumulation is done in backend-local pending entries, flushed + * periodically by pgss_flush_pending_cb. + * * If jstate is not NULL then we're trying to create an entry for which * we have no statistics as yet; we just want to record the normalized * query string. total_time, rows, bufusage and walusage are ignored in this @@ -1279,14 +1144,16 @@ pgss_store(const char *query, int64 queryId, PlannedStmtOrigin planOrigin) { pgssHashKey key; - pgssEntry *entry; + PgStat_EntryRef *entry_ref; + PgStatShared_Pgss *shared_entry; + PgStat_PendingPgss *pending; char *norm_query = NULL; int encoding = GetDatabaseEncoding(); Assert(query != NULL); /* Safety check... */ - if (!pgss || !pgss_hash) + if (!pgss) return; /* @@ -1313,192 +1180,171 @@ pgss_store(const char *query, int64 queryId, key.queryid = queryId; key.toplevel = (nesting_level == 0); - /* Lookup the hash table entry with shared lock. */ - LWLockAcquire(&pgss->lock.lock, LW_SHARED); - - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); - - /* Create new entry, if not present */ - if (!entry) + /* + * If jstate is set, create the shared entry and store normalized query + * text. Don't increment counters; entries with zero calls are not + * returned by pg_stat_statements_internal(). + */ + if (jstate) { - Size query_offset; - int gc_count; - bool stored; - bool do_gc; + const char *store_query; - /* - * Create a new, normalized query string if caller asked. We don't - * need to hold the lock while doing this work. (Note: in any case, - * it's possible that someone else creates a duplicate hashtable entry - * in the interval where we don't hold the lock below. That case is - * handled by entry_alloc.) - */ - if (jstate) + if (pgstat_get_entry_count(PGSTAT_KIND_PGSS) >= pgss_max) { - LWLockRelease(&pgss->lock.lock); - norm_query = generate_normalized_query(jstate, query, - query_location, - &query_len); - LWLockAcquire(&pgss->lock.lock, LW_SHARED); + pgss_maybe_evict(); + return; } - /* Append new query text to file with only shared lock held */ - stored = qtext_store(norm_query ? norm_query : query, query_len, - &query_offset, &gc_count); - - /* - * Determine whether we need to garbage collect external query texts - * while the shared lock is still held. This micro-optimization - * avoids taking the time to decide this while holding exclusive lock. - */ - do_gc = need_gc_qtexts(); - - /* Need exclusive lock to make a new hashtable entry - promote */ - LWLockRelease(&pgss->lock.lock); - LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE); + norm_query = generate_normalized_query(jstate, query, + query_location, + &query_len); + store_query = norm_query ? norm_query : query; - /* - * A garbage collection may have occurred while we weren't holding the - * lock. In the unlikely event that this happens, the query text we - * stored above will have been garbage collected, so write it again. - * This should be infrequent enough that doing it while holding - * exclusive lock isn't a performance problem. - */ - if (!stored || pgss->gc_count != gc_count) - stored = qtext_store(norm_query ? norm_query : query, query_len, - &query_offset, NULL); + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_PGSS, + key.dbid, + pgss_objid(&key), + true); + if (!entry_ref) + { + if (norm_query) + pfree(norm_query); + return; + } - /* If we failed to write to the text file, give up */ - if (!stored) - goto done; + shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats; + pgss_entry_init(shared_entry, &key, encoding); + pgss_store_query_text(shared_entry, store_query, query_len, encoding); - /* OK to create a new hashtable entry */ - entry = entry_alloc(&key, query_offset, query_len, encoding, - jstate != NULL); + pgstat_unlock_entry(entry_ref); - /* If needed, perform garbage collection while exclusive lock held */ - if (do_gc) - gc_qtexts(); + if (norm_query) + pfree(norm_query); + return; } - /* Increment the counts, except when jstate is not NULL */ - if (!jstate) + /* + * Normal case: accumulate stats in a pending entry. The pending entry + * will be flushed to shared memory by pgss_flush_pending_cb. + * + * But first, ensure the shared entry exists with query text. + */ + entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid, + pgss_objid(&key), false, NULL); + if (!entry_ref) { - Assert(kind == PGSS_PLAN || kind == PGSS_EXEC); - /* - * Grab the spinlock while updating the counters (see comment about - * locking rules at the head of the file) + * Entry doesn't exist yet. Don't create a new one if we've already + * hit the configured maximum; eviction will free space eventually. */ - SpinLockAcquire(&entry->mutex); + if (pgstat_get_entry_count(PGSTAT_KIND_PGSS) >= pgss_max) + { + pgss_maybe_evict(); + return; + } - /* "Unstick" entry if it was previously sticky */ - if (IS_STICKY(entry->counters)) - entry->counters.usage = USAGE_INIT; + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_PGSS, + key.dbid, + pgss_objid(&key), + true); + if (!entry_ref) + return; - entry->counters.calls[kind] += 1; - entry->counters.total_time[kind] += total_time; + shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats; + pgss_entry_init(shared_entry, &key, encoding); + pgss_store_query_text(shared_entry, query, query_len, encoding); - if (entry->counters.calls[kind] == 1) - { - entry->counters.min_time[kind] = total_time; - entry->counters.max_time[kind] = total_time; - entry->counters.mean_time[kind] = total_time; - } - else - { - /* - * Welford's method for accurately computing variance. See - * - */ - double old_mean = entry->counters.mean_time[kind]; + pgstat_unlock_entry(entry_ref); + } - entry->counters.mean_time[kind] += - (total_time - old_mean) / entry->counters.calls[kind]; - entry->counters.sum_var_time[kind] += - (total_time - old_mean) * (total_time - entry->counters.mean_time[kind]); + /* + * Now accumulate stats in the pending entry. + */ + Assert(kind == PGSS_PLAN || kind == PGSS_EXEC); - /* - * Calculate min and max time. min = 0 and max = 0 means that the - * min/max statistics were reset - */ - if (entry->counters.min_time[kind] == 0 - && entry->counters.max_time[kind] == 0) - { - entry->counters.min_time[kind] = total_time; - entry->counters.max_time[kind] = total_time; - } - else - { - if (entry->counters.min_time[kind] > total_time) - entry->counters.min_time[kind] = total_time; - if (entry->counters.max_time[kind] < total_time) - entry->counters.max_time[kind] = total_time; - } - } - entry->counters.rows += rows; - entry->counters.shared_blks_hit += bufusage->shared_blks_hit; - entry->counters.shared_blks_read += bufusage->shared_blks_read; - entry->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied; - entry->counters.shared_blks_written += bufusage->shared_blks_written; - entry->counters.local_blks_hit += bufusage->local_blks_hit; - entry->counters.local_blks_read += bufusage->local_blks_read; - entry->counters.local_blks_dirtied += bufusage->local_blks_dirtied; - entry->counters.local_blks_written += bufusage->local_blks_written; - entry->counters.temp_blks_read += bufusage->temp_blks_read; - entry->counters.temp_blks_written += bufusage->temp_blks_written; - entry->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time); - entry->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time); - entry->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time); - entry->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time); - entry->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time); - entry->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time); - entry->counters.usage += USAGE_EXEC(total_time); - entry->counters.wal_records += walusage->wal_records; - entry->counters.wal_fpi += walusage->wal_fpi; - entry->counters.wal_bytes += walusage->wal_bytes; - entry->counters.wal_buffers_full += walusage->wal_buffers_full; - if (jitusage) - { - entry->counters.jit_functions += jitusage->created_functions; - entry->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter); + entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_PGSS, key.dbid, + pgss_objid(&key), NULL); + pending = (PgStat_PendingPgss *) entry_ref->pending; - if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter)) - entry->counters.jit_deform_count++; - entry->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter); + pending->counters.calls[kind] += 1; + pending->counters.total_time[kind] += total_time; - if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter)) - entry->counters.jit_inlining_count++; - entry->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter); + /* + * Compute mean and sum of squared deviations using Welford's online + * algorithm. These per-backend values are later merged into shared + * memory using Chan's parallel variance algorithm in the flush callback. + * See + */ + if (pending->counters.calls[kind] == 1) + { + pending->counters.min_time[kind] = total_time; + pending->counters.max_time[kind] = total_time; + pending->counters.mean_time[kind] = total_time; + } + else + { + double old_mean = pending->counters.mean_time[kind]; + + if (pending->counters.min_time[kind] > total_time) + pending->counters.min_time[kind] = total_time; + if (pending->counters.max_time[kind] < total_time) + pending->counters.max_time[kind] = total_time; + pending->counters.mean_time[kind] += + (total_time - old_mean) / pending->counters.calls[kind]; + pending->counters.sum_var_time[kind] += + (total_time - old_mean) * (total_time - pending->counters.mean_time[kind]); + } - if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter)) - entry->counters.jit_optimization_count++; - entry->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter); + pending->counters.rows += rows; + pending->counters.shared_blks_hit += bufusage->shared_blks_hit; + pending->counters.shared_blks_read += bufusage->shared_blks_read; + pending->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied; + pending->counters.shared_blks_written += bufusage->shared_blks_written; + pending->counters.local_blks_hit += bufusage->local_blks_hit; + pending->counters.local_blks_read += bufusage->local_blks_read; + pending->counters.local_blks_dirtied += bufusage->local_blks_dirtied; + pending->counters.local_blks_written += bufusage->local_blks_written; + pending->counters.temp_blks_read += bufusage->temp_blks_read; + pending->counters.temp_blks_written += bufusage->temp_blks_written; + pending->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time); + pending->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time); + pending->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time); + pending->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time); + pending->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time); + pending->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time); + pending->counters.usage += USAGE_EXEC(total_time); + pending->counters.wal_records += walusage->wal_records; + pending->counters.wal_fpi += walusage->wal_fpi; + pending->counters.wal_bytes += walusage->wal_bytes; + pending->counters.wal_buffers_full += walusage->wal_buffers_full; + if (jitusage) + { + pending->counters.jit_functions += jitusage->created_functions; + pending->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter); - if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter)) - entry->counters.jit_emission_count++; - entry->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter); - } + if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter)) + pending->counters.jit_deform_count++; + pending->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter); - /* parallel worker counters */ - entry->counters.parallel_workers_to_launch += parallel_workers_to_launch; - entry->counters.parallel_workers_launched += parallel_workers_launched; + if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter)) + pending->counters.jit_inlining_count++; + pending->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter); - /* plan cache counters */ - if (planOrigin == PLAN_STMT_CACHE_GENERIC) - entry->counters.generic_plan_calls++; - else if (planOrigin == PLAN_STMT_CACHE_CUSTOM) - entry->counters.custom_plan_calls++; + if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter)) + pending->counters.jit_optimization_count++; + pending->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter); - SpinLockRelease(&entry->mutex); + if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter)) + pending->counters.jit_emission_count++; + pending->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter); } -done: - LWLockRelease(&pgss->lock.lock); + pending->counters.parallel_workers_to_launch += parallel_workers_to_launch; + pending->counters.parallel_workers_launched += parallel_workers_launched; - /* We postpone this clean-up until we're out of the lock */ - if (norm_query) - pfree(norm_query); + if (planOrigin == PLAN_STMT_CACHE_GENERIC) + pending->counters.generic_plan_calls++; + else if (planOrigin == PLAN_STMT_CACHE_CUSTOM) + pending->counters.custom_plan_calls++; } /* @@ -1676,8 +1522,8 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo, Size qbuffer_size = 0; Size extent = 0; int gc_count = 0; - HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; + dshash_seq_status hstat; + PgStatShared_HashEntry *p; /* * Superusers or roles with the privileges of pg_read_all_stats members @@ -1685,8 +1531,8 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo, */ is_allowed_role = has_privs_of_role(userid, ROLE_PG_READ_ALL_STATS); - /* hash table must exist already */ - if (!pgss || !pgss_hash) + /* shared state must exist already */ + if (!pgss) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\""))); @@ -1773,30 +1619,13 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo, } /* - * Get shared lock, load or reload the query text file if we must, and - * iterate over the hashtable entries. - * - * With a large hash table, we might be holding the lock rather longer - * than one could wish. However, this only blocks creation of new hash - * table entries, and the larger the hash table the less likely that is to - * be needed. So we can hope this is okay. Perhaps someday we'll decide - * we need to partition the hash table to limit the time spent holding any - * one lock. + * Get shared lock on the query text file, load or reload if needed, and + * iterate over the pgstat shared hash entries. */ LWLockAcquire(&pgss->lock.lock, LW_SHARED); if (showtext) { - /* - * Here it is safe to examine extent and gc_count without taking the - * mutex. Note that although other processes might change - * pgss->extent just after we look at it, the strings they then write - * into the file cannot yet be referenced in the hashtable, so we - * don't care whether we see them or not. - * - * If qtext_load_file fails, we just press on; we'll return NULL for - * every query text. - */ if (qbuffer == NULL || pgss->extent != extent || pgss->gc_count != gc_count) @@ -1807,45 +1636,75 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo, } } - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + /* Flush any pending stats for this backend so they're visible */ + pgstat_report_anytime_stat(); + + dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); + while ((p = dshash_seq_next(&hstat)) != NULL) { Datum values[PG_STAT_STATEMENTS_COLS]; bool nulls[PG_STAT_STATEMENTS_COLS]; int i = 0; Counters tmp; double stddev; - int64 queryid = entry->key.queryid; + PgStatShared_Pgss *shared_entry; + int64 queryid; TimestampTz stats_since; TimestampTz minmax_stats_since; - memset(values, 0, sizeof(values)); - memset(nulls, 0, sizeof(nulls)); + /* Only process our kind */ + if (p->key.kind != PGSTAT_KIND_PGSS) + continue; + if (p->dropped) + continue; + + shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body); + Assert(shared_entry); + + /* Read entry data under the entry's LWLock */ + LWLockAcquire(&shared_entry->header.lock, LW_SHARED); + tmp = shared_entry->counters; + queryid = shared_entry->key.queryid; + stats_since = shared_entry->stats_since; + minmax_stats_since = shared_entry->minmax_stats_since; + LWLockRelease(&shared_entry->header.lock); + + /* Skip entries created at parse time but never executed */ + if (tmp.calls[PGSS_PLAN] + tmp.calls[PGSS_EXEC] == 0) + continue; - values[i++] = ObjectIdGetDatum(entry->key.userid); - values[i++] = ObjectIdGetDatum(entry->key.dbid); + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[i++] = ObjectIdGetDatum(shared_entry->key.userid); + values[i++] = ObjectIdGetDatum(shared_entry->key.dbid); if (api_version >= PGSS_V1_9) - values[i++] = BoolGetDatum(entry->key.toplevel); + values[i++] = BoolGetDatum(shared_entry->key.toplevel); - if (is_allowed_role || entry->key.userid == userid) + if (is_allowed_role || shared_entry->key.userid == userid) { if (api_version >= PGSS_V1_2) values[i++] = Int64GetDatumFast(queryid); if (showtext) { - char *qstr = qtext_fetch(entry->query_offset, - entry->query_len, - qbuffer, - qbuffer_size); + char *qstr = NULL; + + if (shared_entry->query_len >= 0) + { + qstr = qtext_fetch(shared_entry->query_offset, + shared_entry->query_len, + qbuffer, + qbuffer_size); + } if (qstr) { char *enc; enc = pg_any_to_server(qstr, - entry->query_len, - entry->encoding); + shared_entry->query_len, + shared_entry->encoding); values[i++] = CStringGetTextDatum(enc); @@ -1880,22 +1739,6 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo, nulls[i++] = true; } - /* copy counters to a local variable to keep locking time short */ - SpinLockAcquire(&entry->mutex); - tmp = entry->counters; - SpinLockRelease(&entry->mutex); - - /* - * The spinlock is not required when reading these two as they are - * always updated when holding pgss->lock exclusively. - */ - stats_since = entry->stats_since; - minmax_stats_since = entry->minmax_stats_since; - - /* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */ - if (IS_STICKY(tmp)) - continue; - /* Note that we rely on PGSS_PLAN being 0 and PGSS_EXEC being 1. */ for (int kind = 0; kind < PGSS_NUMKIND; kind++) { @@ -2020,6 +1863,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo, tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } + dshash_seq_term(&hstat); LWLockRelease(&pgss->lock.lock); @@ -2040,8 +1884,9 @@ pg_stat_statements_info(PG_FUNCTION_ARGS) TupleDesc tupdesc; Datum values[PG_STAT_STATEMENTS_INFO_COLS] = {0}; bool nulls[PG_STAT_STATEMENTS_INFO_COLS] = {0}; + int i = 0; - if (!pgss || !pgss_hash) + if (!pgss) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\""))); @@ -2055,162 +1900,12 @@ pg_stat_statements_info(PG_FUNCTION_ARGS) stats = pgss->stats; SpinLockRelease(&pgss->mutex); - values[0] = Int64GetDatum(stats.dealloc); - values[1] = TimestampTzGetDatum(stats.stats_reset); + values[i++] = Int64GetDatum(stats.dealloc); + values[i++] = TimestampTzGetDatum(stats.stats_reset); PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } -/* - * Allocate a new hashtable entry. - * caller must hold an exclusive lock on pgss->lock - * - * "query" need not be null-terminated; we rely on query_len instead - * - * If "sticky" is true, make the new entry artificially sticky so that it will - * probably still be there when the query finishes execution. We do this by - * giving it a median usage value rather than the normal value. (Strictly - * speaking, query strings are normalized on a best effort basis, though it - * would be difficult to demonstrate this even under artificial conditions.) - * - * Note: despite needing exclusive lock, it's not an error for the target - * entry to already exist. This is because pgss_store releases and - * reacquires lock after failing to find a match; so someone else could - * have made the entry while we waited to get exclusive lock. - */ -static pgssEntry * -entry_alloc(pgssHashKey *key, Size query_offset, int query_len, int encoding, - bool sticky) -{ - pgssEntry *entry; - bool found; - - /* Make space if needed */ - while (hash_get_num_entries(pgss_hash) >= pgss_max) - entry_dealloc(); - - /* Find or create an entry with desired hash code */ - entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found); - - if (!found) - { - /* New entry, initialize it */ - - /* reset the statistics */ - memset(&entry->counters, 0, sizeof(Counters)); - /* set the appropriate initial usage count */ - entry->counters.usage = sticky ? pgss->cur_median_usage : USAGE_INIT; - /* re-initialize the mutex each time ... we assume no one using it */ - SpinLockInit(&entry->mutex); - /* ... and don't forget the query text metadata */ - Assert(query_len >= 0); - entry->query_offset = query_offset; - entry->query_len = query_len; - entry->encoding = encoding; - entry->stats_since = GetCurrentTimestamp(); - entry->minmax_stats_since = entry->stats_since; - } - - return entry; -} - -/* - * qsort comparator for sorting into increasing usage order - */ -static int -entry_cmp(const void *lhs, const void *rhs) -{ - double l_usage = (*(pgssEntry *const *) lhs)->counters.usage; - double r_usage = (*(pgssEntry *const *) rhs)->counters.usage; - - if (l_usage < r_usage) - return -1; - else if (l_usage > r_usage) - return +1; - else - return 0; -} - -/* - * Deallocate least-used entries. - * - * Caller must hold an exclusive lock on pgss->lock. - */ -static void -entry_dealloc(void) -{ - HASH_SEQ_STATUS hash_seq; - pgssEntry **entries; - pgssEntry *entry; - int nvictims; - int i; - Size tottextlen; - int nvalidtexts; - - /* - * Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them. - * While we're scanning the table, apply the decay factor to the usage - * values, and update the mean query length. - * - * Note that the mean query length is almost immediately obsolete, since - * we compute it before not after discarding the least-used entries. - * Hopefully, that doesn't affect the mean too much; it doesn't seem worth - * making two passes to get a more current result. Likewise, the new - * cur_median_usage includes the entries we're about to zap. - */ - - entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *)); - - i = 0; - tottextlen = 0; - nvalidtexts = 0; - - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - entries[i++] = entry; - /* "Sticky" entries get a different usage decay rate. */ - if (IS_STICKY(entry->counters)) - entry->counters.usage *= STICKY_DECREASE_FACTOR; - else - entry->counters.usage *= USAGE_DECREASE_FACTOR; - /* In the mean length computation, ignore dropped texts. */ - if (entry->query_len >= 0) - { - tottextlen += entry->query_len + 1; - nvalidtexts++; - } - } - - /* Sort into increasing order by usage */ - qsort(entries, i, sizeof(pgssEntry *), entry_cmp); - - /* Record the (approximate) median usage */ - if (i > 0) - pgss->cur_median_usage = entries[i / 2]->counters.usage; - /* Record the mean query length */ - if (nvalidtexts > 0) - pgss->mean_query_len = tottextlen / nvalidtexts; - else - pgss->mean_query_len = ASSUMED_LENGTH_INIT; - - /* Now zap an appropriate fraction of lowest-usage entries */ - nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100); - nvictims = Min(nvictims, i); - - for (i = 0; i < nvictims; i++) - { - hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL); - } - - pfree(entries); - - /* Increment the number of times entries are deallocated */ - SpinLockAcquire(&pgss->mutex); - pgss->stats.dealloc += 1; - SpinLockRelease(&pgss->mutex); -} - /* * Given a query string (not necessarily null-terminated), allocate a new * entry in the external query text file and store the string there. @@ -2234,6 +1929,8 @@ qtext_store(const char *query, int query_len, Size off; int fd; + *query_offset = 0; + /* * We use a spinlock to protect extent/n_writers/gc_count, so that * multiple processes may execute this function concurrently. @@ -2366,8 +2063,8 @@ qtext_load_file(Size *buffer_size) /* * If we get a short read and errno doesn't get set, the reason is * probably that garbage collection truncated the file since we did - * the fstat(), so we don't log a complaint --- but we don't return - * the data, either, since it's most likely corrupt due to concurrent + * the fstat(), so we don't log a complaint; but we don't return the + * data, either, since it's most likely corrupt due to concurrent * writes from garbage collection. */ errno = 0; @@ -2420,8 +2117,6 @@ qtext_fetch(Size query_offset, int query_len, /* * Do we need to garbage-collect the external query text file? - * - * Caller should hold at least a shared lock on pgss->lock. */ static bool need_gc_qtexts(void) @@ -2478,13 +2173,13 @@ gc_qtexts(void) char *qbuffer; Size qbuffer_size; FILE *qfile = NULL; - HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; + dshash_seq_status hstat; + PgStatShared_HashEntry *p; Size extent; int nentries; /* - * When called from pgss_store, some other session might have proceeded + * When called from the bg worker, some other session might have proceeded * with garbage collection in the no-lock-held interim of lock strength * escalation. Check once more that this is actually necessary. */ @@ -2493,21 +2188,12 @@ gc_qtexts(void) /* * Load the old texts file. If we fail (out of memory, for instance), - * invalidate query texts. Hopefully this is rare. It might seem better - * to leave things alone on an OOM failure, but the problem is that the - * file is only going to get bigger; hoping for a future non-OOM result is - * risky and can easily lead to complete denial of service. + * invalidate query texts. Hopefully this is rare. */ qbuffer = qtext_load_file(&qbuffer_size); if (qbuffer == NULL) goto gc_fail; - /* - * We overwrite the query texts file in place, so as to reduce the risk of - * an out-of-disk-space failure. Since the file is guaranteed not to get - * larger, this should always work on traditional filesystems; though we - * could still lose on copy-on-write filesystems. - */ qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); if (qfile == NULL) { @@ -2521,21 +2207,32 @@ gc_qtexts(void) extent = 0; nentries = 0; - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); + while ((p = dshash_seq_next(&hstat)) != NULL) { - int query_len = entry->query_len; - char *qry = qtext_fetch(entry->query_offset, - query_len, - qbuffer, - qbuffer_size); + PgStatShared_Pgss *shared_entry; + int query_len; + char *qry; + + if (p->key.kind != PGSTAT_KIND_PGSS) + continue; + if (p->dropped) + continue; + + shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body); + + query_len = shared_entry->query_len; + if (query_len < 0) + continue; + + qry = qtext_fetch(shared_entry->query_offset, query_len, + qbuffer, qbuffer_size); if (qry == NULL) { - /* Trouble ... drop the text */ - entry->query_offset = 0; - entry->query_len = -1; - /* entry will not be counted in mean query length computation */ + /* Trouble ... mark text invalid */ + shared_entry->query_offset = 0; + shared_entry->query_len = -1; continue; } @@ -2545,19 +2242,16 @@ gc_qtexts(void) (errcode_for_file_access(), errmsg("could not write file \"%s\": %m", PGSS_TEXT_FILE))); - hash_seq_term(&hash_seq); + dshash_seq_term(&hstat); goto gc_fail; } - entry->query_offset = extent; + shared_entry->query_offset = extent; extent += query_len + 1; nentries++; } + dshash_seq_term(&hstat); - /* - * Truncate away any now-unused space. If this fails for some odd reason, - * we log it, but there's no need to fail. - */ if (ftruncate(fileno(qfile), extent) != 0) ereport(LOG, (errcode_for_file_access(), @@ -2577,13 +2271,8 @@ gc_qtexts(void) elog(DEBUG1, "pgss gc of queries file shrunk size from %zu to %zu", pgss->extent, extent); - /* Reset the shared extent pointer */ pgss->extent = extent; - /* - * Also update the mean query length, to be sure that need_gc_qtexts() - * won't still think we have a problem. - */ if (nentries > 0) pgss->mean_query_len = extent / nentries; else @@ -2591,19 +2280,11 @@ gc_qtexts(void) pfree(qbuffer); - /* - * OK, count a garbage collection cycle. (Note: even though we have - * exclusive lock on pgss->lock, we must take pgss->mutex for this, since - * other processes may examine gc_count while holding only the mutex. - * Also, we have to advance the count *after* we've rewritten the file, - * else other processes might not realize they read a stale file.) - */ record_gc_qtexts(); return; gc_fail: - /* clean up resources */ if (qfile) FreeFile(qfile); if (qbuffer) @@ -2611,18 +2292,24 @@ gc_fail: /* * Since the contents of the external file are now uncertain, mark all - * hashtable entries as having invalid texts. + * entries as having invalid texts. */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); + while ((p = dshash_seq_next(&hstat)) != NULL) { - entry->query_offset = 0; - entry->query_len = -1; + PgStatShared_Pgss *shared_entry; + + if (p->key.kind != PGSTAT_KIND_PGSS) + continue; + if (p->dropped) + continue; + + shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body); + shared_entry->query_offset = 0; + shared_entry->query_len = -1; } + dshash_seq_term(&hstat); - /* - * Destroy the query text file and create a new, empty one - */ (void) unlink(PGSS_TEXT_FILE); qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); if (qfile == NULL) @@ -2633,41 +2320,27 @@ gc_fail: else FreeFile(qfile); - /* Reset the shared extent pointer */ pgss->extent = 0; - - /* Reset mean_query_len to match the new state */ pgss->mean_query_len = ASSUMED_LENGTH_INIT; - /* - * Bump the GC count even though we failed. - * - * This is needed to make concurrent readers of file without any lock on - * pgss->lock notice existence of new version of file. Once readers - * subsequently observe a change in GC count with pgss->lock held, that - * forces a safe reopen of file. Writers also require that we bump here, - * of course. (As required by locking protocol, readers and writers don't - * trust earlier file contents until gc_count is found unchanged after - * pgss->lock acquired in shared or exclusive mode respectively.) - */ record_gc_qtexts(); } -#define SINGLE_ENTRY_RESET(e) \ -if (e) { \ +#define SINGLE_ENTRY_RESET(shared, key_ptr, minmax_only, stats_reset, num_remove) \ +if (shared) { \ if (minmax_only) { \ - /* When requested reset only min/max statistics of an entry */ \ for (int kind = 0; kind < PGSS_NUMKIND; kind++) \ { \ - e->counters.max_time[kind] = 0; \ - e->counters.min_time[kind] = 0; \ + (shared)->counters.max_time[kind] = 0; \ + (shared)->counters.min_time[kind] = 0; \ } \ - e->minmax_stats_since = stats_reset; \ + (shared)->minmax_stats_since = stats_reset; \ } \ else \ { \ - /* Remove the key otherwise */ \ - hash_search(pgss_hash, &e->key, HASH_REMOVE, NULL); \ + (shared)->query_len = -1; \ + pgstat_drop_entry(PGSTAT_KIND_PGSS, (key_ptr)->dbid, \ + pgss_objid(key_ptr)); \ num_remove++; \ } \ } @@ -2678,69 +2351,122 @@ if (e) { \ static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only) { - HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; + dshash_seq_status hstat; + PgStatShared_HashEntry *p; FILE *qfile; int64 num_entries; int64 num_remove = 0; - pgssHashKey key; TimestampTz stats_reset; - if (!pgss || !pgss_hash) + if (!pgss) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\""))); LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE); - num_entries = hash_get_num_entries(pgss_hash); + num_entries = pgstat_get_entry_count(PGSTAT_KIND_PGSS); stats_reset = GetCurrentTimestamp(); if (userid != 0 && dbid != 0 && queryid != INT64CONST(0)) { /* If all the parameters are available, use the fast path. */ + pgssHashKey key; + PgStat_EntryRef *entry_ref; + PgStatShared_Pgss *shared_entry; + memset(&key, 0, sizeof(pgssHashKey)); key.userid = userid; key.dbid = dbid; key.queryid = queryid; - /* - * Reset the entry if it exists, starting with the non-top-level - * entry. - */ + /* Reset non-top-level entry */ key.toplevel = false; - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); - - SINGLE_ENTRY_RESET(entry); + entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid, + pgss_objid(&key), false, NULL); + if (entry_ref) + { + shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats; + SINGLE_ENTRY_RESET(shared_entry, &key, minmax_only, stats_reset, num_remove); + } - /* Also reset the top-level entry if it exists. */ + /* Reset top-level entry */ key.toplevel = true; - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); - - SINGLE_ENTRY_RESET(entry); + entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS, key.dbid, + pgss_objid(&key), false, NULL); + if (entry_ref) + { + shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats; + SINGLE_ENTRY_RESET(shared_entry, &key, minmax_only, stats_reset, num_remove); + } } - else if (userid != 0 || dbid != 0 || queryid != INT64CONST(0)) + else { - /* Reset entries corresponding to valid parameters. */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + /* + * Iterate all entries and reset matching ones. We cannot call + * pgstat_drop_entry() while iterating the dshash (it internally + * acquires partition locks), so collect keys to drop and do it after. + */ + PendingDrop *to_drop = NULL; + int num_to_drop = 0; + int max_to_drop = 0; + + dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); + while ((p = dshash_seq_next(&hstat)) != NULL) { - if ((!userid || entry->key.userid == userid) && - (!dbid || entry->key.dbid == dbid) && - (!queryid || entry->key.queryid == queryid)) + PgStatShared_Pgss *shared_entry; + + if (p->key.kind != PGSTAT_KIND_PGSS) + continue; + if (p->dropped) + continue; + + shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body); + + if ((!userid || shared_entry->key.userid == userid) && + (!dbid || shared_entry->key.dbid == dbid) && + (!queryid || shared_entry->key.queryid == queryid)) { - SINGLE_ENTRY_RESET(entry); + if (minmax_only) + { + for (int kind = 0; kind < PGSS_NUMKIND; kind++) + { + shared_entry->counters.max_time[kind] = 0; + shared_entry->counters.min_time[kind] = 0; + } + shared_entry->minmax_stats_since = stats_reset; + } + else + { + shared_entry->query_len = -1; + + /* Collect for deferred drop */ + if (num_to_drop >= max_to_drop) + { + max_to_drop = Max(max_to_drop * 2, 128); + if (to_drop == NULL) + to_drop = palloc_array(PendingDrop, max_to_drop); + else + to_drop = repalloc_array(to_drop, PendingDrop, max_to_drop); + } + to_drop[num_to_drop].dbid = shared_entry->key.dbid; + to_drop[num_to_drop].objid = pgss_objid(&shared_entry->key); + num_to_drop++; + } } } - } - else - { - /* Reset all entries. */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + dshash_seq_term(&hstat); + + /* Now drop entries outside the iteration */ + for (int i = 0; i < num_to_drop; i++) { - SINGLE_ENTRY_RESET(entry); + pgstat_drop_entry(PGSTAT_KIND_PGSS, + to_drop[i].dbid, to_drop[i].objid); } + num_remove = num_to_drop; + + if (to_drop) + pfree(to_drop); } /* All entries are removed? */ @@ -2790,6 +2516,456 @@ release_lock: return stats_reset; } +/* + * pgstat flush callback: merge pending stats into shared memory. + * + * This is called by the pgstat infrastructure to flush accumulated + * backend-local statistics into the shared entry. + */ +static bool +pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait) +{ + PgStat_PendingPgss *pending; + PgStatShared_Pgss *shared_entry; + Counters *shared; + Counters *p; + + pending = (PgStat_PendingPgss *) entry_ref->pending; + shared_entry = (PgStatShared_Pgss *) entry_ref->shared_stats; + + if (!pgstat_lock_entry(entry_ref, nowait)) + return false; + + shared = &shared_entry->counters; + p = &pending->counters; + + for (int kind = 0; kind < PGSS_NUMKIND; kind++) + { + if (p->calls[kind] == 0) + continue; + + /* + * Merge variance using Chan's parallel variance algorithm to combine + * per-backend sum_var_time (computed via Welford's method) with the + * shared aggregate. This must be done before updating calls/totals. + * See + * + */ + if (shared->calls[kind] > 0) + { + double old_mean_a = shared->mean_time[kind]; + double old_mean_b = p->mean_time[kind]; + double delta = old_mean_a - old_mean_b; + double n_a = shared->calls[kind]; + double n_b = p->calls[kind]; + + shared->sum_var_time[kind] += p->sum_var_time[kind] + + delta * delta * n_a * n_b / (n_a + n_b); + } + else + { + shared->sum_var_time[kind] = p->sum_var_time[kind]; + } + + shared->calls[kind] += p->calls[kind]; + shared->total_time[kind] += p->total_time[kind]; + + /* + * Update min/max. If both are 0 in shared, it means a reset + * happened, so treat the pending values as the new baseline. + */ + if (shared->min_time[kind] == 0 && shared->max_time[kind] == 0) + { + shared->min_time[kind] = p->min_time[kind]; + shared->max_time[kind] = p->max_time[kind]; + } + else + { + if (shared->min_time[kind] > p->min_time[kind]) + shared->min_time[kind] = p->min_time[kind]; + if (shared->max_time[kind] < p->max_time[kind]) + shared->max_time[kind] = p->max_time[kind]; + } + + /* Recompute mean from totals */ + shared->mean_time[kind] = + shared->total_time[kind] / shared->calls[kind]; + } + + shared->rows += p->rows; + shared->shared_blks_hit += p->shared_blks_hit; + shared->shared_blks_read += p->shared_blks_read; + shared->shared_blks_dirtied += p->shared_blks_dirtied; + shared->shared_blks_written += p->shared_blks_written; + shared->local_blks_hit += p->local_blks_hit; + shared->local_blks_read += p->local_blks_read; + shared->local_blks_dirtied += p->local_blks_dirtied; + shared->local_blks_written += p->local_blks_written; + shared->temp_blks_read += p->temp_blks_read; + shared->temp_blks_written += p->temp_blks_written; + shared->shared_blk_read_time += p->shared_blk_read_time; + shared->shared_blk_write_time += p->shared_blk_write_time; + shared->local_blk_read_time += p->local_blk_read_time; + shared->local_blk_write_time += p->local_blk_write_time; + shared->temp_blk_read_time += p->temp_blk_read_time; + shared->temp_blk_write_time += p->temp_blk_write_time; + shared->usage += p->usage; + shared->wal_records += p->wal_records; + shared->wal_fpi += p->wal_fpi; + shared->wal_bytes += p->wal_bytes; + shared->wal_buffers_full += p->wal_buffers_full; + shared->jit_functions += p->jit_functions; + shared->jit_generation_time += p->jit_generation_time; + shared->jit_deform_count += p->jit_deform_count; + shared->jit_deform_time += p->jit_deform_time; + shared->jit_inlining_count += p->jit_inlining_count; + shared->jit_inlining_time += p->jit_inlining_time; + shared->jit_optimization_count += p->jit_optimization_count; + shared->jit_optimization_time += p->jit_optimization_time; + shared->jit_emission_count += p->jit_emission_count; + shared->jit_emission_time += p->jit_emission_time; + shared->parallel_workers_to_launch += p->parallel_workers_to_launch; + shared->parallel_workers_launched += p->parallel_workers_launched; + shared->generic_plan_calls += p->generic_plan_calls; + shared->custom_plan_calls += p->custom_plan_calls; + + pgstat_unlock_entry(entry_ref); + + return true; +} + +/* + * pgstat serialization callback: write query text info for an entry. + * + * We write the query text offset, length, encoding, and the full pgssHashKey + * to the stats file so we can reconstruct the entry on reload. + */ +static void +pgss_to_serialized_data(const PgStat_HashKey *key, + const PgStatShared_Common *header, + FILE *statfile) +{ + const PgStatShared_Pgss *entry = (const PgStatShared_Pgss *) header; + uint32 magic = PGSS_FILE_HEADER; + pgssHashKey hkey = entry->key; + TimestampTz stats_since = entry->stats_since; + TimestampTz minmax_stats_since = entry->minmax_stats_since; + int query_len = entry->query_len; + int encoding = entry->encoding; + + pgstat_write_chunk_s(statfile, &magic); + pgstat_write_chunk_s(statfile, &hkey); + pgstat_write_chunk_s(statfile, &query_len); + pgstat_write_chunk_s(statfile, &encoding); + pgstat_write_chunk_s(statfile, &stats_since); + pgstat_write_chunk_s(statfile, &minmax_stats_since); + + /* + * Write the query text itself into the stats file so it survives restarts + * (PGSS_TEXT_FILE lives in a tmpfs that gets wiped). + */ + if (query_len >= 0) + { + char *qstr = NULL; + + if (!pgss_qtext_write_buffer && pgss) + pgss_qtext_write_buffer = qtext_load_file(&pgss_qtext_write_buffer_size); + + if (pgss_qtext_write_buffer) + qstr = qtext_fetch(entry->query_offset, query_len, + pgss_qtext_write_buffer, + pgss_qtext_write_buffer_size); + + if (qstr) + pgstat_write_chunk(statfile, qstr, query_len + 1); + else + { + char nul = '\0'; + + pgstat_write_chunk(statfile, &nul, 1); + } + } +} + +/* + * pgstat deserialization callback: read query text info for an entry. + */ +static bool +pgss_from_serialized_data(const PgStat_HashKey *key, + PgStatShared_Common *header, + FILE *statfile) +{ + PgStatShared_Pgss *entry = (PgStatShared_Pgss *) header; + int query_len; + int encoding; + uint32 magic; + + if (!pgstat_read_chunk_s(statfile, &magic)) + return false; + if (magic != PGSS_FILE_HEADER) + { + elog(WARNING, "pg_stat_statements: discarding stats with mismatched format (got 0x%08X, expected 0x%08X)", + magic, PGSS_FILE_HEADER); + return false; + } + + if (!pgstat_read_chunk_s(statfile, &entry->key)) + return false; + if (!pgstat_read_chunk_s(statfile, &query_len)) + return false; + if (!pgstat_read_chunk_s(statfile, &encoding)) + return false; + if (!pgstat_read_chunk_s(statfile, &entry->stats_since)) + return false; + if (!pgstat_read_chunk_s(statfile, &entry->minmax_stats_since)) + return false; + + /* Initialize text fields */ + entry->query_len = -1; + entry->encoding = encoding; + entry->query_offset = 0; + + /* + * Read the query text and store it in the external file. + */ + if (query_len >= 0) + { + char *buf = palloc(query_len + 1); + + if (!pgstat_read_chunk(statfile, buf, query_len + 1)) + { + pfree(buf); + return false; + } + + if (!pgss_qtext_rebuild_file) + { + pgss_qtext_rebuild_file = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); + if (!pgss_qtext_rebuild_file) + { + pfree(buf); + return false; + } + pgss_qtext_rebuild_extent = 0; + } + + entry->query_offset = pgss_qtext_rebuild_extent; + + if (fwrite(buf, 1, query_len + 1, pgss_qtext_rebuild_file) != (size_t) (query_len + 1)) + { + pfree(buf); + return false; + } + pgss_qtext_rebuild_extent += query_len + 1; + + entry->query_len = query_len; + entry->encoding = encoding; + pfree(buf); + } + + return true; +} + +/* + * pgstat finish callback: handle end of stats file operations. + * + * For pg_stat_statements, we manage the query text file lifecycle here. + */ +static void +pgss_finish(PgStat_StatsFileOp status) +{ + switch (status) + { + case STATS_WRITE: + /* Free the cached query text buffer used during serialization */ + if (pgss_qtext_write_buffer) + { + pfree(pgss_qtext_write_buffer); + pgss_qtext_write_buffer = NULL; + pgss_qtext_write_buffer_size = 0; + } + break; + + case STATS_READ: + /* Close the rebuild file and update shared extent */ + if (pgss_qtext_rebuild_file) + { + FreeFile(pgss_qtext_rebuild_file); + pgss_qtext_rebuild_file = NULL; + if (pgss) + { + pgss->extent = pgss_qtext_rebuild_extent; + } + pgss_qtext_rebuild_extent = 0; + } + + /* + * If pg_stat_statements.save is disabled, discard all entries + * that were just loaded from the stats file. + */ + if (!pgss_save) + { + entry_reset(0, 0, 0, false); + } + break; + + case STATS_DISCARD: + unlink(PGSS_TEXT_FILE); + break; + } +} + +/* + * Evict least-used entries when the entry count exceeds pgss_max. + * + * Sorts all entries by usage, applies a decay factor, then drops the + * bottom USAGE_DEALLOC_PERCENT of entries. + */ +static void +entry_dealloc(void) +{ + dshash_seq_status hstat; + PgStatShared_HashEntry *p; + UsageEntry *entries; + int nentries = 0; + int allocated = 1024; + int nvictims; + int i; + Size tottextlen = 0; + int nvalidtexts = 0; + + entries = palloc(allocated * sizeof(UsageEntry)); + + /* Scan all entries, collect usage info and apply decay */ + dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); + while ((p = dshash_seq_next(&hstat)) != NULL) + { + PgStatShared_Pgss *shared_entry; + + if (p->key.kind != PGSTAT_KIND_PGSS) + continue; + if (p->dropped) + continue; + + shared_entry = (PgStatShared_Pgss *) dsa_get_address(pgStatLocal.dsa, p->body); + + /* Skip entries not yet executed; protect parse-time entries. */ + if (shared_entry->counters.calls[PGSS_PLAN] + + shared_entry->counters.calls[PGSS_EXEC] == 0) + continue; + + if (nentries >= allocated) + { + allocated *= 2; + entries = repalloc(entries, allocated * sizeof(UsageEntry)); + } + + entries[nentries].key = shared_entry->key; + entries[nentries].usage = shared_entry->counters.usage; + nentries++; + + /* Apply decay */ + shared_entry->counters.usage *= USAGE_DECREASE_FACTOR; + + if (shared_entry->query_len >= 0) + { + tottextlen += shared_entry->query_len + 1; + nvalidtexts++; + } + } + dshash_seq_term(&hstat); + + /* Sort by usage ascending */ + qsort(entries, nentries, sizeof(UsageEntry), + entry_cmp); + + /* Update mean query length */ + if (nvalidtexts > 0) + pgss->mean_query_len = tottextlen / nvalidtexts; + else + pgss->mean_query_len = ASSUMED_LENGTH_INIT; + + /* Drop the bottom fraction */ + nvictims = Max(10, nentries * USAGE_DEALLOC_PERCENT / 100); + nvictims = Min(nvictims, nentries); + + for (i = 0; i < nvictims; i++) + { + pgstat_drop_entry(PGSTAT_KIND_PGSS, + entries[i].key.dbid, + pgss_objid(&entries[i].key)); + } + + pfree(entries); + + /* + * Signal other backends to invalidate their cached references to the + * dropped entries. Without this, backends keep stale refs and never + * re-create evicted entries. + */ + pgstat_request_entry_refs_gc(); + + /* Increment dealloc counter */ + SpinLockAcquire(&pgss->mutex); + pgss->stats.dealloc += 1; + SpinLockRelease(&pgss->mutex); +} + +/* + * qsort comparator for eviction: sort by usage ascending. + */ +static int +entry_cmp(const void *a, const void *b) +{ + double l = ((const UsageEntry *) a)->usage; + double r = ((const UsageEntry *) b)->usage; + + if (l < r) + return -1; + else if (l > r) + return +1; + else + return 0; +} + +/* + * Attempt eviction if enough time has passed since the last cycle. + * + * Uses a conditional lock so that at most one backend performs eviction at a + * time; others simply return without blocking. The time check ensures we + * don't evict more often than EVICTION_INTERVAL_MS milliseconds. + */ +static void +pgss_maybe_evict(void) +{ + /* + * Use the statement start timestamp since this is always called from + * pgss_store() at the start of query execution. + */ + TimestampTz now = GetCurrentStatementStartTimestamp(); + + if (!TimestampDifferenceExceeds(pgss->last_eviction_time, now, + EVICTION_INTERVAL_MS)) + return; + + if (!LWLockConditionalAcquire(&pgss->lock.lock, LW_EXCLUSIVE)) + return; + + /* Re-check after acquiring lock */ + if (TimestampDifferenceExceeds(pgss->last_eviction_time, now, + EVICTION_INTERVAL_MS)) + { + entry_dealloc(); + pgss->last_eviction_time = now; + + /* Also handle query text GC while we hold the lock */ + if (need_gc_qtexts()) + gc_qtexts(); + } + + LWLockRelease(&pgss->lock.lock); +} + /* * Generate a normalized version of the query string that will be used to * represent all similar queries. diff --git a/doc/src/sgml/pgstatstatements.sgml b/doc/src/sgml/pgstatstatements.sgml index d753de5836e..19b1dab74c7 100644 --- a/doc/src/sgml/pgstatstatements.sgml +++ b/doc/src/sgml/pgstatstatements.sgml @@ -809,7 +809,6 @@ calls | 2 pg_stat_statements view were last reset. - @@ -911,11 +910,16 @@ calls | 2 statements tracked by the module (i.e., the maximum number of rows in the pg_stat_statements view). If more distinct statements than that are observed, information about the least-executed - statements is discarded. The number of times such information was + statements is discarded. Eviction is throttled to occur at most once + every 10 seconds; until then, new entries are simply not created once + the limit is reached while existing entries continue to accumulate + statistics normally. + The number of times such information was discarded can be seen in the pg_stat_statements_info view. The default value is 5000. - This parameter can only be set at server start. + This parameter can be changed at any time by reloading the server + configuration. @@ -1008,13 +1012,15 @@ calls | 2 The module requires additional shared memory proportional to - pg_stat_statements.max. Note that this - memory is consumed whenever the module is loaded, even if + pg_stat_statements.max. Note that this memory is + consumed whenever the module is loaded, even if pg_stat_statements.track is set to none. - These parameters must be set in postgresql.conf. + These parameters are typically set in postgresql.conf. + Note that pg_stat_statements.max can be changed + without a server restart by reloading the configuration. Typical usage might be: diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8cf40c87043..62351ab09cd 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2322,6 +2322,7 @@ PgStatShared_Function PgStatShared_HashEntry PgStatShared_IO PgStatShared_Lock +PgStatShared_Pgss PgStatShared_Relation PgStatShared_ReplSlot PgStatShared_SLRU -- 2.50.1 (Apple Git-155)