From 5673e04db118f723094dfb05a3ee3bff895dd0f2 Mon Sep 17 00:00:00 2001 From: "okbob@github.com" Date: Sun, 11 Sep 2022 20:56:40 +0200 Subject: [PATCH v20220922 03/13] session variables Implementation storage and access routines. Session variables are stored in session memory inside dedicated hash table. Two levels of an access are implemented: API level and SQL level. Both levels are implemented in this commit. The most difficult part is cleaning (reset) of session variable. The content of session variable should be cleaned when related session variable is removed from system catalog. But queue of sinval messages can be truncated, or current transaction state can disallow an access to system catalog, so cleaning can be postponed. --- src/backend/access/transam/xact.c | 2 +- src/backend/catalog/dependency.c | 5 + src/backend/commands/session_variable.c | 1097 ++++++++++++++++++++++- src/backend/executor/execExpr.c | 75 ++ src/backend/executor/execExprInterp.c | 11 + src/backend/executor/execMain.c | 56 ++ src/backend/executor/execParallel.c | 148 ++- src/backend/jit/llvm/llvmjit_expr.c | 6 + src/backend/optimizer/plan/planner.c | 8 + src/backend/optimizer/plan/setrefs.c | 118 ++- src/backend/optimizer/util/clauses.c | 75 +- src/backend/parser/analyze.c | 9 + src/backend/parser/parse_expr.c | 232 ++++- src/backend/tcop/pquery.c | 3 + src/backend/utils/adt/ruleutils.c | 46 + src/backend/utils/cache/plancache.c | 22 +- src/backend/utils/fmgr/fmgr.c | 10 +- src/backend/utils/misc/guc_tables.c | 10 + src/include/catalog/pg_proc.dat | 7 + src/include/commands/session_variable.h | 8 +- src/include/executor/execExpr.h | 10 + src/include/executor/execdesc.h | 4 + src/include/nodes/execnodes.h | 19 + src/include/nodes/parsenodes.h | 1 + src/include/nodes/pathnodes.h | 4 + src/include/nodes/plannodes.h | 2 + src/include/nodes/primnodes.h | 12 +- src/include/optimizer/planmain.h | 2 + src/include/parser/parse_expr.h | 1 + src/include/parser/parse_node.h | 1 + src/include/utils/lsyscache.h | 8 +- src/tools/pgindent/typedefs.list | 3 + 32 files changed, 1934 insertions(+), 81 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 406258955a..e9c9aae5a3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2214,7 +2214,7 @@ CommitTransaction(void) */ smgrDoPendingSyncs(true, is_parallel_worker); - /* Let ON COMMIT DROP */ + /* Let ON COMMIT DROP or ON TRANSACTION END */ AtPreEOXact_SessionVariable_on_xact_actions(true); /* close large objects before lower-level cleanup */ diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index d8d512973d..8c2fbd1522 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -1909,6 +1909,11 @@ find_expr_references_walker(Node *node, { Param *param = (Param *) node; + /* A variable parameter depends on the session variable */ + if (param->paramkind == PARAM_VARIABLE) + add_object_address(OCLASS_VARIABLE, param->paramvarid, 0, + context->addrs); + /* A parameter must depend on the parameter's datatype */ add_object_address(OCLASS_TYPE, param->paramtype, 0, context->addrs); diff --git a/src/backend/commands/session_variable.c b/src/backend/commands/session_variable.c index 3eafc05f89..d34f193284 100644 --- a/src/backend/commands/session_variable.c +++ b/src/backend/commands/session_variable.c @@ -22,23 +22,91 @@ #include "commands/session_variable.h" #include "funcapi.h" #include "miscadmin.h" +#include "optimizer/optimizer.h" +#include "storage/lmgr.h" +#include "storage/proc.h" #include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" /* - * The life cycle of temporary session variable can be - * limmited by using clause ON COMMIT DROP. + * Values of session variables are stored in the backend local memory, + * in sessionvars hash table in binary format, in a dedicated memory + * context SVariableMemoryContext. A session variable value can stay + * valid for longer than a transaction. To make sure that the + * underlying memory is freed, we need to solve following issues: + * + * - We need to free local memory when the variable is droppedn, + * whether in the current transaction in the current session or by + * other sessions (other users). To protect the content against + * possibly rollbacked DROP VARIABLE commands, the memory shouldn't + * be freed immediately but be postponed until the end of the + * transaction. + * + * - The session variable can be dropped explicitly (by DROP VARIABLE + * command) or implicitly (using ON COMMIT DROP clause), and the + * value can be implicitly removed (using the ON TRANSACTION END + * clause). In all those cases the memory should be freed at the + * transaction end. + * + * To achieve that, we maintain 4 queues (implemented as list + * of variable oid, list of RecheckVariableRequests + * (requests are created in reaction on sinval) and lists + * of actions). List of actions are used whe we need to calculate + * the final transaction state of an entry's creator's + * subtransaction. List of session variables are used, when we + * don't need to do this calculation (we know, so. every values + * of session variables created with clauses ON COMMIT DROP or + * ON TRANSACTION END RESET will be purged from memory), but + * ON COMMIT DROP action or ON COMMIT RESET action can be done only + * when related sub transaction is committed (we don't want to lost + * value after aborted DROP VARIABLE command, we don't want to delete + * record from system catalog after aborted CREATE VARIABLE command, + * or we don't want to drop record from system catalog implicitly + * when TEMP ON COMMIT DROP variable was successfully dropped). + * Recheck of session's variable existence should be executed + * only when some operation over session's variable is executed + * first time in transaction or at the end of transaction. It + * should not be executed more times inside transaction. + * + * Although the variable's reset doesn't need full delete from + * sessionvars hash table, we do. It is the most simple solution, + * and it reduces possible state's space (and reduces sessionvars + * bloating). + * + * There are two different ways to do final access to session + * variables: buffered (indirect) or direct. Buffered access is used + * in queries, where we have to ensure an stability of passed values + * (then the session variable has same behaviour like external query + * parameters, what is consistent with using PL/pgSQL's variables in + * embedded queries in PL/pgSQL). + * + * This is implemented by using an aux buffer (an array) that holds a + * copy of values of used (in query) session variables. In the final + * end, the values from this array are passed as constant (EEOP_CONST). + * + * Direct access is used by simple expression evaluation (PLpgSQL). + * In this case we don't need to ensure the stability of passed + * values, and maintaining the buffer with copies of values of session + * variables can be useless overhead. In this case we just read the + * value of the session variable directly (EEOP_PARAM_VARIABLE). This + * strategy removes the necessity to modify related PL/pgSQL code to + * support session variables (the reading of session variables is + * fully transparent for PL/pgSQL). */ typedef enum SVariableXActAction { SVAR_ON_COMMIT_DROP, /* used for ON COMMIT DROP */ + SVAR_ON_COMMIT_RESET, /* used for DROP VARIABLE */ } SVariableXActAction; typedef struct SVariableXActActionItem { Oid varid; /* varid of session variable */ + SVariableXActAction action; /* * creating_subid is the ID of the creating subxact. If the action was @@ -49,13 +117,729 @@ typedef struct SVariableXActActionItem SubTransactionId deleting_subid; } SVariableXActActionItem; -/* List holds fields of SVariableXActActionItem type */ -static List *xact_drop_actions = NIL; +/* list holds fields of SVariableXActActionItem type */ +static List *xact_on_commit_actions = NIL; + +/* + * the ON COMMIT DROP and ON TRANSACTION END RESET variables + * are purged from memory every time. + */ +static List *xact_reset_varids = NIL; + +/* + * When the session variable is dropped we need to free local memory. The + * session variable can be dropped by current session, but it can be + * dropped by other's sessions too, so we have to watch sinval message. + * But because we don't want to free local memory immediately, we need to + * hold list of possibly dropped session variables and at the end of + * transaction, we check session variables from this list against system + * catalog. This check can be postponed into next transaction if + * current transactions is in aborted state. + */ +static List *xact_recheck_varids = NIL; + +typedef struct SVariableData +{ + Oid varid; /* pg_variable OID of this sequence (hash key) */ + + /* + * The session variable is identified by oid. The oid is unique in + * catalog. Unfortunately, the cleaning memory can be postponed to begin + * of next transaction in session, and theoreticaly, there can be + * different session variable with same oid. So we need another extra + * identifier that helps with identity check. We can use LSN number in + * session variable create time. The value of session variable (in memory) + * is valid, when there is an record in pg_variable with same oid and same + * create_lsn. + */ + XLogRecPtr create_lsn; /* used for session's variable identity check */ + + bool isnull; + bool freeval; + Datum value; + + Oid typid; + int16 typlen; + bool typbyval; + + bool is_domain; + void *domain_check_extra; + LocalTransactionId domain_check_extra_lxid; + + /* + * Top level local transaction id of the last transaction that dropped the + * variable if any. We need this information to avoid freeing memory for + * variable dropped by the local backend that may be rollbacked. + */ + LocalTransactionId drop_lxid; + + bool is_not_null; /* don't allow null values */ + bool is_immutable; /* true when variable is immutable */ + bool has_defexpr; /* true when variable has a default value */ + + bool is_valid; /* true when variable was successfully + * initialized */ + + uint32 hashvalue; /* used for pairing sinval message */ + + bool eox_reset; /* true, when lifecycle is limitted by + * transaction */ +} SVariableData; + +typedef SVariableData *SVariable; + +static HTAB *sessionvars = NULL; /* hash table for session variables */ +static HTAB *sessionvars_types = NULL; /* hash table for type fingerprints of + * session variables */ + +static MemoryContext SVariableMemoryContext = NULL; + +static void init_session_variable(SVariable svar, Variable *var); static void register_session_variable_xact_action(Oid varid, SVariableXActAction action); static void unregister_session_variable_xact_action(Oid varid, SVariableXActAction action); +/* + * Returns human readable name of SVariableXActAction value. + */ +static const char * +SvariableXActActionName(SVariableXActAction action) +{ + switch (action) + { + case SVAR_ON_COMMIT_DROP: + return "ON COMMIT DROP"; + case SVAR_ON_COMMIT_RESET: + return "ON COMMIT RESET"; + default: + elog(ERROR, "unknown SVariableXActAction action %d", + action); + } +} + +/* + * Releases stored data from session variable, but preserve the hash entry + * in sessionvars. When we replace row value by new value with same type + * fingerprint, we can keep field desc data. + */ +static void +free_session_variable_value(SVariable svar) +{ + if (svar->freeval) + { + pfree(DatumGetPointer(svar->value)); + svar->freeval = false; + } + + /* + * We can mark this session variable as valid when it has not default + * expression, and when null is allowed. When it has defexpr, then the + * content will be valid after an assignment or defexp evaluation. + */ + svar->is_valid = !svar->has_defexpr && !svar->is_not_null; + + /* Clean current value */ + svar->value = (Datum) 0; + svar->isnull = true; +} + +/* + * Release the variable defined by varid from sessionvars + * hashtab. + */ +static void +remove_session_variable(SVariable svar) +{ + free_session_variable_value(svar); + + /* + * In this moment, the session variable is not in catalog, so only saved + * oid can be displayed. + */ + elog(DEBUG1, "session variable (oid:%u) is removing from memory", + svar->varid); + + if (hash_search(sessionvars, + (void *) &svar->varid, + HASH_REMOVE, + NULL) == NULL) + elog(DEBUG1, "hash table corrupted"); +} + +/* + * Release the variable defined by varid from sessionvars + * hashtab. + */ +static void +remove_session_variable_by_id(Oid varid) +{ + SVariable svar; + bool found; + + if (!sessionvars) + return; + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + if (found) + remove_session_variable(svar); +} + +/* + * Callback function for session variable invalidation. + */ +static void +pg_variable_cache_callback(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS status; + SVariable svar; + + /* + * There is no guarantee of sessionvars being initialized, even when + * receiving an invalidation callback, as DISCARD [ ALL | VARIABLES ] + * destroys the hash table entirely. + */ + if (!sessionvars) + return; + + elog(DEBUG1, "pg_variable_cache_callback %u %u", cacheid, hashvalue); + + /* + * When the hashvalue is not specified, then we have to recheck all + * currently used session variables. Since we can't guarantee the exact + * session variable from its hashValue, we have to iterate over all items + * of hash table. + */ + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + if (hashvalue == 0 || svar->hashvalue == hashvalue) + { + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(SVariableMemoryContext); + + xact_recheck_varids = list_append_unique_oid(xact_recheck_varids, + svar->varid); + + MemoryContextSwitchTo(oldcxt); + + elog(DEBUG1, "session variable (oid:%u) should be rechecked (forced by sinval)", + svar->varid); + } + + /* + * although it there is low probability, we have to iterate over all + * locally set session variables, because hashvalue is not unique + * identifier. + */ + } +} + +/* + * Returns true, when the entry in pg_variable is valid for SVariable + */ +static bool +is_session_variable_valid(SVariable svar) +{ + HeapTuple tp; + bool result = false; + + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + if (HeapTupleIsValid(tp)) + { + /* + * In this case, the only oid cannot be used as unique identifier, + * because after oid's reset the oid can be used for new other session + * variable. We do second check against 64bit unique identifier. + */ + if (svar->create_lsn == ((Form_pg_variable) GETSTRUCT(tp))->create_lsn) + result = true; + + ReleaseSysCache(tp); + } + + return result; +} + +/* + * The possible invalidated variables (in memory) are checked + * against system catalog. This routine is called before + * any read or any write from/to session variable. + */ +static void +sync_sessionvars_all() +{ + SVariable svar; + ListCell *l; + + if (!xact_recheck_varids) + return; + + /* + * sessionvars is null after DISCARD VARIABLES. When we are sure, so there + * are not any active session variable in this session. + */ + if (!sessionvars) + { + list_free(xact_recheck_varids); + xact_recheck_varids = NIL; + return; + } + + elog(DEBUG1, "effective call of sync_sessionvars_all()"); + + /* + * This routine is called before any reading, so the session should be in + * transaction state. This is required for access to system catalog. + */ + Assert(IsTransactionState()); + + foreach(l, xact_recheck_varids) + { + bool found; + Oid varid = lfirst_oid(l); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + + /* + * Remove invalid variables, but don't touch variables that were + * dropped by the current top level local transaction, as there's no + * guarantee that the transaction will be committed. Such variables + * will be removed in the next transaction if needed. + */ + if (found) + { + /* + * If this is a variable dropped by the current transaction, + * ignore it and keep the oid to recheck in the next transaction. + */ + if (svar->drop_lxid == MyProc->lxid) + continue; + + if (!is_session_variable_valid(svar)) + remove_session_variable(svar); + } + + xact_recheck_varids = foreach_delete_current(xact_recheck_varids, l); + } +} + +/* + * Create the hash table for storing session variables + */ +static void +create_sessionvars_hashtables(void) +{ + HASHCTL vars_ctl; + + Assert(!sessionvars); + + /* set callbacks */ + if (!SVariableMemoryContext) + { + /* Read sinval messages */ + CacheRegisterSyscacheCallback(VARIABLEOID, + pg_variable_cache_callback, + (Datum) 0); + + /* We need our own long lived memory context */ + SVariableMemoryContext = + AllocSetContextCreate(TopMemoryContext, + "session variables", + ALLOCSET_START_SMALL_SIZES); + } + + Assert(SVariableMemoryContext); + + memset(&vars_ctl, 0, sizeof(vars_ctl)); + vars_ctl.keysize = sizeof(Oid); + vars_ctl.entrysize = sizeof(SVariableData); + vars_ctl.hcxt = SVariableMemoryContext; + + Assert(sessionvars == NULL); + + sessionvars = hash_create("Session variables", 64, &vars_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +/* + * Assign some content to the session variable. It's copied to + * SVariableMemoryContext if necessary. + * + * init_mode is true, when the value of session variable is being initialized + * by default expression or by null. Only in this moment we can allow to + * modify immutable variables with default expression. + * + * This routine have to do successfull change or leave memory without + * change. + */ +static void +set_session_variable(SVariable svar, Datum value, bool isnull, bool init_mode) +{ + Datum newval = value; + + Assert(svar && OidIsValid(svar->typid)); + + /* Don't allow assignment of null to NOT NULL variable */ + if (isnull && svar->is_not_null) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + /* + * Don't allow updating of immutable session variable that has assigned + * not null value or has default expression (and then the value should be + * result of default expression always). Don't do this check, when + * variable is being initialized. + */ + if (!init_mode && + (svar->is_immutable && + (svar->is_valid || svar->has_defexpr))) + ereport(ERROR, + (errcode(ERRCODE_ERROR_IN_ASSIGNMENT), + errmsg("session variable \"%s.%s\" is declared IMMUTABLE", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + if (!isnull) + { + MemoryContext oldcxt = MemoryContextSwitchTo(SVariableMemoryContext); + + newval = datumCopy(value, svar->typbyval, svar->typlen); + + MemoryContextSwitchTo(oldcxt); + } + + free_session_variable_value(svar); + + svar->value = newval; + + svar->isnull = isnull; + svar->freeval = newval != value; + svar->is_valid = true; + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) has new value", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid), + svar->varid); +} + +/* + * Initialize svar from var + * svar - SVariable - holds value + * var - Variable - holds metadata + */ +static void +init_session_variable(SVariable svar, Variable *var) +{ + MemoryContext oldcxt; + + Assert(OidIsValid(var->oid)); + + svar->varid = var->oid; + svar->create_lsn = var->create_lsn; + + svar->typid = var->typid; + + get_typlenbyval(var->typid, &svar->typlen, &svar->typbyval); + svar->is_domain = (get_typtype(var->typid) == TYPTYPE_DOMAIN); + + svar->domain_check_extra = NULL; + svar->domain_check_extra_lxid = InvalidLocalTransactionId; + + svar->value = (Datum) 0; + svar->isnull = true; + svar->freeval = false; + + svar->is_not_null = var->is_not_null; + svar->is_immutable = var->is_immutable; + svar->has_defexpr = var->has_defexpr; + + svar->hashvalue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(var->oid)); + + /* the value of variable is not known yet */ + svar->is_valid = false; + + svar->eox_reset = var->eoxaction == VARIABLE_EOX_RESET || + var->eoxaction == VARIABLE_EOX_DROP; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + if (svar->eox_reset) + xact_reset_varids = lappend_oid(xact_reset_varids, var->oid); + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Search the given session variable in the hash table. If it doesn't + * exist, then insert it (and calculate defexpr if it exists). + * + * Caller is responsible for doing permission checks. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +static SVariable +prepare_variable_for_reading(Oid varid) +{ + SVariable svar; + Variable var; + bool found; + + var.oid = InvalidOid; + + if (!sessionvars) + create_sessionvars_hashtables(); + + /* Ensure so all entries in sessionvars hash table are valid */ + sync_sessionvars_all(); + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + /* Return content if it is available and valid */ + if (!found || !svar->is_valid) + { + /* We need to load defexpr. */ + InitVariable(&var, varid, false); + + if (!found) + { + init_session_variable(svar, &var); + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) has new entry in memory (emitted by READ)", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid), + varid); + } + + /* Raise an error when we cannot initialize variable correctly */ + if (var.is_not_null && !var.defexpr) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)), + errdetail("The session variable was not initialized yet."))); + + if (svar->has_defexpr) + { + Datum value = (Datum) 0; + bool isnull; + EState *estate = NULL; + Expr *defexpr; + ExprState *defexprs; + MemoryContext oldcxt; + + /* Prepare default expr */ + estate = CreateExecutorState(); + + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); + + defexpr = expression_planner((Expr *) var.defexpr); + defexprs = ExecInitExpr(defexpr, NULL); + value = ExecEvalExprSwitchContext(defexprs, + GetPerTupleExprContext(estate), + &isnull); + + MemoryContextSwitchTo(oldcxt); + + /* Store result before releasing Executor memory */ + set_session_variable(svar, value, isnull, true); + + FreeExecutorState(estate); + } + else + set_session_variable(svar, (Datum) 0, true, true); + } + + /* + * Although the value of domain type should be valid (it is checked when + * it is assigned to session variable), we have to check related + * constraints anytime. It can be more expensive than in PL/pgSQL. + * PL/pgSQL forces domain checks when value is assigned to the variable or + * when value is returned from function. Fortunately, domain types manage + * cache of constraints by self. + */ + if (svar->is_domain) + { + /* + * Store domain_check extra in TopTransactionContext. When we are in + * other transaction, the domain_check_extra cache is not valid. + */ + if (svar->domain_check_extra_lxid != MyProc->lxid) + svar->domain_check_extra = NULL; + + domain_check(svar->value, svar->isnull, + svar->typid, &svar->domain_check_extra, + TopTransactionContext); + + svar->domain_check_extra_lxid = MyProc->lxid; + } + + return svar; +} + +/* + * Store the given value in an SVariable, and cache it if not already present. + * + * Caller is responsible for doing permission checks. + * We try not to break the previous value, if something is wrong. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +void +SetSessionVariable(Oid varid, Datum value, bool isNull) +{ + SVariable svar; + bool found; + + if (!sessionvars) + create_sessionvars_hashtables(); + + /* Ensure so all entries in sessionvars hash table are valid */ + sync_sessionvars_all(); + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + if (!found) + { + Variable var; + + /* We don't need to know defexpr here */ + InitVariable(&var, varid, true); + init_session_variable(svar, &var); + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) has new entry in memory (emitted by WRITE)", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid), + varid); + } + + set_session_variable(svar, value, isNull, false); +} + +/* + * Wrapper around SetSessionVariable after checking for correct permission. + */ +void +SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull) +{ + AclResult aclresult; + + /* + * Is possible to write to session variable? + */ + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_UPDATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, get_session_variable_name(varid)); + + SetSessionVariable(varid, value, isNull); +} + +/* + * Returns a copy of value of the session variable specified by varid + * Caller is responsible for doing permission checks. + */ +Datum +CopySessionVariable(Oid varid, bool *isNull, Oid *typid) +{ + SVariable svar; + Datum result; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + *typid = svar->typid; + + /* force copy of not null value */ + if (!svar->isnull) + { + result = datumCopy(svar->value, svar->typbyval, svar->typlen); + *isNull = false; + } + else + { + result = (Datum) 0; + *isNull = true; + } + + return (Datum) result; +} + +/* + * Returns a copy of value of the session variable specified by varid + * with check of expected type. Like previous function, the caller + * is responsible for doing permission checks. + */ +Datum +CopySessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + Datum result; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + if (!svar->isnull) + { + result = datumCopy(svar->value, svar->typbyval, svar->typlen); + *isNull = false; + } + else + { + result = (Datum) 0; + *isNull = true; + } + + return (Datum) result; +} + +/* + * Returns a value of session variable identified by varid with + * check of expected type. Like previous function, the called + * is reposible for doing permission check. + */ +Datum +GetSessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + *isNull = svar->isnull; + + return svar->value; +} + /* * Do the necessary work to setup local memory management of a new * variable. @@ -91,18 +875,88 @@ SessionVariableDropPostprocess(Oid varid) * again at end of xact time. */ unregister_session_variable_xact_action(varid, SVAR_ON_COMMIT_DROP); + + if (sessionvars) + { + bool found; + SVariable svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + + if (found) + { + /* + * Save the current top level local transaction id to make sure we + * don't automatically remove the local variable storage in + * sync_sessionvars_all, as the DROP VARIABLE will send an + * invalidation message. + */ + Assert(LocalTransactionIdIsValid(MyProc->lxid)); + svar->drop_lxid = MyProc->lxid; + + /* + * For variables that are not purged by default we need to + * register an SVAR_ON_COMMIT_RESET action to free the local + * memory for this variable when this transaction or + * subtransaction is committed (we don't need to wait for sinval + * message). The cleaning action for one session variable can be + * repeated in the action list without causing any problem, so we + * don't need to ensure uniqueness. We need a different action + * from RESET, because RESET is executed on any transaction end, + * but we want to execute cleaning only when the current + * transaction will be committed. This action can be reverted by + * ABORT of DROP VARIABLE command. + */ + if (!svar->eox_reset) + register_session_variable_xact_action(varid, + SVAR_ON_COMMIT_RESET); + } + } } /* - * Registration of actions to be executed on session variables at transaction - * end time. We want to drop temporary session variables with clause ON COMMIT - * DROP, or we want to reset values of session variables with clause ON - * TRANSACTION END RESET or we want to clean (reset) local memory allocated by - * values of dropped session variables. + * Fast drop of the complete content of all session variables hash table. + * This is code for DISCARD VARIABLES command. This command + * cannot be run inside transaction, so we don't need to handle + * end of transaction actions. */ +void +ResetSessionVariables(void) +{ + /* Destroy hash table and reset related memory context */ + if (sessionvars) + { + hash_destroy(sessionvars); + sessionvars = NULL; + + hash_destroy(sessionvars_types); + sessionvars_types = NULL; + } + + /* Release memory allocated by session variables */ + if (SVariableMemoryContext != NULL) + MemoryContextReset(SVariableMemoryContext); + + /* + * There are not any session variables left, so simply trim xact + * action list, and other lists. + */ + list_free_deep(xact_on_commit_actions); + xact_on_commit_actions = NIL; + + /* We should clean xact_reset_varids */ + list_free(xact_reset_varids); + xact_reset_varids = NIL; + + /* we should clean xact_recheck_varids */ + list_free(xact_recheck_varids); + xact_recheck_varids = NIL; +} /* - * Register a session variable xact action. + * Registration of actions to be executed on session variables at transaction + * end time. We want to drop temporary session variables with clause ON COMMIT + * DROP, or we want to clean (reset) local memory allocated by + * values of dropped session variables. */ static void register_session_variable_xact_action(Oid varid, @@ -111,18 +965,21 @@ register_session_variable_xact_action(Oid varid, SVariableXActActionItem *xact_ai; MemoryContext oldcxt; - oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + elog(DEBUG1, "SVariableXActAction \"%s\" is registered for session variable (oid:%u)", + SvariableXActActionName(action), varid); + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); xact_ai = (SVariableXActActionItem *) palloc(sizeof(SVariableXActActionItem)); xact_ai->varid = varid; + xact_ai->action = action; xact_ai->creating_subid = GetCurrentSubTransactionId(); xact_ai->deleting_subid = InvalidSubTransactionId; - Assert(action == SVAR_ON_COMMIT_DROP); - xact_drop_actions = lcons(xact_ai, xact_drop_actions); + xact_on_commit_actions = lcons(xact_ai, xact_on_commit_actions); MemoryContextSwitchTo(oldcxt); } @@ -139,14 +996,15 @@ unregister_session_variable_xact_action(Oid varid, { ListCell *l; - Assert(action == SVAR_ON_COMMIT_DROP); + elog(DEBUG1, "SVariableXActAction \"%s\" is unregistered for session variable (oid:%u)", + SvariableXActActionName(action), varid); - foreach(l, xact_drop_actions) + foreach(l, xact_on_commit_actions) { SVariableXActActionItem *xact_ai = (SVariableXActActionItem *) lfirst(l); - if (xact_ai->varid == varid) + if (xact_ai->action == action && xact_ai->varid == varid) xact_ai->deleting_subid = GetCurrentSubTransactionId(); } } @@ -160,40 +1018,68 @@ AtPreEOXact_SessionVariable_on_xact_actions(bool isCommit) { ListCell *l; - foreach(l, xact_drop_actions) + /* + * Clean memory for all eox_reset variables. Do it first, it reduces + * enhancing action lists about RECHECK action. + */ + foreach(l, xact_reset_varids) { - SVariableXActActionItem *xact_ai = - (SVariableXActActionItem *) lfirst(l); + remove_session_variable_by_id(lfirst_oid(l)); + } - /* Iterate only over entries that are still pending */ - if (xact_ai->deleting_subid == InvalidSubTransactionId) + /* We can clean xact_reset_varids */ + list_free(xact_reset_varids); + xact_reset_varids = NIL; + + if (isCommit && xact_on_commit_actions) + { + foreach(l, xact_on_commit_actions) { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(l); - /* - * ON COMMIT DROP is allowed only for temp session variables. So - * we should explicitly delete only when current transaction was - * committed. When it's rollback, then session variable is removed - * automatically. - */ - if (isCommit) + /* Iterate only over entries that are still pending */ + if (xact_ai->deleting_subid == InvalidSubTransactionId) { - ObjectAddress object; + if (xact_ai->action == SVAR_ON_COMMIT_DROP) + { + ObjectAddress object; - object.classId = VariableRelationId; - object.objectId = xact_ai->varid; - object.objectSubId = 0; + /* + * ON COMMIT DROP is allowed only for temp session variables. + * So we should explicitly delete only when current + * transaction was committed. When it's rollback, then session + * variable is removed automatically. + */ - /* - * Since this is an automatic drop, rather than one directly - * initiated by the user, we pass the - * PERFORM_DELETION_INTERNAL flag. - */ - elog(DEBUG1, "session variable (oid:%u) will be deleted (forced by SVAR_ON_COMMIT_DROP action)", - xact_ai->varid); + object.classId = VariableRelationId; + object.objectId = xact_ai->varid; + object.objectSubId = 0; - performDeletion(&object, DROP_CASCADE, - PERFORM_DELETION_INTERNAL | - PERFORM_DELETION_QUIETLY); + /* + * Since this is an automatic drop, rather than one directly + * initiated by the user, we pass the + * PERFORM_DELETION_INTERNAL flag. + */ + elog(DEBUG1, "session variable (oid:%u) will be deleted (forced by SVAR_ON_COMMIT_DROP action)", + xact_ai->varid); + + performDeletion(&object, DROP_CASCADE, + PERFORM_DELETION_INTERNAL | + PERFORM_DELETION_QUIETLY); + } + else + { + /* + * When we process DROP VARIABLE statement, we create + * SVAR_ON_COMMIT_RESET xact action. We want to process this + * action only when related transaction is commited (when DROP + * VARIABLE statement sucessfully processed). We want to preserve + * variable content, when the transaction with DROP VARAIBLE + * statement was reverted. + */ + remove_session_variable_by_id(xact_ai->varid); + } } } } @@ -202,8 +1088,32 @@ AtPreEOXact_SessionVariable_on_xact_actions(bool isCommit) * Any drop action left is an entry that was unregistered and not * rollbacked, so we can simply remove them. */ - list_free_deep(xact_drop_actions); - xact_drop_actions = NIL; + list_free_deep(xact_on_commit_actions); + xact_on_commit_actions = NIL; + + if (isCommit && xact_recheck_varids) + { + Assert(sessionvars); + + foreach(l, xact_recheck_varids) + { + SVariable svar; + bool found; + Oid varid = lfirst_oid(l); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + + if (found) + { + if (!is_session_variable_valid(svar)) + remove_session_variable(svar); + } + } + + list_free(xact_recheck_varids); + xact_recheck_varids = NIL; + } } /* @@ -219,7 +1129,7 @@ AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySu { ListCell *cur_item; - foreach(cur_item, xact_drop_actions) + foreach(cur_item, xact_on_commit_actions) { SVariableXActActionItem *xact_ai = (SVariableXActActionItem *) lfirst(cur_item); @@ -227,7 +1137,7 @@ AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySu if (!isCommit && xact_ai->creating_subid == mySubid) { /* cur_item must be removed */ - xact_drop_actions = foreach_delete_current(xact_drop_actions, cur_item); + xact_on_commit_actions = foreach_delete_current(xact_on_commit_actions, cur_item); pfree(xact_ai); } else @@ -240,3 +1150,98 @@ AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySu } } } + +/* + * pg_debug_show_used_session_variables - designed for testing + * + * returns content of session vars + */ +Datum +pg_debug_show_used_session_variables(PG_FUNCTION_ARGS) +{ +#define NUM_PG_DEBUG_SHOW_USED_SESSION_VARIABLES_ATTS 10 + + elog(DEBUG1, "pg_debug_show_used_session_variables start"); + + SetSingleFuncCall(fcinfo, 0); + + if (sessionvars) + { + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + HASH_SEQ_STATUS status; + SVariable svar; + + /* Ensure so all entries in sessionvars hash table are valid */ + sync_sessionvars_all(); + + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + Datum values[NUM_PG_DEBUG_SHOW_USED_SESSION_VARIABLES_ATTS]; + bool nulls[NUM_PG_DEBUG_SHOW_USED_SESSION_VARIABLES_ATTS]; + HeapTuple tp; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(svar->varid); + values[3] = ObjectIdGetDatum(svar->typid); + + /* check if session variable is visible in system catalog */ + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + /* + * Sessionvars can hold data of variables removed from catalog, + * (and not purged) and then namespacename and name cannot be read + * from catalog. + */ + if (HeapTupleIsValid(tp)) + { + Form_pg_variable varform = (Form_pg_variable) GETSTRUCT(tp); + + /* When we see data in catalog */ + values[1] = PointerGetDatum( + cstring_to_text( + get_namespace_name(varform->varnamespace))); + + values[2] = PointerGetDatum( + cstring_to_text(NameStr(varform->varname))); + + values[4] = PointerGetDatum( + cstring_to_text(format_type_be(svar->typid))); + values[5] = BoolGetDatum(false); + values[6] = BoolGetDatum(svar->is_valid); + + values[8] = BoolGetDatum( + pg_variable_aclcheck(svar->varid, GetUserId(), ACL_SELECT) == ACLCHECK_OK); + + values[9] = BoolGetDatum( + pg_variable_aclcheck(svar->varid, GetUserId(), ACL_UPDATE) == ACLCHECK_OK); + + ReleaseSysCache(tp); + } + else + { + /* + * When session variable was removed from catalog, but still + * it in memory. The memory was not purged yet. + */ + nulls[1] = true; + nulls[2] = true; + nulls[4] = true; + values[5] = BoolGetDatum(true); + nulls[6] = true; + nulls[7] = true; + nulls[8] = true; + } + + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + } + + elog(DEBUG1, "pg_debug_show_used_session_variables end"); + + return (Datum) 0; +} diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index 25a94bbaaa..953b139058 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -33,6 +33,7 @@ #include "access/nbtree.h" #include "catalog/objectaccess.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -994,6 +995,80 @@ ExecInitExprRec(Expr *node, ExprState *state, scratch.d.param.paramtype = param->paramtype; ExprEvalPushStep(state, &scratch); break; + + case PARAM_VARIABLE: + { + int es_num_session_variables = 0; + SessionVariableValue *es_session_variables = NULL; + + if (state->parent && state->parent->state) + { + es_session_variables = state->parent->state->es_session_variables; + es_num_session_variables = state->parent->state->es_num_session_variables; + } + + if (es_session_variables) + { + SessionVariableValue *var; + + /* + * Use buffered session variables when the + * buffer with copied values is avaiable + * (standard query executor mode) + */ + + /* Parameter sanity checks. */ + if (param->paramid >= es_num_session_variables) + elog(ERROR, "paramid of PARAM_VARIABLE param is out of range"); + + var = &es_session_variables[param->paramid]; + + if (var->typid != param->paramtype) + elog(ERROR, "type of buffered value is different than PARAM_VARIABLE type"); + + /* + * In this case, pass the value like + * a constant. + */ + scratch.opcode = EEOP_CONST; + scratch.d.constval.value = var->value; + scratch.d.constval.isnull = var->isnull; + ExprEvalPushStep(state, &scratch); + } + else + { + AclResult aclresult; + Oid varid = param->paramvarid; + Oid vartype = param->paramtype; + + /* + * When the expression is evaluated directly + * without query executor start (plpgsql simple + * expr evaluation), then the array es_session_variables + * is null. In this case we need to use direct + * access to session variables. The values are + * not protected by using copy, but it is not + * problem (we don't need to emulate stability + * of the value). + * + * In this case we should to do aclcheck, because + * usual aclcheck from standard_ExecutorStart + * is not executed in this case. Fortunately + * it is just once per transaction. + */ + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_SELECT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + scratch.opcode = EEOP_PARAM_VARIABLE; + scratch.d.vparam.varid = varid; + scratch.d.vparam.vartype = vartype; + ExprEvalPushStep(state, &scratch); + } + } + break; + case PARAM_EXTERN: /* diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 9b9bbf00a9..8f01ebfe65 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -59,6 +59,7 @@ #include "access/heaptoast.h" #include "catalog/pg_type.h" #include "commands/sequence.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -446,6 +447,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEOP_PARAM_EXEC, &&CASE_EEOP_PARAM_EXTERN, &&CASE_EEOP_PARAM_CALLBACK, + &&CASE_EEOP_PARAM_VARIABLE, &&CASE_EEOP_CASE_TESTVAL, &&CASE_EEOP_MAKE_READONLY, &&CASE_EEOP_IOCOERCE, @@ -1082,6 +1084,15 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_NEXT(); } + EEO_CASE(EEOP_PARAM_VARIABLE) + { + /* direct access to session variable (without buffering) */ + *op->resvalue = GetSessionVariableWithTypeCheck(op->d.vparam.varid, + op->resnull, + op->d.vparam.vartype); + EEO_NEXT(); + } + EEO_CASE(EEOP_CASE_TESTVAL) { /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index d78862e660..4b79566444 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -48,6 +48,7 @@ #include "catalog/pg_publication.h" #include "commands/matview.h" #include "commands/trigger.h" +#include "commands/session_variable.h" #include "executor/execdebug.h" #include "executor/nodeSubplan.h" #include "foreign/fdwapi.h" @@ -200,6 +201,61 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) Assert(queryDesc->sourceText != NULL); estate->es_sourceText = queryDesc->sourceText; + /* + * The executor doesn't work with session variables directly. Values of + * related session variables are copied to dedicated array, and this array + * is passed to executor. + */ + if (queryDesc->num_session_variables > 0) + { + /* + * When paralel access to query parameters (including related session + * variables) is required, then related session variables are restored + * (deserilized) in queryDesc already. So just push pointer of this + * array to executor's estate. + */ + estate->es_session_variables = queryDesc->session_variables; + estate->es_num_session_variables = queryDesc->num_session_variables; + } + else if (queryDesc->plannedstmt->sessionVariables) + { + ListCell *lc; + int nSessionVariables; + int i = 0; + + /* + * In this case, the query uses session variables, but we have to + * prepare the array with passed values (of used session variables) + * first. + */ + nSessionVariables = list_length(queryDesc->plannedstmt->sessionVariables); + + /* Create the array used for passing values of used session variables */ + estate->es_session_variables = (SessionVariableValue *) + palloc(nSessionVariables * sizeof(SessionVariableValue)); + + /* Fill the array */ + foreach(lc, queryDesc->plannedstmt->sessionVariables) + { + AclResult aclresult; + Oid varid = lfirst_oid(lc); + + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_SELECT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + estate->es_session_variables[i].varid = varid; + estate->es_session_variables[i].value = CopySessionVariable(varid, + &estate->es_session_variables[i].isnull, + &estate->es_session_variables[i].typid); + + i++; + } + + estate->es_num_session_variables = nSessionVariables; + } + /* * Fill in the query environment, if any, from queryDesc. */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 99512826c5..a07400e5ea 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,8 +12,9 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer/WAL usage info, and - * the actual plan to be passed down to the worker. + * any ParamListInfo associated with the query, buffer/WAL usage info, + * session variables buffer, and the actual plan to be passed down to + * the worker. * * IDENTIFICATION * src/backend/executor/execParallel.c @@ -66,6 +67,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_SESSION_VARIABLES UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -140,6 +142,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +/* Helper functions that can pass values of session variables */ +static Size EstimateSessionVariables(EState *estate); +static void SerializeSessionVariables(EState *estate, char **start_address); +static SessionVariableValue *RestoreSessionVariables(char **start_address, + int *num_session_variables); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -597,6 +605,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; + char *session_variables_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; @@ -606,6 +615,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int instrumentation_len = 0; int jit_instrumentation_len = 0; int instrument_offset = 0; + int session_variables_len = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; int query_len; @@ -661,6 +671,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized session variables. */ + session_variables_len = EstimateSessionVariables(estate); + shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* * Estimate space for BufferUsage. * @@ -755,6 +770,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); + /* Store serialized session variables. */ + session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space); + SerializeSessionVariables(estate, &session_variables_space); + /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); @@ -1402,6 +1422,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) SharedJitInstrumentation *jit_instrumentation; int instrument_options = 0; void *area_space; + char *sessionvariable_space; dsa_area *area; ParallelWorkerContext pwcxt; @@ -1427,6 +1448,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false); area = dsa_attach_in_place(area_space, seg); + /* Reconstruct session variables. */ + sessionvariable_space = shm_toc_lookup(toc, + PARALLEL_KEY_SESSION_VARIABLES, + false); + queryDesc->session_variables = + RestoreSessionVariables(&sessionvariable_space, + &queryDesc->num_session_variables); + /* Start up the executor */ queryDesc->plannedstmt->jitFlags = fpes->jit_flags; ExecutorStart(queryDesc, fpes->eflags); @@ -1495,3 +1524,118 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FreeQueryDesc(queryDesc); receiver->rDestroy(receiver); } + +/* + * Estimate the amount of space required to serialize a + * session variable. + */ +static Size +EstimateSessionVariables(EState *estate) +{ + int i; + Size sz = sizeof(int); + + if (estate->es_session_variables == NULL) + return sz; + + for (i = 0; i < estate->es_num_session_variables; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + + typeOid = svarval->typid; + + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + + /* space for datum/isnull */ + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + sz = add_size(sz, + datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen)); + } + + return sz; +} + +/* + * Serialize a session variables buffer into caller-provided storage. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte type OID, and then the datum as serialized by + * datumSerialize(). The caller is responsible for ensuring that there is + * enough storage to store the number of bytes that will be written; use + * EstimateSessionVariables to find out how many will be needed. + * *start_address is updated to point to the byte immediately following those + * written. + * + * RestoreSessionVariables can be used to recreate a session variable buffer + * based on the serialized representation; + */ +static void +SerializeSessionVariables(EState *estate, char **start_address) +{ + int nparams; + int i; + + /* Write number of parameters. */ + nparams = estate->es_num_session_variables; + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + /* Write each parameter in turn. */ + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + typeOid = svarval->typid; + + /* Write type OID. */ + memcpy(*start_address, &typeOid, sizeof(Oid)); + *start_address += sizeof(Oid); + + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + datumSerialize(svarval->value, svarval->isnull, typByVal, typLen, + start_address); + } +} + +static SessionVariableValue * +RestoreSessionVariables(char **start_address, int *num_session_variables) +{ + SessionVariableValue *session_variables; + int i; + int nparams; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + *num_session_variables = nparams; + session_variables = (SessionVariableValue *) + palloc(nparams * sizeof(SessionVariableValue)); + + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval = &session_variables[i]; + + /* Read type OID. */ + memcpy(&svarval->typid, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read datum/isnull. */ + svarval->value = datumRestore(start_address, &svarval->isnull); + } + + return session_variables; +} diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index 95d0807bdd..c5f824c0bd 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -1073,6 +1073,12 @@ llvm_compile_expr(ExprState *state) LLVMBuildBr(b, opblocks[opno + 1]); break; + case EEOP_PARAM_VARIABLE: + build_EvalXFunc(b, mod, "ExecEvalParamVariable", + v_state, op, v_econtext); + LLVMBuildBr(b, opblocks[opno + 1]); + break; + case EEOP_PARAM_CALLBACK: { LLVMTypeRef v_functype; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 8014d1fd25..4da28f76fc 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -317,6 +317,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->lastPlanNodeId = 0; glob->transientPlan = false; glob->dependsOnRole = false; + glob->sessionVariables = NIL; /* * Assess whether it's feasible to use parallel mode for this query. We @@ -530,6 +531,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, result->paramExecTypes = glob->paramExecTypes; /* utilityStmt should be null, but we might as well copy it */ result->utilityStmt = parse->utilityStmt; + result->sessionVariables = glob->sessionVariables; result->stmt_location = parse->stmt_location; result->stmt_len = parse->stmt_len; @@ -685,6 +687,12 @@ subquery_planner(PlannerGlobal *glob, Query *parse, */ pull_up_subqueries(root); + /* + * Check if some subquery uses session variable. Flag hasSessionVariables + * should be true if query or some subquery uses any session variable. + */ + pull_up_has_session_variables(root); + /* * If this is a simple UNION ALL query, flatten it into an appendrel. We * do this now because it requires applying pull_up_subqueries to the leaf diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 1cb0abdbc1..12c0364f32 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -181,6 +181,8 @@ static List *set_returning_clause_references(PlannerInfo *root, static List *set_windowagg_runcondition_references(PlannerInfo *root, List *runcondition, Plan *plan); +static bool pull_up_has_session_variables_walker(Node *node, + PlannerInfo *root); /***************************************************************************** @@ -1218,6 +1220,50 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) return plan; } +/* + * Search usage of session variables in subqueries + */ +void +pull_up_has_session_variables(PlannerInfo *root) +{ + Query *query = root->parse; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + } + else + { + (void) query_tree_walker(query, + pull_up_has_session_variables_walker, + (void *) root, 0); + } +} + +static bool +pull_up_has_session_variables_walker(Node *node, PlannerInfo *root) +{ + if (node == NULL) + return false; + if (IsA(node, Query)) + { + Query *query = (Query *) node; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + return false; + } + + /* Recurse into subselects */ + return query_tree_walker((Query *) node, + pull_up_has_session_variables_walker, + (void *) root, 0); + } + return expression_tree_walker(node, pull_up_has_session_variables_walker, + (void *) root); +} + /* * set_indexonlyscan_references * Do set_plan_references processing on an IndexOnlyScan @@ -1831,8 +1877,9 @@ copyVar(Var *var) * This is code that is common to all variants of expression-fixing. * We must look up operator opcode info for OpExpr and related nodes, * add OIDs from regclass Const nodes into root->glob->relationOids, and - * add PlanInvalItems for user-defined functions into root->glob->invalItems. - * We also fill in column index lists for GROUPING() expressions. + * add PlanInvalItems for user-defined functions and session variables into + * root->glob->invalItems. We also fill in column index lists for GROUPING() + * expressions. * * We assume it's okay to update opcode info in-place. So this could possibly * scribble on the planner's input data structures, but it's OK. @@ -1922,15 +1969,39 @@ fix_expr_common(PlannerInfo *root, Node *node) g->cols = cols; } } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + PlanInvalItem *inval_item = makeNode(PlanInvalItem); + + /* paramid is still session variable id */ + inval_item->cacheId = VARIABLEOID; + inval_item->hashValue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(p->paramvarid)); + + /* Append this variable to global, register dependency */ + root->glob->invalItems = lappend(root->glob->invalItems, + inval_item); + } + } } /* * fix_param_node * Do set_plan_references processing on a Param + * Collect session variables list and replace variable oid by + * index to collected list. * * If it's a PARAM_MULTIEXPR, replace it with the appropriate Param from * root->multiexpr_params; otherwise no change is needed. * Just for paranoia's sake, we make a copy of the node in either case. + * + * If it's a PARAM_VARIABLE, then we collect used session variables in + * list root->glob->sessionVariable. We should to assign Param paramvarid + * too, and it is position of related session variable in mentioned list. */ static Node * fix_param_node(PlannerInfo *root, Param *p) @@ -1949,6 +2020,41 @@ fix_param_node(PlannerInfo *root, Param *p) elog(ERROR, "unexpected PARAM_MULTIEXPR ID: %d", p->paramid); return copyObject(list_nth(params, colno - 1)); } + + if (p->paramkind == PARAM_VARIABLE) + { + ListCell *lc; + int n = 0; + bool found = false; + + /* We will modify object */ + p = (Param *) copyObject(p); + + /* + * Now, we can actualize list of session variables, and we can + * complete paramid parameter. + */ + foreach(lc, root->glob->sessionVariables) + { + if (lfirst_oid(lc) == p->paramvarid) + { + p->paramid = n; + found = true; + break; + } + n += 1; + } + + if (!found) + { + root->glob->sessionVariables = lappend_oid(root->glob->sessionVariables, + p->paramvarid); + p->paramid = n; + } + + return (Node *) p; + } + return (Node *) copyObject(p); } @@ -2010,7 +2116,10 @@ fix_alternative_subplan(PlannerInfo *root, AlternativeSubPlan *asplan, * replacing Aggref nodes that should be replaced by initplan output Params, * choosing the best implementation for AlternativeSubPlans, * looking up operator opcode info for OpExpr and related nodes, - * and adding OIDs from regclass Const nodes into root->glob->relationOids. + * adding OIDs from regclass Const nodes into root->glob->relationOids, + * and assigning paramvarid to PARAM_VARIABLE params, and collecting + * of OIDs of session variables in root->glob->sessionVariables list + * (paramvarid is an position of related session variable in this list). * * 'node': the expression to be modified * 'rtoffset': how much to increment varnos by @@ -2032,7 +2141,8 @@ fix_scan_expr(PlannerInfo *root, Node *node, int rtoffset, double num_exec) root->multiexpr_params != NIL || root->glob->lastPHId != 0 || root->minmax_aggs != NIL || - root->hasAlternativeSubPlans) + root->hasAlternativeSubPlans || + root->hasSessionVariables) { return fix_scan_expr_mutator(node, &context); } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index bf3a7cae60..c7e2711b37 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -26,6 +26,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -810,16 +811,17 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) /* * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted, unless they are PARAM_EXTERN Params or are - * PARAM_EXEC Params listed in safe_param_ids, meaning they could be - * either generated within workers or can be computed by the leader and - * then their value can be passed to workers. + * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE + * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they + * could be either generated within workers or can be computed by the + * leader and then their value can be passed to workers. */ else if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind == PARAM_EXTERN) + if (param->paramkind == PARAM_EXTERN || + param->paramkind == PARAM_VARIABLE) return false; if (param->paramkind != PARAM_EXEC || @@ -2230,6 +2232,7 @@ convert_saop_to_hashed_saop_walker(Node *node, void *context) * value of the Param. * 2. Fold stable, as well as immutable, functions to constants. * 3. Reduce PlaceHolderVar nodes to their contained expressions. + * 4. Current value of session variable can be used for estimation too. *-------------------- */ Node * @@ -2352,6 +2355,29 @@ eval_const_expressions_mutator(Node *node, } } } + else if (param->paramkind == PARAM_VARIABLE && + context->estimate) + { + int16 typLen; + bool typByVal; + Datum pval; + bool isnull; + + get_typlenbyval(param->paramtype, + &typLen, &typByVal); + + pval = CopySessionVariableWithTypeCheck(param->paramvarid, + &isnull, + param->paramtype); + + return (Node *) makeConst(param->paramtype, + param->paramtypmod, + param->paramcollid, + (int) typLen, + pval, + isnull, + typByVal); + } /* * Not replaceable, so just copy the Param (no need to @@ -4736,22 +4762,45 @@ substitute_actual_parameters_mutator(Node *node, { if (node == NULL) return NULL; + + /* + * SQL functions can contain two different kind of params. The nodes with + * paramkind PARAM_EXTERN are related to function's arguments (and should + * be replaced in this step), because this is how we apply the function's + * arguments for an expression. + * + * The nodes with paramkind PARAM_VARIABLE are related to usage of session + * variables. The values of session variables are not passed to expression + * by expression arguments, so it should not be replaced here by + * function's arguments. Although we could substitute params related to + * immutable session variables with default expression by this default + * expression, it is safer to not do it. This way we don't have to run + * security checks here. There can be some performance loss, but an access + * to session variable is fast (and the result of default expression is + * immediately materialized and can be reused). + */ if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind != PARAM_EXTERN) + if (param->paramkind != PARAM_EXTERN && + param->paramkind != PARAM_VARIABLE) elog(ERROR, "unexpected paramkind: %d", (int) param->paramkind); - if (param->paramid <= 0 || param->paramid > context->nargs) - elog(ERROR, "invalid paramid: %d", param->paramid); - /* Count usage of parameter */ - context->usecounts[param->paramid - 1]++; + if (param->paramkind == PARAM_EXTERN) + { + if (param->paramid <= 0 || param->paramid > context->nargs) + elog(ERROR, "invalid paramid: %d", param->paramid); - /* Select the appropriate actual arg and replace the Param with it */ - /* We don't need to copy at this time (it'll get done later) */ - return list_nth(context->args, param->paramid - 1); + /* Count usage of parameter */ + context->usecounts[param->paramid - 1]++; + + /* Select the appropriate actual arg and replace the Param with it */ + /* We don't need to copy at this time (it'll get done later) */ + return list_nth(context->args, param->paramid - 1); + } } + return expression_tree_mutator(node, substitute_actual_parameters_mutator, (void *) context); } diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 6688c2a865..702d875d0d 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -525,6 +525,8 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasAggs = pstate->p_hasAggs; + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -942,6 +944,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); @@ -1404,6 +1407,8 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -1622,6 +1627,7 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt) qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); @@ -1878,6 +1884,8 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -2409,6 +2417,7 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 3527b78695..58d27ca965 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -37,11 +37,12 @@ #include "utils/date.h" #include "utils/lsyscache.h" #include "utils/timestamp.h" +#include "utils/typcache.h" #include "utils/xml.h" /* GUC parameters */ bool Transform_null_equals = false; - +bool session_variables_ambiguity_warning = false; static Node *transformExprRecurse(ParseState *pstate, Node *expr); static Node *transformParamRef(ParseState *pstate, ParamRef *pref); @@ -83,6 +84,9 @@ static Expr *make_distinct_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, int location); static Node *make_nulltest_from_distinct(ParseState *pstate, A_Expr *distincta, Node *arg); +static Node *makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location); /* @@ -431,6 +435,32 @@ transformIndirection(ParseState *pstate, A_Indirection *ind) return result; } +/* + * Returns true, when expression of kind allows using of + * session variables. + */ +static bool +expr_kind_allows_session_variables(ParseExprKind p_expr_kind) +{ + switch (p_expr_kind) + { + case EXPR_KIND_JOIN_USING: + case EXPR_KIND_CHECK_CONSTRAINT: + case EXPR_KIND_DOMAIN_CHECK: + case EXPR_KIND_INDEX_EXPRESSION: + case EXPR_KIND_INDEX_PREDICATE: + case EXPR_KIND_COLUMN_DEFAULT: + case EXPR_KIND_TRIGGER_WHEN: + case EXPR_KIND_PARTITION_BOUND: + case EXPR_KIND_PARTITION_EXPRESSION: + case EXPR_KIND_GENERATED_COLUMN: + return false; + + default: + return true; + } +} + /* * Transform a ColumnRef. * @@ -779,6 +809,148 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) parser_errposition(pstate, cref->location))); } + /* + * There are contexts where session's variables are not allowed. The + * question is if we want to identify session's variables in these + * contexts? The code can be more simple, when we don't do it, but then we + * cannot to raise maybe useful message like "you cannot to use session + * variables here". On second hand, in this case the warnings about + * session's variable shadowing can be messy. + */ + if (expr_kind_allows_session_variables(pstate->p_expr_kind)) + { + Oid varid = InvalidOid; + Oid typid; + Oid collid; + int32 typmod; + char *attrname = NULL; + bool not_unique; + + /* + * Session variables are shadowed by columns, routine's variables or + * routine's arguments ever. We don't want to use session variable + * when it is not exactly shadowed, but RTE is valid like: + * + * CREATE TYPE T AS (c int); CREATE VARIABLE foo AS T; CREATE TABLE + * foo(a int, b int); + * + * SELECT foo.a, foo.b, foo.c FROM foo; + * + * This case can be messy and then we disallow it. When we know, so + * possible variable will be shadowed, we try to identify variable + * only when session_variables_ambiguity_warning is requested. + */ + if (node || + (!node && relname && crerr == CRERR_NO_COLUMN)) + { + /* + * In this path we just try (if it is wanted) detect if session + * variable is shadowed. + */ + if (session_variables_ambiguity_warning) + { + varid = IdentifyVariable(cref->fields, &attrname, ¬_unique); + + /* + * Some cases with ambiguous references can be solved without + * raising a warning. When there is a collision between column + * name (or label) and some session variable name, and when we + * know attribute name, then we can ignore the collision when: + * + * a) variable is of scalar type (then indirection cannot be + * applied on this session variable. + * + * b) when related variable has no field with the given + * attrname, then indirection cannot be applied on this + * session variable. + */ + if (OidIsValid(varid) && attrname && node) + { + get_session_variable_type_typmod_collid(varid, + &typid, &typmod, + &collid); + + if (type_is_rowtype(typid)) + { + TupleDesc tupdesc; + bool found = false; + int i; + + /* slow part, I hope it will not be to often */ + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + for (i = 0; i < tupdesc->natts; i++) + { + if (namestrcmp(&(TupleDescAttr(tupdesc, i)->attname), attrname) == 0 && + !TupleDescAttr(tupdesc, i)->attisdropped) + { + found = true; + break; + } + } + + ReleaseTupleDesc(tupdesc); + + /* There is no composite variable with this field. */ + if (!found) + varid = InvalidOid; + } + else + /* There is no composite variable with this name. */ + varid = InvalidOid; + } + + /* + * Raise warning when session variable reference is still + * visible. + */ + if (OidIsValid(varid)) + { + if (node) + ereport(WARNING, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("session variable \"%s\" is shadowed", + NameListToString(cref->fields)), + errdetail("Session variables can be shadowed by columns, routine's variables and routine's arguments with the same name."), + parser_errposition(pstate, cref->location))); + else + /* session variable is shadowed by RTE */ + ereport(WARNING, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("session variable \"%s.%s\" is shadowed", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)), + errdetail("Session variables can be shadowed by tables or table's aliases with the same name."), + parser_errposition(pstate, cref->location))); + } + } + } + else + { + varid = IdentifyVariable(cref->fields, &attrname, ¬_unique); + + if (OidIsValid(varid)) + { + Oid typid; + int32 typmod; + Oid collid; + + if (not_unique) + ereport(ERROR, + (errcode(ERRCODE_AMBIGUOUS_PARAMETER), + errmsg("session variable reference \"%s\" is ambiguous", + NameListToString(cref->fields)), + parser_errposition(pstate, cref->location))); + + get_session_variable_type_typmod_collid(varid, &typid, &typmod, + &collid); + + node = makeParamSessionVariable(pstate, + varid, typid, typmod, collid, + attrname, cref->location); + } + } + } + /* * Throw error if no translation found. */ @@ -813,6 +985,64 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) return node; } +/* + * Generate param variable for reference to session variable + */ +static Node * +makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location) +{ + Param *param; + + param = makeNode(Param); + + param->paramkind = PARAM_VARIABLE; + param->paramvarid = varid; + param->paramtype = typid; + param->paramtypmod = typmod; + param->paramcollid = collid; + + pstate->p_hasSessionVariables = true; + + if (attrname != NULL) + { + TupleDesc tupdesc; + int i; + + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + + for (i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + if (strcmp(attrname, NameStr(att->attname)) == 0 && + !att->attisdropped) + { + /* Success, so generate a FieldSelect expression */ + FieldSelect *fselect = makeNode(FieldSelect); + + fselect->arg = (Expr *) param; + fselect->fieldnum = i + 1; + fselect->resulttype = att->atttypid; + fselect->resulttypmod = att->atttypmod; + /* save attribute's collation for parse_collate.c */ + fselect->resultcollid = att->attcollation; + + ReleaseTupleDesc(tupdesc); + return (Node *) fselect; + } + } + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("could not identify column \"%s\" in variable", attrname), + parser_errposition(pstate, location))); + } + + return (Node *) param; +} + static Node * transformParamRef(ParseState *pstate, ParamRef *pref) { diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 5aa5a350f3..c5cf50687c 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -86,6 +86,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->queryEnv = queryEnv; qd->instrument_options = instrument_options; /* instrumentation wanted? */ + qd->num_session_variables = 0; + qd->session_variables = NULL; + /* null these fields until set by ExecutorStart */ qd->tupDesc = NULL; qd->estate = NULL; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 2b7b1b0c0f..e3beae0bea 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -38,6 +38,7 @@ #include "catalog/pg_statistic_ext.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" +#include "catalog/pg_variable.h" #include "commands/defrem.h" #include "commands/tablespace.h" #include "common/keywords.h" @@ -496,6 +497,7 @@ static char *generate_function_name(Oid funcid, int nargs, static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); static void add_cast_to(StringInfo buf, Oid typid); static char *generate_qualified_type_name(Oid typid); +static char *generate_session_variable_name(Oid varid); static text *string_to_text(char *str); static char *flatten_reloptions(Oid relid); static void get_reloptions(StringInfo buf, Datum reloptions); @@ -8073,6 +8075,14 @@ get_parameter(Param *param, deparse_context *context) return; } + /* translate paramvarid to session variable name */ + if (param->paramkind == PARAM_VARIABLE) + { + appendStringInfo(context->buf, "%s", + generate_session_variable_name(param->paramvarid)); + return; + } + /* * If it's an external parameter, see if the outermost namespace provides * function argument names. @@ -12087,6 +12097,42 @@ generate_collation_name(Oid collid) return result; } +/* + * generate_session_variable_name + * Compute the name to display for a session variable specified by OID + * + * The result includes all necessary quoting and schema-prefixing. + */ +static char * +generate_session_variable_name(Oid varid) +{ + HeapTuple tup; + Form_pg_variable varform; + char *varname; + char *nspname; + char *result; + + tup = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(varid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for variable %u", varid); + + varform = (Form_pg_variable) GETSTRUCT(tup); + + varname = NameStr(varform->varname); + + if (!VariableIsVisible(varid)) + nspname = get_namespace_name_or_temp(varform->varnamespace); + else + nspname = NULL; + + result = quote_qualified_identifier(nspname, varname); + + ReleaseSysCache(tup); + + return result; +} + /* * Given a C string, produce a TEXT datum. * diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 0d6a295674..fc71d27adb 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -58,6 +58,7 @@ #include "access/transam.h" #include "catalog/namespace.h" +#include "catalog/pg_variable.h" #include "executor/executor.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" @@ -1867,7 +1868,8 @@ ScanQueryForLocks(Query *parsetree, bool acquire) * Recurse into sublink subqueries, too. But we already did the ones in * the rtable and cteList. */ - if (parsetree->hasSubLinks) + if (parsetree->hasSubLinks || + parsetree->hasSessionVariables) { query_tree_walker(parsetree, ScanQueryWalker, (void *) &acquire, @@ -1891,6 +1893,20 @@ ScanQueryWalker(Node *node, bool *acquire) ScanQueryForLocks(castNode(Query, sub->subselect), *acquire); /* Fall through to process lefthand args of SubLink */ } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + if (acquire) + LockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + else + UnlockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + } + } /* * Do NOT recurse into Query nodes, because ScanQueryForLocks already @@ -2022,7 +2038,9 @@ PlanCacheRelCallback(Datum arg, Oid relid) /* * PlanCacheObjectCallback - * Syscache inval callback function for PROCOID and TYPEOID caches + * Syscache inval callback function for TYPEOID, PROCOID, NAMESPACEOID, + * OPEROID, AMOPOPID, FOREIGNSERVEROID, FOREIGNDATAWRAPPEROID and VARIABLEOID + * caches. * * Invalidate all plans mentioning the object with the specified hash value, * or all plans mentioning any member of this cache if hashvalue == 0. diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c index af74fe345e..8d61fc082c 100644 --- a/src/backend/utils/fmgr/fmgr.c +++ b/src/backend/utils/fmgr/fmgr.c @@ -1902,9 +1902,13 @@ get_call_expr_arg_stable(Node *expr, int argnum) */ if (IsA(arg, Const)) return true; - if (IsA(arg, Param) && - ((Param *) arg)->paramkind == PARAM_EXTERN) - return true; + if (IsA(arg, Param)) + { + Param *p = (Param *) arg; + + if (p->paramkind == PARAM_EXTERN || p->paramkind == PARAM_VARIABLE) + return true; + } return false; } diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index ab3140ff3a..420ffade7e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -1482,6 +1482,16 @@ struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"session_variables_ambiguity_warning", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Raise warning when reference to session variable is ambiguous."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &session_variables_ambiguity_warning, + false, + NULL, NULL, NULL + }, { {"db_user_namespace", PGC_SIGHUP, CONN_AUTH_AUTH, gettext_noop("Enables per-database user names."), diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a757ade8bc..e3c1af3082 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11823,4 +11823,11 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +{ oid => '8488', descr => 'debug list of used session variables', + proname => 'pg_debug_show_used_session_variables', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'record', proargtypes => '', + proallargtypes => '{oid,text,text,oid,text,bool,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o}', + proargnames => '{varid,schema,name,typid,typname,removed,has_value,can_read,can_write}', + prosrc => 'pg_debug_show_used_session_variables' }, ] diff --git a/src/include/commands/session_variable.h b/src/include/commands/session_variable.h index 51934e8d1a..3912ce39ef 100644 --- a/src/include/commands/session_variable.h +++ b/src/include/commands/session_variable.h @@ -26,16 +26,18 @@ extern void ResetSessionVariables(void); extern void SessionVariableCreatePostprocess(Oid varid, char eoxaction); extern void SessionVariableDropPostprocess(Oid varid); +extern void SessionVariableCreatePostprocess(Oid varid, char eoxaction); extern Datum CopySessionVariable(Oid varid, bool *isNull, Oid *typid); extern Datum CopySessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid); extern Datum GetSessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid); -extern void SetSessionVariable(Oid varid, Datum value, bool isNull, Oid typid); -extern void SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull, Oid typid); +extern void SetSessionVariable(Oid varid, Datum value, bool isNull); +extern void SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull); extern void AtPreEOXact_SessionVariable_on_xact_actions(bool isCommit); -extern void AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySubid, +extern void AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, + SubTransactionId mySubid, SubTransactionId parentSubid); #endif diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index e14f15d435..c7137a9b12 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -158,6 +158,7 @@ typedef enum ExprEvalOp EEOP_PARAM_EXEC, EEOP_PARAM_EXTERN, EEOP_PARAM_CALLBACK, + EEOP_PARAM_VARIABLE, /* return CaseTestExpr value */ EEOP_CASE_TESTVAL, @@ -382,6 +383,13 @@ typedef struct ExprEvalStep Oid paramtype; /* OID of parameter's datatype */ } param; + /* for EEOP_PARAM_VARIABLE */ + struct + { + Oid varid; /* OID of assigned variable */ + Oid vartype; /* OID of parameter's datatype */ + } vparam; + /* for EEOP_PARAM_CALLBACK */ struct { @@ -741,6 +749,8 @@ extern void ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalParamExtern(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalParamVariable(ExprState *state, ExprEvalStep *op, + ExprContext *econtext); extern void ExecEvalSQLValueFunction(ExprState *state, ExprEvalStep *op); extern void ExecEvalCurrentOfExpr(ExprState *state, ExprEvalStep *op); extern void ExecEvalNextValueExpr(ExprState *state, ExprEvalStep *op); diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index e79e2c001f..dbf4dc7ea0 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -48,6 +48,10 @@ typedef struct QueryDesc EState *estate; /* executor's query-wide state */ PlanState *planstate; /* tree of per-plan-node state */ + /* reference to session variables buffer */ + int num_session_variables; + SessionVariableValue *session_variables; + /* This field is set by ExecutorRun */ bool already_executed; /* true if previously executed */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 01b1727fc0..493b9dda65 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -590,6 +590,18 @@ typedef struct AsyncRequest * tuples) */ } AsyncRequest; +/* ---------------- + * SessionVariableValue + * ---------------- + */ +typedef struct SessionVariableValue +{ + Oid varid; + Oid typid; + bool isnull; + Datum value; +} SessionVariableValue; + /* ---------------- * EState information * @@ -641,6 +653,13 @@ typedef struct EState ParamListInfo es_param_list_info; /* values of external params */ ParamExecData *es_param_exec_vals; /* values of internal params */ + /* Variables info: */ + /* number of used session variables */ + int es_num_session_variables; + + /* array of copied values of session variables */ + SessionVariableValue *es_session_variables; + QueryEnvironment *es_queryEnv; /* query environment */ /* Other working state: */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 52bfd673c3..69639be467 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -145,6 +145,7 @@ typedef struct Query bool hasModifyingCTE; /* has INSERT/UPDATE/DELETE in WITH */ bool hasForUpdate; /* FOR [KEY] UPDATE/SHARE was specified */ bool hasRowSecurity; /* rewriter has applied some RLS policy */ + bool hasSessionVariables; /* uses session variables */ bool isReturn; /* is a RETURN statement */ diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 294cfe9c47..5a79a4d591 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -157,6 +157,8 @@ typedef struct PlannerGlobal /* partition descriptors */ PartitionDirectory partition_directory pg_node_attr(read_write_ignore); + + List *sessionVariables; /* list of used session variables */ } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ @@ -459,6 +461,8 @@ struct PlannerInfo /* true if planning a recursive WITH item */ bool hasRecursion; + /* true if session variables were used */ + bool hasSessionVariables; /* * Information about aggregates. Filled by preprocess_aggrefs(). */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index dca2a21e7a..e03727e19f 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -91,6 +91,8 @@ typedef struct PlannedStmt Node *utilityStmt; /* non-null if this is utility stmt */ + List *sessionVariables; /* list of OIDs for PARAM_VARIABLE Params */ + /* statement location in source string (copied from Query) */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 40661334bb..75d8daf267 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -43,7 +43,10 @@ typedef struct Alias List *colnames; /* optional list of column aliases */ } Alias; -/* What to do at commit time for temporary relations */ +/* + * What to do at commit time for temporary relations or + * persistent/temporary variable. + */ typedef enum OnCommitAction { ONCOMMIT_NOOP, /* No ON COMMIT clause (do nothing) */ @@ -296,13 +299,17 @@ typedef struct Const * of the `paramid' field contain the SubLink's subLinkId, and * the low-order 16 bits contain the column number. (This type * of Param is also converted to PARAM_EXEC during planning.) + * + * PARAM_VARIABLE: The parameter is an access to session variable + * paramid holds varid. */ typedef enum ParamKind { PARAM_EXTERN, PARAM_EXEC, PARAM_SUBLINK, - PARAM_MULTIEXPR + PARAM_MULTIEXPR, + PARAM_VARIABLE } ParamKind; typedef struct Param @@ -313,6 +320,7 @@ typedef struct Param Oid paramtype; /* pg_type OID of parameter's datatype */ int32 paramtypmod; /* typmod value, if known */ Oid paramcollid; /* OID of collation, or InvalidOid if none */ + Oid paramvarid; /* OID of session variable if it is used */ int location; /* token location, or -1 if unknown */ } Param; diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 9dffdcfd1e..ebb14e250f 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -117,4 +117,6 @@ extern void record_plan_function_dependency(PlannerInfo *root, Oid funcid); extern void record_plan_type_dependency(PlannerInfo *root, Oid typid); extern bool extract_query_dependencies_walker(Node *node, PlannerInfo *context); +extern void pull_up_has_session_variables(PlannerInfo *root); + #endif /* PLANMAIN_H */ diff --git a/src/include/parser/parse_expr.h b/src/include/parser/parse_expr.h index c8e5c57b43..14b0adb948 100644 --- a/src/include/parser/parse_expr.h +++ b/src/include/parser/parse_expr.h @@ -17,6 +17,7 @@ /* GUC parameters */ extern PGDLLIMPORT bool Transform_null_equals; +extern PGDLLIMPORT bool session_variables_ambiguity_warning; extern Node *transformExpr(ParseState *pstate, Node *expr, ParseExprKind exprKind); diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 4d65a9e9e8..d07f5b0584 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -212,6 +212,7 @@ struct ParseState bool p_hasTargetSRFs; bool p_hasSubLinks; bool p_hasModifyingCTE; + bool p_hasSessionVariables; Node *p_last_srf; /* most recent set-returning func/op found */ diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 8e2b733bb2..387fbcb52c 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -205,11 +205,11 @@ extern Oid get_subscription_oid(const char *subname, bool missing_ok); extern char *get_subscription_name(Oid subid, bool missing_ok); extern char *get_session_variable_name(Oid varid); -extern Oid get_session_variable_namespace(Oid varid); +extern Oid get_session_variable_namespace(Oid varid); extern void get_session_variable_type_typmod_collid(Oid varid, - Oid *typid, - int32 *typmod, - Oid *collid); + Oid *typid, + int32 *typmod, + Oid *collid); #define type_is_array(typid) (get_element_type(typid) != InvalidOid) /* type_is_array_domain accepts both plain arrays and domains over arrays */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 73dae3719b..f452d7e6d4 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2453,6 +2453,7 @@ SerializedTransactionState Session SessionBackupState SessionEndType +SessionVariableValue SetConstraintState SetConstraintStateData SetConstraintTriggerData @@ -2637,6 +2638,8 @@ SupportRequestRows SupportRequestSelectivity SupportRequestSimplify SupportRequestWFuncMonotonic +SVariable +SVariableData SVariableXActAction SVariableXActActionItem Syn -- 2.37.0