diff --git a/contrib/bloom/bloom.h b/contrib/bloom/bloom.h index fba3ba7771..383b1fb8e3 100644 --- a/contrib/bloom/bloom.h +++ b/contrib/bloom/bloom.h @@ -174,6 +174,13 @@ typedef struct BloomScanOpaqueData typedef BloomScanOpaqueData *BloomScanOpaque; +typedef struct +{ + uint64 matches; +} BloomUsage; + +extern BloomUsage bloomUsage; + /* blutils.c */ extern void initBloomState(BloomState *state, Relation index); extern void BloomFillMetapage(Relation index, Page metaPage); diff --git a/contrib/bloom/blscan.c b/contrib/bloom/blscan.c index 0a303a49b2..da29dc7f68 100644 --- a/contrib/bloom/blscan.c +++ b/contrib/bloom/blscan.c @@ -166,6 +166,6 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) CHECK_FOR_INTERRUPTS(); } FreeAccessStrategy(bas); - + bloomUsage.matches += ntids; return ntids; } diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c index 6836129c90..9120f96083 100644 --- a/contrib/bloom/blutils.c +++ b/contrib/bloom/blutils.c @@ -18,6 +18,7 @@ #include "access/reloptions.h" #include "bloom.h" #include "catalog/index.h" +#include "commands/explain.h" #include "commands/vacuum.h" #include "miscadmin.h" #include "storage/bufmgr.h" @@ -34,6 +35,55 @@ PG_FUNCTION_INFO_V1(blhandler); +BloomUsage bloomUsage; + +static void +bloomUsageAdd(CustomResourceUsage* dst, CustomResourceUsage const* add) +{ + ((BloomUsage*)dst)->matches += ((BloomUsage*)add)->matches; +} + +static void +bloomUsageAccum(CustomResourceUsage* acc, CustomResourceUsage const* end, CustomResourceUsage const* start) +{ + ((BloomUsage*)acc)->matches += ((BloomUsage*)end)->matches - ((BloomUsage*)start)->matches;; +} + +static void +bloomUsageShow(ExplainState* es, CustomResourceUsage const* usage, bool planning) +{ + if (es->format == EXPLAIN_FORMAT_TEXT) + { + if (planning) + { + ExplainIndentText(es); + appendStringInfoString(es->str, "Planning:\n"); + es->indent++; + } + ExplainIndentText(es); + appendStringInfoString(es->str, "Bloom:"); + appendStringInfo(es->str, " matches=%lld", + (long long) ((BloomUsage*)usage)->matches); + appendStringInfoChar(es->str, '\n'); + if (planning) + es->indent--; + } + else + { + ExplainPropertyInteger("Bloom Matches", NULL, + ((BloomUsage*)usage)->matches, es); + } +} + +static CustomInstrumentation bloomInstr = { + "bloom", + sizeof(BloomUsage), + &bloomUsage, + bloomUsageAdd, + bloomUsageAccum, + bloomUsageShow +}; + /* Kind of relation options for bloom index */ static relopt_kind bl_relopt_kind; @@ -78,6 +128,7 @@ _PG_init(void) bl_relopt_tab[i + 1].opttype = RELOPT_TYPE_INT; bl_relopt_tab[i + 1].offset = offsetof(BloomOptions, bitSize[0]) + sizeof(int) * i; } + RegisterCustomInsrumentation(&bloomInstr); } /* diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 1087a9011e..67b9d8351e 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -49,6 +49,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005) +#define PARALLEL_KEY_CUST_USAGE UINT64CONST(0xA000000000000006) /* * Status for index builds performed in parallel. This is allocated in a @@ -143,6 +144,7 @@ typedef struct BrinLeader Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; + char *custusage; } BrinLeader; /* @@ -2342,6 +2344,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader)); WalUsage *walusage; BufferUsage *bufferusage; + char *custusage; bool leaderparticipates = true; int querylen; @@ -2396,6 +2399,10 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -2475,6 +2482,9 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bufferusage = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + custusage = shm_toc_allocate(pcxt->toc, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_CUST_USAGE, custusage); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -2487,6 +2497,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->snapshot = snapshot; brinleader->walusage = walusage; brinleader->bufferusage = bufferusage; + brinleader->custusage = custusage; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -2669,7 +2680,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) * or we might get incomplete data.) */ for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); + InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i], brinleader->custusage + pgCustUsageSize*i); cleanup: @@ -2793,6 +2804,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) LOCKMODE indexLockmode; WalUsage *walusage; BufferUsage *bufferusage; + char *custusage; int sortmem; /* @@ -2852,8 +2864,10 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + custusage = shm_toc_lookup(toc, PARALLEL_KEY_CUST_USAGE, false); InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + &walusage[ParallelWorkerNumber], + custusage + ParallelWorkerNumber*pgCustUsageSize); index_close(indexRel, indexLockmode); table_close(heapRel, heapLockmode); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 2011196579..3e7b567d37 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -71,6 +71,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xA000000000000006) +#define PARALLEL_KEY_CUST_USAGE UINT64CONST(0xA000000000000007) /* * DISABLE_LEADER_PARTICIPATION disables the leader's participation in @@ -197,6 +198,7 @@ typedef struct BTLeader Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; + char *custusage; } BTLeader; /* @@ -1467,6 +1469,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader)); WalUsage *walusage; BufferUsage *bufferusage; + char *custusage; bool leaderparticipates = true; int querylen; @@ -1532,6 +1535,9 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -1625,6 +1631,10 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) bufferusage = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + custusage = shm_toc_allocate(pcxt->toc, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_CUST_USAGE, custusage); + /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1638,6 +1648,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btleader->snapshot = snapshot; btleader->walusage = walusage; btleader->bufferusage = bufferusage; + btleader->custusage = custusage; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1676,7 +1687,7 @@ _bt_end_parallel(BTLeader *btleader) * or we might get incomplete data.) */ for (i = 0; i < btleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); + InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i], btleader->custusage + pgCustUsageSize*i); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(btleader->snapshot)) @@ -1811,6 +1822,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) LOCKMODE indexLockmode; WalUsage *walusage; BufferUsage *bufferusage; + char *custusage; int sortmem; #ifdef BTREE_BUILD_STATS @@ -1891,8 +1903,10 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + custusage = shm_toc_lookup(toc, PARALLEL_KEY_CUST_USAGE, false); InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], - &walusage[ParallelWorkerNumber]); + &walusage[ParallelWorkerNumber], + custusage + ParallelWorkerNumber*pgCustUsageSize); #ifdef BTREE_BUILD_STATS if (log_btree_build_stats) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 3d590a6b9f..d3ebb2a381 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -119,6 +119,8 @@ static void show_instrumentation_count(const char *qlabel, int which, static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es); static void show_eval_params(Bitmapset *bms_params, ExplainState *es); static const char *explain_get_index_name(Oid indexId); +static void show_custom_usage(ExplainState *es, const char* usage, + bool planning); static void show_buffer_usage(ExplainState *es, const BufferUsage *usage, bool planning); static void show_wal_usage(ExplainState *es, const WalUsage *usage); @@ -149,7 +151,6 @@ static void ExplainRestoreGroup(ExplainState *es, int depth, int *state_save); static void ExplainDummyGroup(const char *objtype, const char *labelname, ExplainState *es); static void ExplainXMLTag(const char *tagname, int flags, ExplainState *es); -static void ExplainIndentText(ExplainState *es); static void ExplainJSONLineEnding(ExplainState *es); static void ExplainYAMLLineStarting(ExplainState *es); static void escape_yaml(StringInfo buf, const char *str); @@ -170,6 +171,7 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt, Query *query; List *rewritten; ListCell *lc; + ListCell *c; bool timing_set = false; bool summary_set = false; @@ -222,11 +224,28 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt, parser_errposition(pstate, opt->location))); } else - ereport(ERROR, + { + bool found = false; + foreach (c, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(c); + if (strcmp(opt->defname, ci->name) == 0) + { + ci->selected = true; + es->custom = true; + found = true; + break; + } + } + if (!found) + { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized EXPLAIN option \"%s\"", opt->defname), parser_errposition(pstate, opt->location))); + } + } } /* check that WAL is used with EXPLAIN ANALYZE */ @@ -320,12 +339,19 @@ ExplainState * NewExplainState(void) { ExplainState *es = (ExplainState *) palloc0(sizeof(ExplainState)); + ListCell* lc; /* Set default options (most fields can be left as zeroes). */ es->costs = true; /* Prepare output buffer. */ es->str = makeStringInfo(); + /* Reset custom instrumentations selection flag */ + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + ci->selected = false; + } return es; } @@ -397,9 +423,14 @@ ExplainOneQuery(Query *query, int cursorOptions, planduration; BufferUsage bufusage_start, bufusage; + CustomInstrumentationData custusage_start, custusage; if (es->buffers) bufusage_start = pgBufferUsage; + + if (es->custom) + GetCustomInstrumentationState(custusage_start.data); + INSTR_TIME_SET_CURRENT(planstart); /* plan the query */ @@ -415,9 +446,14 @@ ExplainOneQuery(Query *query, int cursorOptions, BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } + if (es->custom) + AccumulateCustomInstrumentationState(custusage.data, custusage_start.data); + /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, + (es->buffers ? &bufusage : NULL), + (es->custom ? &custusage : NULL)); } } @@ -527,7 +563,8 @@ void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage) + const BufferUsage *bufusage, + const CustomInstrumentationData *custusage) { DestReceiver *dest; QueryDesc *queryDesc; @@ -623,6 +660,13 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, ExplainCloseGroup("Planning", "Planning", true, es); } + if (custusage) + { + ExplainOpenGroup("Planning", "Planning", true, es); + show_custom_usage(es, custusage->data, true); + ExplainCloseGroup("Planning", "Planning", true, es); + } + if (es->summary && planduration) { double plantime = INSTR_TIME_GET_DOUBLE(*planduration); @@ -2110,8 +2154,12 @@ ExplainNode(PlanState *planstate, List *ancestors, if (es->wal && planstate->instrument) show_wal_usage(es, &planstate->instrument->walusage); + /* Show custom instrumentation */ + if (es->custom && planstate->instrument) + show_custom_usage(es, planstate->instrument->cust_usage.data, false); + /* Prepare per-worker buffer/WAL usage */ - if (es->workers_state && (es->buffers || es->wal) && es->verbose) + if (es->workers_state && (es->buffers || es->wal || es->custom) && es->verbose) { WorkerInstrumentation *w = planstate->worker_instrument; @@ -2128,6 +2176,8 @@ ExplainNode(PlanState *planstate, List *ancestors, show_buffer_usage(es, &instrument->bufusage, false); if (es->wal) show_wal_usage(es, &instrument->walusage); + if (es->custom) + show_custom_usage(es, instrument->cust_usage.data, false); ExplainCloseWorker(n, es); } } @@ -3544,6 +3594,23 @@ explain_get_index_name(Oid indexId) return result; } +/* + * Show select custom usage details + */ +static void +show_custom_usage(ExplainState *es, const char* usage, bool planning) +{ + ListCell* lc; + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + if (ci->selected) + ci->show(es, usage, planning); + usage += ci->size; + } +} + /* * Show buffer usage details. */ @@ -5017,7 +5084,7 @@ ExplainXMLTag(const char *tagname, int flags, ExplainState *es) * data for a parallel worker there might already be data on the current line * (cf. ExplainOpenWorker); in that case, don't indent any more. */ -static void +void ExplainIndentText(ExplainState *es) { Assert(es->format == EXPLAIN_FORMAT_TEXT); diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 703976f633..806dd1e85c 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -583,9 +583,13 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, instr_time planduration; BufferUsage bufusage_start, bufusage; + CustomInstrumentationData custusage_start, custusage; if (es->buffers) bufusage_start = pgBufferUsage; + if (es->custom) + GetCustomInstrumentationState(custusage_start.data); + INSTR_TIME_SET_CURRENT(planstart); /* Look it up in the hash table */ @@ -630,6 +634,8 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } + if (es->custom) + AccumulateCustomInstrumentationState(custusage.data, custusage_start.data); plan_list = cplan->stmt_list; @@ -640,7 +646,9 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, if (pstmt->commandType != CMD_UTILITY) ExplainOnePlan(pstmt, into, es, query_string, paramLI, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, + (es->buffers ? &bufusage : NULL), + (es->custom ? &custusage : NULL)); else ExplainOneUtility(pstmt->utilityStmt, into, es, query_string, paramLI, queryEnv); diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index e087dfd72e..9b2add2b65 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -50,6 +50,7 @@ #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 #define PARALLEL_VACUUM_KEY_WAL_USAGE 5 #define PARALLEL_VACUUM_KEY_INDEX_STATS 6 +#define PARALLEL_VACUUM_KEY_CUSTOM_USAGE 7 /* * Shared information among parallel workers. So this is allocated in the DSM @@ -184,6 +185,9 @@ struct ParallelVacuumState /* Points to WAL usage area in DSM */ WalUsage *wal_usage; + /* Points to custom usage area in DSM */ + char *custom_usage; + /* * False if the index is totally unsuitable target for all parallel * processing. For example, the index could be < @@ -242,6 +246,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVIndStats *indstats; BufferUsage *buffer_usage; WalUsage *wal_usage; + char *custom_usage; bool *will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; @@ -313,6 +318,9 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(WalUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -403,6 +411,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, mul_size(sizeof(WalUsage), pcxt->nworkers)); shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); pvs->wal_usage = wal_usage; + custom_usage = shm_toc_allocate(pcxt->toc, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_CUSTOM_USAGE, custom_usage); + pvs->custom_usage = custom_usage; /* Store query string for workers */ if (debug_query_string) @@ -706,7 +718,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan WaitForParallelWorkersToFinish(pvs->pcxt); for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i], pvs->custom_usage + pgCustUsageSize*i); } /* @@ -964,6 +976,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) VacDeadItems *dead_items; BufferUsage *buffer_usage; WalUsage *wal_usage; + char *custom_usage; int nindexes; char *sharedquery; ErrorContextCallback errcallback; @@ -1053,8 +1066,10 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); + custom_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_CUSTOM_USAGE, false); InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + &wal_usage[ParallelWorkerNumber], + custom_usage + pgCustUsageSize*ParallelWorkerNumber); /* Pop the error context stack */ error_context_stack = errcallback.previous; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 540f8d21fd..f06553e5b6 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -66,6 +66,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_CUSTOM_USAGE UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -600,6 +601,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; + char *customusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -680,6 +682,13 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(sizeof(WalUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* + * Same thing for CustomUsage. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); @@ -768,6 +777,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); pei->wal_usage = walusage_space; + customusage_space = shm_toc_allocate(pcxt->toc, + mul_size(pgCustUsageSize, pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_CUSTOM_USAGE, customusage_space); + pei->custom_usage = customusage_space; + /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -1164,7 +1178,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei) * finish, or we might get incomplete data.) */ for (i = 0; i < nworkers; i++) - InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i], pei->custom_usage + pgCustUsageSize*i); pei->finished = true; } @@ -1397,6 +1411,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; WalUsage *wal_usage; + char *custom_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1472,8 +1487,10 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Report buffer/WAL usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + custom_usage = shm_toc_lookup(toc, PARALLEL_KEY_CUSTOM_USAGE, false); InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); + &wal_usage[ParallelWorkerNumber], + custom_usage + ParallelWorkerNumber*pgCustUsageSize); /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 268ae8a945..fd1d1c7b7e 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -16,15 +16,62 @@ #include #include "executor/instrument.h" +#include "utils/memutils.h" BufferUsage pgBufferUsage; static BufferUsage save_pgBufferUsage; WalUsage pgWalUsage; static WalUsage save_pgWalUsage; +List* pgCustInstr; /* description of custom instriumentations */ +Size pgCustUsageSize; +static CustomInstrumentationData save_pgCustUsage; /* saved custom instrumentation state */ + + static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); static void WalUsageAdd(WalUsage *dst, WalUsage *add); +void +RegisterCustomInsrumentation(CustomInstrumentation* inst) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext); + pgCustInstr = lappend(pgCustInstr, inst); + pgCustUsageSize += inst->size; + MemoryContextSwitchTo(oldcontext); + if (pgCustUsageSize > MAX_CUSTOM_INSTR_SIZE) + elog(ERROR, "Total size of custom instrumentations exceed limit %d", MAX_CUSTOM_INSTR_SIZE); +} + +void +GetCustomInstrumentationState(char* dst) +{ + ListCell* lc; + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + memcpy(dst, ci->usage, ci->size); + dst += ci->size; + } +} + +void +AccumulateCustomInstrumentationState(char* dst, char const* before) +{ + ListCell* lc; + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + if (ci->selected) + { + memset(dst, 0, ci->size); + ci->accum(dst, ci->usage, before); + } + dst += ci->size; + before += ci->size; + } +} /* Allocate new instrumentation structure(s) */ Instrumentation * @@ -49,7 +96,6 @@ InstrAlloc(int n, int instrument_options, bool async_mode) instr[i].async_mode = async_mode; } } - return instr; } @@ -67,6 +113,8 @@ InstrInit(Instrumentation *instr, int instrument_options) void InstrStartNode(Instrumentation *instr) { + ListCell *lc; + char* cust_start = instr->cust_usage_start.data; if (instr->need_timer && !INSTR_TIME_SET_CURRENT_LAZY(instr->starttime)) elog(ERROR, "InstrStartNode called twice in a row"); @@ -77,6 +125,13 @@ InstrStartNode(Instrumentation *instr) if (instr->need_walusage) instr->walusage_start = pgWalUsage; + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + memcpy(cust_start, ci->usage, ci->size); + cust_start += ci->size; + } } /* Exit from a plan node */ @@ -85,6 +140,9 @@ InstrStopNode(Instrumentation *instr, double nTuples) { double save_tuplecount = instr->tuplecount; instr_time endtime; + ListCell *lc; + char *cust_start = instr->cust_usage_start.data; + char *cust_usage = instr->cust_usage.data; /* count the returned tuples */ instr->tuplecount += nTuples; @@ -110,7 +168,15 @@ InstrStopNode(Instrumentation *instr, double nTuples) WalUsageAccumDiff(&instr->walusage, &pgWalUsage, &instr->walusage_start); - /* Is this the first tuple of this cycle? */ + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + ci->accum(cust_usage, ci->usage, cust_start); + cust_start += ci->size; + cust_usage += ci->size; + } + + /* Is this the first tuple of this cycle? */ if (!instr->running) { instr->running = true; @@ -168,6 +234,10 @@ InstrEndLoop(Instrumentation *instr) void InstrAggNode(Instrumentation *dst, Instrumentation *add) { + ListCell *lc; + char *cust_dst = dst->cust_usage.data; + char *cust_add = add->cust_usage.data; + if (!dst->running && add->running) { dst->running = true; @@ -193,32 +263,69 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add) if (dst->need_walusage) WalUsageAdd(&dst->walusage, &add->walusage); + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + ci->add(cust_dst, cust_add); + cust_dst += ci->size; + cust_add += ci->size; + } } /* note current values during parallel executor startup */ void InstrStartParallelQuery(void) { + ListCell* lc; + char* cust_dst = save_pgCustUsage.data; + save_pgBufferUsage = pgBufferUsage; save_pgWalUsage = pgWalUsage; + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + memcpy(cust_dst, ci->usage, ci->size); + cust_dst += ci->size; + } } /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, char* cust_usage) { + ListCell *lc; + char* cust_save = save_pgCustUsage.data; + memset(bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); memset(walusage, 0, sizeof(WalUsage)); WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + ci->accum(cust_usage, ci->usage, cust_save); + cust_usage += ci->size; + cust_save += ci->size; + } } /* accumulate work done by workers in leader's stats */ void -InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, char* cust_usage) { + ListCell *lc; BufferUsageAdd(&pgBufferUsage, bufusage); WalUsageAdd(&pgWalUsage, walusage); + + foreach (lc, pgCustInstr) + { + CustomInstrumentation *ci = (CustomInstrumentation*)lfirst(lc); + ci->add(ci->usage, cust_usage); + cust_usage += ci->size; + } } /* dst += add */ diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h index 1b44d483d6..c60f6ffbbe 100644 --- a/src/include/commands/explain.h +++ b/src/include/commands/explain.h @@ -41,6 +41,7 @@ typedef struct ExplainState bool verbose; /* be verbose */ bool analyze; /* print actual times */ bool costs; /* print estimated costs */ + bool custom; /* print custom usage */ bool buffers; /* print buffer usage */ bool wal; /* print WAL usage */ bool timing; /* print detailed node timing */ @@ -92,7 +93,8 @@ extern void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage); + const BufferUsage *bufusage, + const CustomInstrumentationData *custusage); extern void ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc); extern void ExplainPrintTriggers(ExplainState *es, QueryDesc *queryDesc); @@ -125,5 +127,6 @@ extern void ExplainOpenGroup(const char *objtype, const char *labelname, bool labeled, ExplainState *es); extern void ExplainCloseGroup(const char *objtype, const char *labelname, bool labeled, ExplainState *es); +extern void ExplainIndentText(ExplainState *es); #endif /* EXPLAIN_H */ diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 6b8c00bb0f..8a828c04db 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ WalUsage *wal_usage; /* walusage area in DSM */ + char *custom_usage; /* points to custiom usage area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index bfd7b6d844..99ba91f57d 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -14,7 +14,7 @@ #define INSTRUMENT_H #include "portability/instr_time.h" - +#include "nodes/pg_list.h" /* * BufferUsage and WalUsage counters keep being incremented infinitely, @@ -65,6 +65,36 @@ typedef enum InstrumentOption INSTRUMENT_ALL = PG_INT32_MAX } InstrumentOption; +/* + * Maximal total size of all custom intrumentations + */ +#define MAX_CUSTOM_INSTR_SIZE 128 + +typedef struct { + char data[MAX_CUSTOM_INSTR_SIZE]; +} CustomInstrumentationData; + +typedef void CustomResourceUsage; +typedef struct ExplainState ExplainState; +typedef void (*cust_instr_add_t)(CustomResourceUsage* dst, CustomResourceUsage const* add); +typedef void (*cust_instr_accum_t)(CustomResourceUsage* acc, CustomResourceUsage const* end, CustomResourceUsage const* start); +typedef void (*cust_instr_show_t)(ExplainState* es, CustomResourceUsage const* usage, bool planning); + +typedef struct +{ + char const* name; /* instrumentation name (as will be recongnized in EXPLAIN options */ + Size size; + CustomResourceUsage* usage; + cust_instr_add_t add; + cust_instr_accum_t accum; + cust_instr_show_t show; + bool selected; /* selected in EXPLAIN options */ +} CustomInstrumentation; + +extern PGDLLIMPORT List* pgCustInstr; /* description of custom instrumentations */ +extern Size pgCustUsageSize; + + typedef struct Instrumentation { /* Parameters set at node creation: */ @@ -90,6 +120,8 @@ typedef struct Instrumentation double nfiltered2; /* # of tuples removed by "other" quals */ BufferUsage bufusage; /* total buffer usage */ WalUsage walusage; /* total WAL usage */ + CustomInstrumentationData cust_usage_start; /* state of custom usage at start */ + CustomInstrumentationData cust_usage; /* total custom usage */ } Instrumentation; typedef struct WorkerInstrumentation @@ -101,6 +133,11 @@ typedef struct WorkerInstrumentation extern PGDLLIMPORT BufferUsage pgBufferUsage; extern PGDLLIMPORT WalUsage pgWalUsage; + +extern void RegisterCustomInsrumentation(CustomInstrumentation* inst); +extern void GetCustomInstrumentationState(char* dst); +extern void AccumulateCustomInstrumentationState(char* dst, char const* before); + extern Instrumentation *InstrAlloc(int n, int instrument_options, bool async_mode); extern void InstrInit(Instrumentation *instr, int instrument_options); @@ -110,8 +147,8 @@ extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples); extern void InstrEndLoop(Instrumentation *instr); extern void InstrAggNode(Instrumentation *dst, Instrumentation *add); extern void InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage); -extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, char* custusage); +extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, char* custusage); extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,