Re: logical replication empty transactions - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: logical replication empty transactions |
Date | |
Msg-id | CAHut+Pv8BN=T_uRiJqw-HQUgeC9ihjwmRGiQcMAqtO1tdERSrg@mail.gmail.com Whole thread Raw |
In response to | RE: logical replication empty transactions ("osumi.takamichi@fujitsu.com" <osumi.takamichi@fujitsu.com>) |
Responses |
Re: logical replication empty transactions
(Ajin Cherian <itsajin@gmail.com>)
|
List | pgsql-hackers |
Hi Ajin, I have reviewed the v7 patch and given my feedback comments below. Apply OK Build OK make check OK TAP (subscriptions) make check OK Build PG Docs (html) OK Although I made lots of review comments below, the important point is that none of them are functional - they are only minore re-wordings and some code refactoring that I thought would make the code simpler and/or easier to read. YMMV, so please feel free to disagree with any of them. ////////// 1a. Commit Comment - wording BEFORE This patch addresses the above problem by postponing the BEGIN / BEGIN PREPARE message until the first change. AFTER This patch addresses the above problem by postponing the BEGIN / BEGIN PREPARE messages until the first change is encountered. ------ 1b. Commit Comment - wording BEFORE While processing a COMMIT message or a PREPARE message, if there is no other change for that transaction, do not send COMMIT message or PREPARE message. AFTER If (when processing a COMMIT / PREPARE message) we find there had been no other change for that transaction, then do not send the COMMIT / PREPARE message. ------ 2. doc/src/sgml/logicaldecoding.sgml - wording @@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, The required <function>commit_prepared_cb</function> callback is called whenever a transaction <command>COMMIT PREPARED</command> has been decoded. The <parameter>gid</parameter> field, which is part of the - <parameter>txn</parameter> parameter, can be used in this callback. + <parameter>txn</parameter> parameter, can be used in this callback. The + parameters <parameter>prepare_end_lsn</parameter> and + <parameter>prepare_time</parameter> can be used to check if the plugin + has received this <command>PREPARE TRANSACTION</command> in which case + it can commit the transaction, otherwise, it can skip the commit. The + <parameter>gid</parameter> alone is not sufficient because the downstream + node can have a prepared transaction with the same identifier. => (some minor rewording of the last part) AFTER: The parameters <parameter>prepare_end_lsn</parameter> and <parameter>prepare_time</parameter> can be used to check if the plugin has received this <command>PREPARE TRANSACTION</command> or not. If yes, it can commit the transaction, otherwise, it can skip the commit. The <parameter>gid</parameter> alone is not sufficient to determine this because the downstream node may already have a prepared transaction with the same identifier. ------ 3. src/backend/replication/logical/proto.c - whitespace @@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * elog(ERROR, "unrecognized flags %u in commit prepared message", flags); /* read fields */ + prepare_data->prepare_end_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR,"prepare_end_lsn is not set in commit prepared message"); => There is missing space before the 2nd elog param. ------ 4. src/backend/replication/logical/worker.c - comment typos /* - * Update origin state so we can restart streaming from correct position - * in case of crash. + * It is possible that we haven't received the prepare because + * the transaction did not have any changes relevant to this + * subscription and was essentially an empty prepare. In which case, + * the walsender is optimized to drop the empty transaction and the + * accompanying prepare. Silently ignore if we don't find the prepared + * transaction. */ 4a. => "and was essentially an empty prepare" --> "so was essentially an empty prepare" 4b. => "In which case" --> "In this case" ------ 5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn @@ -410,10 +417,32 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->sent_begin_txn = false; + txn->output_plugin_private = data; +} => I felt that since this message postponement is now the new behaviour of this function then probably this should all be a function level comment instead of the comment being in the body of the function ------ 6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin + +static void +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) => Even though it is kind of obvious, it is probably better to provide a function comment here too ------ 7. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_txn @@ -428,8 +457,22 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + bool skip; + + Assert(data); + skip = !data->sent_begin_txn; + pfree(data); + txn->output_plugin_private = NULL; OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (skip) + { + elog(DEBUG1, "Skipping replication of an empty transaction"); + return; + } + 7a. => I felt that the comment "skip COMMIT message if nothing was sent" should be done at the point where you *decide* to skip or not. So you could either move that comment to where the skip variable is assigned. Or (my preference) leave the comment where it is but change the variable name to be sent_begin = !data->sent_begin_txn; ------ Regardless I think the comment should be elaborated a bit to describe the reason more. 7b. => BEFORE /* skip COMMIT message if nothing was sent */ AFTER /* If a BEGIN message was not yet sent, then it means there were no relevant changes encountered, so we can skip the COMMIT message too. */ ------ 8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare_txn @@ -441,10 +484,28 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + /* + * Don't send BEGIN PREPARE message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN PREPARE and COMMIT PREPARED messages + * to subscribers, using bandwidth on something with little/no use + * for logical replication. + */ + pgoutput_begin_txn(ctx, txn); +} 8a. => Like previously, I felt that this big comment should be at the function level of pgoutput_begin_prepare_txn instead of in the body of the function. ------ 8b. => And then the body comment would be something simple like: /* Delegate to assign the begin sent flag as false same as for the BEGIN message. */ pgoutput_begin_txn(ctx, txn); ------ 9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare + +static void +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) => Probably this needs a function comment. ------ 10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn @@ -459,8 +520,18 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + + Assert(data); OutputPluginUpdateProgress(ctx); + /* skip PREPARE message if nothing was sent */ + if (!data->sent_begin_txn) => Maybe elaborate on that "skip PREPARE message if nothing was sent" comment in a way similar to my review comment 7b. For example, AFTER /* If the BEGIN was not yet sent, then it means there were no relevant changes encountered, so we can skip the PREPARE message too. */ ------ 11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn @@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* + * skip sending COMMIT PREPARED message if prepared transaction + * has not been sent. + */ + if (data) => Similar to previous review comment 10, I think the reason for the skip should be elaborated a little bit. For example, AFTER /* If the BEGIN PREPARE was not yet sent, then it means there were no relevant changes encountered, so we can skip the COMMIT PREPARED message too. */ ------ 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_rollback_prepared_txn => Similar as for pgoutput_comment_prepared_txn (see review comment 11) ------ 13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change @@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ + if (!in_streaming) + Assert(txndata); + if (!is_publishable_relation(relation)) return; 13a. => I felt the streaming logic with the txndata is a bit confusing. I think it would be easier to have another local variable (sent_begin) and use it like this... bool sent_begin; if (in_streaming) { sent_begin = true; else { PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; Assert(txndata) sent_begin = txn->sent_begin_txn; } ... ------ + /* output BEGIN if we haven't yet */ 13b. => I thought the comment is not quite right AFTER /* Output BEGIN / BEGIN PREPARE if we haven't yet */ ------ + if (!in_streaming && !txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } + 13.c => If you introduce the variable (as suggested in 13a) this code becomes much simpler: AFTER if (!sent_begin) { if (rbtxn_prepared(txn)) pgoutput_begin_prepare(ctx, txn) else pgoutput_begin(ctx, txn); } ------ 14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate => All the similar review comments made for pg_change (13a, 13b, 13c) apply to pgoutput_truncate here also. ------ 15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message @@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata; TransactionId xid = InvalidTransactionId; => This variable should be declared in the block where it is used, similar to the suggestion 13a. Also is it just an accidental omission that you did Assert(txndata) for all the other places but not here? ------ Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: