Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers
From | Amit Kapila |
---|---|
Subject | Re: [HACKERS] logical decoding of two-phase transactions |
Date | |
Msg-id | CAA4eK1Lh5a6DO2a9J8G_Wce1p4b7QskKcfJcKtar5Ugd6jgUYA@mail.gmail.com Whole thread Raw |
In response to | Re: [HACKERS] logical decoding of two-phase transactions (Ajin Cherian <itsajin@gmail.com>) |
Responses |
Re: [HACKERS] logical decoding of two-phase transactions
Re: [HACKERS] logical decoding of two-phase transactions |
List | pgsql-hackers |
On Wed, Sep 9, 2020 at 3:33 PM Ajin Cherian <itsajin@gmail.com> wrote: > > On Mon, Sep 7, 2020 at 11:17 PM Amit Kapila <amit.kapila16@gmail.com> wrote: >> >> >> Nikhil has a test for the same >> (0004-Teach-test_decoding-plugin-to-work-with-2PC.Jan4) in his last >> email [1]. You might want to use it to test this behavior. I think you >> can also keep the tests as a separate patch as Nikhil had. >> > Done. I've added the tests and also tweaked code to make sure that the aborts during 2 phase commits are also handled. > I don't think it is complete yet. * * This error can only occur when we are sending the data in * streaming mode and the streaming is not finished yet. */ - Assert(streaming); - Assert(stream_started); + Assert(streaming || rbtxn_prepared(txn)); + Assert(stream_started || rbtxn_prepared(txn)); Here, you have updated the code but comments are still not updated. * @@ -2370,10 +2391,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, errdata = NULL; curtxn->concurrent_abort = true; - /* Reset the TXN so that it is allowed to stream remaining data. */ - ReorderBufferResetTXN(rb, txn, snapshot_now, - command_id, prev_lsn, - specinsert); + /* If streaming, reset the TXN so that it is allowed to stream remaining data. */ + if (streaming && stream_started) + { + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + elog(LOG, "stopping decoding of %s (%u)", + txn->gid[0] != '\0'? txn->gid:"", txn->xid); + rb->abort(rb, txn, commit_lsn); + } I don't think we need to perform abort here. Later we will anyway encounter the WAL for Rollback Prepared for which we will call abort_prepared_cb. As we have set the 'concurrent_abort' flag, it will allow us to skip all the intermediate records. Here, we need only enough state in ReorderBufferTxn that it can be later used for ReorderBufferFinishPrepared(). Basically, you need functionality similar to ReorderBufferTruncateTXN where except for invalidations you can free memory for everything else. You can either write a new function ReorderBufferTruncatePreparedTxn or pass another bool parameter in ReorderBufferTruncateTXN to indicate it is prepared_xact and then clean up additional things that are not required for prepared xact. * Similarly, I don't understand why we need below code: ReorderBufferProcessTXN() { .. + if (rbtxn_rollback(txn)) + rb->abort(rb, txn, commit_lsn); .. } There is nowhere we are setting the RBTXN_ROLLBACK flag, so how will this check be true? If we decide to remove this code then don't forget to update the comments. * If my previous two comments are correct then I don't think we need the below interface. + <sect3 id="logicaldecoding-output-plugin-abort"> + <title>Transaction Abort Callback</title> + + <para> + The required <function>abort_cb</function> callback is called whenever + a transaction abort has to be initiated. This can happen if we are + decoding a transaction that has been prepared for two-phase commit and + a concurrent rollback happens while we are decoding it. +<programlisting> +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); >> >> >> I don't know why the patch has used this way to implement an option to >> enable two-phase. Can't we use how we implement 'stream-changes' >> option in commit 7259736a6e? Just refer how we set ctx->streaming and >> you can use a similar way to set this parameter. > > > Done, I've moved the checks for callbacks to inside the corresponding wrappers. > This is not what I suggested. Please study the commit 7259736a6e and see how streaming option is implemented. I want later subscribers can specify whether they want transactions to be decoded at prepare time similar to what we have done for streaming. Also, search for ctx->streaming in the code and see how it is set to get the idea. Note: Please use version number while sending patches, you can use something like git format-patch -N -v n to do that. It makes easier for the reviewer to compare it with the previous version. Few other comments: =================== 1. ReorderBufferProcessTXN() { .. if (streaming) { ReorderBufferTruncateTXN(rb, txn); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } else ReorderBufferCleanupTXN(rb, txn); .. } I don't think we can perform ReorderBufferCleanupTXN for the prepared transactions because if we have removed the ReorderBufferTxn before commit, the later code might not consider such a transaction in the system and compute the wrong value of restart_lsn for a slot. Basically, in SnapBuildProcessRunningXacts() when we call ReorderBufferGetOldestTXN(), it should show the ReorderBufferTxn of the prepared transaction which is not yet committed but because we have removed it after prepare, it won't get that TXN and then that leads to wrong computation of restart_lsn. Once we start from a wrong point in WAL, the snapshot built was incorrect which will lead to the wrong result. This is the same reason why the patch is not doing ReorderBufferForget in DecodePrepare when we decide to skip the transaction. Also, here, we need to set CheckXidAlive = InvalidTransactionId; for prepared xact as well. 2. Have you thought about the interaction of streaming with prepared transactions? You can try writing some tests using pg_logical* APIs and see the behaviour. For ex. there is no handling in ReorderBufferStreamCommit for the same. I think you need to introduce stream_prepare API similar to stream_commit and then use the same. 3. - if (streaming) + if (streaming || rbtxn_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2249,7 +2254,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } } - /* Spurious line removal. -- With Regards, Amit Kapila.
pgsql-hackers by date: