From 6f6f3638a6c6a41370c43e77b47f5390f13c453b Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 12 Nov 2020 14:59:58 +0530 Subject: [PATCH v3 1/2] Bug fix skip-empty-xacts in streaming mode In streaming mode the transaction can be decoded in multiple streams and those streams can be interleaved. Due to that we can not remember the transaction's write status in the logical decoding context because those might get changed due to some other transactions so we need to keep that in the reorder buffer txn. --- contrib/test_decoding/test_decoding.c | 95 +++++++++++++++++++++---- src/backend/replication/logical/reorderbuffer.c | 1 + src/include/replication/reorderbuffer.h | 5 ++ 3 files changed, 86 insertions(+), 15 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 8e33614..62b3855 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -34,10 +34,24 @@ typedef struct bool include_xids; bool include_timestamp; bool skip_empty_xacts; - bool xact_wrote_changes; bool only_local; } TestDecodingData; +/* + * Maintain a per transaction level variable to track whether the transaction + * has wrote any changes or not to identify whether it is an empty transaction + * or not. In streaming mode the transaction can be decoded in streams so + * along with maintaining whether the transaction has written any changes or + * not we also need to track whether the current stream has written any changes + * or not so that if user has requested to skip the empty transactions we can + * skip the empty streams even though the transaction has written some changes. + */ +typedef struct +{ + bool xact_wrote_changes; + bool stream_wrote_changes; +} TestDecodingTxnData; + static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init); static void pg_decode_shutdown(LogicalDecodingContext *ctx); @@ -255,8 +269,12 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; - data->xact_wrote_changes = false; if (data->skip_empty_xacts) return; @@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + pfree(txndata); + txn->output_plugin_private = NULL; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; Form_pg_class class_form; TupleDesc tupdesc; MemoryContext old; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; MemoryContext old; int i; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -592,10 +619,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - data->xact_wrote_changes = false; + /* + * If this is the first stream for the txn then allocate the txn plugin + * data and set the xact_wrote_changes to false. + */ + if (txndata == NULL) + { + txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; + } + + txndata->stream_wrote_changes = false; if (data->skip_empty_xacts) return; + pg_output_stream_start(ctx, data, txn, true); } @@ -615,8 +656,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -634,7 +676,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + /* + * stream abort can be sent for an individial subtransaction but we + * maintain the output_plugin_private only under the toptxn so if this + * is not the toptxn then fetch the toptxn. + */ + ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn; + TestDecodingTxnData *txndata = toptxn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + if (txn->toptxn == NULL) + { + Assert(txn->output_plugin_private != NULL); + pfree(txndata); + txn->output_plugin_private = NULL; + } + + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -651,8 +709,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + pfree(txndata); + txn->output_plugin_private = NULL; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -681,13 +744,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; /* output stream start if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) @@ -734,12 +798,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c1bd680..301baff 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb) /* InvalidCommandId is not zero, so set it explicitly */ txn->command_id = InvalidCommandId; + txn->output_plugin_private = NULL; return txn; } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index dfdda93..bd9dd7e 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN /* If we have detected concurrent abort then ignore future changes. */ bool concurrent_abort; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ -- 1.8.3.1