Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAA4eK1JQTDXTfvJ5d+L0ggG4+doyd0Xji=e0OJsb=qcn_jWALA@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Fri, Nov 18, 2022 at 7:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Nov 16, 2022 at 1:50 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > On Tuesday, November 15, 2022 7:58 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote:
> >
> > I noticed that I didn't add CHECK_FOR_INTERRUPTS while retrying send message.
> > So, attach the new version which adds that. Also attach the 0004 patch that
> > restarts logical replication with temporarily disabling the parallel apply if
> > failed to apply a transaction in parallel apply worker.
> >
>
> Few comments on v48-0001
> ======================
>

I have made quite a few changes in the comments, added some new
comments, and made other cosmetic changes in the attached patch. The
is atop v48-0001*. If these look okay to you, please include them in
the next version. Apart from these, I have a few more comments on
v48-0001*

1.
+static bool
+pa_can_start(TransactionId xid)
+{
+ if (!TransactionIdIsValid(xid))
+ return false;

The caller (see caller of pa_start_worker) already has a check that
xid passed here is valid, so I think this should be an Assert unless I
am missing something in which case it is better to add a comment here.

2. Will it be better to rename pa_start_worker() as
pa_allocate_worker() because it sometimes gets the worker from the
pool and also allocate the hash entry for worker info? That will even
match the corresponding pa_free_worker().

3.
+pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
{
...
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ subxactlist = lappend_xid(subxactlist, current_xid);
+ MemoryContextSwitchTo(oldctx);
...

Why do we need to allocate this list in a permanent context? IIUC, we
need to use this to maintain subxacts so that it can be later used to
find the given subxact at the time of rollback to savepoint in the
current in-progress transaction, so why do we need it beyond the
transaction being applied? If there is a reason for the same, it would
be better to add some comments for the same.

4.
+pa_stream_abort(LogicalRepStreamAbortData *abort_data)
{
...
+
+ for (i = list_length(subxactlist) - 1; i >= 0; i--)
+ {
+ TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
+
+ if (xid_tmp == subxid)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (found)
+ {
+ RollbackToSavepoint(spname);
+ CommitTransactionCommand();
+ subxactlist = list_truncate(subxactlist, i + 1);
+ }

I was thinking whether we can have an Assert(false) for the not found
case but it seems if all the changes of a subxact have been skipped
then probably subxid corresponding to "rollback to savepoint" won't be
found in subxactlist and we don't need to do anything for it. If that
is the case, then probably adding a comment for it would be a good
idea, otherwise, we can probably have Assert(false) in the else case.

-- 
With Regards,
Amit Kapila.

Attachment

pgsql-hackers by date:

Previous
From: Tomas Vondra
Date:
Subject: Re: test/modules/test_oat_hooks vs. debug_discard_caches=1
Next
From: Simon Riggs
Date:
Subject: Re: Reducing power consumption on idle servers