From 5cc0ef33ee902706909116aef6c82c8821661fac Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 6 Jul 2021 07:02:26 -0400 Subject: [PATCH v94] Skip empty streaming in-progress transaction for logical replication. This improves the behaviour of skipping empty transaction to also include empty streamed in-progress transactions. --- src/backend/replication/pgoutput/pgoutput.c | 167 +++++++++++++++++++++++++--- 1 file changed, 149 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7ebdb4e..86d0c0a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -65,6 +65,8 @@ static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, TimestampTz prepare_time); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pgoutput_send_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, @@ -134,9 +136,21 @@ typedef struct RelationSyncEntry TupleConversionMap *map; } RelationSyncEntry; +/* + * Maintain the per-transaction level variables to track whether the + * transaction and or streams have written any changes. + * BEGIN / BEGIN PREPARE is held back until the first + * change needs to be sent. In streaming mode the transaction can + * be decoded in streams, so along with maintaining whether the + * transaction has written any changes, we also need to track whether the + * current stream has written any changes. START STREAM is held back until + * the first change is streamed. This is done so that empty transactions and + * streams which do not have any changes can be dropped. + */ typedef struct PGOutputTxnData { bool sent_begin_txn; /* flag indicating whether begin has been sent */ + bool sent_stream_start; /* flag indicating if stream start has been sent */ } PGOutputTxnData; /* Map used to remember which relation schemas we sent. */ @@ -610,6 +624,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, bool schema_sent; TransactionId xid = InvalidTransactionId; TransactionId topxid = InvalidTransactionId; + PGOutputTxnData *txndata; + ReorderBufferTXN *toptxn; /* * Remember XID of the (sub)transaction for the change. We don't care if @@ -623,9 +639,16 @@ maybe_send_schema(LogicalDecodingContext *ctx, xid = change->txn->xid; if (change->txn->toptxn) + { topxid = change->txn->toptxn->xid; + toptxn = change->txn->toptxn; + } else + { topxid = xid; + toptxn = txn; + } + /* * Do we need to send the schema? We do track streamed transactions @@ -648,6 +671,23 @@ maybe_send_schema(LogicalDecodingContext *ctx, if (schema_sent) return; + /* set up txndata */ + txndata = toptxn->output_plugin_private; + + /* + * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE + * is sent. If not, send now. + */ + if (in_streaming && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, toptxn); + else if (!txndata->sent_begin_txn) + { + if (rbtxn_prepared(toptxn)) + pgoutput_begin_prepare(ctx, toptxn); + else + pgoutput_begin(ctx, toptxn); + } + /* * Nope, so send the schema. If the changes will be published using an * ancestor's schema, not the relation's own, send that ancestor's schema @@ -746,9 +786,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; - /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ - if (!in_streaming) - Assert(txndata); + /* should have set up txndata as part of BEGIN/BEGIN PREPARE/START STREAM */ + Assert(txndata); if (!is_publishable_relation(relation)) return; @@ -783,8 +822,11 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* If streaming, send STREAM START if we haven't yet */ + if (in_streaming && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); /* output BEGIN if we haven't yet */ - if (!in_streaming && !txndata->sent_begin_txn) + else if (!txndata->sent_begin_txn) { if (rbtxn_prepared(txn)) pgoutput_begin_prepare(ctx, txn); @@ -902,9 +944,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; - /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ - if (!in_streaming) - Assert(txndata); + /* Should have setup txndata as part of BEGIN/BEGIN PREPARE/START STREAM */ + Assert(txndata); /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) @@ -942,8 +983,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* If streaming, send STREAM START if we haven't yet */ + if (in_streaming && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); /* output BEGIN if we haven't yet */ - if (!in_streaming && !txndata->sent_begin_txn) + else if (!txndata->sent_begin_txn) { if (rbtxn_prepared(txn)) pgoutput_begin_prepare(ctx, txn); @@ -984,16 +1028,24 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; - /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ - if (!in_streaming && transactional) + /* Set up txndata for streaming and transactional messages */ + if (in_streaming || transactional) { txndata = (PGOutputTxnData *) txn->output_plugin_private; - if (!txndata->sent_begin_txn) + + /* If streaming, send STREAM START if we haven't yet */ + if (in_streaming && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ + else if (transactional) { - if (rbtxn_prepared(txn)) - pgoutput_begin_prepare(ctx, txn); - else - pgoutput_begin(ctx, txn); + if (!txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } } } @@ -1076,12 +1128,37 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { - bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = txn->output_plugin_private; /* we can't nest streaming of transactions */ Assert(!in_streaming); /* + * Don't actually send stream start here, instead set a flag that indicates + * that stream start hasn't been sent and wait for the first actual change + * for this stream to be sent and then send stream start. This is done + * to avoid sending empty streams without any changes. + */ + if (txndata == NULL) + { + txndata = + MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData)); + txndata->sent_begin_txn = false; + txn->output_plugin_private = txndata; + } + + txndata->sent_stream_start = false; + in_streaming = true; +} + +static void +pgoutput_send_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* * If we already sent the first stream for this transaction then don't * send the origin id in the subsequent streams. */ @@ -1096,8 +1173,11 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); - /* we're streaming a chunk of transaction now */ - in_streaming = true; + /* + * Set the flags that indicate that changes were sent as part of + * the transaction and the stream. + */ + txndata->sent_begin_txn = txndata->sent_stream_start = true; } /* @@ -1107,9 +1187,18 @@ static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = txn->output_plugin_private; + /* we should be streaming a trasanction */ Assert(in_streaming); + if (!data->sent_stream_start) + { + in_streaming = false; + elog(DEBUG1, "Skipping replication of an empty transaction in stream stop"); + return; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_stop(ctx->out); OutputPluginWrite(ctx, true); @@ -1128,6 +1217,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn) { ReorderBufferTXN *toptxn; + PGOutputTxnData *txndata; + bool sent_begin_txn; /* * The abort should happen outside streaming block, even for streamed @@ -1137,6 +1228,21 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, /* determine the toplevel transaction */ toptxn = (txn->toptxn) ? txn->toptxn : txn; + txndata = toptxn->output_plugin_private; + sent_begin_txn = txndata->sent_begin_txn; + + if (txn->toptxn == NULL) + { + pfree(txndata); + txn->output_plugin_private = NULL; + } + + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream abort"); + return; + } + Assert(rbtxn_is_streamed(toptxn)); @@ -1156,6 +1262,9 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputTxnData *txndata = txn->output_plugin_private; + bool sent_begin_txn = txndata->sent_begin_txn; + /* * The commit should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. @@ -1163,6 +1272,16 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); + pfree(txndata); + txn->output_plugin_private = NULL; + + /* If no changes were part of this transaction then drop the commit */ + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream commit"); + return; + } + OutputPluginUpdateProgress(ctx); OutputPluginPrepareWrite(ctx, true); @@ -1182,8 +1301,20 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *txndata = txn->output_plugin_private; + bool sent_begin_txn = txndata->sent_begin_txn; + Assert(rbtxn_is_streamed(txn)); + pfree(txndata); + txn->output_plugin_private = NULL; + + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare"); + return; + } + OutputPluginUpdateProgress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); -- 1.8.3.1