Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers

From Ajin Cherian
Subject Re: [HACKERS] logical decoding of two-phase transactions
Date
Msg-id CAFPTHDab56twVmC+0a=RNcRw4KuyFdqzW0JAcvJdS63n_fRnOQ@mail.gmail.com
Whole thread Raw
In response to Re: [HACKERS] logical decoding of two-phase transactions  (Peter Smith <smithpb2250@gmail.com>)
Responses Re: [HACKERS] logical decoding of two-phase transactions
List pgsql-hackers
On Fri, May 21, 2021 at 6:43 PM Peter Smith <smithpb2250@gmail.com> wrote:

> Fixed in v77-0001, v77-0002

Attaching a new patch-set that rebases the patch, addresses review
comments from Peter as well as a test failure reported by Tang. I've
also added some new test case into patch-2 authored by Tang.

I've addressed the following comments:

On Tue, May 18, 2021 at 6:53 PM Peter Smith <smithpb2250@gmail.com> wrote:

>
> 1.  File: doc/src/sgml/logicaldecoding.sgml
>
> 1.1
>
> @@ -862,11 +862,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
> LogicalDecodingContext *ctx,
>        The required <function>commit_prepared_cb</function> callback is called
>        whenever a transaction <command>COMMIT PREPARED</command> has
> been decoded.
>        The <parameter>gid</parameter> field, which is part of the
> -      <parameter>txn</parameter> parameter, can be used in this callback.
> +      <parameter>txn</parameter> parameter, can be used in this callback. The
> +      parameters <parameter>prepare_end_lsn</parameter> and
> +      <parameter>prepare_time</parameter> can be used to check if the plugin
> +      has received this <command>PREPARE TRANSACTION</command> in which case
> +      it can apply the rollback, otherwise, it can skip the rollback
> operation. The
> +      <parameter>gid</parameter> alone is not sufficient because the downstream
> +      node can have a prepared transaction with same identifier.
>
> This is in the commit prepared section, but that new text is referring
> to "it can apply to the rollback" etc.
> Is this deliberate text, or maybe cut/paste error?
>
> ==========

Fixed.

>
> 2. File: src/backend/replication/pgoutput/pgoutput.c
>
> 2.1
>
> @@ -76,6 +78,7 @@ static void
> pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
>
>  static bool publications_valid;
>  static bool in_streaming;
> +static bool in_prepared_txn;
>
> Wondering why this is a module static flag. That makes it looks like
> it somehow applies globally to all the functions in this scope, but
> really I think this is just a txn property, right?
> - e.g. why not use another member of the private TXN data instead? or
> - e.g. why not use rbtxn_prepared(txn) macro?
>
> ----------

I've removed this flag and used rbtxn_prepared(txn) macro. This also
seems to fix the crash reported by Tang, where it
was trying to send a "BEGIN PREPARE" as part of a non-prepared tx.
I've changed the logic to rely on the prepared flag in
the txn to decide if BEGIN needs to be sent or BEGIN PREPARE needs to be sent.

