Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAA4eK1+V7toH38OybLkdAr--+G+DMv+jWHAKOYHBfmWsnaKybw@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Mon, Sep 26, 2022 at 8:41 AM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:
>
> On Thur, Sep 22, 2022 at 18:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> > 3.
> > ApplyWorkerMain()
> > {
> > ...
> > ...
> > +
> > + if (server_version >= 160000 &&
> > + MySubscription->stream == SUBSTREAM_PARALLEL)
> > + options.proto.logical.streaming = pstrdup("parallel");
> >
> > After deciding here whether the parallel streaming mode is enabled or
> > not, we recheck the same thing in apply_handle_stream_abort() and
> > parallel_apply_can_start(). In parallel_apply_can_start(), we do it
> > via two different checks. How about storing this information say in
> > structure MyLogicalRepWorker in ApplyWorkerMain() and then use it at
> > other places?
>
> Improved as suggested.
> Added a new flag "in_parallel_apply" to structure MyLogicalRepWorker.
>

Can we name the variable in_parallel_apply as parallel_apply and set
it in logicalrep_worker_launch() instead of in
ParallelApplyWorkerMain()?

Few other comments:
==================
1.
+ if (is_subworker &&
+ nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+
+ ereport(DEBUG1,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("out of parallel apply workers"),
+ errhint("You might need to increase
max_parallel_apply_workers_per_subscription.")));

I think it is better to keep the level of this as LOG. Similar
messages at other places use WARNING or LOG. Here, I prefer LOG
because the system can still proceed without blocking anything.

2.
+/* Reset replication origin tracking. */
+void
+parallel_apply_replorigin_reset(void)
+{
+ bool started_tx = false;
+
+ /* This function might be called inside or outside of transaction. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }

Why do we need a transaction in this function?

3. Few suggestions to improve in the patch:
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 1623c9e2fa..d9c519dfab 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1264,6 +1264,10 @@ apply_handle_stream_prepare(StringInfo s)
  case TRANS_LEADER_SEND_TO_PARALLEL:
  Assert(winfo);

+ /*
+ * The origin can be active only in one process. See
+ * apply_handle_stream_commit.
+ */
  parallel_apply_replorigin_reset();

  /* Send STREAM PREPARE message to the parallel apply worker. */
@@ -1623,12 +1627,7 @@ apply_handle_stream_abort(StringInfo s)
  (errcode(ERRCODE_PROTOCOL_VIOLATION),
  errmsg_internal("STREAM ABORT message without STREAM STOP")));

- /*
- * Check whether the publisher sends abort_lsn and abort_time.
- *
- * Note that the parallel apply worker is only started when the publisher
- * sends abort_lsn and abort_time.
- */
+ /* We receive abort information only when we can apply in parallel. */
  if (MyLogicalRepWorker->in_parallel_apply)
  read_abort_info = true;

@@ -1656,7 +1655,13 @@ apply_handle_stream_abort(StringInfo s)
  Assert(winfo);

  if (subxid == xid)
+ {
+ /*
+ * The origin can be active only in one process. See
+ * apply_handle_stream_commit.
+ */
  parallel_apply_replorigin_reset();
+ }

  /* Send STREAM ABORT message to the parallel apply worker. */
  parallel_apply_send_data(winfo, s->len, s->data);
@@ -1858,6 +1863,12 @@ apply_handle_stream_commit(StringInfo s)
  case TRANS_LEADER_SEND_TO_PARALLEL:
  Assert(winfo);

+ /*
+ * We need to reset the replication origin before sending the commit
+ * message and set it up again after confirming that parallel worker
+ * has processed the message. This is required because origin can be
+ * active only in one process at-a-time.
+ */
  parallel_apply_replorigin_reset();

  /* Send STREAM COMMIT message to the parallel apply worker. */
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 4cbfb43492..2bd9664f86 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -70,11 +70,7 @@ typedef struct LogicalRepWorker
  */
  pid_t apply_leader_pid;

- /*
- * Indicates whether to use parallel apply workers.
- *
- * Determined based on streaming parameter and publisher version.
- */
+ /* Indicates whether apply can be performed parallelly. */
  bool in_parallel_apply;


-- 
With Regards,
Amit Kapila.



pgsql-hackers by date:

Previous
From: Bilal Yavuz
Date:
Subject: kerberos/001_auth test fails on arm CPU darwin
Next
From: Tom Lane
Date:
Subject: Re: kerberos/001_auth test fails on arm CPU darwin