From 829f7bd6ae0b159565b165ccc6facb048cda3552 Mon Sep 17 00:00:00 2001 From: Nikhil Sontakke Date: Mon, 5 Mar 2018 21:51:52 +0530 Subject: [PATCH 4/6] Teach ReorderBuffer to deal with 2PC. --- src/backend/access/transam/twophase.c | 5 + src/backend/replication/logical/decode.c | 135 ++++++++++++-- src/backend/replication/logical/logical.c | 181 +++++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 226 ++++++++++++++++++++++-- src/include/access/xact.h | 7 + src/include/replication/logical.h | 5 + src/include/replication/output_plugin.h | 45 +++++ src/include/replication/reorderbuffer.h | 54 ++++++ 8 files changed, 632 insertions(+), 26 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index d6e4b7980f..f3091af385 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1506,6 +1506,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit) gid); ProcArrayRemove(proc, latestXid); + /* + * Tell logical decoding backends interested in this XID + * that this is going away + */ + LogicalDecodeRemoveTransaction(proc, isCommit); /* * In case we fail while running the callbacks, mark the gxact invalid so diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6eb0d5527e..e1b021750f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,7 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/twophase.h" #include "catalog/pg_control.h" @@ -72,6 +73,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -280,16 +283,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + /* check that output plugin is capable of twophase decoding */ + if (!ctx->enable_twophase) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } + + /* ok, parse it */ + ParsePrepareRecord(XLogRecGetInfo(buf->record), + XLogRecGetData(buf->record), &parsed); + + /* does output plugin want this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -627,9 +647,78 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + if (TransactionIdIsValid(parsed->twophase_xid) && + ReorderBufferTxnIsPrepared(ctx->reorder, + parsed->twophase_xid, parsed->twophase_gid)) + { + Assert(xid == parsed->twophase_xid); + /* we are processing COMMIT PREPARED */ + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); + } + else + { + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } +} + +/* + * Decode PREPARE record. Similar logic as in COMMIT + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + /* + * Process invalidation messages, even if we're not interested in the + * transaction's contents, since the various caches need to always be + * consistent. + */ + if (parsed->nmsgs > 0) + { + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + + /* + * Tell the reorderbuffer about the surviving subtransactions. We need + * to do this because the main transaction itself has not committed + * since we are in the prepare phase right now. So we need to be sure + * the snapshot is setup correctly for the main transaction in case all + * changes happened in subtransanctions + */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); + } + ReorderBufferForget(ctx->reorder, xid, buf->origptr); + + return; + } + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); } /* @@ -641,6 +730,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + /* + * If it's ROLLBACK PREPARED then handle it via callbacks. + */ + if (TransactionIdIsValid(xid) && + !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) && + parsed->dbId == ctx->slot->data.database && + !FilterByOrigin(ctx, origin_id) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid)) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + return; + } for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c8ccade241..4e8f77201c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -60,6 +60,16 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -125,6 +135,7 @@ StartupDecodingContext(List *output_plugin_options, MemoryContext context, old_context; LogicalDecodingContext *ctx; + int twophase_callbacks; /* shorter lines... */ slot = MyReplicationSlot; @@ -184,8 +195,26 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->abort = abort_cb_wrapper; + ctx->reorder->filter_prepare = filter_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->abort_prepared = abort_prepared_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + /* check that plugin implements all callbacks necessary to perform 2PC */ + twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) + + (ctx->callbacks.commit_prepared_cb != NULL) + + (ctx->callbacks.abort_prepared_cb != NULL); + + ctx->enable_twophase = (twophase_callbacks == 3); + + if (twophase_callbacks != 3 && twophase_callbacks != 0) + ereport(WARNING, + (errmsg("Output plugin registered only %d twophase callbacks. " + "Twophase transactions will be decoded at commit time.", + twophase_callbacks))); + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -693,6 +722,122 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort"; + state.report_location = txn->final_lsn; /* beginning of abort record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + + static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) @@ -730,6 +875,42 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* + * If twophase is not enabled, skip decoding at + * PREPARE time + */ + if (!ctx->enable_twophase) + return true; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + return ret; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index d22e116aa1..573e1aa90a 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1264,25 +1264,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) * the top and subtransactions (using a k-way merge) and replay the changes in * lsn order. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; volatile Snapshot snapshot_now; volatile CommandId command_id = FirstCommandId; bool using_subtxn; ReorderBufferIterTXNState *volatile iterstate = NULL; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -1326,20 +1319,60 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change; ReorderBufferChange *specinsert = NULL; + bool change_cleanup = false; + bool check_txn_status, + apply_started = false; + bool is_prepared = rbtxn_prepared(txn); + + /* + * check for the xid once to see if it's already + * committed. + * + * This is to handle cases of concurrent abort + * happening parallel to the decode activity + */ + check_txn_status = TransactionIdDidCommit(txn->xid)? + false : true; if (using_subtxn) BeginInternalSubTransaction("replay"); else StartTransactionCommand(); - rb->begin(rb, txn); - iterstate = ReorderBufferIterTXNInit(rb, txn); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; Oid reloid; + /* + * While decoding 2PC or while streaming uncommitted + * transactions, check if this transaction needs to + * be still decoded. If the transaction got aborted + * or if we were instructed to stop decoding, then + * bail out early. + */ + if (check_txn_status && TransactionIdDidAbort(txn->xid)) + { + elog(LOG, "%s decoding of %s (%u)", + apply_started? "stopping":"skipping", + is_prepared? txn->gid:"", + txn->xid); + change_cleanup = true; + goto change_cleanuptxn; + } + + /* + * We have decided to apply changes based on the go + * ahead from the above decode filter, BEGIN the + * transaction on the other side + */ + if (apply_started == false) + { + rb->begin(rb, txn); + apply_started = true; + } + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -1375,7 +1408,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); + /* Lock transaction before catalog access */ + if (!LogicalLockTransaction(txn)) + { + elog(LOG, "stopping decoding of %s (%u)", + is_prepared? txn->gid:"", + txn->xid); + change_cleanup = true; + goto change_cleanuptxn; + } relation = RelationIdGetRelation(reloid); + LogicalUnlockTransaction(txn); if (relation == NULL) elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", @@ -1546,6 +1589,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } } +change_cleanuptxn: /* * There's a speculative insertion remaining, just clean in up, it * can't have been successful, otherwise we'd gotten a confirmation @@ -1561,8 +1605,24 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + if (change_cleanup) + { + /* call abort if we have sent any changes */ + if (apply_started) + rb->abort(rb, txn, commit_lsn); + } + else + { + /* call commit or prepare callback */ + if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } + + /* remove ourself from the decodeGroupLeader */ + if (MyProc->decodeGroupLeader) + RemoveDecodeGroupMember(MyProc->decodeGroupLeader); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -1589,7 +1649,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * + * We remove it even for prepared transactions. + * This is because the COMMIT PREPARED needs + * no data post the successful PREPARE + */ ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); @@ -1623,6 +1689,136 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_END_TRY(); } +/* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, xid, gid); +} + + +/* + * Commit a transaction. + * + * See comments for ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare a twophase transaction. It calls ReorderBufferCommitInternal() + * since all prepared transactions need to be decoded at PREPARE time. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Check whether this transaction was sent as prepared to subscribers. + * Called while handling commit|abort prepared. + */ +bool +ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* + * Always call the prepare filter. It's the job of the prepare + * filter to give us the *same* response for a given xid + * across multiple calls (including ones on restart) + */ + return !(rb->filter_prepare(rb, txn, xid, gid)); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + /* + * The transaction may or may not exist (during restarts for + * example). Anyways, 2PC transactions do not contain any + * reorderbuffers. So allow it to be created below. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + strcpy(txn->gid, gid); + + if (is_commit) + { + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + rb->commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + rb->abort_prepared(rb, txn, commit_lsn); + } + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferCleanupTXN(rb, txn); +} + /* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 61c4ae37f3..836ccefee6 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -322,6 +322,9 @@ typedef xl_xact_parsed_commit xl_xact_parsed_prepare; typedef struct xl_xact_parsed_abort { + Oid dbId; + Oid tsId; + TimestampTz xact_time; uint32 xinfo; @@ -332,6 +335,10 @@ typedef struct xl_xact_parsed_abort RelFileNode *xnodes; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; /* only for 2PC */ + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; } xl_xact_parsed_abort; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 63b14367f0..9dad4c997f 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -89,6 +89,11 @@ typedef struct LogicalDecodingContext bool prepared_write; XLogRecPtr write_location; TransactionId write_xid; + + /* + * Capabilities of the output plugin. + */ + bool enable_twophase; } LogicalDecodingContext; diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 78fd38bb16..568897df49 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -67,6 +67,46 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* + * Called for an implicit ABORT of a transaction. + */ +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare + * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED + * and sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* * Called for the generic logical decoding messages. */ @@ -98,7 +138,12 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeAbortCB abort_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index d6b00654c2..d446d9082b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -179,6 +180,9 @@ typedef struct ReorderBufferTXN */ TransactionId xid; + /* In case of 2PC we need to pass GID to output plugin */ + char gid[GIDSIZE]; + /* * LSN of the first data carrying, WAL record with knowledge about this * xid. This is allowed to *not* be first record adorned with this xid, if @@ -312,6 +316,37 @@ typedef void (*ReorderBufferCommitCB) ( ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* abort callback signature */ +typedef void (*ReorderBufferAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +typedef bool (*ReorderBufferFilterPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* abort prepared callback signature */ +typedef void (*ReorderBufferAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + /* message callback signature */ typedef void (*ReorderBufferMessageCB) ( ReorderBuffer *rb, @@ -347,6 +382,11 @@ struct ReorderBuffer ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; ReorderBufferCommitCB commit; + ReorderBufferAbortCB abort; + ReorderBufferFilterPrepareCB filter_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferAbortPreparedCB abort_prepared; ReorderBufferMessageCB message; /* @@ -389,6 +429,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -412,6 +457,15 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); -- 2.14.3 (Apple Git-98)