On Mon, Jan 6, 2020 at 2:11 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Mon, Jan 6, 2020 at 9:21 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Sat, Jan 4, 2020 at 4:07 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > >
> > > It is better to merge it with the main patch for
> > > "Implement-streaming-mode-in-ReorderBuffer", otherwise, it is a bit
> > > difficult to review.
> > Actually, we can merge 0008, 0009, 0012, 0018 to the main patch
> > (0007). Basically, if we merge all of them then we don't need to deal
> > with the conflict. I think Tomas has kept them separate so that we
> > can review the solution for the schema sent. And, I kept 0018 as a
> > separate patch to avoid conflict and rebasing in 0008, 0009 and 0012.
> > In the next patch set, I will merge all of them to 0007.
> >
>
> Okay, I think we can merge those patches.
Done
0008, 0009, 0017, 0018 are merged to 0007, 0012 is merged to 0010
>
> Few more comments:
> --------------------------------
> v4-0007-Implement-streaming-mode-in-ReorderBuffer
> 1.
> +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
> {
> ..
> + /*
> + * TOCHECK: We have to rebuild historic snapshot to be sure it includes all
> + * information about
> subtransactions, which could arrive after streaming start.
> + */
> + if (!txn->is_schema_sent)
> + snapshot_now
> = ReorderBufferCopySnap(rb, txn->base_snapshot,
> + txn,
> command_id);
> ..
> }
>
> Why are we using base snapshot here instead of the snapshot we saved
> the first time streaming has happened? And as mentioned in comments,
> won't we need to consider the snapshots for subtransactions that
> arrived after the last time we have streamed the changes?
Fixed
>
> 2.
> + /* remember the command ID and snapshot for the streaming run */
> + txn->command_id = command_id;
> + txn-
> >snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
> +
> txn, command_id);
>
> I don't see where the txn->snapshot_now is getting freed. The
> base_snapshot is freed in ReorderBufferCleanupTXN, but I don't see
> this getting freed.
I have freed this In ReorderBufferCleanupTXN
>
> 3.
> +static void
> +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
> {
> ..
> + /*
> + * If this is a subxact, we need to stream the top-level transaction
> + * instead.
> + */
> + if (txn->toptxn)
> + {
> +
> ReorderBufferStreamTXN(rb, txn->toptxn);
> + return;
> + }
>
> Is it ever possible that we reach here for subtransaction, if not,
> then it should be Assert rather than if condition?
Fixed
>
> 4. In ReorderBufferStreamTXN(), don't we need to set some of the txn
> fields like origin_id, origin_lsn as we do in ReorderBufferCommit()
> especially to cover the case when it gets called due to memory
> overflow (aka via ReorderBufferCheckMemoryLimit).
We get origin_lsn during commit time so I am not sure how can we do
that. I have also noticed that currently, we are not using origin_lsn
on the subscriber side. I think need more investigation that if we
want this then do we need to log it early.
>
> v4-0017-Extend-handling-of-concurrent-aborts-for-streamin
> 1.
> @@ -3712,7 +3727,22 @@ ReorderBufferStreamTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
> if (using_subtxn)
>
> RollbackAndReleaseCurrentSubTransaction();
>
> - PG_RE_THROW();
> + /* re-throw only if it's not an abort */
> + if (errdata-
> >sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK)
> + {
> + MemoryContextSwitchTo(ecxt);
> + PG_RE_THROW();
> +
> }
> + else
> + {
> + /* remember the command ID and snapshot for the streaming run */
> + txn-
> >command_id = command_id;
> + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
> +
> txn, command_id);
> + rb->stream_stop(rb, txn);
> +
> +
> FlushErrorState();
> + }
>
> Can you update comments either in the above code block or some other
> place to explain what is the concurrent abort problem and how we dealt
> with it? Also, please explain how the above error handling is
> sufficient to address all the various scenarios (sub-transaction got
> aborted when we have already sent some changes, or when we have not
> sent any changes yet).
Done
>
> v4-0006-Gracefully-handle-concurrent-aborts-of-uncommitte
> 1.
> + /*
> + * If CheckXidAlive is valid, then we check if it aborted. If it did, we
> + * error out
> + */
> + if (TransactionIdIsValid(CheckXidAlive) &&
> + !TransactionIdIsInProgress(CheckXidAlive) &&
> + !TransactionIdDidCommit(CheckXidAlive))
> + ereport(ERROR,
> + (errcode(ERRCODE_TRANSACTION_ROLLBACK),
> + errmsg("transaction aborted during system catalog scan")));
>
> Why here we can't use TransactionIdDidAbort? If we can't use it, then
> can you add comments stating the reason of the same.
Done
>
> 2.
> /*
> + * An xid value pointing to a possibly ongoing or a prepared transaction.
> + * Currently used in logical decoding. It's possible that such transactions
> + * can get aborted while the decoding is ongoing.
> + */
> +TransactionId CheckXidAlive = InvalidTransactionId;
>
> In comments, there is a mention of a prepared transaction. Do we
> allow prepared transactions to be decoded as part of this patch?
Fixed
>
> 3.
> + /*
> + * If CheckXidAlive is valid, then we check if it aborted. If it did, we
> + * error out
> + */
> + if (TransactionIdIsValid
> (CheckXidAlive) &&
> + !TransactionIdIsInProgress(CheckXidAlive) &&
> + !TransactionIdDidCommit(CheckXidAlive))
>
> This comment just says what code below is doing, can you explain the
> rationale behind this check. It would be better if it is clear by
> reading comments, why we are doing this check after fetching the
> tuple. I think this can refer to the comment I suggested to add for
> changes in patch
> v4-0017-Extend-handling-of-concurrent-aborts-for-streamin.
Done
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com