From bf991e458df1f03708450ddd76a75a9476b5809a Mon Sep 17 00:00:00 2001 From: Dave Pirotte Date: Tue, 17 Nov 2020 04:01:34 +0000 Subject: [PATCH 2/6] Add xid to messages when streaming --- src/backend/replication/logical/proto.c | 16 ++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 2 ++ src/include/replication/logicalproto.h | 3 ++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index deba2a321c..d5eeee4784 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -366,17 +366,29 @@ logicalrep_read_truncate(StringInfo in, * Write MESSAGE to stream */ void -logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, - bool transactional, const char *prefix, Size sz, +logicalrep_write_message(StringInfo out, + bool in_streaming, + ReorderBufferTXN *txn, + XLogRecPtr lsn, + bool transactional, + const char *prefix, + Size sz, const char *message) { uint8 flags = 0; + TransactionId xid = InvalidTransactionId; pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); /* encode and send message flags */ if (transactional) + { flags |= MESSAGE_TRANSACTIONAL; + xid = txn->xid; + } + + if (in_streaming) + pq_sendint32(out, xid); pq_sendint8(out, flags); pq_sendint64(out, lsn); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index cd849c10a4..bd3c2a3b99 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -707,11 +707,13 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + if (!data->messages) return; OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, + in_streaming, txn, message_lsn, transactional, diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index f3c8f95e2c..f22c9436e0 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -152,7 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); -extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, +extern void logicalrep_write_message(StringInfo out, bool in_streaming, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); -- 2.20.1