From d276b9025f66605fd8548849dccf6fd2a0f46a2a Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 23 Jun 2021 06:32:41 -0400 Subject: [PATCH v90] Skip empty transactions for logical replication. The current logical replication behaviour is to send every transaction to subscriber even though the transaction is empty (because it does not contain changes from the selected publications). It is a waste of CPU cycles and network bandwidth to build/transmit these empty transactions. This patch addresses the above problem by postponing the BEGIN / BEGIN PREPARE message until the first change. While processing a COMMIT message or a PREPARE message, if there is no other change for that transaction, do not send COMMIT message or PREPARE message. It means that pgoutput will skip BEGIN / COMMIT or BEGIN PREPARE / PREPARE messages for transactions that are empty. Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com --- contrib/test_decoding/test_decoding.c | 7 +- doc/src/sgml/logicaldecoding.sgml | 12 +- doc/src/sgml/protocol.sgml | 15 +++ src/backend/replication/logical/logical.c | 9 +- src/backend/replication/logical/proto.c | 16 ++- src/backend/replication/logical/reorderbuffer.c | 2 +- src/backend/replication/logical/worker.c | 38 ++++-- src/backend/replication/pgoutput/pgoutput.c | 158 +++++++++++++++++++++++- src/include/replication/logicalproto.h | 8 +- src/include/replication/output_plugin.h | 4 +- src/include/replication/reorderbuffer.h | 4 +- src/test/subscription/t/020_messages.pl | 5 +- src/test/subscription/t/021_twophase.pl | 46 ++++++- src/tools/pgindent/typedefs.list | 1 + 14 files changed, 286 insertions(+), 39 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index e5cd84e..408dbfc 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -86,7 +86,9 @@ static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_lsn); static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, @@ -390,7 +392,8 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* COMMIT PREPARED callback */ static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { TestDecodingData *data = ctx->output_plugin_private; diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 9628eb5..27abdaa 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -883,11 +883,19 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, The required commit_prepared_cb callback is called whenever a transaction COMMIT PREPARED has been decoded. The gid field, which is part of the - txn parameter, can be used in this callback. + txn parameter, can be used in this callback. The + parameters prepare_end_lsn and + prepare_time can be used to check if the plugin + has received this PREPARE TRANSACTION in which case + it can commit the transaction, otherwise, it can skip the commit. The + gid alone is not sufficient because the downstream + node can have a prepared transaction with the same identifier. typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 8b95aaa..6ef2a6b 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -7538,6 +7538,13 @@ are available since protocol version 3. Int64 + The end LSN of the prepare. + + + + +Int64 + The LSN of the commit prepared. @@ -7552,6 +7559,14 @@ are available since protocol version 3. Int64 + Prepare timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int64 + Commit timestamp of the transaction. The value is in number of microseconds since PostgreSQL epoch (2000-01-01). diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 89d91c2..97ca648 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -63,7 +63,8 @@ static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -934,7 +935,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -969,7 +971,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, errmsg("logical replication at prepare time requires commit_prepared_cb callback"))); /* do the actual work: call callback */ - ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn, prepare_end_lsn, + prepare_time); /* Pop the error context stack */ error_context_stack = errcallback.previous; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 8e03006..4653d6d 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -206,7 +206,9 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) */ void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { uint8 flags = 0; @@ -222,8 +224,10 @@ logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, pq_sendbyte(out, flags); /* send fields */ + pq_sendint64(out, prepare_end_lsn); pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); pq_sendint64(out, txn->xact_time.commit_time); pq_sendint32(out, txn->xid); @@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * elog(ERROR, "unrecognized flags %u in commit prepared message", flags); /* read fields */ + prepare_data->prepare_end_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR,"prepare_end_lsn is not set in commit prepared message"); prepare_data->commit_lsn = pq_getmsgint64(in); if (prepare_data->commit_lsn == InvalidXLogRecPtr) elog(ERROR, "commit_lsn is not set in commit prepared message"); - prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) - elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->commit_end_lsn = pq_getmsgint64(in); + if (prepare_data->commit_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "commit_end_lsn is not set in commit prepared message"); + prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->commit_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 48239c0..6cdca07 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2792,7 +2792,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, txn->origin_lsn = origin_lsn; if (is_commit) - rb->commit_prepared(rb, txn, commit_lsn); + rb->commit_prepared(rb, txn, commit_lsn, prepare_end_lsn, prepare_time); else rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e6f1276..47d5a53 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -990,27 +990,39 @@ apply_handle_commit_prepared(StringInfo s) /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, gid, sizeof(gid)); - - /* There is no transaction when COMMIT PREPARED is called */ - begin_replication_step(); - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. + * It is possible that we haven't received the prepare because + * the transaction did not have any changes relevant to this + * subscription and was essentially an empty prepare. In which case, + * the walsender is optimized to drop the empty transaction and the + * accompanying prepare. Silently ignore if we don't find the prepared + * transaction. */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.commit_time; + if (LookupGXact(gid, prepare_data.prepare_end_lsn, + prepare_data.prepare_time)) + { - FinishPreparedTransaction(gid, true); - end_replication_step(); - CommitTransactionCommand(); + /* There is no transaction when COMMIT PREPARED is called */ + begin_replication_step(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.commit_end_lsn; + replorigin_session_origin_timestamp = prepare_data.commit_time; + + FinishPreparedTransaction(gid, true); + end_replication_step(); + CommitTransactionCommand(); + } pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + store_flush_position(prepare_data.commit_end_lsn); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + process_syncing_tables(prepare_data.commit_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d5a284d..43679a2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -56,7 +56,9 @@ static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr commit_lsn); + ReorderBufferTXN *txn, XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, @@ -132,6 +134,11 @@ typedef struct RelationSyncEntry TupleConversionMap *map; } RelationSyncEntry; +typedef struct PGOutputTxnData +{ + bool sent_begin_txn; /* flag indicating whether begin has been sent */ +} PGOutputTxnData; + /* Map used to remember which relation schemas we sent. */ static HTAB *RelationSyncCache = NULL; @@ -401,10 +408,32 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->sent_begin_txn = false; + txn->output_plugin_private = data; +} + + +static void +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + Assert(data); OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); + data->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -419,8 +448,22 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + bool skip; + + Assert(data); + skip = !data->sent_begin_txn; + pfree(data); + txn->output_plugin_private = NULL; OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (skip) + { + elog(DEBUG1, "Skipping replication of an empty transaction"); + return; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -432,10 +475,28 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + /* + * Don't send BEGIN PREPARE message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN PREPARE and COMMIT PREPARED messages + * to subscribers, using bandwidth on something with little/no use + * for logical replication. + */ + pgoutput_begin_txn(ctx, txn); +} + +static void +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + Assert(data); OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin_prepare(ctx->out, txn); + data->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -450,8 +511,18 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + + Assert(data); OutputPluginUpdateProgress(ctx); + /* skip PREPARE message if nothing was sent */ + if (!data->sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty prepared transaction"); + return; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -462,12 +533,33 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* + * skip sending COMMIT PREPARED message if prepared transaction + * has not been sent. + */ + if (data) + { + bool skip = !data->sent_begin_txn; + pfree(data); + txn->output_plugin_private = NULL; + if (skip) + { + elog(DEBUG1, + "Skipping replication of COMMIT PREPARED of an empty transaction"); + return; + } + } + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); + logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn, prepare_end_lsn, + prepare_time); OutputPluginWrite(ctx, true); } @@ -480,8 +572,26 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* + * skip sending ROLLBACK PREPARED message if prepared transaction + * has not been sent. + */ + if (data) + { + bool skip = !data->sent_begin_txn; + pfree(data); + txn->output_plugin_private = NULL; + if (skip) + { + elog(DEBUG1, + "Skipping replication of ROLLBACK of an empty transaction"); + return; + } + } OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, prepare_time); @@ -630,11 +740,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ + if (!in_streaming) + Assert(txndata); + if (!is_publishable_relation(relation)) return; @@ -668,6 +783,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (!in_streaming && !txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -770,6 +894,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; int i; @@ -777,6 +902,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; + /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ + if (!in_streaming) + Assert(txndata); + /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -813,6 +942,15 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* output BEGIN if we haven't yet */ + if (!in_streaming && !txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, xid, @@ -833,6 +971,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata; TransactionId xid = InvalidTransactionId; if (!data->messages) @@ -845,6 +984,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ + if (!in_streaming && transactional) + { + txndata = (PGOutputTxnData *) txn->output_plugin_private; + if (!txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, xid, diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 7a4804f..2fa60b5 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -150,8 +150,10 @@ typedef struct LogicalRepPreparedTxnData */ typedef struct LogicalRepCommitPreparedTxnData { + XLogRecPtr prepare_end_lsn; XLogRecPtr commit_lsn; - XLogRecPtr end_lsn; + XLogRecPtr commit_end_lsn; + TimestampTz prepare_time; TimestampTz commit_time; TransactionId xid; char gid[GIDSIZE]; @@ -190,7 +192,9 @@ extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, extern void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data); extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); extern void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data); extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 810495e..0d28306 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -128,7 +128,9 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, */ typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); /* * Called for ROLLBACK PREPARED. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index d7c785b..ffc0b56 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -442,7 +442,9 @@ typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb, /* commit prepared callback signature */ typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); /* rollback prepared callback signature */ typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl index 52bd92d..2b43ae0 100644 --- a/src/test/subscription/t/020_messages.pl +++ b/src/test/subscription/t/020_messages.pl @@ -86,9 +86,8 @@ $result = $node_publisher->safe_psql( 'publication_names', 'tap_pub') )); -# 66 67 == B C == BEGIN COMMIT -is( $result, qq(66 -67), +# no message and no BEGIN and COMMIT because of empty transaction optimization +is($result, qq(), 'option messages defaults to false so message (M) is not available on slot' ); diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 4c372a6..8a33641 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 24; +use Test::More tests => 25; ############################### # Setup @@ -318,10 +318,9 @@ $node_publisher->safe_psql('postgres', " $node_publisher->wait_for_catchup($appname_copy); -# Check that the transaction has been prepared on the subscriber, there will be 2 -# prepared transactions for the 2 subscriptions. +# Check that the transaction has been prepared on the subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;;"); -is($result, qq(2), 'transaction is prepared on subscriber'); +is($result, qq(1), 'transaction is prepared on subscriber'); # Now commit the insert and verify that it IS replicated $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'mygid';"); @@ -337,6 +336,45 @@ is($result, qq(2), 'replicated data in subscriber table'); $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;"); $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;"); +############################## +# Test empty prepares +############################## + +# create a table that is not part of the publication +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_nopub (a int PRIMARY KEY)"); + +# disable the subscription so that we can peek at the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +# wait for the replication slot to become inactive in the publisher +$node_publisher->poll_query_until('postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", 1); + +# create a transaction with no changes relevant to the slot +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_nopub SELECT generate_series(1,10); + PREPARE TRANSACTION 'empty_transaction'; + COMMIT PREPARED 'empty_transaction';"); + +# peek at the contents of the slot +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub') +)); + +# the empty transaction should be skipped +is($result, qq(), + 'empty transaction dropped on slot' +); + +# enable the subscription to test cleanup +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); + ############################### # check all the cleanup ############################### diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cabc0bb..ad62bbe 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1597,6 +1597,7 @@ PGMessageField PGModuleMagicFunction PGNoticeHooks PGOutputData +PGOutputTxnData PGPROC PGP_CFB PGP_Context -- 1.8.3.1