From 7ab1fd432489f3dbd056fa897786cc5315c7d8ac Mon Sep 17 00:00:00 2001 From: "Zheng (Zane) Li" Date: Fri, 18 Mar 2022 16:57:23 +0000 Subject: [PATCH 02/12] Support logical logging and decoding of DDL command string. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A new WAL record type xl_logical_ddl_message is introduced to support logical logging of DDL command. xl_logical_ddl_message is similar to the existing xl_logical_message for generic message logging. The reason for not using xl_logical_message directly as proposed initially is I found out we need to log more information (such as user role, search path and potentially more in the future) than just one string, and we don’t want to make too much changes to the existing xl_logical_message which may break its current consumers. The logging of DDL command string is processed in function LogLogicalDDLCommand. We categorize DDL command types into three categories in this function: 1. replicated in database level replication only (such as CREATE TABLE, CREATE FUNCTION). 2. replicated in database or table level replication depending on the configuration (such as ALTER TABLE). 3. not supported for replication or pending investigation. Support logical decoding of the new WAL record xl_logical_ddl_message. This is similar to the logical decoding of xl_logical_message. Tests for this change are added in the test_decoding plugin. --- contrib/test_decoding/Makefile | 4 +- .../test_decoding/expected/ddlmessages.out | 42 ++++ contrib/test_decoding/sql/ddlmessages.sql | 28 +++ contrib/test_decoding/test_decoding.c | 61 +++++- src/backend/access/rmgrdesc/Makefile | 1 + .../access/rmgrdesc/logicalddlmsgdesc.c | 54 +++++ src/backend/access/transam/rmgr.c | 1 + src/backend/catalog/pg_publication.c | 52 +++++ src/backend/commands/tablecmds.c | 43 +++- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/ddlmessage.c | 99 +++++++++ src/backend/replication/logical/decode.c | 56 +++++ src/backend/replication/logical/logical.c | 91 ++++++++ .../replication/logical/reorderbuffer.c | 204 +++++++++++++++++- src/backend/tcop/utility.c | 201 ++++++++++++++++- src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/rmgrlist.h | 1 + src/include/commands/tablecmds.h | 3 +- src/include/replication/ddlmessage.h | 47 ++++ src/include/replication/decode.h | 1 + src/include/replication/output_plugin.h | 29 +++ src/include/replication/reorderbuffer.h | 39 ++++ src/test/regress/expected/publication.out | 16 +- 23 files changed, 1054 insertions(+), 21 deletions(-) create mode 100644 contrib/test_decoding/expected/ddlmessages.out create mode 100644 contrib/test_decoding/sql/ddlmessages.sql create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c create mode 100644 src/backend/replication/logical/ddlmessage.c create mode 100644 src/include/replication/ddlmessage.h diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index b220906479..e58a76d3f1 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,9 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream \ + ddlmessages + ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out new file mode 100644 index 0000000000..79284f9def --- /dev/null +++ b/contrib/test_decoding/expected/ddlmessages.out @@ -0,0 +1,42 @@ +-- predictability +SET synchronous_commit = on; +-- turn on logical ddl message logging +CREATE publication mypub FOR ALL TABLES with (ddl = 'database'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE test_ddlmessage (id serial unique, data int); +ALTER TABLE test_ddlmessage add c3 varchar; +ALTER TABLE test_ddlmessage drop c3; +DROP TABLE test_ddlmessage; +BEGIN; +CREATE TABLE test_ddlmessage (id serial unique, data int); +ALTER TABLE test_ddlmessage add c3 varchar; +ROLLBACK; +BEGIN; +CREATE TABLE test_ddlmessage (id serial unique, data int); +ALTER TABLE test_ddlmessage add c3 varchar; +COMMIT; +\o | sed 's/role.*search_path/role: redacted, search_path/g' +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_drop_replication_slot('regression_slot'); +DROP TABLE test_ddlmessage; +DROP publication mypub; + data +-------------------------------------------------------------------------------------------------------------------------------------------------------------- + DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int); + DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; + DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3; + DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage; + DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int); + DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar; +(6 rows) + + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/ddlmessages.sql b/contrib/test_decoding/sql/ddlmessages.sql new file mode 100644 index 0000000000..211497ee22 --- /dev/null +++ b/contrib/test_decoding/sql/ddlmessages.sql @@ -0,0 +1,28 @@ +-- predictability +SET synchronous_commit = on; +-- turn on logical ddl message logging +CREATE publication mypub FOR ALL TABLES with (ddl = 'database'); + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE test_ddlmessage (id serial unique, data int); +ALTER TABLE test_ddlmessage add c3 varchar; +ALTER TABLE test_ddlmessage drop c3; +DROP TABLE test_ddlmessage; + +BEGIN; +CREATE TABLE test_ddlmessage (id serial unique, data int); +ALTER TABLE test_ddlmessage add c3 varchar; +ROLLBACK; + +BEGIN; +CREATE TABLE test_ddlmessage (id serial unique, data int); +ALTER TABLE test_ddlmessage add c3 varchar; +COMMIT; + +\o | sed 's/role.*search_path/role: redacted, search_path/g' +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_drop_replication_slot('regression_slot'); +DROP TABLE test_ddlmessage; +DROP publication mypub; + diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 3736da6784..a44e1f79e3 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -76,6 +76,11 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + const char *role, const char *search_path, + Size sz, const char *message); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); @@ -116,6 +121,11 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + const char *role, const char *search_path, + Size sz, const char *message); static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], @@ -141,6 +151,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->ddlmessage_cb = pg_decode_ddlmessage; cb->filter_prepare_cb = pg_decode_filter_prepare; cb->begin_prepare_cb = pg_decode_begin_prepare_txn; cb->prepare_cb = pg_decode_prepare_txn; @@ -153,6 +164,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; + cb->stream_ddlmessage_cb = pg_decode_stream_ddlmessage; cb->stream_truncate_cb = pg_decode_stream_truncate; } @@ -747,7 +759,8 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, - const char *prefix, Size sz, const char *message) + const char *prefix, Size sz, + const char *message) { OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", @@ -756,6 +769,19 @@ pg_decode_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } +static void +pg_decode_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, const char *role, const char *search_path, + Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", + transactional, prefix, role, search_path, sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} + static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) @@ -936,7 +962,8 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, - const char *prefix, Size sz, const char *message) + const char *prefix, Size sz, + const char *message) { OutputPluginPrepareWrite(ctx, true); @@ -948,7 +975,35 @@ pg_decode_stream_message(LogicalDecodingContext *ctx, else { appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:", - transactional, prefix, sz); + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + } + + OutputPluginWrite(ctx, true); +} + +/* + * In streaming mode, we don't display the contents for transactional messages + * as the transaction can abort at a later point in time. We don't want users to + * see the message contents until the transaction is committed. + */ +static void +pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, const char * role, const char * search_path, + Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + + if (transactional) + { + appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu", + transactional, prefix, role, search_path, sz); + } + else + { + appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:", + transactional, prefix, role, search_path, sz); appendBinaryStringInfo(ctx->out, message, sz); } diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd86..b8e29e8df3 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -19,6 +19,7 @@ OBJS = \ hashdesc.o \ heapdesc.o \ logicalmsgdesc.o \ + logicalddlmsgdesc.o \ mxactdesc.o \ nbtdesc.o \ relmapdesc.o \ diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c new file mode 100644 index 0000000000..7a352d540a --- /dev/null +++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * logicalddlmsgdesc.c + * rmgr descriptor routines for replication/logical/ddlmessage.c + * + * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/logicalddlmsgdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "replication/ddlmessage.h" + +void +logicalddlmsg_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_LOGICAL_DDL_MESSAGE) + { + xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec; + char *prefix = xlrec->message; + char *role = xlrec->message + xlrec->prefix_size; + char *search_path = xlrec->message + xlrec->prefix_size + xlrec->role_size; + char *message = xlrec->message + xlrec->prefix_size + xlrec->role_size + xlrec->search_path_size; + char *sep = ""; + + Assert(prefix[xlrec->prefix_size] != '\0'); + + appendStringInfo(buf, "%s, prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ", + xlrec->transactional ? "transactional" : "non-transactional", + prefix, role, search_path, xlrec->message_size); + /* Write message payload as a series of hex bytes */ + for (int cnt = 0; cnt < xlrec->message_size; cnt++) + { + appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]); + sep = " "; + } + } +} + +const char * +logicalddlmsg_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE) + return "DDL MESSAGE"; + + return NULL; +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 8ed69244e3..6db9a593b8 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -29,6 +29,7 @@ #include "miscadmin.h" #include "replication/decode.h" #include "replication/message.h" +#include "replication/ddlmessage.h" #include "replication/origin.h" #include "storage/standby.h" #include "utils/builtins.h" diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index b8ab1d8141..fad21a31d0 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1206,3 +1206,55 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Checks if DDL on relation (relid) need xlog for logical replication + */ +bool +ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel) +{ + List *allTablePubs = NIL; + List *tablePubs = NIL; + ListCell *lc; + + /* Only replicate toplevel DDL command */ + if (!isTopLevel) + return false; + if (relid == InvalidOid && !forAllTabPubOnly) + return false; + + /* + * Log the DDL command if + * there is any FOR ALL TABLES publication with pubddl_database on + * or + * this TABLE belongs to any non FOR ALL publications with pubddl_table on + */ + allTablePubs = GetAllTablesPublications(); + foreach(lc, allTablePubs) + { + Oid pubid = lfirst_oid(lc); + Publication *pub = GetPublication(pubid); + + if (pub->pubactions.pubddl_database) + return true; + } + + /* + * If forAllTabPubOnly is true (i.e. database level replication is required for the DDL + * to be logged), we can bail now since no publication has been found with pubddl_database on + */ + if (forAllTabPubOnly) + return false; + + tablePubs = GetRelationPublications(relid); + foreach(lc, tablePubs) + { + Oid pubid = lfirst_oid(lc); + Publication *pub = GetPublication(pubid); + + if (pub->pubactions.pubddl_table) + return true; + } + + return false; +} diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 2de0ebacec..6d1487951f 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -80,6 +80,7 @@ #include "partitioning/partbounds.h" #include "partitioning/partdesc.h" #include "pgstat.h" +#include "replication/ddlmessage.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" @@ -1332,13 +1333,14 @@ DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind) * DROP MATERIALIZED VIEW, DROP FOREIGN TABLE */ void -RemoveRelations(DropStmt *drop) +RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel) { ObjectAddresses *objects; char relkind; ListCell *cell; int flags = 0; LOCKMODE lockmode = AccessExclusiveLock; + bool ddlxlog = XLogLogicalInfoActive(); /* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */ if (drop->concurrent) @@ -1437,10 +1439,37 @@ RemoveRelations(DropStmt *drop) /* Not there? */ if (!OidIsValid(relOid)) { + ddlxlog = false; DropErrorMsgNonExistent(rel, relkind, drop->missing_ok); continue; } + /* + * Only log DROP RELATION cmd for logical replication if + * there is any FOR ALL TABLES publication with pubddl_database on or + * every relation to be dropped belongs to any non FOR ALL publications with pubddl_table on + */ + if (ddlxlog) + { + Oid tableOid = InvalidOid; + + if (relkind == RELKIND_RELATION) + tableOid = relOid; + else if (relkind == RELKIND_INDEX) + tableOid = IndexGetRelation(relOid, true); + /* + * Other relation types require database level ddl replication and are + * already logged in LogLogicalDDLCommand() if needed. + */ + else + ddlxlog = false; + + /* DROP RELATION or INDEX are allowed in table level DDL replication */ + if (tableOid != InvalidOid && + !ddl_need_xlog(tableOid, false, isTopLevel)) + ddlxlog = false; + } + /* * Decide if concurrent mode needs to be used here or not. The * callback retrieved the rel's persistence for us. @@ -1484,6 +1513,18 @@ RemoveRelations(DropStmt *drop) add_exact_object_address(&obj, objects); } + /* Log the Drop command for logical replication */ + if (ddlxlog) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + pstate->p_sourcetext, + strlen(pstate->p_sourcetext), + transactional); + } + performMultipleDeletions(objects, drop->behavior, flags); free_object_addresses(objects); diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fdeb71..f3eeb67312 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ decode.o \ + ddlmessage.o\ launcher.o \ logical.o \ logicalfuncs.o \ diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c new file mode 100644 index 0000000000..f93573079a --- /dev/null +++ b/src/backend/replication/logical/ddlmessage.c @@ -0,0 +1,99 @@ +/*------------------------------------------------------------------------- + * + * ddlmessage.c + * Logical DDL messages. + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/ddlmessage.c + * + * NOTES + * + * Logical DDL messages allow XLOG logging of DDL command strings that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * Simiarl to the generic logical messages, These DDL messages can be either + * transactional or non-transactional. Note by default DDLs in PostgreSQL are + * transactional. + * Transactional messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * Non-transactional messages are sent to the plugin at the time when the + * logical decoding reads them from XLOG. This also means that transactional + * messages won't be delivered if the transaction was rolled back but the + * non-transactional one will always be delivered. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xloginsert.h" +#include "catalog/namespace.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "replication/logical.h" +#include "replication/ddlmessage.h" +#include "utils/memutils.h" + +/* + * Write logical decoding DDL message into XLog. + */ +XLogRecPtr +LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message, + size_t size, bool transactional) +{ + xl_logical_ddl_message xlrec; + const char *role; + + role = GetUserNameFromId(roleoid, false); + + /* + * Force xid to be allocated if we're emitting a transactional message. + */ + if (transactional) + { + Assert(IsTransactionState()); + GetCurrentTransactionId(); + } + + xlrec.dbId = MyDatabaseId; + xlrec.transactional = transactional; + /* trailing zero is critical; see logicalddlmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.role_size = strlen(role) + 1; + xlrec.search_path_size = strlen(namespace_search_path) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage); + XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + XLogRegisterData(unconstify(char *, role), xlrec.role_size); + XLogRegisterData(namespace_search_path, xlrec.search_path_size); + XLogRegisterData(unconstify(char *, message), size); + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding ddl messages. + */ +void +logicalddlmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index aa2427ba73..034c7f2413 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -36,6 +36,7 @@ #include "access/xlogutils.h" #include "catalog/pg_control.h" #include "replication/decode.h" +#include "replication/ddlmessage.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/origin.h" @@ -603,6 +604,61 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message->message + message->prefix_size); } +/* + * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer(). + */ +void +logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + RepOriginId origin_id = XLogRecGetOrigin(r); + Snapshot snapshot; + xl_logical_ddl_message *message; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding ddl messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + message = (xl_logical_ddl_message *) XLogRecGetData(r); + + if (message->dbId != ctx->slot->data.database || + FilterByOrigin(ctx, origin_id)) + return; + + if (message->transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!message->transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueDDLMessage(ctx->reorder, xid, snapshot, buf->endptr, + message->transactional, + message->message, + /* first part of message is prefix */ + message->message + message->prefix_size, + /* Second part of message is role*/ + message->message + message->prefix_size + message->role_size, + /* Third part of message is search_path */ + message->message_size, + message->message + message->prefix_size + + message->role_size + message->search_path_size); +} + /* * Consolidated commit record handling between the different form of commit * records. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 625a7f4273..3004f02433 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, const char *search_path, + Size message_size, const char *message); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -90,6 +94,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, const char *search_path, + Size message_size, const char *message); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -218,6 +226,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->ddlmessage = ddlmessage_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -234,6 +243,7 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_ddlmessage_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* @@ -251,6 +261,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_ddlmessage = stream_ddlmessage_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -1220,6 +1231,44 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, + const char *search_path, Size message_size, + const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.ddlmessage_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "ddlmessage"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + role, search_path, message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) @@ -1535,6 +1584,48 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, const char *role, + const char* search_path, Size message_size, + const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_ddlmessage_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_ddlmessage"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix, + role, search_path, message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8da5f9089c..ca01336604 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -512,6 +512,20 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, pfree(change->data.msg.message); change->data.msg.message = NULL; break; + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + if (change->data.ddlmsg.prefix != NULL) + pfree(change->data.ddlmsg.prefix); + change->data.ddlmsg.prefix = NULL; + if (change->data.ddlmsg.role != NULL) + pfree(change->data.ddlmsg.role); + change->data.ddlmsg.role = NULL; + if (change->data.ddlmsg.search_path != NULL) + pfree(change->data.ddlmsg.search_path); + change->data.ddlmsg.search_path = NULL; + if (change->data.ddlmsg.message != NULL) + pfree(change->data.ddlmsg.message); + change->data.ddlmsg.message = NULL; + break; case REORDER_BUFFER_CHANGE_INVALIDATION: if (change->data.inval.invalidations) pfree(change->data.inval.invalidations); @@ -866,6 +880,64 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * A transactional DDL message is queued to be processed upon commit and a + * non-transactional DDL message gets processed immediately. + */ +void +ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + const char *role, const char *search_path, + Size message_size, const char *message) +{ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE; + change->data.ddlmsg.prefix = pstrdup(prefix); + change->data.ddlmsg.role = pstrdup(role); + change->data.ddlmsg.search_path = pstrdup(search_path); + change->data.ddlmsg.message_size = message_size; + change->data.ddlmsg.message = palloc(message_size); + memcpy(change->data.ddlmsg.message, message, message_size); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); + } + else + { + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + PG_TRY(); + { + rb->ddlmessage(rb, txn, lsn, false, prefix, role, search_path, message_size, message); + + TeardownHistoricSnapshot(false); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -1957,6 +2029,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message); } +/* + * Helper function for ReorderBufferProcessTXN for applying the DDL message. + */ +static inline void +ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_ddlmessage(rb, txn, change->lsn, true, + change->data.ddlmsg.prefix, + change->data.ddlmsg.role, + change->data.ddlmsg.search_path, + change->data.ddlmsg.message_size, + change->data.ddlmsg.message); + else + rb->ddlmessage(rb, txn, change->lsn, true, + change->data.ddlmsg.prefix, + change->data.ddlmsg.role, + change->data.ddlmsg.search_path, + change->data.ddlmsg.message_size, + change->data.ddlmsg.message); +} + /* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. @@ -2335,6 +2430,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferApplyMessage(rb, txn, change, streaming); break; + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + ReorderBufferApplyDDLMessage(rb, txn, change, streaming); + break; + case REORDER_BUFFER_CHANGE_INVALIDATION: /* Execute the invalidation messages locally */ ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, @@ -3708,6 +3807,53 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + char *data; + Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1; + Size role_size = strlen(change->data.ddlmsg.role) + 1; + Size search_path_size = strlen(change->data.ddlmsg.search_path) + 1; + + sz += prefix_size + role_size + search_path_size + + change->data.ddlmsg.message_size + + sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size); + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* write the prefix including the size */ + memcpy(data, &prefix_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.prefix, + prefix_size); + data += prefix_size; + + /* write the role including the size */ + memcpy(data, &role_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.role, + role_size); + data += role_size; + + /* write the search_path including the size */ + memcpy(data, &search_path_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.search_path, + search_path_size); + data += search_path_size; + + /* write the message including the size */ + memcpy(data, &change->data.ddlmsg.message_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddlmsg.message, + change->data.ddlmsg.message_size); + data += change->data.ddlmsg.message_size; + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: @@ -4022,6 +4168,18 @@ ReorderBufferChangeSize(ReorderBufferChange *change) sz += prefix_size + change->data.msg.message_size + sizeof(Size) + sizeof(Size); + break; + } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1; + Size role_size = strlen(change->data.ddlmsg.role) + 1; + Size search_path_size = strlen(change->data.ddlmsg.search_path) + 1; + + sz += prefix_size + role_size + search_path_size + + change->data.ddlmsg.message_size + + sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size); + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: @@ -4282,8 +4440,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* read prefix */ memcpy(&prefix_size, data, sizeof(Size)); data += sizeof(Size); - change->data.msg.prefix = MemoryContextAlloc(rb->context, - prefix_size); + change->data.msg.prefix = MemoryContextAlloc(rb->context, prefix_size); memcpy(change->data.msg.prefix, data, prefix_size); Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); data += prefix_size; @@ -4297,6 +4454,49 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_DDLMESSAGE: + { + Size prefix_size; + Size role_size; + Size search_path_size; + + /* read prefix */ + memcpy(&prefix_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size); + memcpy(change->data.ddlmsg.prefix, data, prefix_size); + Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0'); + data += prefix_size; + + /* read role */ + memcpy(&role_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.ddlmsg.role = MemoryContextAlloc(rb->context, + role_size); + memcpy(change->data.ddlmsg.role, data, role_size); + Assert(change->data.ddlmsg.role[role_size - 1] == '\0'); + data += role_size; + + /* read search_path */ + memcpy(&search_path_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.ddlmsg.search_path = MemoryContextAlloc(rb->context, + search_path_size); + memcpy(change->data.ddlmsg.search_path, data, search_path_size); + Assert(change->data.ddlmsg.search_path[search_path_size - 1] == '\0'); + data += search_path_size; + + /* read the message */ + memcpy(&change->data.msg.message_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.message = MemoryContextAlloc(rb->context, + change->data.msg.message_size); + memcpy(change->data.msg.message, data, + change->data.msg.message_size); + data += change->data.msg.message_size; + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 6a5bcded55..5fe54f742a 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -62,6 +62,7 @@ #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" +#include "replication/ddlmessage.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteRemove.h" #include "storage/fd.h" @@ -86,7 +87,7 @@ static void ProcessUtilitySlow(ParseState *pstate, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc); -static void ExecDropStmt(DropStmt *stmt, bool isTopLevel); +static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel); /* * CommandIsReadOnly: is an executable query read-only? @@ -987,7 +988,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, context, params, queryEnv, dest, qc); else - ExecDropStmt(stmt, isTopLevel); + ExecDropStmt(pstate, stmt, isTopLevel); } break; @@ -1087,6 +1088,154 @@ standard_ProcessUtility(PlannedStmt *pstmt, CommandCounterIncrement(); } +/* + * Log a DDL command for logical replication + * Some DDLs are only replicated in Database Level DDL replication + * Some can be replicated in Table Level DDL replication. + * + * Currently we focus on supporting Database Level DDL replication + */ +static void +LogLogicalDDLCommand(Node *parsetree, const char *queryString) +{ + switch (nodeTag(parsetree)) + { + /* Fisrtly, commands that are only supported in Database level DDL replication */ + case T_CreateSchemaStmt: + case T_CreateStmt: + case T_CreateForeignTableStmt: + case T_AlterDomainStmt: + case T_DefineStmt: + case T_CompositeTypeStmt: + case T_CreateEnumStmt: + case T_CreateRangeStmt: + case T_AlterEnumStmt: + case T_ViewStmt: + case T_CreateFunctionStmt: + case T_AlterFunctionStmt: + case T_CreateTrigStmt: + case T_CreateDomainStmt: + case T_CreateCastStmt: + case T_CreateOpClassStmt: + case T_CreateOpFamilyStmt: + case T_AlterOpFamilyStmt: + case T_AlterOperatorStmt: + case T_AlterTypeStmt: + case T_GrantStmt: + case T_AlterCollationStmt: + /* + * Log these stmt for logical replication if + * there is any FOR ALL TABLES publication with pubddl_database on. + * i.e. Database level DDL replication is on for some publication. + */ + if (ddl_need_xlog(InvalidOid, true, true)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + break; + + /* + * Secondly, commands that may be allowed in Table level DDL replication. + * These are currently handled in the later execution path of the command. + * Because we need to get the relation id which readily available in later + * code path. + */ + case T_AlterTableStmt: + case T_IndexStmt: + case T_RenameStmt: /* TODO */ + case T_AlterOwnerStmt: /* TODO */ + break; + + /* DropStmt depends on the removeType */ + case T_DropStmt: + { + DropStmt* stmt = (DropStmt*) parsetree; + switch (stmt->removeType) + { + /* Maybe allowed in Table level DDL replication, handled in later code path */ + case OBJECT_INDEX: + case OBJECT_TABLE: + break; + /* Drop of sequence is by logical replication of sequences separately */ + case OBJECT_SEQUENCE: + break; + /* Drop of other objects are allowed in Database level DDL replication only */ + case OBJECT_VIEW: + case OBJECT_MATVIEW: + case OBJECT_FOREIGN_TABLE: + default: + /* + * Log these DropStmt for logical replication if + * there is any FOR ALL TABLES publication with pubddl_database on. + * i.e. Database level DDL replication is on for some publication. + */ + if (ddl_need_xlog(InvalidOid, true, true)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + break; + } + } + /* + * Lastly, rule out DDLs we don't replicate yet in DDL replication + * Some of these can be supported, we just need to investigate and run tests. + */ + case T_CreateExtensionStmt: + case T_AlterExtensionStmt: + case T_AlterExtensionContentsStmt: + case T_CreateFdwStmt: + case T_AlterFdwStmt: + case T_CreateForeignServerStmt: + case T_AlterForeignServerStmt: + case T_CreateUserMappingStmt: + case T_AlterUserMappingStmt: + case T_DropUserMappingStmt: + case T_ImportForeignSchemaStmt: + case T_RuleStmt: + case T_CreateSeqStmt: + case T_AlterSeqStmt: + case T_CreateTableAsStmt: + case T_RefreshMatViewStmt: + case T_CreatePLangStmt: + case T_CreateConversionStmt: + case T_CreateTransformStmt: + case T_AlterTSDictionaryStmt: + case T_AlterTSConfigurationStmt: + case T_AlterTableMoveAllStmt: + case T_AlterObjectDependsStmt: + case T_AlterObjectSchemaStmt: + case T_CommentStmt: + case T_DropOwnedStmt: + case T_AlterDefaultPrivilegesStmt: + case T_CreatePolicyStmt: + case T_AlterPolicyStmt: + case T_SecLabelStmt: + case T_CreateAmStmt: + case T_CreatePublicationStmt: + case T_AlterPublicationStmt: + case T_CreateSubscriptionStmt: + case T_AlterSubscriptionStmt: + case T_DropSubscriptionStmt: + case T_CreateStatsStmt: + case T_AlterStatsStmt: + break; + default: + break; + } +} + /* * The "Slow" variant of ProcessUtility should only receive statements * supported by the event triggers facility. Therefore, we always @@ -1119,6 +1268,13 @@ ProcessUtilitySlow(ParseState *pstate, if (isCompleteQuery) EventTriggerDDLCommandStart(parsetree); + /* + * Consider logging the DDL command if logical logging is enabled and this is + * a top level query. + */ + if (XLogLogicalInfoActive() && isTopLevel) + LogLogicalDDLCommand(parsetree, queryString); + switch (nodeTag(parsetree)) { /* @@ -1321,6 +1477,23 @@ ProcessUtilitySlow(ParseState *pstate, EventTriggerAlterTableStart(parsetree); EventTriggerAlterTableRelid(relid); + /* + * Log the ALTER TABLE command if + * There is any publication with database level ddl on or + * this TABLE belongs to any publication with table level ddl on + */ + if (XLogLogicalInfoActive() && + ddl_need_xlog(relid, false, isTopLevel)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + /* ... and do it */ AlterTable(atstmt, lockmode, &atcontext); @@ -1539,6 +1712,24 @@ ProcessUtilitySlow(ParseState *pstate, /* ... and do it */ EventTriggerAlterTableStart(parsetree); + + /* + * Log CREATE INDEX cmd for logical replication if + * there is any publication with database level ddl on or + * this TABLE belongs to any publication with table level ddl on. + */ + if (XLogLogicalInfoActive() && + ddl_need_xlog(relid, false, isTopLevel)) + { + bool transactional = true; + const char* prefix = ""; + LogLogicalDDLMessage(prefix, + GetUserId(), + queryString, + strlen(queryString), + transactional); + } + address = DefineIndex(relid, /* OID of heap relation */ stmt, @@ -1761,7 +1952,7 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_DropStmt: - ExecDropStmt((DropStmt *) parsetree, isTopLevel); + ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel); /* no commands stashed for DROP */ commandCollected = true; break; @@ -1982,7 +2173,7 @@ ProcessUtilityForAlterTable(Node *stmt, AlterTableUtilityContext *context) * Dispatch function for DropStmt */ static void -ExecDropStmt(DropStmt *stmt, bool isTopLevel) +ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel) { switch (stmt->removeType) { @@ -1997,7 +2188,7 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel) case OBJECT_VIEW: case OBJECT_MATVIEW: case OBJECT_FOREIGN_TABLE: - RemoveRelations(stmt); + RemoveRelations(pstate, stmt, isTopLevel); break; default: RemoveObjects(stmt); diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17bb4c..792f438959 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -27,6 +27,7 @@ #include "commands/sequence.h" #include "commands/tablespace.h" #include "replication/message.h" +#include "replication/ddlmessage.h" #include "replication/origin.h" #include "rmgrdesc.h" #include "storage/standbydefs.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 9a74721c97..9de3b8f2eb 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddlmsg_decode) diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 5d4037f26e..68781365de 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -18,6 +18,7 @@ #include "catalog/dependency.h" #include "catalog/objectaddress.h" #include "nodes/parsenodes.h" +#include "parser/parse_node.h" #include "storage/lock.h" #include "utils/relcache.h" @@ -27,7 +28,7 @@ struct AlterTableUtilityContext; /* avoid including tcop/utility.h here */ extern ObjectAddress DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, ObjectAddress *typaddress, const char *queryString); -extern void RemoveRelations(DropStmt *drop); +extern void RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel); extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode); diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h new file mode 100644 index 0000000000..1e8ef22296 --- /dev/null +++ b/src/include/replication/ddlmessage.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * ddlmessage.h + * Exports from replication/logical/ddlmessage.c + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * src/include/replication/ddlmessage.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_DDL_MESSAGE_H +#define PG_LOGICAL_DDL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" + +/* + * Generic logical decoding DDL message wal record. + */ +typedef struct xl_logical_ddl_message +{ + Oid dbId; /* database Oid emitted from */ + bool transactional; /* is message transactional? */ + Size prefix_size; /* length of prefix */ + Size role_size; /* length of the role that executes the DDL command */ + Size search_path_size; /* length of the search path */ + Size message_size; /* size of the message */ + /* + * payload, including null-terminated prefix of length prefix_size + * and null-terminated role of length role_size + * and null-terminated search_path of length search_path_size + */ + char message[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_ddl_message; + +#define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message)) + +extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *ddl_message, + size_t size, bool transactional); + +/* RMGR API*/ +#define XLOG_LOGICAL_DDL_MESSAGE 0x00 +void logicalddlmsg_redo(XLogReaderState *record); +void logicalddlmsg_desc(StringInfo buf, XLogReaderState *record); +const char *logicalddlmsg_identify(uint8 info); + +#endif /* PG_LOGICAL_DDL_MESSAGE_H */ diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 741bf65cf7..427a7b997d 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 539dc8e697..5b1c245b72 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, Size message_size, const char *message); +/* + * Called for the logical decoding DDL messages. + */ +typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size message_size, + const char *message); + /* * Filter changes by origin. */ @@ -199,6 +212,20 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx Size message_size, const char *message); +/* + * Callback for streaming logical decoding DDL messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size message_size, + const char *message); + /* * Callback for streaming truncates from in-progress transactions. */ @@ -219,6 +246,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeDDLMessageCB ddlmessage_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -237,6 +265,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamDDLMessageCB stream_ddlmessage_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4a01f877e5..dd89e08efc 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -56,6 +56,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_DDLMESSAGE, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, @@ -130,6 +131,16 @@ typedef struct ReorderBufferChange char *message; } msg; + /* DDL Message. */ + struct + { + char *prefix; + char *role; + char *search_path; + Size message_size; + char *message; + } ddlmsg; + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; @@ -430,6 +441,17 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* DDL message callback signature */ +typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size sz, + const char *message); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -496,6 +518,18 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream DDL message callback signature */ +typedef void (*ReorderBufferStreamDDLMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + const char *role, + const char *search_path, + Size sz, + const char *message); + /* stream truncate callback signature */ typedef void (*ReorderBufferStreamTruncateCB) ( ReorderBuffer *rb, @@ -541,6 +575,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferDDLMessageCB ddlmessage; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -560,6 +595,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamDDLMessageCB stream_ddlmessage; ReorderBufferStreamTruncateCB stream_truncate; /* @@ -635,6 +671,9 @@ extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, const char *role, + const char *search_path, Size message_size, const char *message); extern void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index e9cea11a5b..5b4bee74cb 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -676,10 +676,10 @@ CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); RESET client_min_messages; ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok \dRp+ testpub_table_ins - Publication testpub_table_ins - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | f | f | t | f + Publication testpub_table_ins + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL +--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+----------------- + regress_publication_user | f | t | f | f | t | f | f | f Tables: "public.testpub_tbl5" (a) @@ -821,10 +821,10 @@ CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c)); ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey; ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1); \dRp+ testpub_both_filters - Publication testpub_both_filters - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_both_filters + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL +--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+----------------- + regress_publication_user | f | t | t | t | t | f | f | f Tables: "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1) -- 2.32.0