Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers
From | Ajin Cherian |
---|---|
Subject | Re: [HACKERS] logical decoding of two-phase transactions |
Date | |
Msg-id | CAFPTHDab56twVmC+0a=RNcRw4KuyFdqzW0JAcvJdS63n_fRnOQ@mail.gmail.com Whole thread Raw |
In response to | Re: [HACKERS] logical decoding of two-phase transactions (Peter Smith <smithpb2250@gmail.com>) |
Responses |
Re: [HACKERS] logical decoding of two-phase transactions
|
List | pgsql-hackers |
On Fri, May 21, 2021 at 6:43 PM Peter Smith <smithpb2250@gmail.com> wrote: > Fixed in v77-0001, v77-0002 Attaching a new patch-set that rebases the patch, addresses review comments from Peter as well as a test failure reported by Tang. I've also added some new test case into patch-2 authored by Tang. I've addressed the following comments: On Tue, May 18, 2021 at 6:53 PM Peter Smith <smithpb2250@gmail.com> wrote: > > 1. File: doc/src/sgml/logicaldecoding.sgml > > 1.1 > > @@ -862,11 +862,19 @@ typedef void (*LogicalDecodePrepareCB) (struct > LogicalDecodingContext *ctx, > The required <function>commit_prepared_cb</function> callback is called > whenever a transaction <command>COMMIT PREPARED</command> has > been decoded. > The <parameter>gid</parameter> field, which is part of the > - <parameter>txn</parameter> parameter, can be used in this callback. > + <parameter>txn</parameter> parameter, can be used in this callback. The > + parameters <parameter>prepare_end_lsn</parameter> and > + <parameter>prepare_time</parameter> can be used to check if the plugin > + has received this <command>PREPARE TRANSACTION</command> in which case > + it can apply the rollback, otherwise, it can skip the rollback > operation. The > + <parameter>gid</parameter> alone is not sufficient because the downstream > + node can have a prepared transaction with same identifier. > > This is in the commit prepared section, but that new text is referring > to "it can apply to the rollback" etc. > Is this deliberate text, or maybe cut/paste error? > > ========== Fixed. > > 2. File: src/backend/replication/pgoutput/pgoutput.c > > 2.1 > > @@ -76,6 +78,7 @@ static void > pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, > > static bool publications_valid; > static bool in_streaming; > +static bool in_prepared_txn; > > Wondering why this is a module static flag. That makes it looks like > it somehow applies globally to all the functions in this scope, but > really I think this is just a txn property, right? > - e.g. why not use another member of the private TXN data instead? or > - e.g. why not use rbtxn_prepared(txn) macro? > > ---------- I've removed this flag and used rbtxn_prepared(txn) macro. This also seems to fix the crash reported by Tang, where it was trying to send a "BEGIN PREPARE" as part of a non-prepared tx. I've changed the logic to rely on the prepared flag in the txn to decide if BEGIN needs to be sent or BEGIN PREPARE needs to be sent. > > 2.2 > > @@ -404,10 +410,32 @@ pgoutput_startup(LogicalDecodingContext *ctx, > OutputPluginOptions *opt, > static void > pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > { > + PGOutputTxnData *data = MemoryContextAllocZero(ctx->context, > + sizeof(PGOutputTxnData)); > + > + (void)txn; /* keep compiler quiet */ > > I guess since now the arg "txn" is being used the added statement to > "keep compiler quiet" is now redundant, so should be removed. > Removed this. > ---------- > > 2.3 > > +static void > +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > +{ > bool send_replication_origin = txn->origin_id != InvalidRepOriginId; > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > > OutputPluginPrepareWrite(ctx, !send_replication_origin); > logicalrep_write_begin(ctx->out, txn); > + data->sent_begin_txn = true; > > > I wondered is it worth adding Assert(data); here? > > ---------- Added. > > 2.4 > > @@ -422,8 +450,14 @@ static void > pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, > XLogRecPtr commit_lsn) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > OutputPluginUpdateProgress(ctx); > > I wondered is it worthwhile to add Assert(data); here also? > > ---------- Added. > > 2.5 > @@ -422,8 +450,14 @@ static void > pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, > XLogRecPtr commit_lsn) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > OutputPluginUpdateProgress(ctx); > > + /* skip COMMIT message if nothing was sent */ > + if (!data->sent_begin_txn) > + return; > > Shouldn't this code also be freeing that allocated data? I think you > do free it in similar functions later in this patch. > > ---------- Modified this. > > 2.6 > > @@ -435,10 +469,31 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > static void > pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > { > + PGOutputTxnData *data = MemoryContextAllocZero(ctx->context, > + sizeof(PGOutputTxnData)); > + > + /* > + * Don't send BEGIN message here. Instead, postpone it until the first > + * change. In logical replication, a common scenario is to replicate a set > + * of tables (instead of all tables) and transactions whose changes were on > + * table(s) that are not published will produce empty transactions. These > + * empty transactions will send BEGIN and COMMIT messages to subscribers, > + * using bandwidth on something with little/no use for logical replication. > + */ > + data->sent_begin_txn = false; > + txn->output_plugin_private = data; > + in_prepared_txn = true; > +} > > Apart from setting the in_prepared_txn = true; this is all identical > code to pgoutput_begin_txn so you could consider just delegating to > call that other function to save all the cut/paste data allocation and > big comment. Or maybe this way is better - I am not sure. > > ---------- Updated this. > > 2.7 > > +static void > +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) > +{ > bool send_replication_origin = txn->origin_id != InvalidRepOriginId; > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > > OutputPluginPrepareWrite(ctx, !send_replication_origin); > logicalrep_write_begin_prepare(ctx->out, txn); > + data->sent_begin_txn = true; > > I wondered is it worth adding Assert(data); here also? > > ---------- Added Assert. > > 2.8 > > @@ -453,11 +508,18 @@ static void > pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, > XLogRecPtr prepare_lsn) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > OutputPluginUpdateProgress(ctx); > > I wondered is it worth adding Assert(data); here also? > > ---------- Added. > > 2.9 > > @@ -465,12 +527,28 @@ pgoutput_prepare_txn(LogicalDecodingContext > *ctx, ReorderBufferTXN *txn, > */ > static void > pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > - XLogRecPtr commit_lsn) > + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, > + TimestampTz prepare_time) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > OutputPluginUpdateProgress(ctx); > > + /* > + * skip sending COMMIT PREPARED message if prepared transaction > + * has not been sent. > + */ > + if (data && !data->sent_begin_txn) > + { > + pfree(data); > + return; > + } > + > + if (data) > + pfree(data); > OutputPluginPrepareWrite(ctx, true); > > I think this pfree logic might be refactored more simply to just be > done in one place. e.g. like: > > if (data) > { > bool skip = !data->sent_begin_txn; > pfree(data); > if (skip) > return; > } > > BTW, is it even possible to get in this function with NULL private > data? Perhaps that should be an Assert(data) ? > > ---------- Changed the logic as per your suggestion but did not add the Assert because you can come into this function with a NULL private data, this can happen as the commit prepared for the transaction can come after a restart of the WALSENDER and the previously setup private data is lost. This is only applicable for commit prepared and rollback prepared. > > 2.10 > > @@ -483,8 +561,22 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, > XLogRecPtr prepare_end_lsn, > TimestampTz prepare_time) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > OutputPluginUpdateProgress(ctx); > > + /* > + * skip sending COMMIT PREPARED message if prepared transaction > + * has not been sent. > + */ > + if (data && !data->sent_begin_txn) > + { > + pfree(data); > + return; > + } > + > + if (data) > + pfree(data); > > Same comment as above for refactoring the pfree logic. > > ---------- Refactored. > > 2.11 > > @@ -483,8 +561,22 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, > XLogRecPtr prepare_end_lsn, > TimestampTz prepare_time) > { > + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; > + > OutputPluginUpdateProgress(ctx); > > + /* > + * skip sending COMMIT PREPARED message if prepared transaction > + * has not been sent. > + */ > + if (data && !data->sent_begin_txn) > + { > + pfree(data); > + return; > + } > + > + if (data) > + pfree(data); > > Is that comment correct or cut/paste error? Why does it say "COMMIT PREPARED" ? > > ---------- Fixed. > > 2.12 > > @@ -613,6 +705,7 @@ pgoutput_change(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > Relation relation, ReorderBufferChange *change) > { > PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; > MemoryContext old; > > I wondered is it worth adding Assert(txndata); here also? > > ---------- Added. > > 2.13 > > @@ -750,6 +852,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > int nrelations, Relation relations[], ReorderBufferChange *change) > { > PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; > MemoryContext old; > > I wondered is it worth adding Assert(txndata); here also? > > ---------- Added. > > 2.14 > > @@ -813,11 +925,15 @@ pgoutput_message(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > const char *message) > { > PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + PGOutputTxnData *txndata; > TransactionId xid = InvalidTransactionId; > > if (!data->messages) > return; > > + if (txn && txn->output_plugin_private) > + txndata = (PGOutputTxnData *) txn->output_plugin_private; > + > /* > * Remember the xid for the message in streaming mode. See > * pgoutput_change. > @@ -825,6 +941,19 @@ pgoutput_message(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > if (in_streaming) > xid = txn->xid; > > + /* output BEGIN if we haven't yet, avoid for streaming and > non-transactional messages */ > + if (!in_streaming && transactional) > + { > + txndata = (PGOutputTxnData *) txn->output_plugin_private; > + if (!txndata->sent_begin_txn) > + { > + if (!in_prepared_txn) > + pgoutput_begin(ctx, txn); > + else > + pgoutput_begin_prepare(ctx, txn); > + } > + } > > That code: > + if (txn && txn->output_plugin_private) > + txndata = (PGOutputTxnData *) txn->output_plugin_private; > looked misplaced to me. > > Shouldn't all that be relocated to be put inside the if block: > + if (!in_streaming && transactional) > > And when you do that maybe the condition can be simplified because you could > Assert(txn); > > ========== Removed that redundant code but cannot add Assert here as in streaming and transactional messages, there will be no output_plugin_private. > > 3. File src/include/replication/pgoutput.h > > 3.1 > > @@ -30,4 +30,9 @@ typedef struct PGOutputData > bool two_phase; > } PGOutputData; > > +typedef struct PGOutputTxnData > +{ > + bool sent_begin_txn; /* flag indicating whether begin has been sent */ > +} PGOutputTxnData; > + > > Why is this typedef here? IIUC it is only used inside the pgoutput.c, > so shouldn't it be declared in that file also? > > ---------- Changed this accordingly. > > 3.2 > > @@ -30,4 +30,9 @@ typedef struct PGOutputData > bool two_phase; > } PGOutputData; > > +typedef struct PGOutputTxnData > +{ > + bool sent_begin_txn; /* flag indicating whether begin has been sent */ > +} PGOutputTxnData; > + > > That is a new typedef so maybe your patch also should update the > src/tools/pgindent/typedefs.list to name this new typedef. > > ---------- Added. regards, Ajin Cherian Fujitsu Australia
Attachment
pgsql-hackers by date: