Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: [HACKERS] logical decoding of two-phase transactions
Date
Msg-id CAA4eK1Lh5a6DO2a9J8G_Wce1p4b7QskKcfJcKtar5Ugd6jgUYA@mail.gmail.com
Whole thread Raw
In response to Re: [HACKERS] logical decoding of two-phase transactions  (Ajin Cherian <itsajin@gmail.com>)
Responses Re: [HACKERS] logical decoding of two-phase transactions
Re: [HACKERS] logical decoding of two-phase transactions
List pgsql-hackers
On Wed, Sep 9, 2020 at 3:33 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> On Mon, Sep 7, 2020 at 11:17 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>>
>>
>> Nikhil has a test for the same
>> (0004-Teach-test_decoding-plugin-to-work-with-2PC.Jan4) in his last
>> email [1]. You might want to use it to test this behavior. I think you
>> can also keep the tests as a separate patch as Nikhil had.
>>
> Done. I've added the tests and also tweaked code to make sure that the aborts during 2 phase commits are also
handled.
>

I don't think it is complete yet.
*
* This error can only occur when we are sending the data in
  * streaming mode and the streaming is not finished yet.
  */
- Assert(streaming);
- Assert(stream_started);
+ Assert(streaming || rbtxn_prepared(txn));
+ Assert(stream_started  || rbtxn_prepared(txn));

Here, you have updated the code but comments are still not updated.

*
@@ -2370,10 +2391,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
  errdata = NULL;
  curtxn->concurrent_abort = true;

- /* Reset the TXN so that it is allowed to stream remaining data. */
- ReorderBufferResetTXN(rb, txn, snapshot_now,
-   command_id, prev_lsn,
-   specinsert);
+ /* If streaming, reset the TXN so that it is allowed to stream
remaining data. */
+ if (streaming && stream_started)
+ {
+ ReorderBufferResetTXN(rb, txn, snapshot_now,
+   command_id, prev_lsn,
+   specinsert);
+ }
+ else
+ {
+ elog(LOG, "stopping decoding of %s (%u)",
+ txn->gid[0] != '\0'? txn->gid:"", txn->xid);
+ rb->abort(rb, txn, commit_lsn);
+ }

I don't think we need to perform abort here. Later we will anyway
encounter the WAL for Rollback Prepared for which we will call
abort_prepared_cb. As we have set the 'concurrent_abort' flag, it will
allow us to skip all the intermediate records. Here, we need only
enough state in ReorderBufferTxn that it can be later used for
ReorderBufferFinishPrepared(). Basically, you need functionality
similar to ReorderBufferTruncateTXN where except for invalidations you
can free memory for everything else. You can either write a new
function ReorderBufferTruncatePreparedTxn or pass another bool
parameter in ReorderBufferTruncateTXN to indicate it is prepared_xact
and then clean up additional things that are not required for prepared
xact.

*
Similarly, I don't understand why we need below code:
ReorderBufferProcessTXN()
{
..
+ if (rbtxn_rollback(txn))
+ rb->abort(rb, txn, commit_lsn);
..
}

There is nowhere we are setting the RBTXN_ROLLBACK flag, so how will
this check be true? If we decide to remove this code then don't forget
to update the comments.

*
If my previous two comments are correct then I don't think we need the
below interface.
+    <sect3 id="logicaldecoding-output-plugin-abort">
+     <title>Transaction Abort Callback</title>
+
+     <para>
+      The required <function>abort_cb</function> callback is called whenever
+      a transaction abort has to be initiated. This can happen if we are
+      decoding a transaction that has been prepared for two-phase commit and
+      a concurrent rollback happens while we are decoding it.
+<programlisting>
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+                                       ReorderBufferTXN *txn,
+                                       XLogRecPtr abort_lsn);



>>
>>
>> I don't know why the patch has used this way to implement an option to
>> enable two-phase. Can't we use how we implement 'stream-changes'
>> option in commit 7259736a6e? Just refer how we set ctx->streaming and
>> you can use a similar way to set this parameter.
>
>
> Done, I've moved the checks for callbacks to inside the corresponding wrappers.
>

This is not what I suggested. Please study the commit 7259736a6e and
see how streaming option is implemented. I want later subscribers can
specify whether they want transactions to be decoded at prepare time
similar to what we have done for streaming. Also, search for
ctx->streaming in the code and see how it is set to get the idea.

Note: Please use version number while sending patches, you can use
something like git format-patch -N -v n to do that. It makes easier
for the reviewer to compare it with the previous version.

Few other comments:
===================
1.
ReorderBufferProcessTXN()
{
..
if (streaming)
{
ReorderBufferTruncateTXN(rb, txn);

/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
else
ReorderBufferCleanupTXN(rb, txn);
..
}

I don't think we can perform ReorderBufferCleanupTXN for the prepared
transactions because if we have removed the ReorderBufferTxn before
commit, the later code might not consider such a transaction in the
system and compute the wrong value of restart_lsn for a slot.
Basically, in SnapBuildProcessRunningXacts() when we call
ReorderBufferGetOldestTXN(), it should show the ReorderBufferTxn of
the prepared transaction which is not yet committed but because we
have removed it after prepare, it won't get that TXN and then that
leads to wrong computation of restart_lsn. Once we start from a wrong
point in WAL, the snapshot built was incorrect which will lead to the
wrong result. This is the same reason why the patch is not doing
ReorderBufferForget in DecodePrepare when we decide to skip the
transaction. Also, here, we need to set CheckXidAlive =
InvalidTransactionId; for prepared xact as well.

2. Have you thought about the interaction of streaming with prepared
transactions? You can try writing some tests using pg_logical* APIs
and see the behaviour. For ex. there is no handling in
ReorderBufferStreamCommit for the same. I think you need to introduce
stream_prepare API similar to stream_commit and then use the same.

3.
- if (streaming)
+ if (streaming || rbtxn_prepared(change->txn))
  {
  curtxn = change->txn;
  SetupCheckXidLive(curtxn->xid);
@@ -2249,7 +2254,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
  break;
  }
  }
-
  /*

Spurious line removal.

-- 
With Regards,
Amit Kapila.



pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: Re: Range checks of pg_test_fsync --secs-per-test and pg_test_timing --duration
Next
From: Amit Kapila
Date:
Subject: Re: [HACKERS] logical decoding of two-phase transactions