From 1e273c48a6a2556fa59c0a24f7dfc0885e197331 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 23 Dec 2020 02:38:34 -0500 Subject: [PATCH v35 4/8] Add support for apply at prepare time to built-in logical replication. To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the below things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add the prepare API for streaming transactions which will apply the changes accumulated in spool-file at prepare time. * We allow skipping prepared transactions if they are already prepared. We do ensure that we skip only when the GID, origin_lsn, and origin_timestamp of a prepared xact matches to avoid the possibility of a match of prepared xact from two different nodes. This can happen when the server or apply worker restarts after a prepared transaction. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, and Dilip Kumar Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com --- src/backend/access/transam/twophase.c | 74 ++++++- src/backend/replication/logical/origin.c | 7 +- src/backend/replication/logical/proto.c | 260 +++++++++++++++++++++- src/backend/replication/logical/worker.c | 330 ++++++++++++++++++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 172 ++++++++++++--- src/include/access/twophase.h | 2 + src/include/replication/logicalproto.h | 75 ++++++- src/include/replication/reorderbuffer.h | 12 + src/tools/pgindent/typedefs.list | 3 + 9 files changed, 895 insertions(+), 40 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index fe10809..71cca00 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1133,9 +1133,9 @@ EndPrepare(GlobalTransaction gxact) gxact->prepare_start_lsn = ProcLastRecPtr; /* - * Mark the prepared transaction as valid. As soon as xact.c marks - * MyProc as not running our XID (which it will do immediately after - * this function returns), others can commit/rollback the xact. + * Mark the prepared transaction as valid. As soon as xact.c marks MyProc + * as not running our XID (which it will do immediately after this + * function returns), others can commit/rollback the xact. * * NB: a side effect of this is to make a dummy ProcArray entry for the * prepared XID. This must happen before we clear the XID from MyProc / @@ -2446,3 +2446,71 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); } + +/* + * LookupGXact + * Check if the prepared transaction with the given GID and lsn is around. + * + * Note that we always compare with the LSN where prepare ends because that is + * what is stored as origin_lsn in the 2PC file. + * + * This function is primarily used to check if the prepared transaction + * received from the upstream (remote node) already exists. Checking only GID + * is not sufficient because a different prepared xact with the same GID can + * exist on the same node. So, we are ensuring to match origin_lsn and + * origin_timestamp of prepared xact to avoid the possibility of a match of + * prepared xact from two different nodes. + */ +bool +LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, + TimestampTz origin_prepare_timestamp) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if ((gxact->valid && strcmp(gxact->gid, gid) == 0)) + { + char* buf; + TwoPhaseFileHeader* hdr; + + /* + * We are neither expecting the collisions of GXACTs (same gid) + * between publisher and subscribers nor the apply worker restarts + * after prepared xacts, so we perform all I/O while holding + * TwoPhaseStateLock for simplicity. + * + * To move the I/O out of the lock, we need to ensure that no other + * backend commits the prepared xact in the meantime. We can do + * this optimization if we encounter many collisions in GID between + * publisher and subscriber. + */ + if (gxact->ondisk) + buf = ReadTwoPhaseFile(gxact->xid, false); + else + { + Assert(gxact->prepare_start_lsn); + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + } + + hdr = (TwoPhaseFileHeader *) buf; + + if (hdr->origin_lsn == prepare_end_lsn && + hdr->origin_timestamp == origin_prepare_timestamp) + { + found = true; + pfree(buf); + break; + } + + pfree(buf); + } + } + LWLockRelease(TwoPhaseStateLock); + return found; +} diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 15ab8e7..dd33469 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -957,8 +957,11 @@ replorigin_advance(RepOriginId node, /* * Due to - harmless - race conditions during a checkpoint we could see - * values here that are older than the ones we already have in memory. - * Don't overwrite those. + * values here that are older than the ones we already have in memory. We + * could also see older values for prepared transactions when the prepare + * is sent at a later point of time along with commit prepared and there + * are other transactions commits between prepare and commit prepared. See + * ReorderBufferFinishPrepared. Don't overwrite those. */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index fdb3118..1047385 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -78,7 +78,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); - /* send the flags field (unused for now) */ + /* send the flags field */ pq_sendbyte(out, flags); /* send fields */ @@ -106,6 +106,264 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write BEGIN PREPARE to the output stream. + */ +void +logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE); + + /* fixed fields */ + pq_sendint64(out, txn->final_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction BEGIN PREPARE from the stream. + */ +void +logicalrep_read_begin_prepare(StringInfo in, LogicalRepBeginPrepareData *begin_data) +{ + /* read fields */ + begin_data->final_lsn = pq_getmsgint64(in); + if (begin_data->final_lsn == InvalidXLogRecPtr) + elog(ERROR, "final_lsn not set in begin message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin message"); + begin_data->committime = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(begin_data->gid, pq_getmsgstring(in)); +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write COMMIT PREPARED to the output stream. + */ +void +logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. Additionally, the transaction + * must be prepared. See ReorderBufferFinishPrepared. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction COMMIT PREPARED from the stream. + */ +void +logicalrep_read_commit_prepared(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write ROLLBACK PREPARED to the output stream. + */ +void +logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions. In + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_end_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction ROLLBACK PREPARED from the stream. + */ +void +logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in rollback prepare message", flags); + + /* read fields */ + rollback_data->prepare_end_lsn = pq_getmsgint64(in); + rollback_data->rollback_end_lsn = pq_getmsgint64(in); + rollback_data->preparetime = pq_getmsgint64(in); + rollback_data->rollbacktime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(rollback_data->gid, pq_getmsgstring(in)); +} + +/* + * Write STREAM PREPARE to the output stream. + */ +void +logicalrep_write_stream_prepare(StringInfo out, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_PREPARE); + + /* + * This should only ever happen for two-phase transactions. In which case + * we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* transaction ID */ + Assert(TransactionIdIsValid(txn->xid)); + pq_sendint32(out, txn->xid); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read STREAM PREPARE from the output stream. + */ +TransactionId +logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags */ + flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in stream prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->preparetime = pq_getmsgint64(in); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); + + return xid; +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4f75e85..4f57a8a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -59,6 +59,7 @@ #include "access/table.h" #include "access/tableam.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" @@ -169,6 +170,9 @@ bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* for skipping prepared transaction */ +bool skip_prepared_txn = false; + /* * Hash table for storing the streaming xid information along with shared file * set for streaming and subxact files. @@ -690,6 +694,12 @@ apply_handle_begin(StringInfo s) { LogicalRepBeginData begin_data; + /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + logicalrep_read_begin(s, &begin_data); remote_final_lsn = begin_data.final_lsn; @@ -709,6 +719,12 @@ apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + logicalrep_read_commit(s, &commit_data); Assert(commit_data.commit_lsn == remote_final_lsn); @@ -722,6 +738,264 @@ apply_handle_commit(StringInfo s) } /* + * Handle BEGIN message. + */ +static void +apply_handle_begin_prepare(StringInfo s) +{ + LogicalRepBeginPrepareData begin_data; + + logicalrep_read_begin_prepare(s, &begin_data); + + if (LookupGXact(begin_data.gid, begin_data.end_lsn, begin_data.committime)) + { + /* + * If this gid has already been prepared then we don't want to apply + * this txn again. This can happen after restart where upstream can + * send the prepared transaction again. See + * ReorderBufferFinishPrepared. Don't update remote_final_lsn. + */ + skip_prepared_txn = true; + return; + } + + remote_final_lsn = begin_data.final_lsn; + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + + logicalrep_read_prepare(s, &prepare_data); + + if (skip_prepared_txn) + { + /* + * If we are skipping this transaction because it was previously + * prepared, ignore it and reset the flag. + */ + Assert(LookupGXact(prepare_data.gid, prepare_data.end_lsn, + prepare_data.preparetime)); + skip_prepared_txn = false; + return; + } + + Assert(prepare_data.prepare_lsn == remote_final_lsn); + + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* + * BeginTransactionBlock is necessary to balance the + * EndTransactionBlock called within the PrepareTransactionBlock + * below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + PrepareTransactionBlock(prepare_data.gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a COMMIT PREPARED of a previously PREPARED transaction. + */ +static void +apply_handle_commit_prepared(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + + /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + + logicalrep_read_commit_prepared(s, &prepare_data); + + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + FinishPreparedTransaction(prepare_data.gid, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION. + */ +static void +apply_handle_rollback_prepared(StringInfo s) +{ + LogicalRepRollbackPreparedTxnData rollback_data; + + /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + + logicalrep_read_rollback_prepared(s, &rollback_data); + + /* + * It is possible that we haven't received prepare because it occurred + * before walsender reached a consistent point in which case we need to + * skip rollback prepared. + */ + if (LookupGXact(rollback_data.gid, rollback_data.prepare_end_lsn, + rollback_data.preparetime)) + { + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; + replorigin_session_origin_timestamp = rollback_data.rollbacktime; + + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + FinishPreparedTransaction(rollback_data.gid, false); + CommitTransactionCommand(); + } + + pgstat_report_stat(false); + + store_flush_position(rollback_data.rollback_end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(rollback_data.rollback_end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle STREAM PREPARE. + * + * Logic is in two parts: + * 1. Replay all the spooled operations + * 2. Mark the transaction as prepared + */ +static void +apply_handle_stream_prepare(StringInfo s) +{ + int nchanges = 0; + LogicalRepPreparedTxnData prepare_data; + TransactionId xid; + + Assert(!in_streamed_transaction); + + /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + + xid = logicalrep_read_stream_prepare(s, &prepare_data); + elog(DEBUG1, "received prepare for streamed transaction %u", xid); + + /* + * + * -------------------------------------------------------------------------- + * 1. Replay all the spooled operations - Similar code as for + * apply_handle_stream_commit (i.e. non two-phase stream commit) + * -------------------------------------------------------------------------- + */ + + ensure_transaction(); + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); + + nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn); + + /* + * + * -------------------------------------------------------------------------- + * 2. Mark the transaction as prepared. - Similar code as for + * apply_handle_prepare (i.e. two-phase non-streamed prepare) + * -------------------------------------------------------------------------- + */ + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.preparetime; + + PrepareTransactionBlock(prepare_data.gid); + CommitTransactionCommand(); + + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + elog(DEBUG1, "apply_handle_stream_prepare_txn: replayed %d (all) changes.", nchanges); + + in_remote_transaction = false; + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -753,6 +1027,12 @@ apply_handle_stream_start(StringInfo s) Assert(!in_streamed_transaction); /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + + /* * Start a transaction on stream start, this transaction will be committed * on the stream stop unless it is a tablesync worker in which case it will * be committed after processing all the messages. We need the transaction @@ -800,6 +1080,12 @@ apply_handle_stream_stop(StringInfo s) Assert(in_streamed_transaction); /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + + /* * Close the file with serialized changes, and serialize information about * subxacts for the toplevel transaction. */ @@ -835,6 +1121,12 @@ apply_handle_stream_abort(StringInfo s) Assert(!in_streamed_transaction); + /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + logicalrep_read_stream_abort(s, &xid, &subxid); /* @@ -1053,6 +1345,12 @@ apply_handle_stream_commit(StringInfo s) Assert(!in_streamed_transaction); + /* + * We don't expect any other transaction data while skipping a prepared + * xact. + */ + Assert(!skip_prepared_txn); + xid = logicalrep_read_stream_commit(s, &commit_data); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -1176,6 +1474,9 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (skip_prepared_txn) + return; + if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; @@ -1297,6 +1598,9 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; + if (skip_prepared_txn) + return; + if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; @@ -1454,6 +1758,9 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (skip_prepared_txn) + return; + if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; @@ -1823,6 +2130,9 @@ apply_handle_truncate(StringInfo s) List *relids_logged = NIL; ListCell *lc; + if (skip_prepared_txn) + return; + if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; @@ -1979,6 +2289,26 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); return; + + case LOGICAL_REP_MSG_BEGIN_PREPARE: + apply_handle_begin_prepare(s); + return; + + case LOGICAL_REP_MSG_PREPARE: + apply_handle_prepare(s); + return; + + case LOGICAL_REP_MSG_COMMIT_PREPARED: + apply_handle_commit_prepared(s); + return; + + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + apply_handle_rollback_prepared(s); + return; + + case LOGICAL_REP_MSG_STREAM_PREPARE: + apply_handle_stream_prepare(s); + return; } ereport(ERROR, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 49d25b0..7cf2951 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,6 +47,16 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, @@ -57,6 +67,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; static bool in_streaming; @@ -66,6 +78,9 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx); +static void send_repl_origin(LogicalDecodingContext* ctx, + RepOriginId origin_id, XLogRecPtr origin_lsn, + bool send_origin); /* * Entry in the map used to remember which relation schemas we sent. @@ -143,6 +158,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; + + cb->begin_prepare_cb = pgoutput_begin_prepare_txn; + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -153,6 +173,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_truncate_cb = pgoutput_truncate; + /* transaction streaming - two-phase commit */ + cb->stream_prepare_cb = pgoutput_stream_prepare_txn; } static void @@ -322,8 +344,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } else { - /* Disable the streaming during the slot initialization mode. */ + /* + * Disable the streaming and prepared transactions during the slot + * initialization mode. + */ ctx->streaming = false; + ctx->twophase = false; } } @@ -338,27 +364,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); - if (send_replication_origin) - { - char *origin; - - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - - /*---------- - * XXX: which behaviour do we want here? - * - * Alternatives: - * - don't send origin message if origin name not found - * (that's what we do now) - * - throw error - that will break replication, not good - * - send some special "unknown" origin - *---------- - */ - if (replorigin_by_oid(txn->origin_id, true, &origin)) - logicalrep_write_origin(ctx->out, origin, txn->origin_lsn); - } + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); OutputPluginWrite(ctx, true); } @@ -378,6 +385,68 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * BEGIN PREPARE callback + */ +static void +pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_begin_prepare(ctx->out, txn); + + send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, + send_replication_origin); + + OutputPluginWrite(ctx, true); +} + +/* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * ROLLBACK PREPARED callback + */ +static void +pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, + prepare_time); + OutputPluginWrite(ctx, true); +} + +/* * Write the current schema of the relation and its ancestor (if any) if not * done yet. */ @@ -766,17 +835,8 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); - if (send_replication_origin) - { - char *origin; - - /* Message boundary */ - OutputPluginWrite(ctx, false); - OutputPluginPrepareWrite(ctx, true); - - if (replorigin_by_oid(txn->origin_id, true, &origin)) - logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); - } + send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr, + send_replication_origin); OutputPluginWrite(ctx, true); @@ -857,6 +917,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, } /* + * PREPARE callback (for streaming two-phase commit). + * + * Notify the downstream to prepare the transaction. + */ +static void +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Initialize the relation schema sync cache for a decoding session. * * The hash table is destroyed at the end of a decoding session. While @@ -1171,3 +1249,31 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) entry->replicate_valid = false; } + +/* Send Replication origin */ +static void +send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, + XLogRecPtr origin_lsn, bool send_origin) +{ + if (send_origin) + { + char *origin; + + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + + /*---------- + * XXX: which behaviour do we want here? + * + * Alternatives: + * - don't send origin message if origin name not found + * (that's what we do now) + * - throw error - that will break replication, not good + * - send some special "unknown" origin + *---------- + */ + if (replorigin_by_oid(origin_id, true, &origin)) + logicalrep_write_origin(ctx->out, origin, origin_lsn); + } +} diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 2ca71c3..5afb977 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -58,4 +58,6 @@ extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn, RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); +extern bool LookupGXact(const char *gid, XLogRecPtr prepare_at_lsn, + TimestampTz origin_prepare_timestamp); #endif /* TWOPHASE_H */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 1f2535d..13ea3b7 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -13,6 +13,7 @@ #ifndef LOGICAL_PROTO_H #define LOGICAL_PROTO_H +#include "access/xact.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" @@ -54,10 +55,15 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', + LOGICAL_REP_MSG_PREPARE = 'P', + LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', + LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', - LOGICAL_REP_MSG_STREAM_ABORT = 'A' + LOGICAL_REP_MSG_STREAM_ABORT = 'A', + LOGICAL_REP_MSG_STREAM_PREPARE = 'p' } LogicalRepMsgType; /* @@ -114,6 +120,7 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ typedef struct LogicalRepCommitData { XLogRecPtr commit_lsn; @@ -121,6 +128,48 @@ typedef struct LogicalRepCommitData TimestampTz committime; } LogicalRepCommitData; + +/* Begin Prepare */ +typedef struct LogicalRepBeginPrepareData +{ + XLogRecPtr final_lsn; + XLogRecPtr end_lsn; + TimestampTz committime; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepBeginPrepareData; + +/* + * Prepared transaction protocol information. This same structure is used to + * hold the information for prepare, and commit prepared transaction. + * prepare_lsn and preparetime are used to store commit lsn and + * commit time for commit prepared. + */ +typedef struct LogicalRepPreparedTxnData +{ + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz preparetime; + char gid[GIDSIZE]; +} LogicalRepPreparedTxnData; + +/* + * Rollback Prepared transaction protocol information. The prepare information + * prepare_end_lsn and prepare_time are used to check if the downstream has + * received this prepared transaction in which case it can apply the rollback, + * otherwise, it can skip the rollback operation. The gid alone is not + * sufficient because the downstream node can have a prepared transaction with + * same identifier. + */ +typedef struct LogicalRepRollbackPreparedTxnData +{ + XLogRecPtr prepare_end_lsn; + XLogRecPtr rollback_end_lsn; + TimestampTz preparetime; + TimestampTz rollbacktime; + char gid[GIDSIZE]; +} LogicalRepRollbackPreparedTxnData; + extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); @@ -128,6 +177,24 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); extern void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data); +extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn); +extern void logicalrep_read_begin_prepare(StringInfo in, + LogicalRepBeginPrepareData *begin_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN* txn, + XLogRecPtr commit_lsn); +extern void logicalrep_read_commit_prepared(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); +extern void logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data); + + extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); @@ -171,4 +238,10 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern TransactionId logicalrep_read_stream_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); + + #endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 6d63338..4b92e68 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -247,6 +247,18 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \ ) +/* Has this prepared transaction been committed? */ +#define rbtxn_commit_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_COMMIT_PREPARED) != 0 \ +) + +/* Has this prepared transaction been rollbacked? */ +#define rbtxn_rollback_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_ROLLBACK_PREPARED) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9cd047ba..ecba4ae 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1339,12 +1339,15 @@ LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData +LogicalRepBeginPrepareData LogicalRepCommitData LogicalRepCtxStruct LogicalRepPartMapEntry +LogicalRepPreparedTxnData LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation +LogicalRepRollbackPreparedTxnData LogicalRepTupleData LogicalRepTyp LogicalRepWorker -- 1.8.3.1