Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAA4eK1Jwh7j86Egk1cye=x2R_yrTjzXGj7Fx12wVybBAEq91kA@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
List pgsql-hackers
On Wed, Nov 30, 2022 at 4:23 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> 2.
> + /*
> + * The stream lock is released when processing changes in a
> + * streaming block, so the leader needs to acquire the lock here
> + * before entering PARTIAL_SERIALIZE mode to ensure that the
> + * parallel apply worker will wait for the leader to release the
> + * stream lock.
> + */
> + if (in_streamed_transaction &&
> + action != LOGICAL_REP_MSG_STREAM_STOP)
> + {
> + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
>
> This comment is not completely correct because we can even acquire the
> lock for the very streaming chunk. This check will work but doesn't
> appear future-proof or at least not very easy to understand though I
> don't have a better suggestion at this stage. Can we think of a better
> check here?
>

One idea is that we acquire this lock every time and callers like
stream_commit are responsible to release it. Also, we can handle the
close of stream file in the respective callers. I think that will make
this part of the patch easier to follow.

Some other comments:
=====================
1. The handling of buffile inside pa_stream_abort() looks bit ugly to
me. I think you primarily required it because the buffile opened by
parallel apply worker is in CurrentResourceOwner. Can we think of
having a new resource owner to apply spooled messages? I think that
will avoid the need to have a special purpose code to handle buffiles
in parallel apply worker.

2.
@@ -564,6 +571,7 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
  TransactionId current_xid;
  ParallelApplyWorkerInfo *winfo;
  TransApplyAction apply_action;
+ StringInfoData original_msg;

  apply_action = get_transaction_apply_action(stream_xid, &winfo);

@@ -573,6 +581,8 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)

  Assert(TransactionIdIsValid(stream_xid));

+ original_msg = *s;
+
  /*
  * We should have received XID of the subxact as the first part of the
  * message, so extract it.
@@ -596,10 +606,14 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
  stream_write_change(action, s);
  return true;

+ case TRANS_LEADER_PARTIAL_SERIALIZE:
  case TRANS_LEADER_SEND_TO_PARALLEL:
  Assert(winfo);

- pa_send_data(winfo, s->len, s->data);
+ if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+ pa_send_data(winfo, s->len, s->data);
+ else
+ stream_write_change(action, &original_msg);

Please add the comment to specify the reason to remember the original string.

3.
@@ -1797,8 +1907,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
  changes_filename(path, MyLogicalRepWorker->subid, xid);
  elog(DEBUG1, "replaying changes from file \"%s\"", path);

- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
- false);
+ stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+ stream_xid = xid;

Why do we need stream_xid here? I think we can avoid having global
stream_fd if the comment #1 is feasible.

4.
+ * TRANS_LEADER_APPLY:
+ * The action means that we

/The/This. Please make a similar change for other actions.

5. Apart from the above, please find a few changes to the comments for
0001 and 0002 patches in the attached patches.


-- 
With Regards,
Amit Kapila.

Attachment

pgsql-hackers by date:

Previous
From: Sergey Shinderuk
Date:
Subject: Re: Bug in row_number() optimization
Next
From: Amit Langote
Date:
Subject: Re: generic plans and "initial" pruning