From 41a16ede7010247dad821710d9c39f0d7a471edf Mon Sep 17 00:00:00 2001 From: "okbob@github.com" Date: Sat, 6 Dec 2025 07:35:30 +0100 Subject: [PATCH 10/11] transactional DDL - CREATE VARIABLE, DROP VARIABLE --- src/backend/access/transam/xact.c | 5 + src/backend/commands/session_variable.c | 217 ++++++++++++++++-- src/include/commands/session_variable.h | 2 + .../expected/session_variables_ddl.out | 38 +++ .../regress/sql/session_variables_ddl.sql | 21 ++ 5 files changed, 258 insertions(+), 25 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 092e197eba3..8a80e9c00af 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -39,6 +39,7 @@ #include "commands/async.h" #include "commands/tablecmds.h" #include "commands/trigger.h" +#include "commands/session_variable.h" #include "common/pg_prng.h" #include "executor/spi.h" #include "libpq/be-fsstubs.h" @@ -2333,6 +2334,9 @@ CommitTransaction(void) /* close large objects before lower-level cleanup */ AtEOXact_LargeObject(true); + /* remove stacked session variables */ + AtPreEOXact_SessionVariables(true); + /* * Insert notifications sent by NOTIFY commands into the queue. This * should be late in the pre-commit sequence to minimize time spent @@ -2937,6 +2941,7 @@ AbortTransaction(void) AtAbort_Portals(); smgrDoPendingSyncs(false, is_parallel_worker); AtEOXact_LargeObject(false); + AtPreEOXact_SessionVariables(false); AtAbort_Notify(); AtEOXact_RelationMap(false, is_parallel_worker); AtAbort_Twophase(); diff --git a/src/backend/commands/session_variable.c b/src/backend/commands/session_variable.c index 21345217863..36e243b3ed5 100644 --- a/src/backend/commands/session_variable.c +++ b/src/backend/commands/session_variable.c @@ -59,6 +59,11 @@ typedef struct SVariableData int16 typlen; bool typbyval; + + struct SVariableData *prev; + bool stacked; + LocalTransactionId created_lxid; + LocalTransactionId dropped_lxid; } SVariableData; typedef SVariableData *SVariable; @@ -67,6 +72,14 @@ static HTAB *sessionvars = NULL; /* hash table for session variables */ static MemoryContext SVariableMemoryContext = NULL; +/* + * When we to remove committed dropped variables or uncommitted + * created variables from sessionvars tab. created_or_dropped_lxid + * is transaction id of transaction when some of DROP or CREATE variable + * was executed. + */ +static LocalTransactionId created_or_dropped_lxid = InvalidLocalTransactionId; + /* * Create the hash table for storing session variables. */ @@ -108,6 +121,14 @@ search_variable(char *varname, bool missing_ok) svar = (SVariable) hash_search(sessionvars, varname, HASH_FIND, NULL); + /* Session variable can be dropped inside current transaction */ + if (svar && svar->dropped_lxid != InvalidLocalTransactionId) + { + Assert(created_or_dropped_lxid == MyProc->vxid.lxid); + Assert(svar->dropped_lxid == MyProc->vxid.lxid); + svar = NULL; + } + if (!svar && !missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), @@ -240,6 +261,7 @@ CreateVariable(ParseState *pstate, CreateSessionVarStmt *stmt) Oid typcollation; Oid varowner = GetUserId(); SVariable svar; + SVariable prev_svar = NULL; bool found; HeapTuple proctup; Form_pg_proc proc; @@ -302,19 +324,37 @@ CreateVariable(ParseState *pstate, CreateSessionVarStmt *stmt) if (found) { - if (stmt->if_not_exists) + if (svar->dropped_lxid == InvalidLocalTransactionId) { - ereport(NOTICE, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("session variable \"%s\" already exists, skipping", - stmt->name))); - return; + if (stmt->if_not_exists) + { + ereport(NOTICE, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("session variable \"%s\" already exists, skipping", + stmt->name))); + return; + } + else + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("session variable \"%s\" already exists", + stmt->name))); } else - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("session variable \"%s\" already exists", - stmt->name))); + { + MemoryContext oldcxt; + + Assert(created_or_dropped_lxid == MyProc->vxid.lxid); + Assert(svar->dropped_lxid == MyProc->vxid.lxid); + + oldcxt = MemoryContextSwitchTo(SVariableMemoryContext); + prev_svar = palloc(sizeof(SVariableData)); + memcpy(prev_svar, svar, sizeof(SVariableData)); + prev_svar->stacked = true; + memset(svar, 0, sizeof(SVariableData)); + + MemoryContextSwitchTo(oldcxt); + } } namestrcpy(&svar->varname, stmt->name); @@ -327,6 +367,12 @@ CreateVariable(ParseState *pstate, CreateSessionVarStmt *stmt) svar->value = (Datum) 0; svar->isnull = true; + + svar->prev = prev_svar; + svar->stacked = false; + svar->dropped_lxid = InvalidLocalTransactionId; + svar->created_lxid = MyProc->vxid.lxid; + created_or_dropped_lxid = MyProc->vxid.lxid; } /* @@ -361,14 +407,129 @@ DropVariableByName(DropSessionVarStmt *stmt) errmsg("must be owner of session variable %s", stmt->name))); - if (!svar->typbyval && !svar->isnull) + svar->dropped_lxid = MyProc->vxid.lxid; + created_or_dropped_lxid = MyProc->vxid.lxid; +} + +static void +free_svar_value(SVariable svar) +{ + if (!svar->isnull && !svar->typbyval) pfree(DatumGetPointer(svar->value)); +} + +static void +free_stacked_svars(SVariable svar) +{ + while (svar) + { + SVariable current = svar; + + free_svar_value(current); + svar = current->prev; + pfree(current); + } +} + +/* + * remove dropped committed entries or created uncommitted entries + * from hash table. + */ +void +AtPreEOXact_SessionVariables(bool isCommit) +{ + if (created_or_dropped_lxid != InvalidLocalTransactionId) + { + HASH_SEQ_STATUS status; + SVariable svar; - if (hash_search(sessionvars, - stmt->name, - HASH_REMOVE, - NULL) == NULL) - elog(ERROR, "hash table corrupted"); + Assert(created_or_dropped_lxid == MyProc->vxid.lxid); + Assert(sessionvars); + + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + if ((svar->dropped_lxid != InvalidLocalTransactionId) || + (svar->created_lxid != InvalidLocalTransactionId)) + { + Assert((svar->dropped_lxid == InvalidLocalTransactionId) || + (svar->dropped_lxid == MyProc->vxid.lxid)); + + Assert((svar->created_lxid == InvalidLocalTransactionId) || + (svar->created_lxid == MyProc->vxid.lxid)); + + if (isCommit) + { + if (svar->dropped_lxid == MyProc->vxid.lxid) + { + free_stacked_svars(svar->prev); + free_svar_value(svar); + + (void) hash_search(sessionvars, + NameStr(svar->varname), + HASH_REMOVE, + NULL); + svar = NULL; + } + else + { + free_stacked_svars(svar->prev); + svar->prev = NULL; + svar->created_lxid = InvalidLocalTransactionId; + } + } + else + { + SVariable iter; + + /* + * We have to search value the oldest svar in the stack. If it is just dropped, + * then we revert dropped flag. If it is created in current transaction, then + * we remove this svar too. + */ + iter = svar; + while (iter->prev) + { + SVariable current = iter; + + free_svar_value(current); + + iter = current->prev; + + if (current->stacked) + pfree(current); + } + + if (iter->created_lxid == MyProc->vxid.lxid) + { + free_svar_value(iter); + if (iter->stacked) + pfree(iter); + + (void) hash_search(sessionvars, + NameStr(svar->varname), + HASH_REMOVE, + NULL); + } + else + { + if (iter->stacked) + { + memcpy(svar, iter, sizeof(SVariableData)); + svar->stacked = false; + pfree(iter); + } + + /* revert dropped flag */ + svar->dropped_lxid = InvalidLocalTransactionId; + } + } + } + } + + created_or_dropped_lxid = InvalidLocalTransactionId; + } } /* @@ -453,23 +614,29 @@ ExecuteLetStmt(ParseState *pstate, } /* - * Fast drop of the complete content of the session variables hash table, and - * cleanup of any list that wouldn't be relevant anymore. * This is used by the DISCARD TEMP. */ void ResetSessionVariables(void) { - /* destroy hash table and reset related memory context */ + /* mark all session variables as dropped */ if (sessionvars) { - hash_destroy(sessionvars); - sessionvars = NULL; - } + HASH_SEQ_STATUS status; + SVariable svar; + bool found = false; - /* release memory allocated by session variables */ - if (SVariableMemoryContext != NULL) - MemoryContextReset(SVariableMemoryContext); + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + svar->dropped_lxid = MyProc->vxid.lxid; + found = true; + } + + if (found) + created_or_dropped_lxid = MyProc->vxid.lxid; + } } /* diff --git a/src/include/commands/session_variable.h b/src/include/commands/session_variable.h index 3f07ae55aac..1218c566767 100644 --- a/src/include/commands/session_variable.h +++ b/src/include/commands/session_variable.h @@ -39,4 +39,6 @@ extern void ExecuteLetStmt(ParseState *pstate, LetStmt *stmt, ParamListInfo para extern void ResetSessionVariables(void); +extern void AtPreEOXact_SessionVariables(bool isCommit); + #endif diff --git a/src/test/regress/expected/session_variables_ddl.out b/src/test/regress/expected/session_variables_ddl.out index b8d6f629596..c10680512ce 100644 --- a/src/test/regress/expected/session_variables_ddl.out +++ b/src/test/regress/expected/session_variables_ddl.out @@ -70,3 +70,41 @@ SELECT * FROM pg_get_temporary_session_variables_names(); ------------------------------------------ (0 rows) +CREATE TEMP VARIABLE x AS varchar; +LET x = 'Hi'; +BEGIN; +DROP VARIABLE x; +CREATE TEMP VARIABLE x AS varchar; +LET x = 'Hello'; +SELECT VARIABLE(x); + x +------- + Hello +(1 row) + +COMMIT; +SELECT VARIABLE(x); + x +------- + Hello +(1 row) + +LET x = 'Hi'; +BEGIN; +DROP VARIABLE x; +CREATE TEMP VARIABLE x AS varchar; +LET x = 'Hello'; +SELECT VARIABLE(x); + x +------- + Hello +(1 row) + +ROLLBACK; +SELECT VARIABLE(x); + x +---- + Hi +(1 row) + +DROP VARIABLE x; diff --git a/src/test/regress/sql/session_variables_ddl.sql b/src/test/regress/sql/session_variables_ddl.sql index 02af365338f..9e79feed92b 100644 --- a/src/test/regress/sql/session_variables_ddl.sql +++ b/src/test/regress/sql/session_variables_ddl.sql @@ -78,3 +78,24 @@ SELECT * FROM pg_get_temporary_session_variables_names(); DROP VARIABLE x; DROP VARIABLE y; SELECT * FROM pg_get_temporary_session_variables_names(); + +CREATE TEMP VARIABLE x AS varchar; +LET x = 'Hi'; +BEGIN; +DROP VARIABLE x; +CREATE TEMP VARIABLE x AS varchar; +LET x = 'Hello'; +SELECT VARIABLE(x); +COMMIT; +SELECT VARIABLE(x); + +LET x = 'Hi'; +BEGIN; +DROP VARIABLE x; +CREATE TEMP VARIABLE x AS varchar; +LET x = 'Hello'; +SELECT VARIABLE(x); +ROLLBACK; +SELECT VARIABLE(x); + +DROP VARIABLE x; -- 2.52.0