>
> 2.2
>
> @@ -404,10 +410,32 @@ pgoutput_startup(LogicalDecodingContext *ctx,
> OutputPluginOptions *opt,
>  static void
>  pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>  {
> + PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
> + sizeof(PGOutputTxnData));
> +
> + (void)txn; /* keep compiler quiet */
>
> I guess since now the arg "txn" is being used the added statement to
> "keep compiler quiet" is now redundant, so should be removed.
>

Removed this.

> ----------
>
> 2.3
>
> +static void
> +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
> +{
>   bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
> + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
>
>   OutputPluginPrepareWrite(ctx, !send_replication_origin);
>   logicalrep_write_begin(ctx->out, txn);
> + data->sent_begin_txn = true;
>
>
> I wondered is it worth adding Assert(data); here?
>
> ----------

Added.

>
> 2.4
>
> @@ -422,8 +450,14 @@ static void
>  pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>   XLogRecPtr commit_lsn)
>  {
> + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> I wondered is it worthwhile to add Assert(data); here also?
>
> ----------

Added.

>
> 2.5
> @@ -422,8 +450,14 @@ static void
>  pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>   XLogRecPtr commit_lsn)
>  {
> + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> + /* skip COMMIT message if nothing was sent */
> + if (!data->sent_begin_txn)
> + return;
>
> Shouldn't this code also be freeing that allocated data? I think you
> do free it in similar functions later in this patch.
>
> ----------

Modified this.

>
> 2.6
>
> @@ -435,10 +469,31 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>  static void
>  pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>  {
> + PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
> + sizeof(PGOutputTxnData));
> +
> + /*
> + * Don't send BEGIN message here. Instead, postpone it until the first
> + * change. In logical replication, a common scenario is to replicate a set
> + * of tables (instead of all tables) and transactions whose changes were on
> + * table(s) that are not published will produce empty transactions. These
> + * empty transactions will send BEGIN and COMMIT messages to subscribers,
> + * using bandwidth on something with little/no use for logical replication.
> + */
> + data->sent_begin_txn = false;
> + txn->output_plugin_private = data;
> + in_prepared_txn = true;
> +}
>
> Apart from setting the in_prepared_txn = true; this is all identical
> code to pgoutput_begin_txn so you could consider just delegating to
> call that other function to save all the cut/paste data allocation and
> big comment. Or maybe this way is better - I am not sure.
>
> ----------

Updated this.

>
> 2.7
>
> +static void
> +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
> +{
>   bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
> + PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
>
>   OutputPluginPrepareWrite(ctx, !send_replication_origin);
>   logicalrep_write_begin_prepare(ctx->out, txn);
> + data->sent_begin_txn = true;
>
> I wondered is it worth adding Assert(data); here also?
>
> ----------

Added Assert.

>
> 2.8
>
> @@ -453,11 +508,18 @@ static void
>  pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>   XLogRecPtr prepare_lsn)
>  {
> + PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> I wondered is it worth adding Assert(data); here also?
>
> ----------

Added.

>
> 2.9
>
> @@ -465,12 +527,28 @@ pgoutput_prepare_txn(LogicalDecodingContext
> *ctx, ReorderBufferTXN *txn,
>   */
>  static void
>  pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> - XLogRecPtr commit_lsn)
> + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
> + TimestampTz prepare_time)
>  {
> + PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> + /*
> + * skip sending COMMIT PREPARED message if prepared transaction
> + * has not been sent.
> + */
> + if (data && !data->sent_begin_txn)
> + {
> + pfree(data);
> + return;
> + }
> +
> + if (data)
> + pfree(data);
>   OutputPluginPrepareWrite(ctx, true);
>
> I think this pfree logic might be refactored more simply to just be
> done in one place. e.g. like:
>
> if (data)
> {
>     bool skip = !data->sent_begin_txn;
>     pfree(data);
>     if (skip)
>         return;
> }
>
> BTW, is it even possible to get in this function with NULL private
> data? Perhaps that should be an Assert(data) ?
>
> ----------

Changed the logic as per your suggestion but did not add the Assert
because you can come into this function
with a NULL private data, this can happen as the commit prepared for
the transaction can come after a restart of the
WALSENDER and the previously setup private data is lost. This is only
applicable for commit prepared and rollback prepared.

>
> 2.10
>
> @@ -483,8 +561,22 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
>      XLogRecPtr prepare_end_lsn,
>      TimestampTz prepare_time)
>  {
> + PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> + /*
> + * skip sending COMMIT PREPARED message if prepared transaction
> + * has not been sent.
> + */
> + if (data && !data->sent_begin_txn)
> + {
> + pfree(data);
> + return;
> + }
> +
> + if (data)
> + pfree(data);
>
> Same comment as above for refactoring the pfree logic.
>
> ----------

Refactored.

>
> 2.11
>
> @@ -483,8 +561,22 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
>      XLogRecPtr prepare_end_lsn,
>      TimestampTz prepare_time)
>  {
> + PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> + /*
> + * skip sending COMMIT PREPARED message if prepared transaction
> + * has not been sent.
> + */
> + if (data && !data->sent_begin_txn)
> + {
> + pfree(data);
> + return;
> + }
> +
> + if (data)
> + pfree(data);
>
> Is that comment correct or cut/paste error? Why does it say "COMMIT PREPARED" ?
>
> ----------

Fixed.

>
> 2.12
>
> @@ -613,6 +705,7 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   Relation relation, ReorderBufferChange *change)
>  {
>   PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>   MemoryContext old;
>
> I wondered is it worth adding Assert(txndata); here also?
>
> ----------

Added.

>
> 2.13
>
> @@ -750,6 +852,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>     int nrelations, Relation relations[], ReorderBufferChange *change)
>  {
>   PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>   MemoryContext old;
>
> I wondered is it worth adding Assert(txndata); here also?
>
> ----------

Added.

>
> 2.14
>
> @@ -813,11 +925,15 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   const char *message)
>  {
>   PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + PGOutputTxnData *txndata;
>   TransactionId xid = InvalidTransactionId;
>
>   if (!data->messages)
>   return;
>
> + if (txn && txn->output_plugin_private)
> + txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   /*
>   * Remember the xid for the message in streaming mode. See
>   * pgoutput_change.
> @@ -825,6 +941,19 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   if (in_streaming)
>   xid = txn->xid;
>
> + /* output BEGIN if we haven't yet, avoid for streaming and
> non-transactional messages */
> + if (!in_streaming && transactional)
> + {
> + txndata = (PGOutputTxnData *) txn->output_plugin_private;
> + if (!txndata->sent_begin_txn)
> + {
> + if (!in_prepared_txn)
> + pgoutput_begin(ctx, txn);
> + else
> + pgoutput_begin_prepare(ctx, txn);
> + }
> + }
>
> That code:
> + if (txn && txn->output_plugin_private)
> + txndata = (PGOutputTxnData *) txn->output_plugin_private;
> looked misplaced to me.
>
> Shouldn't all that be relocated to be put inside the if block:
> + if (!in_streaming && transactional)
>
> And when you do that maybe the condition can be simplified because you could
> Assert(txn);
>
> ==========

Removed that redundant code but cannot add Assert here as in streaming
and transactional messages, there will be no
output_plugin_private.

>
> 3. File src/include/replication/pgoutput.h
>
> 3.1
>
> @@ -30,4 +30,9 @@ typedef struct PGOutputData
>   bool two_phase;
>  } PGOutputData;
>
> +typedef struct PGOutputTxnData
> +{
> + bool sent_begin_txn; /* flag indicating whether begin has been sent */
> +} PGOutputTxnData;
> +
>
> Why is this typedef here? IIUC it is only used inside the pgoutput.c,
> so shouldn't it be declared in that file also?
>
> ----------

Changed this accordingly.

>
> 3.2
>
> @@ -30,4 +30,9 @@ typedef struct PGOutputData
>   bool two_phase;
>  } PGOutputData;
>
> +typedef struct PGOutputTxnData
> +{
> + bool sent_begin_txn; /* flag indicating whether begin has been sent */
> +} PGOutputTxnData;
> +
>
> That is a new typedef so maybe your patch also should update the
> src/tools/pgindent/typedefs.list to name this new typedef.
>
> ----------

Added.

regards,
Ajin Cherian
Fujitsu Australia

Attachment

pgsql-hackers by date:

Previous
From: Zhihong Yu
Date:
Subject: Re: Possible pointer var TupleDesc rettupdesc used not initialized (src/backend/optimizer/util/clauses.c)
Next
From: Noah Misch
Date:
Subject: Re: Test of a partition with an incomplete detach has a timing issue