From b3067d6c7ea17688fd9ded25ce5b96cef6efb383 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 12 Nov 2020 14:59:58 +0530 Subject: [PATCH v2 1/2] Bug fix skip-empty-xacts in streaming mode In streaming mode the transaction can be decoded in multiple streams and those streams can be interleaved. Due to that we can not remember the transaction's write status in the logical decoding context because those might get changed due to some other transactions so we need to keep that in the reorder buffer txn. --- contrib/test_decoding/test_decoding.c | 80 ++++++++++++++++++++++++++------- src/include/replication/reorderbuffer.h | 5 +++ 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 8e33614..50d8e24 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -34,10 +34,15 @@ typedef struct bool include_xids; bool include_timestamp; bool skip_empty_xacts; - bool xact_wrote_changes; bool only_local; } TestDecodingData; +typedef struct +{ + bool xact_wrote_changes; + bool stream_wrote_changes; +} TestDecodingTxnData; + static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init); static void pg_decode_shutdown(LogicalDecodingContext *ctx); @@ -255,8 +260,12 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; - data->xact_wrote_changes = false; if (data->skip_empty_xacts) return; @@ -280,8 +289,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + pfree(txndata); + txn->output_plugin_private = false; + + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -442,18 +456,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; Form_pg_class class_form; TupleDesc tupdesc; MemoryContext old; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -527,17 +543,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; MemoryContext old; int i; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -592,10 +610,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - data->xact_wrote_changes = false; + /* + * If this is the first stream for the txn then allocate the txn plugin + * data and set the xact_wrote_changes to false. + */ + if (txndata == NULL) + { + txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; + } + + txndata->stream_wrote_changes = false; if (data->skip_empty_xacts) return; + pg_output_stream_start(ctx, data, txn, true); } @@ -615,8 +647,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -633,8 +666,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, XLogRecPtr abort_lsn) { TestDecodingData *data = ctx->output_plugin_private; + ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn; + TestDecodingTxnData *txndata = toptxn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (txn->toptxn == NULL) + { + Assert(txn->output_plugin_private != NULL); + pfree(txndata); + txn->output_plugin_private = false; + } + + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -651,8 +694,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + pfree(txndata); + txn->output_plugin_private = false; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -681,13 +729,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; /* output stream start if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) @@ -734,12 +783,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index dfdda93..bd9dd7e 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN /* If we have detected concurrent abort then ignore future changes. */ bool concurrent_abort; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ -- 1.8.3.1