From 13f1992172fb5bad19bfae83aee3df7c8d3cfa5d Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Fri, 15 Apr 2022 02:02:10 +0000 Subject: [PATCH 07/12] Enable logging and replication of DDLs executed inside function and procedures. Also fixed a bug where DDL with SUBCOMMAND gets logged twice. --- .../test_decoding/expected/ddlmessages.out | 47 +++++++++++++++++-- contrib/test_decoding/sql/ddlmessages.sql | 19 +++++++- src/backend/catalog/pg_publication.c | 5 +- src/backend/commands/tablecmds.c | 6 +-- src/backend/tcop/utility.c | 26 +++++----- src/include/catalog/pg_publication.h | 2 +- src/include/commands/tablecmds.h | 3 +- src/test/subscription/t/030_rep_ddls.pl | 38 +++++++++++++++ 8 files changed, 121 insertions(+), 25 deletions(-) diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out index 0376f36c24..823029d03d 100644 --- a/contrib/test_decoding/expected/ddlmessages.out +++ b/contrib/test_decoding/expected/ddlmessages.out @@ -11,7 +11,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d init (1 row) -CREATE TABLE test_ddlmessage (id serial unique, data int); +CREATE TABLE test_ddlmessage (id serial unique primary key, data int); ALTER TABLE test_ddlmessage add c3 varchar; ALTER TABLE test_ddlmessage drop c3; DROP TABLE test_ddlmessage; @@ -24,9 +24,9 @@ CREATE TABLE test_ddlmessage (id serial unique, data int); ALTER TABLE test_ddlmessage add c3 varchar; COMMIT; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); - data ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int); + data +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int); DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3; DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage; @@ -34,6 +34,44 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; (6 rows) +-- Test logging DDL in function +CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) +RETURNS VOID AS $$ +BEGIN + execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + execute format('ALTER TABLE %I ADD c3 int', tname); + execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); +END; +$$ +LANGUAGE plpgsql; +SELECT func_ddl ('tab_from_func'); + func_ddl +---------- + +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) + + RETURNS VOID AS $$ + + BEGIN + + execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + + execute format('ALTER TABLE %I ADD c3 int', tname); + + execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + + execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); + + END; + + $$ + + LANGUAGE plpgsql; + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar); + DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int + BEGIN + table public.tab_from_func: INSERT: id[integer]:1 name[character varying]:'a' c3[integer]:null + table public.tab_from_func: INSERT: id[integer]:2 name[character varying]:'b' c3[integer]:22 + COMMIT +(7 rows) + SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- @@ -41,4 +79,5 @@ SELECT pg_drop_replication_slot('regression_slot'); (1 row) DROP TABLE test_ddlmessage; +DROP TABLE tab_from_func; DROP publication mypub; diff --git a/contrib/test_decoding/sql/ddlmessages.sql b/contrib/test_decoding/sql/ddlmessages.sql index c23610f9b4..3082671488 100644 --- a/contrib/test_decoding/sql/ddlmessages.sql +++ b/contrib/test_decoding/sql/ddlmessages.sql @@ -9,7 +9,7 @@ SET SESSION AUTHORIZATION 'ddl_replication_user'; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); -CREATE TABLE test_ddlmessage (id serial unique, data int); +CREATE TABLE test_ddlmessage (id serial unique primary key, data int); ALTER TABLE test_ddlmessage add c3 varchar; ALTER TABLE test_ddlmessage drop c3; DROP TABLE test_ddlmessage; @@ -25,7 +25,24 @@ ALTER TABLE test_ddlmessage add c3 varchar; COMMIT; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Test logging DDL in function +CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) +RETURNS VOID AS $$ +BEGIN + execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + execute format('ALTER TABLE %I ADD c3 int', tname); + execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); +END; +$$ +LANGUAGE plpgsql; + +SELECT func_ddl ('tab_from_func'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + SELECT pg_drop_replication_slot('regression_slot'); DROP TABLE test_ddlmessage; +DROP TABLE tab_from_func; DROP publication mypub; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index fad21a31d0..d5e4f3713d 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1211,15 +1211,12 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * Checks if DDL on relation (relid) need xlog for logical replication */ bool -ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel) +ddl_need_xlog(Oid relid, bool forAllTabPubOnly) { List *allTablePubs = NIL; List *tablePubs = NIL; ListCell *lc; - /* Only replicate toplevel DDL command */ - if (!isTopLevel) - return false; if (relid == InvalidOid && !forAllTabPubOnly) return false; diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 6d1487951f..3c9b6409ca 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -1333,14 +1333,14 @@ DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind) * DROP MATERIALIZED VIEW, DROP FOREIGN TABLE */ void -RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel) +RemoveRelations(ParseState *pstate, DropStmt *drop, bool isCompleteQuery) { ObjectAddresses *objects; char relkind; ListCell *cell; int flags = 0; LOCKMODE lockmode = AccessExclusiveLock; - bool ddlxlog = XLogLogicalInfoActive(); + bool ddlxlog = XLogLogicalInfoActive() && isCompleteQuery; /* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */ if (drop->concurrent) @@ -1466,7 +1466,7 @@ RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel) /* DROP RELATION or INDEX are allowed in table level DDL replication */ if (tableOid != InvalidOid && - !ddl_need_xlog(tableOid, false, isTopLevel)) + !ddl_need_xlog(tableOid, false)) ddlxlog = false; } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 29381b53b6..e9e7567209 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -87,7 +87,8 @@ static void ProcessUtilitySlow(ParseState *pstate, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc); -static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel); +static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel, + bool isCompleteQuery); /* * CommandIsReadOnly: is an executable query read-only? @@ -988,7 +989,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, context, params, queryEnv, dest, qc); else - ExecDropStmt(pstate, stmt, isTopLevel); + ExecDropStmt(pstate, stmt, isTopLevel, context != PROCESS_UTILITY_SUBCOMMAND); } break; @@ -1128,7 +1129,7 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) * there is any FOR ALL TABLES publication with pubddl_database on. * i.e. Database level DDL replication is on for some publication. */ - if (ddl_need_xlog(InvalidOid, true, true)) + if (ddl_need_xlog(InvalidOid, true)) { bool transactional = true; const char* prefix = ""; @@ -1170,7 +1171,7 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) * there is any FOR ALL TABLES publication with pubddl_database on. * i.e. Database level DDL replication is on for some publication. */ - if (ddl_need_xlog(InvalidOid, true, true)) + if (ddl_need_xlog(InvalidOid, true)) { bool transactional = true; const char* prefix = ""; @@ -1221,7 +1222,7 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString) * there is any FOR ALL TABLES publication with pubddl_database on. * i.e. Database level DDL replication is on for some publication. */ - if (ddl_need_xlog(InvalidOid, true, true)) + if (ddl_need_xlog(InvalidOid, true)) { bool transactional = true; const char* prefix = ""; @@ -1318,7 +1319,7 @@ ProcessUtilitySlow(ParseState *pstate, * Consider logging the DDL command if logical logging is enabled and this is * a complete top level query. */ - if (XLogLogicalInfoActive() && isTopLevel) + if (XLogLogicalInfoActive()) LogLogicalDDLCommand(parsetree, queryString); } @@ -1530,7 +1531,8 @@ ProcessUtilitySlow(ParseState *pstate, * this TABLE belongs to any publication with table level ddl on */ if (XLogLogicalInfoActive() && - ddl_need_xlog(relid, false, isTopLevel)) + isCompleteQuery && + ddl_need_xlog(relid, false)) { bool transactional = true; const char* prefix = ""; @@ -1766,7 +1768,8 @@ ProcessUtilitySlow(ParseState *pstate, * this TABLE belongs to any publication with table level ddl on. */ if (XLogLogicalInfoActive() && - ddl_need_xlog(relid, false, isTopLevel)) + isCompleteQuery && + ddl_need_xlog(relid, false)) { bool transactional = true; const char* prefix = ""; @@ -1999,7 +2002,7 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_DropStmt: - ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel); + ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel, isCompleteQuery); /* no commands stashed for DROP */ commandCollected = true; break; @@ -2220,7 +2223,8 @@ ProcessUtilityForAlterTable(Node *stmt, AlterTableUtilityContext *context) * Dispatch function for DropStmt */ static void -ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel) +ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel, + bool isCompleteQuery) { switch (stmt->removeType) { @@ -2235,7 +2239,7 @@ ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel) case OBJECT_VIEW: case OBJECT_MATVIEW: case OBJECT_FOREIGN_TABLE: - RemoveRelations(pstate, stmt, isTopLevel); + RemoveRelations(pstate, stmt, isCompleteQuery); break; default: RemoveObjects(stmt); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index a94e152256..8c114b2447 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -166,6 +166,6 @@ extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); -extern bool ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel); +extern bool ddl_need_xlog(Oid relid, bool forAllTabPubOnly); #endif /* PG_PUBLICATION_H */ diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 68781365de..24106de2e5 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -28,7 +28,8 @@ struct AlterTableUtilityContext; /* avoid including tcop/utility.h here */ extern ObjectAddress DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, ObjectAddress *typaddress, const char *queryString); -extern void RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel); +extern void RemoveRelations(ParseState *pstate, DropStmt *drop, + bool isCompleteQuery); extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode); diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl index 4e05212068..34b9d51eb1 100644 --- a/src/test/subscription/t/030_rep_ddls.pl +++ b/src/test/subscription/t/030_rep_ddls.pl @@ -348,6 +348,44 @@ $node_publisher->wait_for_catchup('mysub'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_cast c, pg_catalog.pg_proc p WHERE p.proname='add' AND c.castfunc=p.oid;"); is($result, qq(1), 'CreateCast Stmt is replicated'); +#TEST DDL in function +$node_publisher->safe_psql('postgres', qq{ +CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) +RETURNS VOID AS \$\$ +BEGIN + execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + execute format('ALTER TABLE %I ADD c3 int', tname); + execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); +END; +\$\$ +LANGUAGE plpgsql;}); + +$node_publisher->safe_psql('postgres', "SELECT func_ddl('func_table');"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.func_table where c3 = 22;"); +is($result, qq(1), 'DDLs in function are replicated'); + +#TEST DDL in procedure +$node_publisher->safe_psql('postgres', qq{ +CREATE OR REPLACE procedure proc_ddl (tname varchar(20)) +LANGUAGE plpgsql AS \$\$ +BEGIN + execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); + execute format('ALTER TABLE %I ADD c3 int', tname); + execute format('INSERT INTO %I VALUES (1, ''a'');', tname); + execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); +END \$\$;}); + +$node_publisher->safe_psql('postgres', "CALL proc_ddl('proc_table');"); + +$node_publisher->wait_for_catchup('mysub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.proc_table where c3 = 22;"); +is($result, qq(1), 'DDLs in procedure are replicated'); + #TODO TEST certain DDLs are not replicated pass "DDL replication tests passed!"; -- 2.32.0