From 0964e4eeab145d1375818b2389b49c083ee396a2 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 23 Feb 2023 05:58:26 -0500 Subject: [PATCH v74 5/8] DDL messaging infrastructure for DDL replication. DDL messaging infrastructure for DDL replication. --- contrib/test_decoding/expected/ddl.out | 26 ++++ contrib/test_decoding/sql/ddl.sql | 5 + contrib/test_decoding/test_decoding.c | 48 +++++++ src/backend/access/rmgrdesc/Makefile | 1 + .../access/rmgrdesc/logicalddlmsgdesc.c | 52 +++++++ src/backend/access/rmgrdesc/meson.build | 1 + src/backend/access/transam/rmgr.c | 1 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/ddlmessage.c | 84 ++++++++++++ src/backend/replication/logical/decode.c | 41 ++++++ src/backend/replication/logical/logical.c | 44 ++++++ .../replication/logical/logicalfuncs.c | 24 ++++ src/backend/replication/logical/meson.build | 1 + .../replication/logical/reorderbuffer.c | 127 ++++++++++++++++++ src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/rmgrlist.h | 1 + src/include/catalog/pg_proc.dat | 8 ++ src/include/replication/ddlmessage.h | 60 +++++++++ src/include/replication/decode.h | 1 + src/include/replication/output_plugin.h | 14 ++ src/include/replication/reorderbuffer.h | 27 ++++ 21 files changed, 568 insertions(+) create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c create mode 100644 src/backend/replication/logical/ddlmessage.c create mode 100644 src/include/replication/ddlmessage.h diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 9a28b5ddc5..0f51f2b41a 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -831,6 +831,32 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc ------ (0 rows) +SELECT 'ddl msg1' FROM pg_logical_emit_ddl_message('ddl msg1', 16394, 1, 'msg1'); + ?column? +---------- + ddl msg1 +(1 row) + +SELECT 'ddl msg2' FROM pg_logical_emit_ddl_message('ddl msg2', 16394, 1, '{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I %{authorization}s", "name": "foo", "authorization": {"fmt": "AUTHORIZATION %{authorization_role}I", "present": false, "authorization_role": null}, "if_not_exists": ""}'); + ?column? +---------- + ddl msg2 +(1 row) + +SELECT 'ddl msg3' FROM pg_logical_emit_ddl_message('ddl msg3', 16394, 1, '{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "foo", "schemaname": "element_test"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}}'); + ?column? +---------- + ddl msg3 +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + message: prefix: ddl msg1, relid: 16394, cmdtype: Drop start, sz: 4 content:msg1 + message: prefix: ddl msg2, relid: 16394, cmdtype: Drop start, sz: 217 content:{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I %{authorization}s", "name": "foo", "authorization": {"fmt": "AUTHORIZATION %{authorization_role}I", "present": false, "authorization_role": null}, "if_not_exists": ""} + message: prefix: ddl msg3, relid: 16394, cmdtype: Drop start, sz: 1396 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "foo", "schemaname": "element_test"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}} +(3 rows) + SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index 4f76bed72c..1ea5a4b271 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -431,6 +431,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc -- done, free logical replication slot SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT 'ddl msg1' FROM pg_logical_emit_ddl_message('ddl msg1', 16394, 1, 'msg1'); +SELECT 'ddl msg2' FROM pg_logical_emit_ddl_message('ddl msg2', 16394, 1, '{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I %{authorization}s", "name": "foo", "authorization": {"fmt": "AUTHORIZATION %{authorization_role}I", "present": false, "authorization_role": null}, "if_not_exists": ""}'); +SELECT 'ddl msg3' FROM pg_logical_emit_ddl_message('ddl msg3', 16394, 1, '{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "foo", "schemaname": "element_test"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}}'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index b7e6048647..e967e3bfa5 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -14,9 +14,11 @@ #include "catalog/pg_type.h" +#include "replication/ddlmessage.h" #include "replication/logical.h" #include "replication/origin.h" +#include "tcop/ddl_deparse.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -72,6 +74,12 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_ddl_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, Oid relid, + DeparsedCommandType cmdtype, + Size sz, const char *message); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); @@ -135,6 +143,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->ddl_cb = pg_decode_ddl_message; cb->filter_prepare_cb = pg_decode_filter_prepare; cb->begin_prepare_cb = pg_decode_begin_prepare_txn; cb->prepare_cb = pg_decode_prepare_txn; @@ -750,6 +759,45 @@ pg_decode_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } +static void +pg_decode_ddl_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, const char *prefix, Oid relid, + DeparsedCommandType cmdtype, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "message: prefix: %s, relid: %u, ", + prefix, relid); + + switch(cmdtype) + { + case DCT_SimpleCmd: + appendStringInfo(ctx->out, "cmdtype: Simple, "); + break; + case DCT_TableDropStart: + appendStringInfo(ctx->out, "cmdtype: Drop start, "); + break; + case DCT_TableDropEnd: + appendStringInfo(ctx->out, "cmdtype: Drop end, "); + break; + case DCT_TableAlter: + appendStringInfo(ctx->out, "cmdtype: Alter table, "); + break; + case DCT_ObjectCreate: + appendStringInfo(ctx->out, "cmdtype: Create, "); + break; + case DCT_ObjectDrop: + appendStringInfo(ctx->out, "cmdtype: Drop, "); + break; + default: + appendStringInfo(ctx->out, "cmdtype: Invalid, "); + break; + } + + appendStringInfo(ctx->out, "sz: %zu content:", sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} + static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd86..2ff01e69bf 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -18,6 +18,7 @@ OBJS = \ gistdesc.o \ hashdesc.o \ heapdesc.o \ + logicalddlmsgdesc.o \ logicalmsgdesc.o \ mxactdesc.o \ nbtdesc.o \ diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c new file mode 100644 index 0000000000..05e930c90c --- /dev/null +++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c @@ -0,0 +1,52 @@ +/*------------------------------------------------------------------------- + * + * logicalddlmsgdesc.c + * rmgr descriptor routines for replication/logical/ddlmessage.c + * + * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/logicalddlmsgdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "replication/ddlmessage.h" + +void +logicalddlmsg_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_LOGICAL_DDL_MESSAGE) + { + xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec; + char *prefix = xlrec->message; + char *message = xlrec->message + xlrec->prefix_size; + char *sep = ""; + + Assert(prefix[xlrec->prefix_size] != '\0'); + + appendStringInfo(buf, "prefix \"%s\"; payload (%zu bytes): ", + prefix, xlrec->message_size); + appendStringInfo(buf, "relid %u cmdtype %u", xlrec->relid, xlrec->cmdtype); + /* Write message payload as a series of hex bytes */ + for (int cnt = 0; cnt < xlrec->message_size; cnt++) + { + appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]); + sep = " "; + } + } +} + +const char * +logicalddlmsg_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE) + return "DDL"; + + return NULL; +} diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build index 166cee67b6..781e2d7713 100644 --- a/src/backend/access/rmgrdesc/meson.build +++ b/src/backend/access/rmgrdesc/meson.build @@ -11,6 +11,7 @@ rmgr_desc_sources = files( 'gistdesc.c', 'hashdesc.c', 'heapdesc.c', + 'logicalddlmsgdesc.c', 'logicalmsgdesc.c', 'mxactdesc.c', 'nbtdesc.c', diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 7d67eda5f7..678e81ae01 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -27,6 +27,7 @@ #include "fmgr.h" #include "funcapi.h" #include "miscadmin.h" +#include "replication/ddlmessage.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..d3680e9bb5 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ applyparallelworker.o \ + ddlmessage.o \ decode.o \ launcher.o \ logical.o \ diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c new file mode 100644 index 0000000000..e11e56e5a3 --- /dev/null +++ b/src/backend/replication/logical/ddlmessage.c @@ -0,0 +1,84 @@ +/*------------------------------------------------------------------------- + * + * ddlmessage.c + * Logical DDL messages. + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/ddlmessage.c + * + * NOTES + * + * Logical DDL messages allow XLOG logging of DDL command strings that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * Unlike generic logical messages, these DDL messages have only transactional + * mode. Note by default DDLs in PostgreSQL are transactional. + * + * These messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xloginsert.h" +#include "catalog/namespace.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "replication/logical.h" +#include "replication/ddlmessage.h" +#include "utils/memutils.h" + +/* + * Write logical decoding DDL message into XLog. + */ +XLogRecPtr +LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype, + const char *message, size_t size) +{ + xl_logical_ddl_message xlrec; + + /* Ensure we have a valid transaction id. */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + xlrec.dbId = MyDatabaseId; + /* Trailing zero is critical; see logicalddlmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + xlrec.relid = relid; + xlrec.cmdtype = cmdtype; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage); + XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + XLogRegisterData(unconstify(char *, message), size); + + /* Allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding ddl messages. + */ +void +logicalddlmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8fe7bb65f1..c615a23fe5 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -36,6 +36,7 @@ #include "access/xlogutils.h" #include "catalog/pg_control.h" #include "replication/decode.h" +#include "replication/ddlmessage.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/origin.h" @@ -613,6 +614,46 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message->message + message->prefix_size); } +/* + * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer(). + */ +void +logicalddl_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + RepOriginId origin_id = XLogRecGetOrigin(r); + xl_logical_ddl_message *message; + + if (info != XLOG_LOGICAL_DDL_MESSAGE) + elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding ddl messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + message = (xl_logical_ddl_message *) XLogRecGetData(r); + + if (message->dbId != ctx->slot->data.database || + FilterByOrigin(ctx, origin_id)) + return; + + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr, + message->message, /* first part of message is prefix */ + message->message_size, + message->message + message->prefix_size, + message->relid, message->cmdtype); +} + /* * Consolidated commit record handling between the different form of commit * records. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c3ec97a0a6..39e686122f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -33,6 +33,7 @@ #include "fmgr.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/ddlmessage.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/origin.h" @@ -73,6 +74,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void ddl_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, const char *prefix, + Oid relid, DeparsedCommandType cmdtype, + Size message_size, const char *message); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -223,6 +228,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->ddl = ddl_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -1233,6 +1239,44 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +ddl_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, Oid relid, DeparsedCommandType cmdtype, + Size message_size, + const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.ddl_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "ddl"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.ddl_cb(ctx, txn, message_lsn, prefix, relid, cmdtype, + message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fa1b641a2b..92f5f9357e 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -29,6 +29,7 @@ #include "nodes/makefuncs.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/ddlmessage.h" #include "replication/message.h" #include "storage/fd.h" #include "utils/array.h" @@ -388,3 +389,26 @@ pg_logical_emit_message_text(PG_FUNCTION_ARGS) /* bytea and text are compatible */ return pg_logical_emit_message_bytea(fcinfo); } + +/* + * SQL function for writing logical decoding DDL message into WAL. + */ +Datum +pg_logical_emit_ddl_message_bytea(PG_FUNCTION_ARGS) +{ + char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Oid relid = PG_GETARG_OID(1); + DeparsedCommandType cmdtype = PG_GETARG_INT16(2); + char *data = text_to_cstring(PG_GETARG_TEXT_PP(3)); + XLogRecPtr lsn; + + lsn = LogLogicalDDLMessage(prefix, relid, cmdtype, data, strlen(data)); + PG_RETURN_LSN(lsn); +} + +Datum +pg_logical_emit_ddl_message_text(PG_FUNCTION_ARGS) +{ + /* bytea and text are compatible */ + return pg_logical_emit_ddl_message_bytea(fcinfo); +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..99c608d03f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -2,6 +2,7 @@ backend_sources += files( 'applyparallelworker.c', + 'ddlmessage.c', 'decode.c', 'launcher.c', 'logical.c', diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2d17c551a8..811739b59f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -94,6 +94,7 @@ #include "lib/binaryheap.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/ddlmessage.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" @@ -516,6 +517,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, pfree(change->data.msg.message); change->data.msg.message = NULL; break; + case REORDER_BUFFER_CHANGE_DDL: + if (change->data.ddl.prefix != NULL) + pfree(change->data.ddl.prefix); + change->data.ddl.prefix = NULL; + if (change->data.ddl.message != NULL) + pfree(change->data.ddl.message); + change->data.ddl.message = NULL; + break; case REORDER_BUFFER_CHANGE_INVALIDATION: if (change->data.inval.invalidations) pfree(change->data.inval.invalidations); @@ -904,6 +913,36 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * A transactional DDL message is queued to be processed upon commit. + */ +void +ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, const char *prefix, + Size message_size, const char *message, + Oid relid, DeparsedCommandType cmdtype) +{ + MemoryContext oldcontext; + ReorderBufferChange *change; + + Assert(TransactionIdIsValid(xid)); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_DDL; + change->data.ddl.prefix = pstrdup(prefix); + change->data.ddl.relid = relid; + change->data.ddl.cmdtype = cmdtype; + change->data.ddl.message_size = message_size; + change->data.ddl.message = palloc(message_size); + memcpy(change->data.ddl.message, message, message_size); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -2006,6 +2045,21 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message); } +/* + * Helper function for ReorderBufferProcessTXN for applying the DDL message. + */ +static inline void +ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, bool streaming) +{ + rb->ddl(rb, txn, change->lsn, + change->data.ddl.prefix, + change->data.ddl.relid, + change->data.ddl.cmdtype, + change->data.ddl.message_size, + change->data.ddl.message); +} + /* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. @@ -2388,6 +2442,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferApplyMessage(rb, txn, change, streaming); break; + case REORDER_BUFFER_CHANGE_DDL: + ReorderBufferApplyDDLMessage(rb, txn, change, streaming); + break; + case REORDER_BUFFER_CHANGE_INVALIDATION: /* Execute the invalidation messages locally */ ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, @@ -3845,6 +3903,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_DDL: + { + char *data; + Size prefix_size = strlen(change->data.ddl.prefix) + 1; + + sz += prefix_size + change->data.ddl.message_size + + sizeof(Size) + sizeof(Oid) + sizeof(int) + sizeof(Size); + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* write the prefix, relid and cmdtype including the size */ + memcpy(data, &prefix_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, &change->data.ddl.relid, sizeof(Oid)); + data += sizeof(Oid); + memcpy(data, &change->data.ddl.cmdtype, sizeof(DeparsedCommandType)); + data += sizeof(int); + memcpy(data, change->data.ddl.prefix, prefix_size); + data += prefix_size; + + /* write the message including the size */ + memcpy(data, &change->data.ddl.message_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.ddl.message, + change->data.ddl.message_size); + data += change->data.ddl.message_size; + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: @@ -4159,6 +4250,15 @@ ReorderBufferChangeSize(ReorderBufferChange *change) sz += prefix_size + change->data.msg.message_size + sizeof(Size) + sizeof(Size); + break; + } + case REORDER_BUFFER_CHANGE_DDL: + { + Size prefix_size = strlen(change->data.ddl.prefix) + 1; + + sz += prefix_size + change->data.ddl.message_size + + sizeof(Size) + sizeof(Size) + sizeof(Oid) + sizeof(DeparsedCommandType); + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: @@ -4436,6 +4536,33 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message_size); data += change->data.msg.message_size; + break; + } + case REORDER_BUFFER_CHANGE_DDL: + { + Size prefix_size; + + /* read prefix */ + memcpy(&prefix_size, data, sizeof(Size)); + data += sizeof(Size); + memcpy(&change->data.ddl.relid, data, sizeof(Oid)); + data += sizeof(Oid); + memcpy(&change->data.ddl.cmdtype, data, sizeof(DeparsedCommandType)); + data += sizeof(int); + change->data.ddl.prefix = MemoryContextAlloc(rb->context, prefix_size); + memcpy(change->data.ddl.prefix, data, prefix_size); + Assert(change->data.ddl.prefix[prefix_size - 1] == '\0'); + data += prefix_size; + + /* read the message */ + memcpy(&change->data.msg.message_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.message = MemoryContextAlloc(rb->context, + change->data.msg.message_size); + memcpy(change->data.msg.message, data, + change->data.msg.message_size); + data += change->data.msg.message_size; + break; } case REORDER_BUFFER_CHANGE_INVALIDATION: diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17bb4c..daf1730252 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -26,6 +26,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/ddlmessage.h" #include "replication/message.h" #include "replication/origin.h" #include "rmgrdesc.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 463bcb67c5..abcbe97593 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddl_decode) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 8951716f0a..b899b27e1e 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11040,6 +11040,14 @@ proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'bool text bytea', prosrc => 'pg_logical_emit_message_bytea' }, +{ oid => '3813', descr => 'emit a textual logical decoding message', + proname => 'pg_logical_emit_ddl_message', provolatile => 'v', proparallel => 'u', + prorettype => 'pg_lsn', proargtypes => 'bool text text', + prosrc => 'pg_logical_emit_ddl_message_text' }, +{ oid => '3814', descr => 'emit a binary logical decoding message', + proname => 'pg_logical_emit_ddl_message', provolatile => 'v', proparallel => 'u', + prorettype => 'pg_lsn', proargtypes => 'text regclass int4 text', + prosrc => 'pg_logical_emit_ddl_message_bytea' }, # event triggers { oid => '3566', descr => 'list objects dropped by the current command', diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h new file mode 100644 index 0000000000..77df6ea11a --- /dev/null +++ b/src/include/replication/ddlmessage.h @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * ddlmessage.h + * Exports from replication/logical/ddlmessage.c + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * src/include/replication/ddlmessage.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_DDL_MESSAGE_H +#define PG_LOGICAL_DDL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "nodes/nodes.h" + + +/* + * Support for keeping track of deparsed commands. + */ +typedef enum DeparsedCommandType +{ + DCT_SimpleCmd, + DCT_TableDropStart, + DCT_TableDropEnd, + DCT_TableAlter, + DCT_ObjectCreate, + DCT_ObjectDrop +} DeparsedCommandType; + +/* + * Generic logical decoding DDL message wal record. + */ +typedef struct xl_logical_ddl_message +{ + Oid dbId; /* database Oid emitted from */ + Size prefix_size; /* length of prefix including null terminator */ + Oid relid; /* id of the table */ + DeparsedCommandType cmdtype; /* type of sql command */ + Size message_size; /* size of the message */ + + /* + * payload, including null-terminated prefix of length prefix_size + */ + char message[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_ddl_message; + +#define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message)) + +extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype, + const char *ddl_message, size_t size); + +/* RMGR API*/ +#define XLOG_LOGICAL_DDL_MESSAGE 0x00 +void logicalddlmsg_redo(XLogReaderState *record); +void logicalddlmsg_desc(StringInfo buf, XLogReaderState *record); +const char *logicalddlmsg_identify(uint8 info); + +#endif diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 14fa921ab4..c9ac708d32 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalddl_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 2d89d26586..5ed5e6a7bc 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -9,6 +9,7 @@ #ifndef OUTPUT_PLUGIN_H #define OUTPUT_PLUGIN_H +#include "replication/ddlmessage.h" #include "replication/reorderbuffer.h" struct LogicalDecodingContext; @@ -90,6 +91,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, Size message_size, const char *message); +/* + * Called for the logical decoding DDL messages. + */ +typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, + Oid relid, + DeparsedCommandType cmdtype, + Size message_size, + const char *message); + /* * Filter changes by origin. */ @@ -221,6 +234,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeDDLMessageCB ddl_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 215d1494e9..e92b816e20 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -11,6 +11,8 @@ #include "access/htup_details.h" #include "lib/ilist.h" +#include "nodes/nodes.h" +#include "replication/ddlmessage.h" #include "storage/sinval.h" #include "utils/hsearch.h" #include "utils/relcache.h" @@ -65,6 +67,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_DDL, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, @@ -139,6 +142,16 @@ typedef struct ReorderBufferChange char *message; } msg; + /* DDL */ + struct + { + char *prefix; + Size message_size; + char *message; + Oid relid; + DeparsedCommandType cmdtype; + } ddl; + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; @@ -452,6 +465,16 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* DDL message callback signature */ +typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + const char *prefix, + Oid relid, + DeparsedCommandType cmdtype, + Size sz, + const char *message); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -574,6 +597,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferDDLMessageCB ddl; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -678,6 +702,9 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + const char *prefix, Size message_size, + const char *message, Oid relid, DeparsedCommandType cmdtype); extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); -- 2.39.1.windows.1