From e780c58c4cd59b9dbf44a214a7dbcd47c7443fe0 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Mon, 17 May 2021 08:32:51 -0400 Subject: [PATCH v76] Skip empty transactions for logical replication. The current logical replication behavior is to send every transaction to subscriber even though the transaction is empty (because it does not contain changes from the selected publications). It is a waste of CPU cycles and network bandwidth to build/transmit these empty transactions. This patch addresses the above problem by postponing the BEGIN / BEGIN PREPARE message until the first change. While processing a COMMIT message or a PREPARE message, if there is no other change for that transaction, do not send COMMIT message or PREPARE message. It means that pgoutput will skip BEGIN / COMMIT or BEGIN PREPARE / PREPARE messages for transactions that are empty. Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com --- contrib/test_decoding/test_decoding.c | 7 +- doc/src/sgml/logicaldecoding.sgml | 12 ++- doc/src/sgml/protocol.sgml | 15 +++ src/backend/replication/logical/logical.c | 9 +- src/backend/replication/logical/proto.c | 14 ++- src/backend/replication/logical/reorderbuffer.c | 2 +- src/backend/replication/logical/worker.c | 36 ++++--- src/backend/replication/pgoutput/pgoutput.c | 135 +++++++++++++++++++++++- src/include/replication/logicalproto.h | 8 +- src/include/replication/output_plugin.h | 4 +- src/include/replication/pgoutput.h | 5 + src/include/replication/reorderbuffer.h | 4 +- src/test/subscription/t/020_messages.pl | 5 +- 13 files changed, 223 insertions(+), 33 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 9393c85..f3329b8 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -86,7 +86,9 @@ static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_lsn); static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, @@ -390,7 +392,8 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* COMMIT PREPARED callback */ static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { TestDecodingData *data = ctx->output_plugin_private; diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 493432d..5cd4146 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -862,11 +862,19 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, The required commit_prepared_cb callback is called whenever a transaction COMMIT PREPARED has been decoded. The gid field, which is part of the - txn parameter, can be used in this callback. + txn parameter, can be used in this callback. The + parameters prepare_end_lsn and + prepare_time can be used to check if the plugin + has received this PREPARE TRANSACTION in which case + it can apply the rollback, otherwise, it can skip the rollback operation. The + gid alone is not sufficient because the downstream + node can have a prepared transaction with same identifier. typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index d13b58b..d2e2f82 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -7543,6 +7543,13 @@ are available since protocol version 3. Int64 + The end LSN of the prepare. + + + + +Int64 + The LSN of the commit. @@ -7557,6 +7564,14 @@ are available since protocol version 3. Int64 + Timestamp of the prepare. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int64 + Timestamp of the commit transaction. The value is in number of microseconds since PostgreSQL epoch (2000-01-01). diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c387997..ed60719 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -63,7 +63,8 @@ static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -940,7 +941,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -975,7 +977,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, errmsg("logical replication at prepare time requires commit_prepared_cb callback"))); /* do the actual work: call callback */ - ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn, prepare_end_lsn, + prepare_time); /* Pop the error context stack */ error_context_stack = errcallback.previous; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 7ebfd91..fd1cf6c 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -206,7 +206,9 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) */ void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { uint8 flags = 0; @@ -223,8 +225,10 @@ logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, pq_sendbyte(out, flags); /* send fields */ + pq_sendint64(out, prepare_end_lsn); pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); pq_sendint64(out, txn->u_op_time.commit_time); pq_sendint32(out, txn->xid); @@ -245,12 +249,16 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * elog(ERROR, "unrecognized flags %u in commit prepared message", flags); /* read fields */ + prepare_data->prepare_end_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR,"prepare_end_lsn is not set in commit prepared message"); prepare_data->commit_lsn = pq_getmsgint64(in); if (prepare_data->commit_lsn == InvalidXLogRecPtr) elog(ERROR, "commit_lsn is not set in commit prepared message"); - prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + prepare_data->commit_end_lsn = pq_getmsgint64(in); + if (prepare_data->commit_end_lsn == InvalidXLogRecPtr) elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->commit_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index bd36eb3..a4ce0a7 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2766,7 +2766,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, txn->origin_lsn = origin_lsn; if (is_commit) - rb->commit_prepared(rb, txn, commit_lsn); + rb->commit_prepared(rb, txn, commit_lsn, prepare_end_lsn, prepare_time); else rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c1470ab..786e553 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -938,26 +938,38 @@ apply_handle_commit_prepared(StringInfo s) /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, gid, sizeof(gid)); - - /* there is no transaction when COMMIT PREPARED is called */ - ensure_transaction(); - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. + * It is possible that we haven't received the prepare because + * the transaction did not have any changes relevant to this + * subscription and was essentially an empty prepare. In which case, + * the walsender is optimized to drop the empty transaction and the + * accompanying prepare. Silently ignore if we don't find the prepared + * transaction. */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.commit_time; + if (LookupGXact(gid, prepare_data.prepare_end_lsn, + prepare_data.prepare_time)) + { - FinishPreparedTransaction(gid, true); - CommitTransactionCommand(); + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.commit_end_lsn; + replorigin_session_origin_timestamp = prepare_data.commit_time; + + FinishPreparedTransaction(gid, true); + CommitTransactionCommand(); + } pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + store_flush_position(prepare_data.commit_end_lsn); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + process_syncing_tables(prepare_data.commit_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7c3a33d..89ce7ba 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -56,7 +56,9 @@ static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr commit_lsn); + ReorderBufferTXN *txn, XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, @@ -76,6 +78,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; static bool in_streaming; +static bool in_prepared_txn; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -377,6 +380,9 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, else ctx->twophase_opt_given = true; + /* Also remember we're currently not in a prepared transaction. */ + in_prepared_txn = false; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -404,10 +410,32 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + (void)txn; /* keep compiler quiet */ + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->sent_begin_txn = false; + txn->output_plugin_private = data; +} + + +static void +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); + data->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -422,8 +450,14 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (!data->sent_begin_txn) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -435,10 +469,31 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->sent_begin_txn = false; + txn->output_plugin_private = data; + in_prepared_txn = true; +} + +static void +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin_prepare(ctx->out, txn); + data->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -453,11 +508,18 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* skip PREPARE message if nothing was sent */ + if (!data->sent_begin_txn) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); + in_prepared_txn = false; } /* @@ -465,12 +527,28 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* + * skip sending COMMIT PREPARED message if prepared transaction + * has not been sent. + */ + if (data && !data->sent_begin_txn) + { + pfree(data); + return; + } + + if (data) + pfree(data); OutputPluginPrepareWrite(ctx, true); - logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); + logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn, prepare_end_lsn, + prepare_time); OutputPluginWrite(ctx, true); } @@ -483,8 +561,22 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* + * skip sending COMMIT PREPARED message if prepared transaction + * has not been sent. + */ + if (data && !data->sent_begin_txn) + { + pfree(data); + return; + } + + if (data) + pfree(data); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, prepare_time); @@ -613,6 +705,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; @@ -651,6 +744,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (!in_streaming && !txndata->sent_begin_txn) + { + if (!in_prepared_txn) + pgoutput_begin(ctx, txn); + else + pgoutput_begin_prepare(ctx, txn); + } + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -750,6 +852,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; int i; @@ -793,6 +896,15 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* output BEGIN if we haven't yet */ + if (!in_streaming && !txndata->sent_begin_txn) + { + if (!in_prepared_txn) + pgoutput_begin(ctx, txn); + else + pgoutput_begin_prepare(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, xid, @@ -813,11 +925,15 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata; TransactionId xid = InvalidTransactionId; if (!data->messages) return; + if (txn && txn->output_plugin_private) + txndata = (PGOutputTxnData *) txn->output_plugin_private; + /* * Remember the xid for the message in streaming mode. See * pgoutput_change. @@ -825,6 +941,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ + if (!in_streaming && transactional) + { + txndata = (PGOutputTxnData *) txn->output_plugin_private; + if (!txndata->sent_begin_txn) + { + if (!in_prepared_txn) + pgoutput_begin(ctx, txn); + else + pgoutput_begin_prepare(ctx, txn); + } + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, xid, diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 9b3e934..a6d9977 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -150,8 +150,10 @@ typedef struct LogicalRepPreparedTxnData */ typedef struct LogicalRepCommitPreparedTxnData { + XLogRecPtr prepare_end_lsn; XLogRecPtr commit_lsn; - XLogRecPtr end_lsn; + XLogRecPtr commit_end_lsn; + TimestampTz prepare_time; TimestampTz commit_time; TransactionId xid; char gid[GIDSIZE]; @@ -190,7 +192,9 @@ extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, extern void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data); extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); extern void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data); extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 810495e..0d28306 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -128,7 +128,9 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, */ typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); /* * Called for ROLLBACK PREPARED. diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 0dc460f..bec526f 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -30,4 +30,9 @@ typedef struct PGOutputData bool two_phase; } PGOutputData; +typedef struct PGOutputTxnData +{ + bool sent_begin_txn; /* flag indicating whether begin has been sent */ +} PGOutputTxnData; + #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 61fda1e..0c61278 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -456,7 +456,9 @@ typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb, /* commit prepared callback signature */ typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); /* rollback prepared callback signature */ typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl index 0940d0f..39bff1b 100644 --- a/src/test/subscription/t/020_messages.pl +++ b/src/test/subscription/t/020_messages.pl @@ -82,9 +82,8 @@ $result = $node_publisher->safe_psql( 'publication_names', 'tap_pub') )); -# 66 67 == B C == BEGIN COMMIT -is( $result, qq(66 -67), +# no message and no BEGIN and COMMIT because of empty transaction optimization +is($result, qq(), 'option messages defaults to false so message (M) is not available on slot' ); -- 1.8.3.1