Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
| From | Amit Kapila |
|---|---|
| Subject | Re: Perform streaming logical transactions by background workers and parallel apply |
| Date | |
| Msg-id | CAA4eK1Jwh7j86Egk1cye=x2R_yrTjzXGj7Fx12wVybBAEq91kA@mail.gmail.com Whole thread |
| In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
| Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
|
| List | pgsql-hackers |
On Wed, Nov 30, 2022 at 4:23 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> 2.
> + /*
> + * The stream lock is released when processing changes in a
> + * streaming block, so the leader needs to acquire the lock here
> + * before entering PARTIAL_SERIALIZE mode to ensure that the
> + * parallel apply worker will wait for the leader to release the
> + * stream lock.
> + */
> + if (in_streamed_transaction &&
> + action != LOGICAL_REP_MSG_STREAM_STOP)
> + {
> + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
>
> This comment is not completely correct because we can even acquire the
> lock for the very streaming chunk. This check will work but doesn't
> appear future-proof or at least not very easy to understand though I
> don't have a better suggestion at this stage. Can we think of a better
> check here?
>
One idea is that we acquire this lock every time and callers like
stream_commit are responsible to release it. Also, we can handle the
close of stream file in the respective callers. I think that will make
this part of the patch easier to follow.
Some other comments:
=====================
1. The handling of buffile inside pa_stream_abort() looks bit ugly to
me. I think you primarily required it because the buffile opened by
parallel apply worker is in CurrentResourceOwner. Can we think of
having a new resource owner to apply spooled messages? I think that
will avoid the need to have a special purpose code to handle buffiles
in parallel apply worker.
2.
@@ -564,6 +571,7 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
TransactionId current_xid;
ParallelApplyWorkerInfo *winfo;
TransApplyAction apply_action;
+ StringInfoData original_msg;
apply_action = get_transaction_apply_action(stream_xid, &winfo);
@@ -573,6 +581,8 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
Assert(TransactionIdIsValid(stream_xid));
+ original_msg = *s;
+
/*
* We should have received XID of the subxact as the first part of the
* message, so extract it.
@@ -596,10 +606,14 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
stream_write_change(action, s);
return true;
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);
- pa_send_data(winfo, s->len, s->data);
+ if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+ pa_send_data(winfo, s->len, s->data);
+ else
+ stream_write_change(action, &original_msg);
Please add the comment to specify the reason to remember the original string.
3.
@@ -1797,8 +1907,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
- false);
+ stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+ stream_xid = xid;
Why do we need stream_xid here? I think we can avoid having global
stream_fd if the comment #1 is feasible.
4.
+ * TRANS_LEADER_APPLY:
+ * The action means that we
/The/This. Please make a similar change for other actions.
5. Apart from the above, please find a few changes to the comments for
0001 and 0002 patches in the attached patches.
--
With Regards,
Amit Kapila.
Attachment
pgsql-hackers by date: