From bfb86896df3b6da65fb51828a91d81f30ab9f3df Mon Sep 17 00:00:00 2001 From: "okbob@github.com" Date: Mon, 11 Apr 2022 20:32:05 +0200 Subject: [PATCH 02/12] session variables Implementation storage and access routines. Session variables are stored in session memory inside dedicated hash table. Two levels of an access are implemented: API level and SQL level. Both levels are implemented in this commit. The most difficult part is cleaning (reset) of session variable. The content of session variable should be cleaned when related session variable is removed from system catalog. But queue of sinval messages can be truncated, or current transaction state can disallow an access to system catalog, so cleaning can be postponed. --- src/backend/access/transam/xact.c | 2 +- src/backend/catalog/dependency.c | 5 + src/backend/commands/sessionvariable.c | 1145 +++++++++++++++++++++++- src/backend/executor/execExpr.c | 75 ++ src/backend/executor/execExprInterp.c | 11 + src/backend/executor/execMain.c | 56 ++ src/backend/executor/execParallel.c | 148 ++- src/backend/jit/llvm/llvmjit_expr.c | 6 + src/backend/optimizer/plan/planner.c | 8 + src/backend/optimizer/plan/setrefs.c | 118 ++- src/backend/optimizer/util/clauses.c | 75 +- src/backend/parser/analyze.c | 13 +- src/backend/parser/parse_expr.c | 186 +++- src/backend/tcop/pquery.c | 3 + src/backend/utils/adt/ruleutils.c | 46 + src/backend/utils/cache/plancache.c | 19 +- src/backend/utils/fmgr/fmgr.c | 10 +- src/backend/utils/misc/guc.c | 12 + src/include/catalog/pg_proc.dat | 7 + src/include/executor/execExpr.h | 10 + src/include/executor/execdesc.h | 4 + src/include/nodes/execnodes.h | 19 + src/include/nodes/parsenodes.h | 2 + src/include/nodes/pathnodes.h | 4 + src/include/nodes/plannodes.h | 2 + src/include/nodes/primnodes.h | 12 +- src/include/optimizer/planmain.h | 2 + src/include/parser/parse_expr.h | 1 + src/include/parser/parse_node.h | 1 + 29 files changed, 1940 insertions(+), 62 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e72fd74145..355f62ae0f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2214,7 +2214,7 @@ CommitTransaction(void) */ smgrDoPendingSyncs(true, is_parallel_worker); - /* Let ON COMMIT DROP */ + /* Let ON COMMIT DROP or ON TRANSACTION END */ AtPreEOXact_SessionVariable_on_xact_actions(true); /* close large objects before lower-level cleanup */ diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 6255c2c72f..2b70faa173 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -1909,6 +1909,11 @@ find_expr_references_walker(Node *node, { Param *param = (Param *) node; + /* A variable parameter depends on the session variable */ + if (param->paramkind == PARAM_VARIABLE) + add_object_address(OCLASS_VARIABLE, param->paramvarid, 0, + context->addrs); + /* A parameter must depend on the parameter's datatype */ add_object_address(OCLASS_TYPE, param->paramtype, 0, context->addrs); diff --git a/src/backend/commands/sessionvariable.c b/src/backend/commands/sessionvariable.c index 3094bab82a..204582ecbc 100644 --- a/src/backend/commands/sessionvariable.c +++ b/src/backend/commands/sessionvariable.c @@ -23,22 +23,138 @@ #include "catalog/pg_variable.h" #include "commands/session_variable.h" #include "executor/executor.h" +#include "executor/svariableReceiver.h" +#include "funcapi.h" #include "optimizer/optimizer.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_type.h" +#include "rewrite/rewriteHandler.h" +#include "storage/lmgr.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" /* - * The life cycle of temporary session variable can be - * limmited by using clause ON COMMIT DROP. + * Values of session variables are stored in local memory, in + * sessionvars hash table in binary format and the life cycle of + * session variable can be longer than transaction. Then we + * need to solve following issues: + * + * - We need to purge memory when variable is dropped (in current + * transaction in current session or by other sessions (other + * users). There is a request to protect content against + * possibly aborted the DROP VARIABLE command. So the purging should + * not be executed immediately. It should be postponed until the end + * of the transaction. + * + * - The session variable can be dropped explicitly (by DROP VARIABLE) + * command or implicitly (by using ON COMMIT DROP clause). The + * purging memory at transaction end can be requested implicitly + * too (by using the ON TRANSACTION END clause). + * + * - We need to ensure that the returned value is valid. The stored + * value can be invalid from different reasons: + * + * a) format change - we doesn't support change of format for + * session variable, but format can be changed by ALTER TYPE + * operation or by owner's extension update. We store fingerprint + * of any type used by session variables, and before any + * read we check if this fingerprint is still valid (see + * SVariableType related operations). Built-in non composite + * types are trusty every time. + * + * For other cases we can check xmin, and if xmin is different, + * then we check the name. For types from extension we + * can check xmin of pg_extension related record (and if it + * is different we can check extension's name and extension's + * version). For domains we should to force cast to domain type + * to reply all constraints before returning the value. + * + * b) the variable can be dropped - we don't purge memory + * when the variable is dropped immediately, so theoretically + * we can return dropped value. The protection against this + * issue is based on invisibility of dropped session variables + * in system catalogue. The DROP commands invalidates related + * cached plans too, so it should work with prepared statements. + * Theoretically there is possible access to value of dropped + * variable by using known oid, but we don't try to solve this + * issue now (it is possible only when session variables API + * is used from extensions). The implementation watches sinval + * messages, and in message processing check the consistency + * of stored values in hash table against system catalogue. + * This check should be done after end of any transaction, but + * because for this check we need not aborted state, the check + * can be postponed to next transaction (when the variable + * will be used). Unfortunately, this check can be postponed + * to the future and between there can be oid overflow. We + * cannot to detect oid overflow, and then theoretically there is + * the possibility to find in catalogue a variable with same + * oid like oid of dropped value. But as protection against + * returning invalid value we can (and we do) check the used + * type, and when the type is not valid, then we don't allow + * returns any value. We cannot to check name or schema, because + * we suppport ALTER RENAME and ALTER SET SCHEMA. So, although + * there is an risk to return value of dropped variable (instead + * of new calculated default expr or NULL), there should not + * be risk of crash due to invalid format. + * + * The values of session variables are stored in hash table + * sessionvars, the fingerprints of used types are stored in + * hash table sessionvars_types. We maintain four queues + * (implemented as list of variable oid, list of RecheckVariableRequests + * (requests are created in reaction on sinval) and lists + * of actions). List of actions are used whe we need to calculate + * the final transaction state of an entry's creator's + * subtransaction. List of session variables are used, when we + * don't need to do this calculation (we know, so. every values + * of session variables created with clauses ON COMMIT DROP or + * ON TRANSACTION END RESET will be purged from memory), but + * ON COMMIT DROP action or ON COMMIT RESET action can be done only + * when related sub transaction is committed (we don't want to lost + * value after aborted DROP VARIABLE command, we don't want to delete + * record from system catalogue after aborted CREATE VARIABLE command, + * or we don't want to drop record from system catalog implicitly + * when TEMP ON COMMIT DROP variable was successfully dropped). + * Recheck of session's variable existence should be executed + * only when some operation over session's variable is executed + * first time in transaction or at the end of transaction. It + * should not be executed more times inside transaction. + * + * Although the variable's reset doesn't need full delete from + * sessionvars hash table, we do. It is the most simple solution, + * and it reduces possible state's space (and reduces sessionvars + * bloating). + * + * There are two different ways to do final access to session + * variables: buffered (indirect) or direct. Buffered access is used + * in queries, where we have to ensure an stability of passed values + * (then the session variable has same behaviour like external query + * parameters, what is consistent with using PL/pgSQL's variables in + * embedded queries in PL/pgSQL). + * + * This is implemented by using an aux buffer (an array) that holds a + * copy of values of used (in query) session variables. In the final + * end, the values from this array are passed as constant (EEOP_CONST). + * + * Direct access is used by simple expression evaluation (PLpgSQL). + * In this case we don't need to ensure the stability of passed + * values, and maintaining the buffer with copies of values of session + * variables can be useless overhead. In this case we just read the + * value of the session variable directly (EEOP_PARAM_VARIABLE). This + * strategy removes the necessity to modify related PL/pgSQL code to + * support session variables (the reading of session variables is + * fully transparent for PL/pgSQL). */ typedef enum SVariableXActAction { SVAR_ON_COMMIT_DROP, /* used for ON COMMIT DROP */ + SVAR_ON_COMMIT_RESET, /* used for DROP VARIABLE */ } SVariableXActAction; typedef struct SVariableXActActionItem @@ -54,12 +170,706 @@ typedef struct SVariableXActActionItem SubTransactionId deleting_subid; } SVariableXActActionItem; -/* List holds fields of SVariableXActActionItem type */ -static List *xact_drop_actions = NIL; +/* Both lists hold fields of SVariableXActActionItem type */ +static List *xact_on_commit_drop_actions = NIL; +static List *xact_on_commit_reset_actions = NIL; + +/* + * the ON COMMIT DROP and ON TRANSACTION END RESET variables + * are purged from memory every time. + */ +static List *xact_reset_varids = NIL; + +/* + * When the session variable is dropped we need to purge memory. The + * session variable can be dropped by current session, but it can be + * dropped by other's sessions too, so we have to watc sinval message. + * But because we don't want to purge memory immediately, we need to + * hold list of possibly dropped session variables and at the end of + * transaction, we check session variables from this list against system + * catalogue. This check can be postponed into next transaction if + * current transactions is in aborted state. When the check is postponed + * to the next transaction, then have to be executed just once time, and + * the fields of list created in current transaction have to be ignored + * (the sinval message can come first before sync_sessionvars_all call, + * and then we have to skip items created by current transaction, because + * we want to variables when the related operation (that emmits sinval) + * is in final state (at the end of related transaction). + * Saved lxid in synced_lxid is safeguard against repeated useles + * recheck inside one transaction. + */ +typedef struct RecheckVariableRequest +{ + Oid varid; + LocalTransactionId lxid; +} RecheckVariableRequest; + +static List *xact_recheck_requests = NIL; + +static LocalTransactionId synced_lxid = InvalidLocalTransactionId; + +typedef struct SVariableData +{ + Oid varid; /* pg_variable OID of this sequence (hash key) */ + + char *name; /* for debug purposes */ + char *nsname; /* for debug purposes */ + + SVariableType svartype; + int64 gennum; + + bool isnull; + bool freeval; + Datum value; + + bool is_rowtype; /* true when variable is composite */ + bool is_not_null; /* don't allow null values */ + bool is_immutable; /* true when variable is immutable */ + bool has_defexpr; /* true when variable has a default value */ + + bool is_valid; /* true when variable was successfully + * initialized */ + + uint32 hashvalue; + + bool eox_reset; /* true, when lifecycle is limitted by transaction */ +} SVariableData; + +typedef SVariableData * SVariable; + +static HTAB *sessionvars = NULL; /* hash table for session variables */ +static MemoryContext SVariableMemoryContext = NULL; + +static bool first_time = true; + static void register_session_variable_xact_action(Oid varid, SVariableXActAction action); static void unregister_session_variable_xact_action(Oid varid, SVariableXActAction action); +/* + * Returns human readable name of SVariableXActAction value. + */ +static const char * +SvariableXActActionName(SVariableXActAction action) +{ + switch (action) + { + case SVAR_ON_COMMIT_DROP: + return "ON COMMIT DROP"; + case SVAR_ON_COMMIT_RESET: + return "ON COMMIT RESET"; + default: + elog(ERROR, "unknown SVariableXActAction action %d", + action); + } +} + +/* + * Releases stored data from session variable, but preserve the hash entry + * in sessionvars. + */ +static void +free_session_variable_value(SVariable svar) +{ + if (svar->freeval) + pfree(DatumGetPointer(svar->value)); + + /* Clean current value */ + svar->value = (Datum) 0; + svar->isnull = true; + svar->freeval = false; + + /* + * We can mark this session variable as valid when + * it has not default expression, and when null is + * allowed. When it has defexpr, then the content + * will be valid after an assignment or defexp evaluation. + */ + svar->is_valid = !svar->has_defexpr && !svar->is_not_null; +} + +/* + * Release the variable defined by varid from sessionvars + * hashtab. + */ +static void +remove_session_variable(SVariable svar) +{ + free_session_variable_value(svar); + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) is removing from memory", + svar->nsname, svar->name, + svar->varid); + + if (hash_search(sessionvars, + (void *) &svar->varid, + HASH_REMOVE, + NULL) == NULL) + elog(DEBUG1, "hash table corrupted"); +} + +/* + * Release the variable defined by varid from sessionvars + * hashtab. + */ +static void +remove_session_variable_by_id(Oid varid) +{ + SVariable svar; + bool found; + + if (!sessionvars) + return; + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + if (found) + remove_session_variable(svar); +} + +/* + * Callback function for session variable invalidation. + */ +static void +pg_variable_cache_callback(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS status; + SVariable svar; + + /* + * There is no guarantee of sessionvars being initialized, even when + * receiving an invalidation callback, as DISCARD [ ALL | VARIABLES ] + * destroys the hash table entirely. + */ + if (!sessionvars) + return; + + /* + * When the hashvalue is not specified, then we have to recheck all + * currently used session variables. Since we can't guarantee the exact + * session variable from its hashValue, we have to iterate over all + * items of hash table. + */ + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + if (hashvalue == 0 || svar->hashvalue == hashvalue) + { + /* + * We don't need to execute recheck for variables, + * that will be surelly purged from memory at eox + * time. + */ + if (!svar->eox_reset) + { + RecheckVariableRequest *req = NULL; + ListCell *l; + + /* + * The recheck request should be postponed to the end of current + * transaction if there are request recheck from previous + * transaction. This ensure so we don't purge memory too + * early, when we process sinval from previous aborted + * transaction. Possible test case: + * + * LET var = X; + * BEGIN DROP VAR var; ROOLBACK; + * BEGIN DROP VAR var ROLLBACK; + * SELECT var; -- should be X + * + * Without postponing, the call of sync_sessionvars_all + * in second transaction purge (badly) memory, because + * sinval message from prrevious aborted transaction is + * processed and in this time related record from pg_variable + * is removed. So when we find request recheck from prev + * transaction, we just update lxid. + */ + foreach (l, xact_recheck_requests) + { + req = (RecheckVariableRequest *) lfirst(l); + if (req->varid == svar->varid) + { + if (req->lxid != MyProc->lxid) + { + req->lxid = MyProc->lxid; + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) should be rechecked (forced by sinval)" + " (found recheck request from previous transaction)", + svar->nsname, svar->name, + svar->varid); + + break; + } + } + } + + if (!req) + { + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + + req = (RecheckVariableRequest *) palloc(sizeof(RecheckVariableRequest)); + + req->varid = svar->varid; + req->lxid = MyProc->lxid; + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) should be rechecked (forced by sinval)", + svar->nsname, svar->name, + svar->varid); + + xact_recheck_requests = lappend(xact_recheck_requests, req); + + MemoryContextSwitchTo(oldcxt); + } + } + } + + /* + * although it there is low probability, we have to iterate + * over all actively used session variables, because hashvalue + * is not unique identifier. + */ + } +} + +/* + * When we need to recheck all session variables, then + * the most effective method is seq scan over hash tab. + * We need to check synchronization request before any + * read to be sure so returned data are valid. + */ +static void +sync_sessionvars_all() +{ + SVariable svar; + ListCell *l; + + if (synced_lxid == MyProc->lxid) + return; + + if (!xact_recheck_requests) + return; + + /* + * sessionvars is null after DISCARD VARIABLES. + * When we are sure, so there are not any + * active session variable in this session. + */ + if (!sessionvars) + { + list_free_deep(xact_recheck_requests); + xact_recheck_requests = NIL; + return; + } + + /* + * This routine is called before any reading. So the + * session should be in transaction state. This is required + * for access to system catalog. + */ + Assert(IsTransactionState()); + + foreach(l, xact_recheck_requests) + { + RecheckVariableRequest *req; + bool found; + + req = (RecheckVariableRequest *) lfirst(l); + + if (req->lxid != MyProc->lxid) + { + svar = (SVariable) hash_search(sessionvars, &req->varid, + HASH_FIND, &found); + + if (found) + { + HeapTuple tp; + + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(req->varid)); + + if (HeapTupleIsValid(tp)) + ReleaseSysCache(tp); + else + remove_session_variable(svar); + } + + xact_recheck_requests = foreach_delete_current(xact_recheck_requests, l); + pfree(req); + } + } + + /* Don't repeat this check in this transaction more time */ + synced_lxid = MyProc->lxid; +} + +/* + * Create the hash table for storing session variables + */ +static void +create_sessionvars_hashtable(void) +{ + HASHCTL ctl; + + /* set callbacks */ + if (first_time) + { + /* Read sinval messages */ + CacheRegisterSyscacheCallback(VARIABLEOID, + pg_variable_cache_callback, + (Datum) 0); + /* needs its own long lived memory context */ + SVariableMemoryContext = + AllocSetContextCreate(TopMemoryContext, + "session variables", + ALLOCSET_START_SMALL_SIZES); + + first_time = false; + } + + Assert(SVariableMemoryContext); + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(SVariableData); + ctl.hcxt = SVariableMemoryContext; + + Assert(sessionvars == NULL); + + sessionvars = hash_create("Session variables", 64, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +/* + * Assign some content to the session variable. It's copied to + * SVariableMemoryContext if necessary. + * + * init_mode is true, when the value of session variable is being initialized + * by default expression or by null. Only in this moment we can allow to + * modify immutable variables with default expression. + */ +static void +set_session_variable(SVariable svar, Datum value, + bool isnull, Oid typid, + bool init_mode) +{ + MemoryContext oldcxt; + Datum newval = value; + + /* Don't allow assignment of null to NOT NULL variable */ + if (isnull && svar->is_not_null) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + if (svar->typid != typid) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("type \"%s\" of assigned value is different than type \"%s\" of session variable \"%s.%s\"", + format_type_be(typid), + format_type_be(svar->typid), + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + /* + * Don't allow updating of immutable session variable that has assigned + * not null value or has default expression (and then the value should be + * result of default expression always). Don't do this check, when variable + * is being initialized. + */ + if (!init_mode && + (svar->is_immutable && (svar->is_valid || svar->has_defexpr))) + ereport(ERROR, + (errcode(ERRCODE_ERROR_IN_ASSIGNMENT), + errmsg("session variable \"%s.%s\" is declared IMMUTABLE", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + /* copy value to session persistent context */ + oldcxt = MemoryContextSwitchTo(SVariableMemoryContext); + if (!isnull) + newval = datumCopy(value, svar->typbyval, svar->typlen); + MemoryContextSwitchTo(oldcxt); + + free_session_variable_value(svar); + + svar->value = newval; + svar->isnull = isnull; + svar->freeval = newval != value; + svar->is_valid = true; +} + +/* + * Initialize svar from var + * svar - SVariable - holds value + * var - Variable - holds metadata + */ +static void +init_session_variable(SVariable svar, Variable *var) +{ + MemoryContext oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + + Assert(OidIsValid(var->oid)); + + svar->varid = var->oid; + + svar->name = pstrdup(var->varname); + svar->nsname = get_namespace_name(var->varnamespace); + + svar->svartype = get_svariabletype(var->typid); + svar->gennum = get_svariable_valid_type_gennum(svar->svartype); + + svar->isnull = true; + svar->freeval = false; + svar->value = (Datum) 0; + + svar->is_rowtype = type_is_rowtype(var->typid); + svar->is_not_null = var->is_not_null; + svar->is_immutable = var->is_immutable; + svar->has_defexpr = var->has_defexpr; + + svar->hashvalue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(var->oid)); + + /* the value of variable is not known yet */ + svar->is_valid = false; + + svar->eox_reset = var->eoxaction == VARIABLE_EOX_RESET || + var->eoxaction == VARIABLE_EOX_DROP; + + if (svar->eox_reset) + { + + MemoryContext oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + + xact_reset_varids = lappend_oid(xact_reset_varids, var->oid); + + MemoryContextSwitchTo(oldcxt); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Search the given session variable in the hash table. If it doesn't + * exist, then insert it (and calculate defexpr if it exists). + * + * Caller is responsible for doing permission checks. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +static SVariable +prepare_variable_for_reading(Oid varid) +{ + SVariable svar; + Variable var; + bool found; + + var.oid = InvalidOid; + + if (!sessionvars) + create_sessionvars_hashtable(); + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + /* Ensure so all entries in sessionvars hash table are valid */ + sync_sessionvars_all(); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + /* Return content if it is available and valid */ + if (found && svar->is_valid) + return svar; + + /* We need to load defexpr. */ + initVariable(&var, varid, false); + + if (!found) + init_session_variable(svar, &var); + + /* Raise an error when we cannot initialize variable correctly */ + if (var.is_not_null && !var.defexpr) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)), + errdetail("The session variable was not initialized yet."))); + + if (svar->has_defexpr) + { + Datum value = (Datum) 0; + bool isnull; + EState *estate = NULL; + Expr *defexpr; + ExprState *defexprs; + MemoryContext oldcxt; + + /* Prepare default expr */ + estate = CreateExecutorState(); + + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); + + defexpr = expression_planner((Expr *) var.defexpr); + defexprs = ExecInitExpr(defexpr, NULL); + value = ExecEvalExprSwitchContext(defexprs, + GetPerTupleExprContext(estate), + &isnull); + + + /* Store result before releasing Executor memory */ + set_session_variable(svar, value, isnull, svar->typid, true); + + MemoryContextSwitchTo(oldcxt); + + FreeExecutorState(estate); + } + else + set_session_variable(svar, (Datum) 0, true, svar->typid, true); + + return svar; +} + +/* + * Store the given value in an SVariable, and cache it if not already present. + * + * Caller is responsible for doing permission checks. + * We try not to break the previous value, if something is wrong. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +void +SetSessionVariable(Oid varid, Datum value, bool isNull, Oid typid) +{ + SVariable svar; + bool found; + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + if (!sessionvars) + create_sessionvars_hashtable(); + + /* Ensure so all entries in sessionvars hash table are valid */ + sync_sessionvars_all(); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + if (!found) + { + Variable var; + + /* We don't need to know defexpr here */ + initVariable(&var, varid, true); + init_session_variable(svar, &var); + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) has new entry in memory (emitted by WRITE)", + svar->nsname, svar->name, + varid); + } + + set_session_variable(svar, value, isNull, typid, false); +} + +/* + * Wrapper around SetSessionVariable after checking for correct permission. + */ +void +SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull, Oid typid) +{ + AclResult aclresult; + + /* + * Is possible to write to session variable? + */ + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_WRITE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, get_session_variable_name(varid)); + + SetSessionVariable(varid, value, isNull, typid); +} + +/* + * Returns a copy of value of the session variable specified by varid + * Caller is responsible for doing permission checks. + */ +Datum +CopySessionVariable(Oid varid, bool *isNull, Oid *typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + *isNull = svar->isnull; + *typid = svar->typid; + + if (!svar->isnull) + return datumCopy(svar->value, svar->typbyval, svar->typlen); + + return (Datum) 0; +} + +/* + * Returns a copy of value of the session variable specified by varid + * with check of expected type. Like previous function, the caller + * is responsible for doing permission checks. + */ +Datum +CopySessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + *isNull = svar->isnull; + + if (!svar->isnull) + return datumCopy(svar->value, svar->typbyval, svar->typlen); + + return (Datum) 0; +} + +/* + * Returns a value of session variable identified by varid with + * check of expected type. Like previous function, the called + * is reposible for doing permission check. + */ +Datum +GetSessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + *isNull = svar->isnull; + + if (svar->isnull) + return (Datum) 0; + + return svar->value; +} + /* * Routines used for manipulation with session variables from * SQL level @@ -170,19 +980,24 @@ DefineSessionVariable(ParseState *pstate, CreateSessionVarStmt *stmt) } /* - * Drop variable by OID. This routine doesn't try to remove - * the value of session variable immediately. It will be - * removed on transaction end in sync_sessionvars_xact_callback - * routine. This routine manipulate just with system catalog. + * Drop variable by OID. This routine can be called by directly + * command DROP VARIABLE or indirectly for TEMP ON COMMIT DROP + * variables. + * + * In first case we want to postpone memory purging to commit or + * abort time. In second case, the menory will be purged by + * processing xact_reset_varids list. In second case, the calling + * of unregistration of SVAR_ON_COMMIT_DROP action is useless, but + * we are not able to detect which case is executed. */ void RemoveSessionVariable(Oid varid) { + SVariable svar; + bool found; Relation rel; HeapTuple tup; - elog(DEBUG1, "session variable (oid:%u) is deleted from pg_variable", varid); - rel = table_open(VariableRelationId, RowExclusiveLock); tup = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(varid)); @@ -190,6 +1005,18 @@ RemoveSessionVariable(Oid varid) if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for variable %u", varid); + if (message_level_is_interesting(DEBUG1)) + { + Form_pg_variable varform; + + varform = (Form_pg_variable) GETSTRUCT(tup); + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) is deleted from pg_variable", + get_namespace_name(varform->varnamespace), + NameStr(varform->varname), + varid); + } + CatalogTupleDelete(rel, &tup->t_self); ReleaseSysCache(tup); @@ -197,17 +1024,81 @@ RemoveSessionVariable(Oid varid) table_close(rel, RowExclusiveLock); /* - * We removed entry from catalog already, we should not do it - * again at end of xact time. + * We removed entry from catalog already, we must not do it + * again at end of xact time */ unregister_session_variable_xact_action(varid, SVAR_ON_COMMIT_DROP); + + if (sessionvars) + { + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + + /* + * For variables that are not purged by default we need to + * register SVAR_ON_COMMIT_RESET action. This action can + * be reverted by ABORT of DROP VARIABLE command. + */ + if (found && !svar->eox_reset) + { + /* + * And we want to enforce variable clearning when this transaction or + * subtransaction will be committed (we don't need to wait for + * sinval message). The cleaning action for one session variable + * can be repeated in the action list without causing any problem, + * so we don't need to ensure uniqueness. We need a different action + * from RESET, because RESET is executed on any transaction end, + * but we want to execute cleaning only when the current transaction + * will be committed. + */ + register_session_variable_xact_action(varid, SVAR_ON_COMMIT_RESET); + } + } +} + +/* + * Fast drop of the complete content of all session variables hash table. + * This is code for DISCARD VARIABLES command. This command + * cannot be run inside transaction, so we don't need to handle + * end of transaction actions. + */ +void +ResetSessionVariables(void) +{ + /* Destroy hash table and reset related memory context */ + if (sessionvars) + { + hash_destroy(sessionvars); + sessionvars = NULL; + } + + /* Release memory allocated by session variables */ + if (SVariableMemoryContext != NULL) + MemoryContextReset(SVariableMemoryContext); + + /* + * There are not any session variables left, so simply trim + * both xact action lists. + */ + list_free_deep(xact_on_commit_drop_actions); + xact_on_commit_drop_actions = NIL; + + list_free_deep(xact_on_commit_reset_actions); + xact_on_commit_reset_actions = NIL; + + /* We should clean xact_reset_varids */ + list_free(xact_reset_varids); + xact_reset_varids = NIL; + + /* we should clean xact_recheck_requests */ + list_free_deep(xact_recheck_requests); + xact_recheck_requests = NIL; } /* * Registration of actions to be executed on session variables at transaction * end time. We want to drop temporary session variables with clause ON COMMIT - * DROP, or we want to reset values of session variables with clause ON - * TRANSACTION END RESET or we want to clean (reset) local memory allocated by + * DROP, or we want to clean (reset) local memory allocated by * values of dropped session variables. */ @@ -221,6 +1112,9 @@ register_session_variable_xact_action(Oid varid, SVariableXActActionItem *xact_ai; MemoryContext oldcxt; + elog(DEBUG1, "SVariableXActAction \"%s\" is registered for session variable (oid:%u)", + SvariableXActActionName(action), varid); + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); xact_ai = (SVariableXActActionItem *) @@ -231,8 +1125,10 @@ register_session_variable_xact_action(Oid varid, xact_ai->creating_subid = GetCurrentSubTransactionId(); xact_ai->deleting_subid = InvalidSubTransactionId; - Assert(action == SVAR_ON_COMMIT_DROP); - xact_drop_actions = lcons(xact_ai, xact_drop_actions); + if (action == SVAR_ON_COMMIT_DROP) + xact_on_commit_drop_actions = lcons(xact_ai, xact_on_commit_drop_actions); + else + xact_on_commit_reset_actions = lcons(xact_ai, xact_on_commit_reset_actions); MemoryContextSwitchTo(oldcxt); } @@ -251,7 +1147,10 @@ unregister_session_variable_xact_action(Oid varid, Assert(action == SVAR_ON_COMMIT_DROP); - foreach(l, xact_drop_actions) + elog(DEBUG1, "SVariableXActAction \"%s\" is unregistered for session variable (oid:%u)", + SvariableXActActionName(action), varid); + + foreach(l, xact_on_commit_drop_actions) { SVariableXActActionItem *xact_ai = (SVariableXActActionItem *) lfirst(l); @@ -270,25 +1169,39 @@ AtPreEOXact_SessionVariable_on_xact_actions(bool isCommit) { ListCell *l; - foreach(l, xact_drop_actions) + /* + * Clean memory for all eox_reset variables. Do it + * first, it reduces enhancing action lists about + * RECHECK action. + */ + foreach(l, xact_reset_varids) { - SVariableXActActionItem *xact_ai = - (SVariableXActActionItem *) lfirst(l); + remove_session_variable_by_id(lfirst_oid(l)); + } + + /* We can clean xact_reset_varids */ + list_free(xact_reset_varids); + xact_reset_varids = NIL; - /* Iterate only over entries that are still pending */ - if (xact_ai->deleting_subid == InvalidSubTransactionId) + if (isCommit && xact_on_commit_drop_actions) + { + foreach(l, xact_on_commit_drop_actions) { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(l); - /* - * ON COMMIT DROP is allowed only for temp session - * variables. So we should explicitly delete only when - * current transaction was committed. When it's rollback, - * then session variable is removed automatically. - */ - if (isCommit) + /* Iterate only over entries that are still pending */ + if (xact_ai->deleting_subid == InvalidSubTransactionId) { ObjectAddress object; + /* + * ON COMMIT DROP is allowed only for temp session + * variables. So we should explicitly delete only when + * current transaction was committed. When it's rollback, + * then session variable is removed automatically. + */ + object.classId = VariableRelationId; object.objectId = xact_ai->varid; object.objectSubId = 0; @@ -312,8 +1225,63 @@ AtPreEOXact_SessionVariable_on_xact_actions(bool isCommit) * Any drop action left is an entry that was unregistered and not * rollbacked, so we can simply remove them. */ - list_free_deep(xact_drop_actions); - xact_drop_actions = NIL; + list_free_deep(xact_on_commit_drop_actions); + xact_on_commit_drop_actions = NIL; + + if (isCommit && xact_recheck_requests) + { + Assert(sessionvars); + + foreach(l, xact_recheck_requests) + { + SVariable svar; + bool found; + RecheckVariableRequest *req; + + req = (RecheckVariableRequest *) lfirst(l); + + svar = (SVariable) hash_search(sessionvars, &req->varid, + HASH_FIND, &found); + + if (found) + { + HeapTuple tp; + + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + if (HeapTupleIsValid(tp)) + ReleaseSysCache(tp); + else + remove_session_variable(svar); + } + } + + list_free_deep(xact_recheck_requests); + xact_recheck_requests = NIL; + } + + if (isCommit && xact_on_commit_reset_actions) + { + foreach(l, xact_on_commit_reset_actions) + { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(l); + + /* + * When we process DROP VARIABLE statement, we create + * SVAR_ON_COMMIT_RESET xact action. We want to process + * this action only when related transaction is commited + * (when DROP VARIABLE statement sucessfully processed). + * We want to preserve variable content, when the transaction + * with DROP VARAIBLE statement was reverted. + */ + if (xact_ai->deleting_subid == InvalidSubTransactionId) + remove_session_variable_by_id(xact_ai->varid); + } + } + + list_free_deep(xact_on_commit_reset_actions); + xact_on_commit_reset_actions = NIL; } /* @@ -329,7 +1297,33 @@ AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySu { ListCell *cur_item; - foreach(cur_item, xact_drop_actions) + foreach(cur_item, xact_on_commit_drop_actions) + { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(cur_item); + + if (!isCommit && xact_ai->creating_subid == mySubid) + { + /* cur_item must be removed */ + xact_on_commit_drop_actions = foreach_delete_current(xact_on_commit_drop_actions, cur_item); + pfree(xact_ai); + } + else + { + /* cur_item must be preserved */ + if (xact_ai->creating_subid == mySubid) + xact_ai->creating_subid = parentSubid; + if (xact_ai->deleting_subid == mySubid) + xact_ai->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId; + } + } + + /* + * Reset and recheck actions - cleaning memory should be done every time + * (when the variable with short life cycle was used) and then + * cannot be removed from xact action list. + */ + foreach(cur_item, xact_on_commit_reset_actions) { SVariableXActActionItem *xact_ai = (SVariableXActActionItem *) lfirst(cur_item); @@ -337,7 +1331,8 @@ AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySu if (!isCommit && xact_ai->creating_subid == mySubid) { /* cur_item must be removed */ - xact_drop_actions = foreach_delete_current(xact_drop_actions, cur_item); + xact_on_commit_reset_actions = + foreach_delete_current(xact_on_commit_reset_actions, cur_item); pfree(xact_ai); } else @@ -350,3 +1345,87 @@ AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySu } } } + +/* + * pg_debug_show_used_session_variables - designed for testing + * + * returns content of session vars + */ +Datum +pg_debug_show_used_session_variables(PG_FUNCTION_ARGS) +{ +#define NUM_PG_DEBUG_SHOW_USED_SESSION_VARIABLES_ATTS 10 + + SetSingleFuncCall(fcinfo, 0); + + if (sessionvars) + { + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + HASH_SEQ_STATUS status; + SVariable svar; + + /* Ensure so all entries in sessionvars hash table are valid */ + sync_sessionvars_all(); + + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + Datum values[NUM_PG_DEBUG_SHOW_USED_SESSION_VARIABLES_ATTS]; + bool nulls[NUM_PG_DEBUG_SHOW_USED_SESSION_VARIABLES_ATTS]; + HeapTuple tp; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(svar->varid); + + values[3] = ObjectIdGetDatum(svar->svartype->typid); + values[4] = PointerGetDatum(cstring_to_text(svar->svartype->typname)); + values[7] = BoolGetDatum(session_variable_use_valid_type(svar)); + + /* check if session variable is visible in system catalogue */ + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + /* + * Sessionvars can hold data of variables removed from catalogue, + * (and not purged) and then namespacename and name cannot be read + * from catalogue. + */ + if (HeapTupleIsValid(tp)) + { + ReleaseSysCache(tp); + + /* When we see data in catalogue */ + nsname = get_namespace_name(get_session_variable_namespace(svar->varid)); + name = get_session_variable_name(svar->varid); + + values[5] = BoolGetDatum(false); + values[6] = BoolGetDatum(svar->is_valid); + values[8] = BoolGetDatum(pg_variable_aclcheck(svar->varid, GetUserId(), ACL_READ) == ACLCHECK_OK); + values[9] = BoolGetDatum(pg_variable_aclcheck(svar->varid, GetUserId(), ACL_WRITE) == ACLCHECK_OK); + } + else + { + /* + * When session variable was removed from catalogue, but still it + * in memory. The memory was not purged yet. + */ + nsname = svar->nsname; + name = svar->name; + + values[5] = BoolGetDatum(true); + nulls[6] = true; + nulls[8] = true; + nulls[9] = true; + } + + values[1] = PointerGetDatum(cstring_to_text(nsname)); + values[2] = PointerGetDatum(cstring_to_text(name)); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + } + + return (Datum) 0; +} diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index d0a57c7aae..89e6727893 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -33,6 +33,7 @@ #include "access/nbtree.h" #include "catalog/objectaccess.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -994,6 +995,80 @@ ExecInitExprRec(Expr *node, ExprState *state, scratch.d.param.paramtype = param->paramtype; ExprEvalPushStep(state, &scratch); break; + + case PARAM_VARIABLE: + { + int es_num_session_variables = 0; + SessionVariableValue *es_session_variables = NULL; + + if (state->parent && state->parent->state) + { + es_session_variables = state->parent->state->es_session_variables; + es_num_session_variables = state->parent->state->es_num_session_variables; + } + + if (es_session_variables) + { + SessionVariableValue *var; + + /* + * Use buffered session variables when the + * buffer with copied values is avaiable + * (standard query executor mode) + */ + + /* Parameter sanity checks. */ + if (param->paramid >= es_num_session_variables) + elog(ERROR, "paramid of PARAM_VARIABLE param is out of range"); + + var = &es_session_variables[param->paramid]; + + if (var->typid != param->paramtype) + elog(ERROR, "type of buffered value is different than PARAM_VARIABLE type"); + + /* + * In this case, pass the value like + * a constant. + */ + scratch.opcode = EEOP_CONST; + scratch.d.constval.value = var->value; + scratch.d.constval.isnull = var->isnull; + ExprEvalPushStep(state, &scratch); + } + else + { + AclResult aclresult; + Oid varid = param->paramvarid; + Oid vartype = param->paramtype; + + /* + * When the expression is evaluated directly + * without query executor start (plpgsql simple + * expr evaluation), then the array es_session_variables + * is null. In this case we need to use direct + * access to session variables. The values are + * not protected by using copy, but it is not + * problem (we don't need to emulate stability + * of the value). + * + * In this case we should to do aclcheck, because + * usual aclcheck from standard_ExecutorStart + * is not executed in this case. Fortunately + * it is just once per transaction. + */ + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_READ); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + scratch.opcode = EEOP_PARAM_VARIABLE; + scratch.d.vparam.varid = varid; + scratch.d.vparam.vartype = vartype; + ExprEvalPushStep(state, &scratch); + } + } + break; + case PARAM_EXTERN: /* diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 636794ca6f..ce32fe0c25 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -61,6 +61,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/sequence.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -455,6 +456,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEOP_PARAM_EXEC, &&CASE_EEOP_PARAM_EXTERN, &&CASE_EEOP_PARAM_CALLBACK, + &&CASE_EEOP_PARAM_VARIABLE, &&CASE_EEOP_CASE_TESTVAL, &&CASE_EEOP_MAKE_READONLY, &&CASE_EEOP_IOCOERCE, @@ -1094,6 +1096,15 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_NEXT(); } + EEO_CASE(EEOP_PARAM_VARIABLE) + { + /* direct access to session variable (without buffering) */ + *op->resvalue = GetSessionVariableWithTypeCheck(op->d.vparam.varid, + op->resnull, + op->d.vparam.vartype); + EEO_NEXT(); + } + EEO_CASE(EEOP_CASE_TESTVAL) { /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index ef2fd46092..5e915cb1e7 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -48,6 +48,7 @@ #include "catalog/pg_publication.h" #include "commands/matview.h" #include "commands/trigger.h" +#include "commands/session_variable.h" #include "executor/execdebug.h" #include "executor/nodeSubplan.h" #include "foreign/fdwapi.h" @@ -200,6 +201,61 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) Assert(queryDesc->sourceText != NULL); estate->es_sourceText = queryDesc->sourceText; + /* + * The executor doesn't work with session variables directly. Values + * of related session variables are copied to dedicated array, and + * this array is passed to executor. + */ + if (queryDesc->num_session_variables > 0) + { + /* + * When paralel access to query parameters (including related + * session variables) is required, then related session variables + * are restored (deserilized) in queryDesc already. So just push + * pointer of this array to executor's estate. + */ + estate->es_session_variables = queryDesc->session_variables; + estate->es_num_session_variables = queryDesc->num_session_variables; + } + else if (queryDesc->plannedstmt->sessionVariables) + { + ListCell *lc; + int nSessionVariables; + int i = 0; + + /* + * In this case, the query uses session variables, but we have + * to prepare the array with passed values (of used session + * variables) first. + */ + nSessionVariables = list_length(queryDesc->plannedstmt->sessionVariables); + + /* Create the array used for passing values of used session variables */ + estate->es_session_variables = (SessionVariableValue *) + palloc(nSessionVariables * sizeof(SessionVariableValue)); + + /* Fill the array */ + foreach(lc, queryDesc->plannedstmt->sessionVariables) + { + AclResult aclresult; + Oid varid = lfirst_oid(lc); + + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_READ); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + estate->es_session_variables[i].varid = varid; + estate->es_session_variables[i].value = CopySessionVariable(varid, + &estate->es_session_variables[i].isnull, + &estate->es_session_variables[i].typid); + + i++; + } + + estate->es_num_session_variables = nSessionVariables; + } + /* * Fill in the query environment, if any, from queryDesc. */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index f1fd7f7e8b..1e04923788 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,8 +12,9 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer/WAL usage info, and - * the actual plan to be passed down to the worker. + * any ParamListInfo associated with the query, buffer/WAL usage info, + * session variables buffer, and the actual plan to be passed down to + * the worker. * * IDENTIFICATION * src/backend/executor/execParallel.c @@ -66,6 +67,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_SESSION_VARIABLES UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -140,6 +142,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +/* Helper functions that can pass values of session variables */ +static Size EstimateSessionVariables(EState *estate); +static void SerializeSessionVariables(EState *estate, char **start_address); +static SessionVariableValue * RestoreSessionVariables(char **start_address, + int *num_session_variables); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -597,6 +605,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; + char *session_variables_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; @@ -606,6 +615,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int instrumentation_len = 0; int jit_instrumentation_len = 0; int instrument_offset = 0; + int session_variables_len = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; int query_len; @@ -661,6 +671,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized session variables. */ + session_variables_len = EstimateSessionVariables(estate); + shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* * Estimate space for BufferUsage. * @@ -755,6 +770,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); + /* Store serialized session variables. */ + session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space); + SerializeSessionVariables(estate, &session_variables_space); + /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); @@ -1402,6 +1422,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) SharedJitInstrumentation *jit_instrumentation; int instrument_options = 0; void *area_space; + char *sessionvariable_space; dsa_area *area; ParallelWorkerContext pwcxt; @@ -1427,6 +1448,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false); area = dsa_attach_in_place(area_space, seg); + /* Reconstruct session variables. */ + sessionvariable_space = shm_toc_lookup(toc, + PARALLEL_KEY_SESSION_VARIABLES, + false); + queryDesc->session_variables = + RestoreSessionVariables(&sessionvariable_space, + &queryDesc->num_session_variables); + /* Start up the executor */ queryDesc->plannedstmt->jitFlags = fpes->jit_flags; ExecutorStart(queryDesc, fpes->eflags); @@ -1495,3 +1524,118 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FreeQueryDesc(queryDesc); receiver->rDestroy(receiver); } + +/* + * Estimate the amount of space required to serialize a + * session variable. + */ +static Size +EstimateSessionVariables(EState *estate) +{ + int i; + Size sz = sizeof(int); + + if (estate->es_session_variables == NULL) + return sz; + + for (i = 0; i < estate->es_num_session_variables; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + + typeOid = svarval->typid; + + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + + /* space for datum/isnull */ + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + sz = add_size(sz, + datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen)); + } + + return sz; +} + +/* + * Serialize a session variables buffer into caller-provided storage. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte type OID, and then the datum as serialized by + * datumSerialize(). The caller is responsible for ensuring that there is + * enough storage to store the number of bytes that will be written; use + * EstimateSessionVariables to find out how many will be needed. + * *start_address is updated to point to the byte immediately following those + * written. + * + * RestoreSessionVariables can be used to recreate a session variable buffer + * based on the serialized representation; + */ +static void +SerializeSessionVariables(EState *estate, char **start_address) +{ + int nparams; + int i; + + /* Write number of parameters. */ + nparams = estate->es_num_session_variables; + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + /* Write each parameter in turn. */ + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + typeOid = svarval->typid; + + /* Write type OID. */ + memcpy(*start_address, &typeOid, sizeof(Oid)); + *start_address += sizeof(Oid); + + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + datumSerialize(svarval->value, svarval->isnull, typByVal, typLen, + start_address); + } +} + +static SessionVariableValue * +RestoreSessionVariables(char **start_address, int *num_session_variables) +{ + SessionVariableValue *session_variables; + int i; + int nparams; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + *num_session_variables = nparams; + session_variables = (SessionVariableValue *) + palloc(nparams * sizeof(SessionVariableValue)); + + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval = &session_variables[i]; + + /* Read type OID. */ + memcpy(&svarval->typid, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read datum/isnull. */ + svarval->value = datumRestore(start_address, &svarval->isnull); + } + + return session_variables; +} diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index bd3965143d..dca5942b7a 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -1073,6 +1073,12 @@ llvm_compile_expr(ExprState *state) LLVMBuildBr(b, opblocks[opno + 1]); break; + case EEOP_PARAM_VARIABLE: + build_EvalXFunc(b, mod, "ExecEvalParamVariable", + v_state, op, v_econtext); + LLVMBuildBr(b, opblocks[opno + 1]); + break; + case EEOP_PARAM_CALLBACK: { LLVMTypeRef v_functype; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index cf9e0a74db..97e1038656 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -317,6 +317,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->lastPlanNodeId = 0; glob->transientPlan = false; glob->dependsOnRole = false; + glob->sessionVariables = NIL; /* * Assess whether it's feasible to use parallel mode for this query. We @@ -530,6 +531,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, result->paramExecTypes = glob->paramExecTypes; /* utilityStmt should be null, but we might as well copy it */ result->utilityStmt = parse->utilityStmt; + result->sessionVariables = glob->sessionVariables; result->stmt_location = parse->stmt_location; result->stmt_len = parse->stmt_len; @@ -685,6 +687,12 @@ subquery_planner(PlannerGlobal *glob, Query *parse, */ pull_up_subqueries(root); + /* + * Check if some subquery uses session variable. Flag hasSessionVariables + * should be true if query or some subquery uses any session variable. + */ + pull_up_has_session_variables(root); + /* * If this is a simple UNION ALL query, flatten it into an appendrel. We * do this now because it requires applying pull_up_subqueries to the leaf diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 1cb0abdbc1..c7edc4e75a 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -181,6 +181,8 @@ static List *set_returning_clause_references(PlannerInfo *root, static List *set_windowagg_runcondition_references(PlannerInfo *root, List *runcondition, Plan *plan); +static bool pull_up_has_session_variables_walker(Node *node, + PlannerInfo *root); /***************************************************************************** @@ -1218,6 +1220,50 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) return plan; } +/* + * Search usage of session variables in subqueries + */ +void +pull_up_has_session_variables(PlannerInfo *root) +{ + Query *query = root->parse; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + } + else + { + (void) query_tree_walker(query, + pull_up_has_session_variables_walker, + (void *) root, 0); + } +} + +static bool +pull_up_has_session_variables_walker(Node *node, PlannerInfo *root) +{ + if (node == NULL) + return false; + if (IsA(node, Query)) + { + Query *query = (Query *) node; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + return false; + } + + /* Recurse into subselects */ + return query_tree_walker((Query *) node, + pull_up_has_session_variables_walker, + (void *) root, 0); + } + return expression_tree_walker(node, pull_up_has_session_variables_walker, + (void *) root); +} + /* * set_indexonlyscan_references * Do set_plan_references processing on an IndexOnlyScan @@ -1831,8 +1877,9 @@ copyVar(Var *var) * This is code that is common to all variants of expression-fixing. * We must look up operator opcode info for OpExpr and related nodes, * add OIDs from regclass Const nodes into root->glob->relationOids, and - * add PlanInvalItems for user-defined functions into root->glob->invalItems. - * We also fill in column index lists for GROUPING() expressions. + * add PlanInvalItems for user-defined functions and session variables into + * root->glob->invalItems. We also fill in column index lists for GROUPING() + * expressions. * * We assume it's okay to update opcode info in-place. So this could possibly * scribble on the planner's input data structures, but it's OK. @@ -1922,15 +1969,39 @@ fix_expr_common(PlannerInfo *root, Node *node) g->cols = cols; } } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + PlanInvalItem *inval_item = makeNode(PlanInvalItem); + + /* paramid is still session variable id */ + inval_item->cacheId = VARIABLEOID; + inval_item->hashValue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(p->paramvarid)); + + /* Append this variable to global, register dependency */ + root->glob->invalItems = lappend(root->glob->invalItems, + inval_item); + } + } } /* * fix_param_node * Do set_plan_references processing on a Param + * Collect session variables list and replace variable oid by + * index to collected list. * * If it's a PARAM_MULTIEXPR, replace it with the appropriate Param from * root->multiexpr_params; otherwise no change is needed. * Just for paranoia's sake, we make a copy of the node in either case. + * + * If it's a PARAM_VARIABLE, then we collect used session variables in + * list root->glob->sessionVariable. We should to assign Param paramvarid + * too, and it is position of related session variable in mentioned list. */ static Node * fix_param_node(PlannerInfo *root, Param *p) @@ -1949,6 +2020,41 @@ fix_param_node(PlannerInfo *root, Param *p) elog(ERROR, "unexpected PARAM_MULTIEXPR ID: %d", p->paramid); return copyObject(list_nth(params, colno - 1)); } + + if (p->paramkind == PARAM_VARIABLE) + { + ListCell *lc; + int n = 0; + bool found = false; + + /* We will modify object */ + p = (Param *) copyObject(p); + + /* + * Now, we can actualize list of session variables, and we can complete + * paramid parameter. + */ + foreach(lc, root->glob->sessionVariables) + { + if (lfirst_oid(lc) == p->paramvarid) + { + p->paramid = n; + found = true; + break; + } + n += 1; + } + + if (!found) + { + root->glob->sessionVariables = lappend_oid(root->glob->sessionVariables, + p->paramvarid); + p->paramid = n; + } + + return (Node *) p; + } + return (Node *) copyObject(p); } @@ -2010,7 +2116,10 @@ fix_alternative_subplan(PlannerInfo *root, AlternativeSubPlan *asplan, * replacing Aggref nodes that should be replaced by initplan output Params, * choosing the best implementation for AlternativeSubPlans, * looking up operator opcode info for OpExpr and related nodes, - * and adding OIDs from regclass Const nodes into root->glob->relationOids. + * adding OIDs from regclass Const nodes into root->glob->relationOids, + * and assigning paramvarid to PARAM_VARIABLE params, and collecting + * of OIDs of session variables in root->glob->sessionVariables list + * (paramvarid is an position of related session variable in this list). * * 'node': the expression to be modified * 'rtoffset': how much to increment varnos by @@ -2032,7 +2141,8 @@ fix_scan_expr(PlannerInfo *root, Node *node, int rtoffset, double num_exec) root->multiexpr_params != NIL || root->glob->lastPHId != 0 || root->minmax_aggs != NIL || - root->hasAlternativeSubPlans) + root->hasAlternativeSubPlans || + root->hasSessionVariables) { return fix_scan_expr_mutator(node, &context); } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 533df86ff7..ca48b3b74c 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -26,6 +26,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/executor.h" #include "executor/functions.h" #include "executor/execExpr.h" @@ -853,16 +854,17 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) /* * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted, unless they are PARAM_EXTERN Params or are - * PARAM_EXEC Params listed in safe_param_ids, meaning they could be - * either generated within workers or can be computed by the leader and - * then their value can be passed to workers. + * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE + * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they + * could be either generated within workers or can be computed by the + * leader and then their value can be passed to workers. */ else if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind == PARAM_EXTERN) + if (param->paramkind == PARAM_EXTERN || + param->paramkind == PARAM_VARIABLE) return false; if (param->paramkind != PARAM_EXEC || @@ -2284,6 +2286,7 @@ convert_saop_to_hashed_saop_walker(Node *node, void *context) * value of the Param. * 2. Fold stable, as well as immutable, functions to constants. * 3. Reduce PlaceHolderVar nodes to their contained expressions. + * 4. Current value of session variable can be used for estimation too. *-------------------- */ Node * @@ -2406,6 +2409,29 @@ eval_const_expressions_mutator(Node *node, } } } + else if (param->paramkind == PARAM_VARIABLE && + context->estimate) + { + int16 typLen; + bool typByVal; + Datum pval; + bool isnull; + + get_typlenbyval(param->paramtype, + &typLen, &typByVal); + + pval = CopySessionVariableWithTypeCheck(param->paramvarid, + &isnull, + param->paramtype); + + return (Node *) makeConst(param->paramtype, + param->paramtypmod, + param->paramcollid, + (int) typLen, + pval, + isnull, + typByVal); + } /* * Not replaceable, so just copy the Param (no need to @@ -4813,22 +4839,45 @@ substitute_actual_parameters_mutator(Node *node, { if (node == NULL) return NULL; + + /* + * SQL functions can contain two different kind of params. The nodes with + * paramkind PARAM_EXTERN are related to function's arguments (and should + * be replaced in this step), because this is how we apply the function's + * arguments for an expression. + * + * The nodes with paramkind PARAM_VARIABLE are related to usage of + * session variables. The values of session variables are not passed + * to expression by expression arguments, so it should not be replaced + * here by function's arguments. Although we could substitute params + * related to immutable session variables with default expression by + * this default expression, it is safer to not do it. This way we don't + * have to run security checks here. There can be some performance loss, + * but an access to session variable is fast (and the result of default + * expression is immediately materialized and can be reused). + */ if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind != PARAM_EXTERN) + if (param->paramkind != PARAM_EXTERN && + param->paramkind != PARAM_VARIABLE) elog(ERROR, "unexpected paramkind: %d", (int) param->paramkind); - if (param->paramid <= 0 || param->paramid > context->nargs) - elog(ERROR, "invalid paramid: %d", param->paramid); - /* Count usage of parameter */ - context->usecounts[param->paramid - 1]++; + if (param->paramkind == PARAM_EXTERN) + { + if (param->paramid <= 0 || param->paramid > context->nargs) + elog(ERROR, "invalid paramid: %d", param->paramid); - /* Select the appropriate actual arg and replace the Param with it */ - /* We don't need to copy at this time (it'll get done later) */ - return list_nth(context->args, param->paramid - 1); + /* Count usage of parameter */ + context->usecounts[param->paramid - 1]++; + + /* Select the appropriate actual arg and replace the Param with it */ + /* We don't need to copy at this time (it'll get done later) */ + return list_nth(context->args, param->paramid - 1); + } } + return expression_tree_mutator(node, substitute_actual_parameters_mutator, (void *) context); } diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 6688c2a865..a013031d22 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -525,6 +525,8 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasAggs = pstate->p_hasAggs; + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -942,6 +944,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); @@ -1312,8 +1315,9 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) /* process the FROM clause */ transformFromClause(pstate, stmt->fromClause); - /* transform targetlist */ - qry->targetList = transformTargetList(pstate, stmt->targetList, + /* Transform targetlist. */ + qry->targetList = transformTargetList(pstate, + stmt->targetList, EXPR_KIND_SELECT_TARGET); /* mark column origins */ @@ -1404,6 +1408,8 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -1878,6 +1884,8 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -2409,6 +2417,7 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index aa1b1a76dc..abe43015c5 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -40,11 +40,12 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/timestamp.h" +#include "utils/typcache.h" #include "utils/xml.h" /* GUC parameters */ bool Transform_null_equals = false; - +bool session_variables_ambiguity_warning = false; static Node *transformExprRecurse(ParseState *pstate, Node *expr); static Node *transformParamRef(ParseState *pstate, ParamRef *pref); @@ -101,6 +102,9 @@ static Expr *make_distinct_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, int location); static Node *make_nulltest_from_distinct(ParseState *pstate, A_Expr *distincta, Node *arg); +static Node *makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location); /* @@ -506,6 +510,10 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) char *relname = NULL; char *colname = NULL; ParseNamespaceItem *nsitem; + Oid varid = InvalidOid; + char *attrname = NULL; + bool not_unique; + bool lockit; int levels_up; enum { @@ -841,6 +849,124 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) parser_errposition(pstate, cref->location))); } + /* Protect session variable by AccessShareLock when it is not shadowed. */ + lockit = (node == NULL); + + /* The col's reference can be a reference to session variable too. */ + varid = IdentifyVariable(cref->fields, &attrname, lockit, ¬_unique); + + /* + * Raise error when reference to session variable is ambiguous and + * and this reference is not shadowed. + */ + if (!node && not_unique) + ereport(ERROR, + (errcode(ERRCODE_AMBIGUOUS_PARAMETER), + errmsg("session variable reference \"%s\" is ambiguous", + NameListToString(cref->fields)), + parser_errposition(pstate, cref->location))); + + if (OidIsValid(varid)) + { + /* + * When Session variables is shadowed by columns or by routine's + * variables or routine's arguments. + */ + if (node != NULL) + { + /* + * In this case we can raise a warning, but only when required. + * We should not raise warnings for contexts where usage of session + * variables has not sense, or when we can detect that variable + * doesn't have the wanted field (and then there is no possible + * identifier's collision). + */ + if (session_variables_ambiguity_warning && + pstate->p_expr_kind == EXPR_KIND_SELECT_TARGET) + { + Oid typid; + int32 typmod; + Oid collid; + + get_session_variable_type_typmod_collid(varid, + &typid, &typmod, + &collid); + + /* + * Some cases with ambiguous references can be solved without + * raising a warning. When there is a collision between column + * name (or label) and some session variable name, and when we + * know attribute name, then we can ignore the collision when: + * + * a) variable is of scalar type (then indirection cannot be + * applied on this session variable. + * + * b) when related variable has no field with the given + * attrname, then indirection cannot be applied on this + * session variable. + */ + if (attrname) + { + if (type_is_rowtype(typid)) + { + TupleDesc tupdesc; + bool found = false; + int i; + + /* slow part, I hope it will not be to often */ + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + for (i = 0; i < tupdesc->natts; i++) + { + if (namestrcmp(&(TupleDescAttr(tupdesc, i)->attname), attrname) == 0 && + !TupleDescAttr(tupdesc, i)->attisdropped) + { + found = true; + break; + } + } + + ReleaseTupleDesc(tupdesc); + + /* There is no composite variable with this field. */ + if (!found) + varid = InvalidOid; + } + else + /* There is no composite variable with this name. */ + varid = InvalidOid; + } + + /* + * Raise warning when session variable reference is still + * visible. + */ + if (OidIsValid(varid)) + ereport(WARNING, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("session variable \"%s\" is shadowed", + NameListToString(cref->fields)), + errdetail("Session variables can be shadowed by columns, routine's variables and routine's arguments with same name."), + parser_errposition(pstate, cref->location))); + } + + /* Session variables are always shadowed by any other node. */ + varid = InvalidOid; + } + else + { + Oid typid; + int32 typmod; + Oid collid; + + get_session_variable_type_typmod_collid(varid, &typid, &typmod, + &collid); + + node = makeParamSessionVariable(pstate, + varid, typid, typmod, collid, + attrname, cref->location); + } + } + /* * Throw error if no translation found. */ @@ -875,6 +1001,64 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) return node; } +/* + * Generate param variable for reference to session variable + */ +static Node * +makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location) +{ + Param *param; + + param = makeNode(Param); + + param->paramkind = PARAM_VARIABLE; + param->paramvarid = varid; + param->paramtype = typid; + param->paramtypmod = typmod; + param->paramcollid = collid; + + pstate->p_hasSessionVariables = true; + + if (attrname != NULL) + { + TupleDesc tupdesc; + int i; + + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + + for (i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + if (strcmp(attrname, NameStr(att->attname)) == 0 && + !att->attisdropped) + { + /* Success, so generate a FieldSelect expression */ + FieldSelect *fselect = makeNode(FieldSelect); + + fselect->arg = (Expr *) param; + fselect->fieldnum = i + 1; + fselect->resulttype = att->atttypid; + fselect->resulttypmod = att->atttypmod; + /* save attribute's collation for parse_collate.c */ + fselect->resultcollid = att->attcollation; + + ReleaseTupleDesc(tupdesc); + return (Node *) fselect; + } + } + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("could not identify column \"%s\" in variable", attrname), + parser_errposition(pstate, location))); + } + + return (Node *) param; +} + static Node * transformParamRef(ParseState *pstate, ParamRef *pref) { diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 5aa5a350f3..c5cf50687c 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -86,6 +86,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->queryEnv = queryEnv; qd->instrument_options = instrument_options; /* instrumentation wanted? */ + qd->num_session_variables = 0; + qd->session_variables = NULL; + /* null these fields until set by ExecutorStart */ qd->tupDesc = NULL; qd->estate = NULL; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 8964f73b92..84b5c8732a 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -38,6 +38,7 @@ #include "catalog/pg_statistic_ext.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" +#include "catalog/pg_variable.h" #include "commands/defrem.h" #include "commands/tablespace.h" #include "common/keywords.h" @@ -502,6 +503,7 @@ static char *generate_function_name(Oid funcid, int nargs, static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); static void add_cast_to(StringInfo buf, Oid typid); static char *generate_qualified_type_name(Oid typid); +static char *generate_session_variable_name(Oid varid); static text *string_to_text(char *str); static char *flatten_reloptions(Oid relid); static void get_reloptions(StringInfo buf, Datum reloptions); @@ -8086,6 +8088,14 @@ get_parameter(Param *param, deparse_context *context) return; } + /* translate paramvarid to session variable name */ + if (param->paramkind == PARAM_VARIABLE) + { + appendStringInfo(context->buf, "%s", + generate_session_variable_name(param->paramvarid)); + return; + } + /* * If it's an external parameter, see if the outermost namespace provides * function argument names. @@ -12760,6 +12770,42 @@ generate_collation_name(Oid collid) return result; } +/* + * generate_session_variable_name + * Compute the name to display for a session variable specified by OID + * + * The result includes all necessary quoting and schema-prefixing. + */ +static char * +generate_session_variable_name(Oid varid) +{ + HeapTuple tup; + Form_pg_variable varform; + char *varname; + char *nspname; + char *result; + + tup = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(varid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for variable %u", varid); + + varform = (Form_pg_variable) GETSTRUCT(tup); + + varname = NameStr(varform->varname); + + if (!VariableIsVisible(varid)) + nspname = get_namespace_name_or_temp(varform->varnamespace); + else + nspname = NULL; + + result = quote_qualified_identifier(nspname, varname); + + ReleaseSysCache(tup); + + return result; +} + /* * Given a C string, produce a TEXT datum. * diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 0d6a295674..74daf13a3f 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -58,6 +58,7 @@ #include "access/transam.h" #include "catalog/namespace.h" +#include "catalog/pg_variable.h" #include "executor/executor.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" @@ -1891,6 +1892,20 @@ ScanQueryWalker(Node *node, bool *acquire) ScanQueryForLocks(castNode(Query, sub->subselect), *acquire); /* Fall through to process lefthand args of SubLink */ } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + if (acquire) + LockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + else + UnlockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + } + } /* * Do NOT recurse into Query nodes, because ScanQueryForLocks already @@ -2022,7 +2037,9 @@ PlanCacheRelCallback(Datum arg, Oid relid) /* * PlanCacheObjectCallback - * Syscache inval callback function for PROCOID and TYPEOID caches + * Syscache inval callback function for TYPEOID, PROCOID, NAMESPACEOID, + * OPEROID, AMOPOPID, FOREIGNSERVEROID, FOREIGNDATAWRAPPEROID and VARIABLEOID + * caches. * * Invalidate all plans mentioning the object with the specified hash value, * or all plans mentioning any member of this cache if hashvalue == 0. diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c index af74fe345e..8d61fc082c 100644 --- a/src/backend/utils/fmgr/fmgr.c +++ b/src/backend/utils/fmgr/fmgr.c @@ -1902,9 +1902,13 @@ get_call_expr_arg_stable(Node *expr, int argnum) */ if (IsA(arg, Const)) return true; - if (IsA(arg, Param) && - ((Param *) arg)->paramkind == PARAM_EXTERN) - return true; + if (IsA(arg, Param)) + { + Param *p = (Param *) arg; + + if (p->paramkind == PARAM_EXTERN || p->paramkind == PARAM_VARIABLE) + return true; + } return false; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9fbbfb1be5..f687019037 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1731,6 +1731,18 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + + { + {"session_variables_ambiguity_warning", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Raise warning when reference to session variable is ambiguous."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &session_variables_ambiguity_warning, + false, + NULL, NULL, NULL + }, + { {"db_user_namespace", PGC_SIGHUP, CONN_AUTH_AUTH, gettext_noop("Enables per-database user names."), diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 5055904005..e9d26f783b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11897,4 +11897,11 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +{ oid => '8488', descr => 'debug list of used session variables', + proname => 'pg_debug_show_used_session_variables', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'record', proargtypes => '', + proallargtypes => '{oid,text,text,oid,text,bool,bool,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o}', + proargnames => '{varid,schema,name,typid,typname,removed,has_value,has_valid_value,can_read,can_write}', + prosrc => 'pg_debug_show_used_session_variables' }, ] diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index c8ef917ffe..9e06ca7a3b 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -161,6 +161,7 @@ typedef enum ExprEvalOp EEOP_PARAM_EXEC, EEOP_PARAM_EXTERN, EEOP_PARAM_CALLBACK, + EEOP_PARAM_VARIABLE, /* return CaseTestExpr value */ EEOP_CASE_TESTVAL, @@ -388,6 +389,13 @@ typedef struct ExprEvalStep Oid paramtype; /* OID of parameter's datatype */ } param; + /* for EEOP_PARAM_VARIABLE */ + struct + { + Oid varid; /* OID of assigned variable */ + Oid vartype; /* OID of parameter's datatype */ + } vparam; + /* for EEOP_PARAM_CALLBACK */ struct { @@ -824,6 +832,8 @@ extern void ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalParamExtern(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalParamVariable(ExprState *state, ExprEvalStep *op, + ExprContext *econtext); extern void ExecEvalSQLValueFunction(ExprState *state, ExprEvalStep *op); extern void ExecEvalCurrentOfExpr(ExprState *state, ExprEvalStep *op); extern void ExecEvalNextValueExpr(ExprState *state, ExprEvalStep *op); diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index e79e2c001f..dbf4dc7ea0 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -48,6 +48,10 @@ typedef struct QueryDesc EState *estate; /* executor's query-wide state */ PlanState *planstate; /* tree of per-plan-node state */ + /* reference to session variables buffer */ + int num_session_variables; + SessionVariableValue *session_variables; + /* This field is set by ExecutorRun */ bool already_executed; /* true if previously executed */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 01b1727fc0..e8509d4c73 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -590,6 +590,18 @@ typedef struct AsyncRequest * tuples) */ } AsyncRequest; +/* ---------------- + * SessionVariableValue + * ---------------- + */ +typedef struct SessionVariableValue +{ + Oid varid; + Oid typid; + bool isnull; + Datum value; +} SessionVariableValue; + /* ---------------- * EState information * @@ -641,6 +653,13 @@ typedef struct EState ParamListInfo es_param_list_info; /* values of external params */ ParamExecData *es_param_exec_vals; /* values of internal params */ + /* Variables info: */ + /* number of used session variables */ + int es_num_session_variables; + + /* array of copied values of session variables */ + SessionVariableValue *es_session_variables; + QueryEnvironment *es_queryEnv; /* query environment */ /* Other working state: */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 978464658d..b24d2bd9f9 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -138,6 +138,7 @@ typedef struct Query int resultRelation; /* rtable index of target relation for * INSERT/UPDATE/DELETE/MERGE; 0 for SELECT */ + Oid resultVariable; /* target variable of LET statement */ bool hasAggs; /* has aggregates in tlist or havingQual */ bool hasWindowFuncs; /* has window functions in tlist */ bool hasTargetSRFs; /* has set-returning functions in tlist */ @@ -147,6 +148,7 @@ typedef struct Query bool hasModifyingCTE; /* has INSERT/UPDATE/DELETE in WITH */ bool hasForUpdate; /* FOR [KEY] UPDATE/SHARE was specified */ bool hasRowSecurity; /* rewriter has applied some RLS policy */ + bool hasSessionVariables; /* uses session variables */ bool isReturn; /* is a RETURN statement */ diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 294cfe9c47..5a79a4d591 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -157,6 +157,8 @@ typedef struct PlannerGlobal /* partition descriptors */ PartitionDirectory partition_directory pg_node_attr(read_write_ignore); + + List *sessionVariables; /* list of used session variables */ } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ @@ -459,6 +461,8 @@ struct PlannerInfo /* true if planning a recursive WITH item */ bool hasRecursion; + /* true if session variables were used */ + bool hasSessionVariables; /* * Information about aggregates. Filled by preprocess_aggrefs(). */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index dca2a21e7a..e03727e19f 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -91,6 +91,8 @@ typedef struct PlannedStmt Node *utilityStmt; /* non-null if this is utility stmt */ + List *sessionVariables; /* list of OIDs for PARAM_VARIABLE Params */ + /* statement location in source string (copied from Query) */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 3aa96bb685..29c2266c2b 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -43,7 +43,10 @@ typedef struct Alias List *colnames; /* optional list of column aliases */ } Alias; -/* What to do at commit time for temporary relations */ +/* + * What to do at commit time for temporary relations or + * persistent/temporary variable. + */ typedef enum OnCommitAction { ONCOMMIT_NOOP, /* No ON COMMIT clause (do nothing) */ @@ -305,13 +308,17 @@ typedef struct Const * of the `paramid' field contain the SubLink's subLinkId, and * the low-order 16 bits contain the column number. (This type * of Param is also converted to PARAM_EXEC during planning.) + * + * PARAM_VARIABLE: The parameter is an access to session variable + * paramid holds varid. */ typedef enum ParamKind { PARAM_EXTERN, PARAM_EXEC, PARAM_SUBLINK, - PARAM_MULTIEXPR + PARAM_MULTIEXPR, + PARAM_VARIABLE } ParamKind; typedef struct Param @@ -322,6 +329,7 @@ typedef struct Param Oid paramtype; /* pg_type OID of parameter's datatype */ int32 paramtypmod; /* typmod value, if known */ Oid paramcollid; /* OID of collation, or InvalidOid if none */ + Oid paramvarid; /* OID of session variable if it is used */ int location; /* token location, or -1 if unknown */ } Param; diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 1566f435b3..2608735731 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -117,4 +117,6 @@ extern void record_plan_function_dependency(PlannerInfo *root, Oid funcid); extern void record_plan_type_dependency(PlannerInfo *root, Oid typid); extern bool extract_query_dependencies_walker(Node *node, PlannerInfo *root); +extern void pull_up_has_session_variables(PlannerInfo *root); + #endif /* PLANMAIN_H */ diff --git a/src/include/parser/parse_expr.h b/src/include/parser/parse_expr.h index c8e5c57b43..14b0adb948 100644 --- a/src/include/parser/parse_expr.h +++ b/src/include/parser/parse_expr.h @@ -17,6 +17,7 @@ /* GUC parameters */ extern PGDLLIMPORT bool Transform_null_equals; +extern PGDLLIMPORT bool session_variables_ambiguity_warning; extern Node *transformExpr(ParseState *pstate, Node *expr, ParseExprKind exprKind); diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 4d65a9e9e8..d07f5b0584 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -212,6 +212,7 @@ struct ParseState bool p_hasTargetSRFs; bool p_hasSubLinks; bool p_hasModifyingCTE; + bool p_hasSessionVariables; Node *p_last_srf; /* most recent set-returning func/op found */ -- 2.37.2