Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAA4eK1+e8JsiC8uMZPU25xQRyxNvVS24M4=Zy-xD18jzX+vrmA@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Mon, Aug 29, 2022 at 5:01 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Thursday, August 25, 2022 7:33 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
>
> > 11.
> > + /*
> > + * Attach to the message queue.
> > + */
> > + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false);
> > + shm_mq_set_sender(mq, MyProc);
> > + error_mqh = shm_mq_attach(mq, seg, NULL);
> > + pq_redirect_to_shm_mq(seg, error_mqh);
> > +
> > + /*
> > + * Now, we have initialized DSM. Attach to slot.
> > + */
> > + logicalrep_worker_attach(worker_slot);
> > + MyParallelShared->logicalrep_worker_generation =
> > MyLogicalRepWorker->generation;
> > + MyParallelShared->logicalrep_worker_slot_no = worker_slot;
> > +
> > + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
> > +    InvalidBackendId);
> >
> > Is there a reason to set parallel_leader immediately after
> > pq_redirect_to_shm_mq() as we are doing parallel.c?
>
> Moved the code.
>

Sorry, if I was not clear but what I wanted was something like the below:

diff --git a/src/backend/replication/logical/applyparallelworker.c
b/src/backend/replication/logical/applyparallelworker.c
index 832e99cd48..6646e00658 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -480,6 +480,9 @@ ApplyParallelWorkerMain(Datum main_arg)
        mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
        shm_mq_set_sender(mq, MyProc);
        error_mqh = shm_mq_attach(mq, seg, NULL);
+       pq_redirect_to_shm_mq(seg, error_mqh);
+       pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
+                                                  InvalidBackendId);

        /*
         * Primary initialization is complete. Now, we can attach to
our slot. This
@@ -490,10 +493,6 @@ ApplyParallelWorkerMain(Datum main_arg)
        MyParallelShared->logicalrep_worker_generation =
MyLogicalRepWorker->generation;
        MyParallelShared->logicalrep_worker_slot_no = worker_slot;

-       pq_redirect_to_shm_mq(seg, error_mqh);
-       pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
-                                                  InvalidBackendId);
-
        MyLogicalRepWorker->last_send_time =
MyLogicalRepWorker->last_recv_time =
                MyLogicalRepWorker->reply_time = 0;


Few other comments on v25-0001*
============================
1.
+ {
+ {"max_apply_parallel_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of apply parallel workers per subscription."),
+ NULL,
+ },
+ &max_apply_parallel_workers_per_subscription,

Let's model this to max_parallel_workers_per_gather and name this
max_parallel_apply_workers_per_subscription.


+typedef struct ApplyParallelWorkerEntry
+{
+ TransactionId xid; /* Hash key -- must be first */
+ ApplyParallelWorkerInfo *winfo;
+} ApplyParallelWorkerEntry;
+
+/* Apply parallel workers hash table (initialized on first use). */
+static HTAB *ApplyParallelWorkersHash = NULL;
+static List *ApplyParallelWorkersFreeList = NIL;
+static List *ApplyParallelWorkersList = NIL;

Similarly, for above let's name them as ParallelApply*. I think in
comments/doc changes it is better to refer as parallel apply worker.
we can keep filename as it is.


2.
+ * If there are enough apply parallel workers(reache half of the
+ * max_apply_parallel_workers_per_subscription)

/reache/reached. There should be a space before (.

3.
+ * The dynamic shared memory segment will contain (1) a shm_mq that can be used
+ * to transport errors (and other messages reported via elog/ereport) from the
+ * apply parallel worker to leader apply worker (2) another shm_mq that can
+ * be used to transport changes in the transaction from leader apply worker to
+ * apply parallel worker (3) necessary information to be shared among apply
+ * parallel workers to leader apply worker

I think it is better to use send instead of transport in above
paragraph. In (3), /apply parallel workers to leader apply
worker/apply parallel workers and leader apply worker

4.
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
...
...
+ else if (apply_action == TA_SEND_TO_PARALLEL_WORKER)
+ {
+ parallel_apply_send_data(winfo, s->len, s->data);


It is better to have an Assert for winfo being non-null here and other
similar usages.

-- 
With Regards,
Amit Kapila.



pgsql-hackers by date:

Previous
From: Kyotaro Horiguchi
Date:
Subject: Re: pg_rewind WAL segments deletion pitfall
Next
From: Amit Kapila
Date:
Subject: Re: patch: Add missing descriptions for rmgr APIs