From e966dcf860b4ea0becd66ce159398a4615b3b3d2 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Mon, 6 Jun 2022 20:52:12 +0800 Subject: [PATCH] support CREATE TABLE ASSELECT INTO The main idea of replicating the CREATE TABLE AS is that we deprase the CREATE TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it after creating the table and before writing data into the table and replicate the incoming writes later as normal INSERTs. In this apporach, we don't execute the subquery on subscriber so that don't need to make sure all the objects referenced in the subquery also exists in subscriber. And This approach works for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES]) Introducing a new type of event trigger "table_init_write". which would be fired for CREATE TABLE AS/SELECT INTO after creating the table and before any other modification. we deparse the command in the table_init_write event trigger and WAL log the deparsed json string. The walsender will send the string to subscriber. And incoming INSERTs will also be replicated. --- src/backend/commands/createas.c | 10 +++ src/backend/commands/ddl_deparse.c | 18 ++++ src/backend/commands/event_trigger.c | 153 ++++++++++++++++++++++++++++++++- src/backend/commands/publicationcmds.c | 9 ++ src/backend/tcop/utility.c | 2 + src/backend/utils/cache/evtcache.c | 2 + src/include/catalog/pg_proc.dat | 3 + src/include/commands/event_trigger.h | 4 + src/include/tcop/deparse_utility.h | 9 +- src/include/utils/evtcache.h | 3 +- 10 files changed, 207 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 9abbb6b..989e894 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -34,6 +34,7 @@ #include "catalog/namespace.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/event_trigger.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" @@ -143,6 +144,15 @@ create_ctas_internal(List *attrList, IntoClause *into) StoreViewQuery(intoRelationAddr.objectId, query, false); CommandCounterIncrement(); } + else + { + /* + * Fire the trigger for table_init_write after creating the table so + * that we can access the catalog info about the newly created table in + * the trigger function. + */ + EventTriggerTableInitWrite((Node *) create, intoRelationAddr); + } return intoRelationAddr; } diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c index 7e20044..fcba4bb 100644 --- a/src/backend/commands/ddl_deparse.c +++ b/src/backend/commands/ddl_deparse.c @@ -2035,6 +2035,21 @@ removed feature return alterTableStmt; } +static ObjTree * +deparse_CreateTableAsStmt(CollectedCommand *cmd) +{ + Oid objectId; + Node *parsetree; + + Assert(cmd->type == SCT_CreateTableAs); + + parsetree = cmd->d.ctas.real_create; + objectId = cmd->d.ctas.address.objectId; + + return deparse_CreateStmt(objectId, parsetree); +} + + /* * Handle deparsing of simple commands. * @@ -2135,6 +2150,9 @@ deparse_utility_command(CollectedCommand *cmd) case SCT_AlterTable: tree = deparse_AlterTableStmt(cmd); break; + case SCT_CreateTableAs: + tree = deparse_CreateTableAsStmt(cmd); + break; default: elog(ERROR, "unexpected deparse node type %d", cmd->type); } diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index e5894c1..5256e1a 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -133,7 +133,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) if (strcmp(stmt->eventname, "ddl_command_start") != 0 && strcmp(stmt->eventname, "ddl_command_end") != 0 && strcmp(stmt->eventname, "sql_drop") != 0 && - strcmp(stmt->eventname, "table_rewrite") != 0) + strcmp(stmt->eventname, "table_rewrite") != 0 && + strcmp(stmt->eventname, "table_init_write") != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized event name \"%s\"", @@ -159,7 +160,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) /* Validate tag list, if any. */ if ((strcmp(stmt->eventname, "ddl_command_start") == 0 || strcmp(stmt->eventname, "ddl_command_end") == 0 || - strcmp(stmt->eventname, "sql_drop") == 0) + strcmp(stmt->eventname, "sql_drop") == 0 || + strcmp(stmt->eventname, "table_init_write") == 0) && tags != NULL) validate_ddl_tags("tag", tags); else if (strcmp(stmt->eventname, "table_rewrite") == 0 @@ -586,7 +588,8 @@ EventTriggerCommonSetup(Node *parsetree, dbgtag = CreateCommandTag(parsetree); if (event == EVT_DDLCommandStart || event == EVT_DDLCommandEnd || - event == EVT_SQLDrop) + event == EVT_SQLDrop || + event == EVT_TableInitWrite) { if (!command_tag_event_trigger_ok(dbgtag)) elog(ERROR, "unexpected command tag \"%s\"", GetCommandTagName(dbgtag)); @@ -869,6 +872,144 @@ EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason) CommandCounterIncrement(); } + +/* + * EventTriggerTableInitWriteStart + * Prepare to receive data on an CREATE TABLE AS/SELET INTO command about + * to be executed. + */ +void +EventTriggerTableInitWriteStart(Node *parsetree) +{ + MemoryContext oldcxt; + CollectedCommand *command; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + command = palloc(sizeof(CollectedCommand)); + + command->type = SCT_CreateTableAs; + command->in_extension = creating_extension; + command->d.ctas.address = InvalidObjectAddress; + command->d.ctas.real_create = NULL; + command->parsetree = copyObject(parsetree); + + command->parent = currentEventTriggerState->currentCommand; + currentEventTriggerState->currentCommand = command; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * EventTriggerTableInitWriteEnd + * Finish up saving an CREATE TABLE AS/SELECT INTO command. + * + * FIXME this API isn't considering the possibility that an xact/subxact is + * aborted partway through. Probably it's best to add an + * AtEOSubXact_EventTriggers() to fix this. + */ +void +EventTriggerTableInitWriteEnd(void) +{ + CollectedCommand *parent; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + parent = currentEventTriggerState->currentCommand->parent; + + pfree(currentEventTriggerState->currentCommand); + + currentEventTriggerState->currentCommand = parent; +} + +/* + * publication_deparse_table_init_write + * + * Deparse the ddl table create command and log it. + */ +Datum +publication_deparse_table_init_write(PG_FUNCTION_ARGS) +{ + CollectedCommand *cmd; + char *json_string; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + elog(ERROR, "not fired by event trigger manager"); + + cmd = currentEventTriggerState->currentCommand; + Assert(cmd); + + /* Deparse the DDL command and WAL log it to allow decoding of the same. */ + json_string = deparse_utility_command(cmd); + + if (json_string != NULL) + LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_Create, + json_string, strlen(json_string) + 1); + + return PointerGetDatum(NULL); +} + +/* + * Fire table_init_rewrite triggers. + */ +void +EventTriggerTableInitWrite(Node *real_create, ObjectAddress address) +{ + List *runlist; + EventTriggerData trigdata; + CollectedCommand *command; + + /* + * See EventTriggerDDLCommandStart for a discussion about why event + * triggers are disabled in single user mode. + */ + if (!IsUnderPostmaster) + return; + + /* + * Also do nothing if our state isn't set up, which it won't be if there + * weren't any relevant event triggers at the start of the current DDL + * command. This test might therefore seem optional, but it's + * *necessary*, because EventTriggerCommonSetup might find triggers that + * didn't exist at the time the command started. + */ + if (!currentEventTriggerState) + return; + + command = currentEventTriggerState->currentCommand; + + runlist = EventTriggerCommonSetup(command->parsetree, + EVT_TableInitWrite, + "table_init_write", + &trigdata); + if (runlist == NIL) + return; + + /* Set the real CreateTable statment and object address */ + command->d.ctas.real_create = real_create; + command->d.ctas.address = address; + + /* Run the triggers. */ + EventTriggerInvoke(runlist, &trigdata); + + /* Cleanup. */ + list_free(runlist); + + /* + * Make sure anything the event triggers did will be visible to the main + * command. + */ + CommandCounterIncrement(); +} + /* * Invoke each event trigger in a list of event triggers. */ @@ -1149,7 +1290,8 @@ trackDroppedObjectsNeeded(void) */ return list_length(EventCacheLookup(EVT_SQLDrop)) > 0 || list_length(EventCacheLookup(EVT_TableRewrite)) > 0 || - list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0; + list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0 || + list_length(EventCacheLookup(EVT_TableInitWrite)) > 0; } /* @@ -1873,6 +2015,7 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) case SCT_AlterOpFamily: case SCT_CreateOpClass: case SCT_AlterTSConfig: + case SCT_CreateTableAs: { char *identity; char *type; @@ -1890,6 +2033,8 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) addr = cmd->d.createopc.address; else if (cmd->type == SCT_AlterTSConfig) addr = cmd->d.atscfg.address; + else if (cmd->type == SCT_AlterTSConfig) + addr = cmd->d.ctas.address; /* * If an object was dropped in the same command we may end diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 4a596c7..3ac20ce 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -957,6 +957,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CMDTAG_ALTER_TABLE }; + CommandTag init_commands[] = { + CMDTAG_CREATE_TABLE_AS, + CMDTAG_SELECT_INTO + }; + /* Create the ddl_command_end event trigger */ CreateDDLReplicaEventTrigger("ddl_command_end", end_commands, lengthof(end_commands), myself, puboid); @@ -968,6 +973,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Create the table_rewrite event trigger */ CreateDDLReplicaEventTrigger("table_rewrite", rewrite_commands, lengthof(rewrite_commands), myself, puboid); + + /* Create the table_init_write event trigger */ + CreateDDLReplicaEventTrigger("table_init_write", init_commands, + lengthof(init_commands), myself, puboid); } table_close(rel, RowExclusiveLock); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 6a5bcde..4d003de 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1665,8 +1665,10 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_CreateTableAsStmt: + EventTriggerTableInitWriteStart(parsetree); address = ExecCreateTableAs(pstate, (CreateTableAsStmt *) parsetree, params, queryEnv, qc); + EventTriggerTableInitWriteEnd(); break; case T_RefreshMatViewStmt: diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c index 3a9c9f0..76b2315 100644 --- a/src/backend/utils/cache/evtcache.c +++ b/src/backend/utils/cache/evtcache.c @@ -167,6 +167,8 @@ BuildEventTriggerCache(void) event = EVT_SQLDrop; else if (strcmp(evtevent, "table_rewrite") == 0) event = EVT_TableRewrite; + else if (strcmp(evtevent, "table_init_write") == 0) + event = EVT_TableInitWrite; else continue; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index bb98c17..b82a5a4 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11899,4 +11899,7 @@ { oid => '4646', descr => 'trigger for ddl command deparse table rewrite', proname => 'publication_deparse_table_rewrite', prorettype => 'event_trigger', proargtypes => '', prosrc => 'publication_deparse_table_rewrite' }, +{ oid => '4647', descr => 'trigger for ddl command deparse table init', + proname => 'publication_deparse_table_init_write', prorettype => 'event_trigger', + proargtypes => '', prosrc => 'publication_deparse_table_init_write' }, ] diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h index fd2ee7f..a9e0f71 100644 --- a/src/include/commands/event_trigger.h +++ b/src/include/commands/event_trigger.h @@ -55,6 +55,10 @@ extern void EventTriggerDDLCommandEnd(Node *parsetree); extern void EventTriggerSQLDrop(Node *parsetree); extern void EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason); +extern void EventTriggerTableInitWriteStart(Node *parsetree); +extern void EventTriggerTableInitWrite(Node *parsetree, ObjectAddress address); +extern void EventTriggerTableInitWriteEnd(void); + extern bool EventTriggerBeginCompleteQuery(void); extern void EventTriggerEndCompleteQuery(void); extern bool trackDroppedObjectsNeeded(void); diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h index b53294b..3d294a0 100644 --- a/src/include/tcop/deparse_utility.h +++ b/src/include/tcop/deparse_utility.h @@ -29,7 +29,8 @@ typedef enum CollectedCommandType SCT_AlterOpFamily, SCT_AlterDefaultPrivileges, SCT_CreateOpClass, - SCT_AlterTSConfig + SCT_AlterTSConfig, + SCT_CreateTableAs } CollectedCommandType; /* @@ -101,6 +102,12 @@ typedef struct CollectedCommand { ObjectType objtype; } defprivs; + + struct + { + ObjectAddress address; + Node *real_create; + } ctas; } d; struct CollectedCommand *parent; /* when nested */ diff --git a/src/include/utils/evtcache.h b/src/include/utils/evtcache.h index ddb67a6..1e64831 100644 --- a/src/include/utils/evtcache.h +++ b/src/include/utils/evtcache.h @@ -22,7 +22,8 @@ typedef enum EVT_DDLCommandStart, EVT_DDLCommandEnd, EVT_SQLDrop, - EVT_TableRewrite + EVT_TableRewrite, + EVT_TableInitWrite } EventTriggerEvent; typedef struct -- 2.7.2.windows.1