From 18a619617c2cc19d0e304e33fb11e72d23731061 Mon Sep 17 00:00:00 2001 From: Dave Pirotte Date: Thu, 5 Nov 2020 03:14:54 +0000 Subject: [PATCH 1/6] Add logical decoding messages to pgoutput This patch adds a "messages" option to the pgoutput output plugin. When "messages" is true, logical decoding messages (i.e. generated via pg_logical_emit_message) are sent to the slot consumer. --- doc/src/sgml/protocol.sgml | 65 ++++++++ src/backend/replication/logical/proto.c | 24 +++ src/backend/replication/logical/worker.c | 3 + src/backend/replication/pgoutput/pgoutput.c | 40 ++++- src/include/replication/logicalproto.h | 3 + src/include/replication/pgoutput.h | 1 + src/test/subscription/t/020_messages.pl | 158 ++++++++++++++++++++ 7 files changed, 293 insertions(+), 1 deletion(-) create mode 100644 src/test/subscription/t/020_messages.pl diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index cee28889e1..02449bf792 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6413,6 +6413,71 @@ Begin + + +Message + + + + + + + + Byte1('M') + + + + Identifies the message as a logical decoding message. + + + + + + Int8 + + + + Flags; Either 0 for no flags or 1 if the logical decoding + message is transactional. + + + + + + Int64 + + + + The LSN of the logical decoding message. + + + + + + String + + + + The prefix of the logical decoding message. + + + + + + Byten + + + + The content of the logical decoding message. + + + + + + + + + Commit diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index fdb31182d7..deba2a321c 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -25,6 +25,7 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define MESSAGE_TRANSACTIONAL (1<<0) #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) @@ -361,6 +362,29 @@ logicalrep_read_truncate(StringInfo in, return relids; } +/* + * Write MESSAGE to stream + */ +void +logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, + const char *message) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); + + /* encode and send message flags */ + if (transactional) + flags |= MESSAGE_TRANSACTIONAL; + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + pq_sendstring(out, prefix); + pq_sendint32(out, sz); + pq_sendbytes(out, message, sz); +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 04684912de..6890603622 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s) apply_handle_origin(s); return; + case LOGICAL_REP_MSG_MESSAGE: + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); return; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c997aed83..cd849c10a4 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx, static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static void pgoutput_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, @@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; + cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -158,15 +163,17 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names, bool *binary, - bool *enable_streaming) + bool *messages, bool *enable_streaming) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool messages_option_given = false; bool streaming_given = false; *binary = false; + *messages = false; foreach(lc, options) { @@ -222,6 +229,16 @@ parse_output_parameters(List *options, uint32 *protocol_version, *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "messages") == 0) + { + if (messages_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + messages_option_given = true; + + *messages = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -269,6 +286,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, &data->protocol_version, &data->publication_names, &data->binary, + &data->messages, &enable_streaming); /* Check if we support requested protocol */ @@ -683,6 +701,26 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, + const char *message) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + if (!data->messages) + return; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_message(ctx->out, + txn, + message_lsn, + transactional, + prefix, + sz, + message); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 1f2535df80..f3c8f95e2c 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', @@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); +extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index a8c676ed23..3b7273bd89 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -25,6 +25,7 @@ typedef struct PGOutputData List *publication_names; List *publications; bool binary; + bool messages; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl new file mode 100644 index 0000000000..8e59e324e3 --- /dev/null +++ b/src/test/subscription/t/020_messages.pl @@ -0,0 +1,158 @@ +# Tests that logical decoding messages are emitted and that +# they do not break subscribers +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +# +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR TABLE tab"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub" +); + +# ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); + +$node_publisher->safe_psql('postgres', + "select pg_logical_emit_message(true, 'a prefix', 'a transactional message')" +); + +my $slot_codes_with_message = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') +)); + +# 66 77 67 == B M C == BEGIN MESSAGE COMMIT +is($slot_codes_with_message, "66\n77\n67", + 'messages on slot are B M C with message option'); + +my $transactional_message_flags = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 1) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') + offset 1 limit 1 +)); + +is($transactional_message_flags, "1", + "transactional message flags are set to 1"); + +my $slot_codes_without_message = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub') +)); + +# 66 67 == B C == BEGIN COMMIT +is($slot_codes_without_message, "66\n67", + 'messages on slot are B C without message option'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); +$node_publisher->wait_for_catchup('sub'); + +# ensure a non-transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)"); + +my $message_lsn = $node_publisher->safe_psql('postgres', + "select pg_logical_emit_message(false, 'prefix', 'nontransactional')"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)"); + +my $slot_message_code = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') + where lsn = '$message_lsn' and xid = 0 +)); + +is($slot_message_code, "77", "non-transactional message on slot is M"); + +my $nontransactional_message_flags = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 1) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') + offset 1 limit 1 +)); + +is($nontransactional_message_flags, + "0", "non-transactional message flags are set to 0"); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); +$node_publisher->wait_for_catchup('sub'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); +$node_subscriber->safe_psql('postgres', "checkpoint;"); + +# wait for the replication connection to drop from the publisher +$node_publisher->poll_query_until('postgres', + 'SELECT count(*) from pg_catalog.pg_stat_replication', 0); + +# ensure a non-transactional logical decoding message shows up on the slot +# when it is emitted within an aborted transaction. the message won't emit +# until something advances the LSN, which we intentionally do here with a +# checkpoint. +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + SELECT pg_logical_emit_message(false, 'prefix', + 'nontransactional aborted 1'); + INSERT INTO tab VALUES (5); + SELECT pg_logical_emit_message(false, 'prefix', + 'nontransactional aborted 2'); + ROLLBACK; + CHECKPOINT; +)); + +my $aborted_txn_message_codes = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') +)); + +is($aborted_txn_message_codes, "77\n77", + "non-transactional message on slot from aborted transaction is M"); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); +$node_publisher->wait_for_catchup('sub'); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab"); +is($result, qq(2), 'rows move'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.20.1