From d620d8bc13ad609ed7f04b2efb5a0737754dc799 Mon Sep 17 00:00:00 2001 From: Wang Wei Date: Mon, 24 Apr 2023 17:02:30 +0800 Subject: [PATCH 2/8] Enhance the event trigger to support DDL deparsing 1) ALTER TABLE can have multiple subcommands which might include DROP COLUMN command and ALTER TYPE referring the drop column in USING expression. As the dropped column cannot be accessed after the execution of DROP COLUMN, a special trigger is added to handle this case before the drop column is executed. 2) For CREATE TABLE AS, to avoid the need to ensure all the objects referenced in the subquery also exists in subscriber, deprase the CREATE TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it after creating the table and before writing data into the table and replicate the incoming writes later as normal INSERTs. This approach works for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES]). To achieve this, introduce a new type of event trigger "table_init_write". which would be fired for CREATE TABLE/CREATE TABLE AS/SELECT INTO after creating the table and before any other modification. we deparse the command in the table_init_write event trigger and WAL log the deparsed json string. The walsender will send the string to subscriber. And incoming INSERTs will also be replicated. --- src/backend/commands/createas.c | 10 + src/backend/commands/ddldeparse.c | 37 +- src/backend/commands/event_trigger.c | 350 +++++++++++++++--- src/backend/commands/tablecmds.c | 10 +- src/backend/tcop/utility.c | 3 + src/backend/utils/cache/evtcache.c | 2 + src/include/commands/event_trigger.h | 50 ++- src/include/tcop/deparse_utility.h | 12 +- src/include/utils/evtcache.h | 3 +- .../t/000_stream_subxact_abort.pl | 263 +++++++++++++ 10 files changed, 678 insertions(+), 62 deletions(-) create mode 100644 src/test/subscription/t/000_stream_subxact_abort.pl diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index e91920ca14..a7b22cb5db 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -34,6 +34,7 @@ #include "catalog/namespace.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/event_trigger.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" @@ -143,6 +144,15 @@ create_ctas_internal(List *attrList, IntoClause *into) StoreViewQuery(intoRelationAddr.objectId, query, false); CommandCounterIncrement(); } + else + { + /* + * Fire the trigger for table_init_write after creating the table so + * that we can access the catalog info about the newly created table + * in the trigger function. + */ + EventTriggerTableInitWrite((Node *) create, intoRelationAddr); + } return intoRelationAddr; } diff --git a/src/backend/commands/ddldeparse.c b/src/backend/commands/ddldeparse.c index 40cea91d95..05c2b22d7e 100644 --- a/src/backend/commands/ddldeparse.c +++ b/src/backend/commands/ddldeparse.c @@ -1761,6 +1761,26 @@ deparse_CreateStmt(Oid objectId, Node *parsetree) return ret; } +/* + * Deparse CREATE TABLE AS command. + * + * deparse_CreateStmt do the actual work as we deparse the final CreateStmt for + * CREATE TABLE AS command. + */ +static ObjTree * +deparse_CreateTableAsStmt(CollectedCommand *cmd) +{ + Oid objectId; + Node *parsetree; + + Assert(cmd->type == SCT_CreateTableAs); + + parsetree = cmd->d.ctas.real_create; + objectId = cmd->d.ctas.address.objectId; + + return deparse_CreateStmt(objectId, parsetree); +} + /* * Deparse all the collected subcommands and return an ObjTree representing the * alter command. @@ -2188,20 +2208,8 @@ deparse_AlterRelation(CollectedCommand *cmd) */ tmp_obj2 = new_objtree("USING %{expression}s"); if (def->raw_default) - { - Datum deparsed; - char *defexpr; - List *exprs = NIL; - - exprs = lappend(exprs, def->cooked_default); - defexpr = nodeToString(def->cooked_default); - deparsed = DirectFunctionCall2(pg_get_expr, - CStringGetTextDatum(defexpr), - RelationGetRelid(rel)); - append_string_object(tmp_obj2, "expression", - TextDatumGetCString(deparsed)); - } + sub->usingexpr); else append_not_present(tmp_obj2); @@ -2933,6 +2941,9 @@ deparse_utility_command(CollectedCommand *cmd) case SCT_AlterTable: tree = deparse_AlterRelation(cmd); break; + case SCT_CreateTableAs: + tree = deparse_CreateTableAsStmt(cmd); + break; default: elog(ERROR, "unexpected deparse node type %d", cmd->type); } diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 4d48e490ed..8561c6fad0 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -36,6 +36,7 @@ #include "lib/ilist.h" #include "miscadmin.h" #include "parser/parse_func.h" +#include "parser/parser.h" #include "pgstat.h" #include "tcop/ddldeparse.h" #include "tcop/deparse_utility.h" @@ -49,45 +50,7 @@ #include "utils/rel.h" #include "utils/syscache.h" -typedef struct EventTriggerQueryState -{ - /* memory context for this state's objects */ - MemoryContext cxt; - - /* sql_drop */ - slist_head SQLDropList; - bool in_sql_drop; - - /* table_rewrite */ - Oid table_rewrite_oid; /* InvalidOid, or set for table_rewrite - * event */ - int table_rewrite_reason; /* AT_REWRITE reason */ - - /* Support for command collection */ - bool commandCollectionInhibited; - CollectedCommand *currentCommand; - List *commandList; /* list of CollectedCommand; see - * deparse_utility.h */ - struct EventTriggerQueryState *previous; -} EventTriggerQueryState; - -static EventTriggerQueryState *currentEventTriggerState = NULL; - -/* Support for dropped objects */ -typedef struct SQLDropObject -{ - ObjectAddress address; - const char *schemaname; - const char *objname; - const char *objidentity; - const char *objecttype; - List *addrnames; - List *addrargs; - bool original; - bool normal; - bool istemp; - slist_node next; -} SQLDropObject; +EventTriggerQueryState *currentEventTriggerState = NULL; static void AlterEventTriggerOwner_internal(Relation rel, HeapTuple tup, @@ -131,7 +94,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) if (strcmp(stmt->eventname, "ddl_command_start") != 0 && strcmp(stmt->eventname, "ddl_command_end") != 0 && strcmp(stmt->eventname, "sql_drop") != 0 && - strcmp(stmt->eventname, "table_rewrite") != 0) + strcmp(stmt->eventname, "table_rewrite") != 0 && + strcmp(stmt->eventname, "table_init_write") != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized event name \"%s\"", @@ -157,7 +121,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) /* Validate tag list, if any. */ if ((strcmp(stmt->eventname, "ddl_command_start") == 0 || strcmp(stmt->eventname, "ddl_command_end") == 0 || - strcmp(stmt->eventname, "sql_drop") == 0) + strcmp(stmt->eventname, "sql_drop") == 0 || + strcmp(stmt->eventname, "table_init_write") == 0) && tags != NULL) validate_ddl_tags("tag", tags); else if (strcmp(stmt->eventname, "table_rewrite") == 0 @@ -583,7 +548,8 @@ EventTriggerCommonSetup(Node *parsetree, dbgtag = CreateCommandTag(parsetree); if (event == EVT_DDLCommandStart || event == EVT_DDLCommandEnd || - event == EVT_SQLDrop) + event == EVT_SQLDrop || + event == EVT_TableInitWrite) { if (!command_tag_event_trigger_ok(dbgtag)) elog(ERROR, "unexpected command tag \"%s\"", GetCommandTagName(dbgtag)); @@ -866,6 +832,153 @@ EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason) CommandCounterIncrement(); } +/* + * EventTriggerTableInitWriteStart + * Prepare to receive data on an CREATE TABLE AS/SELECT INTO command about + * to be executed. + */ +void +EventTriggerTableInitWriteStart(Node *parsetree) +{ + MemoryContext oldcxt; + CollectedCommand *command; + CreateTableAsStmt *stmt = (CreateTableAsStmt *)parsetree; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + command = palloc(sizeof(CollectedCommand)); + + command->type = (stmt->objtype == OBJECT_TABLE) ? SCT_CreateTableAs : SCT_Simple; + command->in_extension = creating_extension; + command->d.ctas.address = InvalidObjectAddress; + command->d.ctas.real_create = NULL; + command->parsetree = copyObject(parsetree); + + command->parent = currentEventTriggerState->currentCommand; + currentEventTriggerState->currentCommand = command; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * EventTriggerTableInitWriteEnd + * Finish up saving an CREATE TABLE AS/SELECT INTO command. + * + * FIXME this API isn't considering the possibility that an xact/subxact is + * aborted partway through. Probably it's best to add an + * AtEOSubXact_EventTriggers() to fix this. + */ +void +EventTriggerTableInitWriteEnd(ObjectAddress address) +{ + CollectedCommand *parent; + CreateTableAsStmt *stmt; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + Assert(currentEventTriggerState->currentCommand != NULL); + + stmt = (CreateTableAsStmt *)currentEventTriggerState->currentCommand->parsetree; + + if (stmt->objtype == OBJECT_TABLE) + { + MemoryContext oldcxt; + + parent = currentEventTriggerState->currentCommand->parent; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + currentEventTriggerState->commandList = + lappend(currentEventTriggerState->commandList, + currentEventTriggerState->currentCommand); + + MemoryContextSwitchTo(oldcxt); + + currentEventTriggerState->currentCommand = parent; + } + else + { + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + currentEventTriggerState->currentCommand->d.simple.address = address; + currentEventTriggerState->commandList = + lappend(currentEventTriggerState->commandList, + currentEventTriggerState->currentCommand); + + MemoryContextSwitchTo(oldcxt); + } +} + +/* + * Fire table_init_rewrite triggers. + */ +void +EventTriggerTableInitWrite(Node *real_create, ObjectAddress address) +{ + List *runlist; + EventTriggerData trigdata; + CollectedCommand *command; + MemoryContext oldcxt; + + /* + * See EventTriggerDDLCommandStart for a discussion about why event + * triggers are disabled in single user mode. + */ + if (!IsUnderPostmaster) + return; + + /* + * Also do nothing if our state isn't set up, which it won't be if there + * weren't any relevant event triggers at the start of the current DDL + * command. This test might therefore seem optional, but it's + * *necessary*, because EventTriggerCommonSetup might find triggers that + * didn't exist at the time the command started. + */ + if (!currentEventTriggerState) + return; + + /* Do nothing if no command was collected. */ + if (!currentEventTriggerState->currentCommand) + return; + + command = currentEventTriggerState->currentCommand; + + /* Set the real CreateTable statment and object address */ + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + command->d.ctas.address = address; + command->d.ctas.real_create = copyObject(real_create); + MemoryContextSwitchTo(oldcxt); + + runlist = EventTriggerCommonSetup(command->parsetree, + EVT_TableInitWrite, + "table_init_write", + &trigdata); + if (runlist == NIL) + return; + + /* Run the triggers. */ + EventTriggerInvoke(runlist, &trigdata); + + /* Cleanup. */ + list_free(runlist); + + /* + * Make sure anything the event triggers did will be visible to the main + * command. + */ + CommandCounterIncrement(); +} + /* * Invoke each event trigger in a list of event triggers. */ @@ -1147,7 +1260,8 @@ trackDroppedObjectsNeeded(void) */ return (EventCacheLookup(EVT_SQLDrop) != NIL) || (EventCacheLookup(EVT_TableRewrite) != NIL) || - (EventCacheLookup(EVT_DDLCommandEnd) != NIL); + (EventCacheLookup(EVT_DDLCommandEnd) != NIL) || + (EventCacheLookup(EVT_TableInitWrite) != NIL); } /* @@ -1538,6 +1652,7 @@ EventTriggerAlterTableStart(Node *parsetree) command->d.alterTable.classId = RelationRelationId; command->d.alterTable.objectId = InvalidOid; + command->d.alterTable.rewrite = false; command->d.alterTable.subcmds = NIL; command->parsetree = copyObject(parsetree); @@ -1571,7 +1686,7 @@ EventTriggerAlterTableRelid(Oid objectId) * internally, so that's all that this code needs to handle at the moment. */ void -EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address) +EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address, bool rewrite) { MemoryContext oldcxt; CollectedATSubcmd *newsub; @@ -1591,12 +1706,156 @@ EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address) newsub->address = address; newsub->parsetree = copyObject(subcmd); + currentEventTriggerState->currentCommand->d.alterTable.rewrite |= rewrite; + currentEventTriggerState->currentCommand->d.alterTable.subcmds = + lappend(currentEventTriggerState->currentCommand->d.alterTable.subcmds, newsub); + + MemoryContextSwitchTo(oldcxt); +} + +/* + * EventTriggerAlterTypeStart + * Save data about a single part of an ALTER TYPE. + * + * ALTER TABLE can have multiple subcommands which might include DROP COLUMN + * command and ALTER TYPE referring the drop column in USING expression. + * As the dropped column cannot be accessed after the execution of DROP COLUMN, + * a special trigger is required to handle this case before the drop column is + * executed. + */ +void +EventTriggerAlterTypeStart(AlterTableCmd *subcmd, Relation rel) +{ + MemoryContext oldcxt; + CollectedATSubcmd *newsub; + ColumnDef *def; + Relation attrelation; + HeapTuple heapTup; + Form_pg_attribute attTup; + AttrNumber attnum; + ObjectAddress address; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + Assert(IsA(subcmd, AlterTableCmd)); + Assert(subcmd->subtype == AT_AlterColumnType); + Assert(currentEventTriggerState->currentCommand != NULL); + Assert(OidIsValid(currentEventTriggerState->currentCommand->d.alterTable.objectId)); + + def = (ColumnDef *) subcmd->def; + Assert(IsA(def, ColumnDef)); + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + newsub = palloc(sizeof(CollectedATSubcmd)); + newsub->parsetree = (Node *)copyObject(subcmd); + + attrelation = table_open(AttributeRelationId, RowExclusiveLock); + + /* Look up the target column */ + heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), subcmd->name); + if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + subcmd->name, RelationGetRelationName(rel))); + attTup = (Form_pg_attribute) GETSTRUCT(heapTup); + attnum = attTup->attnum; + + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); + heap_freetuple(heapTup); + table_close(attrelation, RowExclusiveLock); + newsub->address = address; + + if (def->raw_default) + { + OverrideSearchPath *overridePath; + char *defexpr; + + /* + * We want all object names to be qualified when deparsing the + * expression, so that results are "portable" to environments with + * different search_path settings. Rather than inject what would be + * repetitive calls to override search path all over the place, we do + * it centrally here. + */ + overridePath = GetOverrideSearchPath(CurrentMemoryContext); + overridePath->schemas = NIL; + overridePath->addCatalog = false; + overridePath->addTemp = true; + PushOverrideSearchPath(overridePath); + + defexpr = nodeToString(def->cooked_default); + newsub->usingexpr = TextDatumGetCString(DirectFunctionCall2(pg_get_expr, + CStringGetTextDatum(defexpr), + RelationGetRelid(rel))); + + PopOverrideSearchPath(); + } + else + newsub->usingexpr = NULL; + currentEventTriggerState->currentCommand->d.alterTable.subcmds = lappend(currentEventTriggerState->currentCommand->d.alterTable.subcmds, newsub); MemoryContextSwitchTo(oldcxt); } +/* + * EventTriggerAlterTypeEnd + * Finish up saving an ALTER TYPE command, and add it to command list. + */ +void +EventTriggerAlterTypeEnd(Node *subcmd, ObjectAddress address, bool rewrite) +{ + MemoryContext oldcxt; + CollectedATSubcmd *newsub; + ListCell *cell; + CollectedCommand *cmd; + AlterTableCmd *altsubcmd = (AlterTableCmd *)subcmd; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + cmd = currentEventTriggerState->currentCommand; + + Assert(IsA(subcmd, AlterTableCmd)); + Assert(cmd != NULL); + Assert(OidIsValid(cmd->d.alterTable.objectId)); + + foreach(cell, cmd->d.alterTable.subcmds) + { + CollectedATSubcmd *sub = (CollectedATSubcmd *) lfirst(cell); + AlterTableCmd *collcmd = (AlterTableCmd *) sub->parsetree; + + if (collcmd->subtype == altsubcmd->subtype && + address.classId == sub->address.classId && + address.objectId == sub->address.objectId && + address.objectSubId == sub->address.objectSubId) + { + cmd->d.alterTable.rewrite |= rewrite; + return; + } + } + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + newsub = palloc(sizeof(CollectedATSubcmd)); + newsub->address = address; + newsub->parsetree = copyObject(subcmd); + + cmd->d.alterTable.rewrite |= rewrite; + cmd->d.alterTable.subcmds = lappend(cmd->d.alterTable.subcmds, newsub); + + MemoryContextSwitchTo(oldcxt); +} + /* * EventTriggerAlterTableEnd * Finish up saving an ALTER TABLE command, and add it to command list. @@ -1864,6 +2123,7 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) case SCT_AlterOpFamily: case SCT_CreateOpClass: case SCT_AlterTSConfig: + case SCT_CreateTableAs: { char *identity; char *type; @@ -1881,6 +2141,8 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) addr = cmd->d.createopc.address; else if (cmd->type == SCT_AlterTSConfig) addr = cmd->d.atscfg.address; + else if (cmd->type == SCT_CreateTableAs) + addr = cmd->d.ctas.address; /* * If an object was dropped in the same command we may end diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index a7d1a09bb5..30de9fa5de 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -4742,6 +4742,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, recurse, lockmode, AT_PASS_UNSET, context); Assert(cmd != NULL); + + EventTriggerAlterTypeStart(cmd, rel); + /* Performs own recursion */ ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd, lockmode, context); @@ -5013,6 +5016,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, { ObjectAddress address = InvalidObjectAddress; Relation rel = tab->rel; + bool commandCollected = false; switch (cmd->subtype) { @@ -5136,6 +5140,8 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, case AT_AlterColumnType: /* ALTER COLUMN TYPE */ /* parse transformation was done earlier */ address = ATExecAlterColumnType(tab, rel, cmd, lockmode); + EventTriggerAlterTypeEnd((Node *) cmd, address, tab->rewrite); + commandCollected = true; break; case AT_AlterColumnGenericOptions: /* ALTER COLUMN OPTIONS */ address = @@ -5308,8 +5314,8 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, /* * Report the subcommand to interested event triggers. */ - if (cmd) - EventTriggerCollectAlterTableSubcmd((Node *) cmd, address); + if (cmd && !commandCollected) + EventTriggerCollectAlterTableSubcmd((Node *) cmd, address, tab->rewrite); /* * Bump the command counter to ensure the next subcommand in the sequence diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 77c2f38e15..0a5b43cd5f 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1675,8 +1675,11 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_CreateTableAsStmt: + EventTriggerTableInitWriteStart(parsetree); address = ExecCreateTableAs(pstate, (CreateTableAsStmt *) parsetree, params, queryEnv, qc); + EventTriggerTableInitWriteEnd(address); + commandCollected = true; break; case T_RefreshMatViewStmt: diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c index 1f5e7eb4c6..f2a9f5dcc2 100644 --- a/src/backend/utils/cache/evtcache.c +++ b/src/backend/utils/cache/evtcache.c @@ -167,6 +167,8 @@ BuildEventTriggerCache(void) event = EVT_SQLDrop; else if (strcmp(evtevent, "table_rewrite") == 0) event = EVT_TableRewrite; + else if (strcmp(evtevent, "table_init_write") == 0) + event = EVT_TableInitWrite; else continue; diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h index 5ed6ece555..cba4e72455 100644 --- a/src/include/commands/event_trigger.h +++ b/src/include/commands/event_trigger.h @@ -16,6 +16,7 @@ #include "catalog/dependency.h" #include "catalog/objectaddress.h" #include "catalog/pg_event_trigger.h" +#include "lib/ilist.h" #include "nodes/parsenodes.h" #include "tcop/cmdtag.h" #include "tcop/deparse_utility.h" @@ -29,6 +30,44 @@ typedef struct EventTriggerData CommandTag tag; } EventTriggerData; +typedef struct EventTriggerQueryState +{ + /* memory context for this state's objects */ + MemoryContext cxt; + + /* sql_drop */ + slist_head SQLDropList; + bool in_sql_drop; + + /* table_rewrite */ + Oid table_rewrite_oid; /* InvalidOid, or set for table_rewrite + * event */ + int table_rewrite_reason; /* AT_REWRITE reason */ + + /* Support for command collection */ + bool commandCollectionInhibited; + CollectedCommand *currentCommand; + List *commandList; /* list of CollectedCommand; see + * deparse_utility.h */ + struct EventTriggerQueryState *previous; +} EventTriggerQueryState; + +/* Support for dropped objects */ +typedef struct SQLDropObject +{ + ObjectAddress address; + const char *schemaname; + const char *objname; + const char *objidentity; + const char *objecttype; + List *addrnames; + List *addrargs; + bool original; + bool normal; + bool istemp; + slist_node next; +} SQLDropObject; + #define AT_REWRITE_ALTER_PERSISTENCE 0x01 #define AT_REWRITE_DEFAULT_VAL 0x02 #define AT_REWRITE_COLUMN_REWRITE 0x04 @@ -55,6 +94,10 @@ extern void EventTriggerDDLCommandEnd(Node *parsetree); extern void EventTriggerSQLDrop(Node *parsetree); extern void EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason); +extern void EventTriggerTableInitWriteStart(Node *parsetree); +extern void EventTriggerTableInitWrite(Node *parsetree, ObjectAddress address); +extern void EventTriggerTableInitWriteEnd(ObjectAddress address); + extern bool EventTriggerBeginCompleteQuery(void); extern void EventTriggerEndCompleteQuery(void); extern bool trackDroppedObjectsNeeded(void); @@ -71,7 +114,12 @@ extern void EventTriggerCollectSimpleCommand(ObjectAddress address, extern void EventTriggerAlterTableStart(Node *parsetree); extern void EventTriggerAlterTableRelid(Oid objectId); extern void EventTriggerCollectAlterTableSubcmd(Node *subcmd, - ObjectAddress address); + ObjectAddress address, + bool rewrite); + +extern void EventTriggerAlterTypeStart(AlterTableCmd *subcmd, Relation rel); +extern void EventTriggerAlterTypeEnd(Node *subcmd, ObjectAddress address, + bool rewrite); extern void EventTriggerAlterTableEnd(void); extern void EventTriggerCollectGrant(InternalGrant *istmt); diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h index b585810b9a..a4a12377b8 100644 --- a/src/include/tcop/deparse_utility.h +++ b/src/include/tcop/deparse_utility.h @@ -29,7 +29,8 @@ typedef enum CollectedCommandType SCT_AlterOpFamily, SCT_AlterDefaultPrivileges, SCT_CreateOpClass, - SCT_AlterTSConfig + SCT_AlterTSConfig, + SCT_CreateTableAs } CollectedCommandType; /* @@ -39,6 +40,7 @@ typedef struct CollectedATSubcmd { ObjectAddress address; /* affected column, constraint, index, ... */ Node *parsetree; + char *usingexpr; } CollectedATSubcmd; typedef struct CollectedCommand @@ -62,6 +64,7 @@ typedef struct CollectedCommand { Oid objectId; Oid classId; + bool rewrite; List *subcmds; } alterTable; @@ -100,6 +103,13 @@ typedef struct CollectedCommand { ObjectType objtype; } defprivs; + + /* CREATE TABLE AS */ + struct + { + ObjectAddress address; + Node *real_create; + } ctas; } d; struct CollectedCommand *parent; /* when nested */ diff --git a/src/include/utils/evtcache.h b/src/include/utils/evtcache.h index d340026518..91d4bdd6b3 100644 --- a/src/include/utils/evtcache.h +++ b/src/include/utils/evtcache.h @@ -22,7 +22,8 @@ typedef enum EVT_DDLCommandStart, EVT_DDLCommandEnd, EVT_SQLDrop, - EVT_TableRewrite + EVT_TableRewrite, + EVT_TableInitWrite } EventTriggerEvent; typedef struct diff --git a/src/test/subscription/t/000_stream_subxact_abort.pl b/src/test/subscription/t/000_stream_subxact_abort.pl new file mode 100644 index 0000000000..2b67ae1e0a --- /dev/null +++ b/src/test/subscription/t/000_stream_subxact_abort.pl @@ -0,0 +1,263 @@ + +# Copyright (c) 2021-2023, PostgreSQL Global Development Group + +# Test streaming of transaction containing multiple subtransactions and rollbacks +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with DDL, DML and ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (3, md5(3::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (4, md5(4::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (5, md5(5::text)); + SAVEPOINT s3; + INSERT INTO test_tab VALUES (6, md5(6::text)); + ROLLBACK TO s2; + INSERT INTO test_tab VALUES (7, md5(7::text)); + ROLLBACK TO s1; + INSERT INTO test_tab VALUES (8, md5(8::text)); + SAVEPOINT s4; + INSERT INTO test_tab VALUES (9, md5(9::text)); + SAVEPOINT s5; + INSERT INTO test_tab VALUES (10, md5(10::text)); + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(6|0), + 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with subscriber receiving out of order + # subtransaction ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (11, md5(11::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (12, md5(12::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (13, md5(13::text)); + SAVEPOINT s3; + INSERT INTO test_tab VALUES (14, md5(14::text)); + RELEASE s2; + INSERT INTO test_tab VALUES (15, md5(15::text)); + ROLLBACK TO s1; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(7|0), + 'check rollback to savepoint was reflected on subscriber'); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with subscriber receiving rollback + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (16, md5(16::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (17, md5(17::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (18, md5(18::text)); + ROLLBACK; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(7|0), 'check rollback was reflected on subscriber'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); +} + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); + +my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" +); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + +# Test serializing changes to files and notify the parallel apply worker to +# apply them at the end of the transaction. +$node_subscriber->append_conf('postgresql.conf', + 'logical_replication_mode = immediate'); +# Reset the log_min_messages to default. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +my $offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + ROLLBACK; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(0), 'check rollback was reflected on subscriber'); + +# Serialize the ABORT sub-transaction. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 values(1); + SAVEPOINT sp; + INSERT INTO test_tab_2 values(1); + ROLLBACK TO sp; + COMMIT; + }); + +# Ensure that the changes are serialized. +$node_subscriber->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that only sub-transaction is aborted on subscriber. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(1), 'check rollback to savepoint was reflected on subscriber'); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing(); -- 2.34.1