From cd7c441e0c07e3a1f8428fed3111ef3a5aa50f75 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 14 Oct 2020 07:48:50 -0400 Subject: [PATCH v8] Support two phase commits in streaming mode in logical decoding Add APIs to the streaming APIS for PREPARE, COMMIT PREPARED and ROLLBACK PREPARED --- contrib/test_decoding/test_decoding.c | 84 +++++++++++++++ doc/src/sgml/logicaldecoding.sgml | 62 +++++++++-- src/backend/replication/logical/logical.c | 132 +++++++++++++++++++++++- src/backend/replication/logical/reorderbuffer.c | 29 ++++-- src/include/replication/output_plugin.h | 27 +++++ src/include/replication/reorderbuffer.h | 21 ++++ 6 files changed, 339 insertions(+), 16 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 6b8e502..9f0cf10 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -78,6 +78,15 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx, static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_stream_commit_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_stream_rollback_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn); static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); @@ -129,6 +138,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_start_cb = pg_decode_stream_start; cb->stream_stop_cb = pg_decode_stream_stop; cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_prepare_cb = pg_decode_stream_prepare; + cb->stream_commit_prepared_cb = pg_decode_stream_commit_prepared; + cb->stream_rollback_prepared_cb = pg_decode_stream_rollback_prepared; cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; @@ -805,6 +817,78 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, } static void +pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "preparing streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "preparing streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_commit_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "commit prepared streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "commit prepared streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_rollback_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "abort prepared streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "abort prepared streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 4ad5dca..824c55a 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -396,6 +396,9 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; + LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb; + LogicalDecodeStreamRollbackPreparedCB stream_rollback_prepared_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; @@ -418,14 +421,16 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); in-progress transactions. The stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb and stream_change_cb - are required, while stream_message_cb and + are required, while stream_message_cb, + stream_prepare_cb, stream_commit_prepared_cb, + stream_rollback_prepared_cb and stream_truncate_cb are optional. An output plugin may also define functions to support two-phase commits, which are decoded on PREPARE TRANSACTION. The prepare_cb, - commit_prepared_cb and abort_prepared_cb + commit_prepared_cb and rollback_prepared_cb callbacks are required, while filter_prepare_cb is optional. @@ -638,8 +643,8 @@ typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ct callback. typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, - XLogRecPtr abort_lsn); + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); @@ -846,6 +851,45 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + + Stream Prepare Callback + + The stream_prepare_cb callback is called to prepare + a previously streamed transaction as part of a two-phase commit. + +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + + + Stream Commit Prepared Callback + + The stream_commit_prepared_cb callback is called to commit + a previously streamed transaction as part of a two-phase commit. + +typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Stream Rollback Prepared Callback + + The stream_rollback_prepared_cb callback is called to abort + a previously streamed transaction as part of a two-phase commit. + +typedef void (*LogicalDecodeStreamRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn); + + + + Stream Commit Callback @@ -1024,9 +1068,13 @@ OutputPluginWrite(ctx, true); When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded - changes are transmitted, the transaction is committed using the - stream_commit_cb callback (or possibly aborted using - the stream_abort_cb callback). + changes are transmitted, the transaction can be committed using the + the stream_commit_cb callback + (or possibly aborted using the stream_abort_cb callback). + If two-phase commits are supported, the transaction can be prepared using the + stream_prepare_cb callback, commit prepared using the + stream_commit_prepared_cb callback or aborted using the + stream_rollback_prepared_cb. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1c0f70d..84a4751 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -82,6 +82,12 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn); static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void stream_rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn); static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -233,6 +239,9 @@ StartupDecodingContext(List *output_plugin_options, ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) || (ctx->callbacks.stream_stop_cb != NULL) || (ctx->callbacks.stream_abort_cb != NULL) || + (ctx->callbacks.stream_prepare_cb != NULL) || + (ctx->callbacks.stream_commit_prepared_cb != NULL) || + (ctx->callbacks.stream_rollback_prepared_cb != NULL) || (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || @@ -262,6 +271,9 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_start = stream_start_cb_wrapper; ctx->reorder->stream_stop = stream_stop_cb_wrapper; ctx->reorder->stream_abort = stream_abort_cb_wrapper; + ctx->reorder->stream_prepare = stream_prepare_cb_wrapper; + ctx->reorder->stream_commit_prepared = stream_commit_prepared_cb_wrapper; + ctx->reorder->stream_rollback_prepared = stream_rollback_prepared_cb_wrapper; ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; @@ -885,7 +897,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr abort_lsn) + XLogRecPtr abort_lsn) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -1241,6 +1253,124 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming and two-phase commits are supported. */ + Assert(ctx->streaming); + Assert(ctx->twophase); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_prepare"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + /* in streaming mode with two-phase commits, stream_prepare_cb is required */ + if (ctx->callbacks.stream_prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming commits requires a stream_prepare_cb callback"))); + + ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_commit_prepared"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + /* in streaming mode with two-phase commits, stream_commit_prepared_cb is required */ + if (ctx->callbacks.stream_prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_commit_prepared_cb callback"))); + + ctx->callbacks.stream_commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_rollback_prepared"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + /* in streaming mode with two-phase commits, stream_rollback_prepared_cb is required */ + if (ctx->callbacks.stream_prepare_cb == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical streaming requires a stream_rollback_prepared_cb callback"))); + + ctx->callbacks.stream_rollback_prepared_cb(ctx, txn, rollback_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8fc1301..d569d46 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1793,9 +1793,18 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - rb->stream_commit(rb, txn, txn->final_lsn); - - ReorderBufferCleanupTXN(rb, txn); + if (rbtxn_prepared(txn)) + { + rb->stream_prepare(rb, txn, txn->final_lsn); + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + { + rb->stream_commit(rb, txn, txn->final_lsn); + ReorderBufferCleanupTXN(rb, txn); + } } /* @@ -2633,15 +2642,19 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, strcpy(txn->gid, gid); if (is_commit) - { txn->txn_flags |= RBTXN_COMMIT_PREPARED; - rb->commit_prepared(rb, txn, commit_lsn); - } else - { txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + + if (rbtxn_is_streamed(txn) && rbtxn_commit_prepared(txn)) + rb->stream_commit_prepared(rb, txn, commit_lsn); + else if (rbtxn_is_streamed(txn) && rbtxn_rollback_prepared(txn)) + rb->stream_rollback_prepared(rb, txn, commit_lsn); + else if (rbtxn_commit_prepared(txn)) + rb->commit_prepared(rb, txn, commit_lsn); + else if (rbtxn_rollback_prepared(txn)) rb->rollback_prepared(rb, txn, commit_lsn); - } + /* cleanup: make sure there's no cache pollution */ ReorderBufferExecuteInvalidations(rb, txn); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 96acd01..dfcb577 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -157,6 +157,30 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn); /* + * Called to prepare changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit. + */ +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called to commit prepared changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit. + */ +typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called to abort/rollback prepared changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit. + */ +typedef void (*LogicalDecodeStreamRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn); + +/* * Called to apply changes streamed to remote node from in-progress * transaction. */ @@ -214,6 +238,9 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; + LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb; + LogicalDecodeStreamRollbackPreparedCB stream_rollback_prepared_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ca823e3..84e4840 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -478,6 +478,24 @@ typedef void (*ReorderBufferStreamAbortCB) ( ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamRollbackPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr rollback_lsn); + /* commit streamed transaction callback signature */ typedef void (*ReorderBufferStreamCommitCB) ( ReorderBuffer *rb, @@ -557,6 +575,9 @@ struct ReorderBuffer ReorderBufferStreamStartCB stream_start; ReorderBufferStreamStopCB stream_stop; ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamPrepareCB stream_prepare; + ReorderBufferStreamCommitPreparedCB stream_commit_prepared; + ReorderBufferStreamRollbackPreparedCB stream_rollback_prepared; ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; -- 1.8.3.1