Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAD21AoBeJxF3ZOoU6rUXh3UZVxqmdVGNxC=ERPBe-7mOr=Sz5A@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
List pgsql-hackers
On Wed, Nov 30, 2022 at 10:51 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Wednesday, November 30, 2022 9:41 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote:
> >
> > On Tuesday, November 29, 2022 8:34 PM Amit Kapila
> > > Review comments on v53-0001*
> >
> > Attach the new version patch set.
>
> Sorry, there were some mistakes in the previous patch set.
> Here is the correct V54 patch set. I also ran pgindent for the patch set.
>

Thank you for updating the patches. Here are random review comments
for 0001 and 0002 patches.

ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("logical replication parallel apply worker
exited abnormally"),
                 errcontext("%s", edata.context)));
and

ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("logical replication parallel apply worker
exited because of subscription information change")));

I'm not sure ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE is appropriate
here. Given that parallel apply worker has already reported the error
message with the error code, I think we don't need to set the
errorcode for the logs from the leader process.

Also, I'm not sure the term "exited abnormally" is appropriate since
we use it when the server crashes for example. I think ERRORs reported
here don't mean that in general.

---
if (am_parallel_apply_worker() && on_subinfo_change)
{
    /*
     * If a parallel apply worker exits due to the subscription
     * information change, we notify the leader apply worker so that the
     * leader can report more meaningful message in time and restart the
     * logical replication.
     */
    pq_putmessage('X', NULL, 0);
}

and

ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("logical replication parallel apply worker
exited because of subscription information change")));

Do we really need an additional message in case of 'X'? When we call
apply_worker_clean_exit with on_subinfo_change = true, we have
reported the error message such as:

ereport(LOG,
        (errmsg("logical replication parallel apply worker for
subscription \"%s\" will stop because of a parameter change",
                MySubscription->name)));

I think that reporting a similar message from the leader might not be
meaningful for users.

---
-                if (options->proto.logical.streaming &&
-                        PQserverVersion(conn->streamConn) >= 140000)
-                        appendStringInfoString(&cmd, ", streaming 'on'");
+                if (options->proto.logical.streaming_str)
+                        appendStringInfo(&cmd, ", streaming '%s'",
+
options->proto.logical.streaming_str);

and

+        /*
+         * Assign the appropriate option value for streaming option
according to
+         * the 'streaming' mode and the publisher's ability to
support that mode.
+         */
+        if (server_version >= 160000 &&
+                MySubscription->stream == SUBSTREAM_PARALLEL)
+        {
+                options.proto.logical.streaming_str = pstrdup("parallel");
+                MyLogicalRepWorker->parallel_apply = true;
+        }
+        else if (server_version >= 140000 &&
+                         MySubscription->stream != SUBSTREAM_OFF)
+        {
+                options.proto.logical.streaming_str = pstrdup("on");
+                MyLogicalRepWorker->parallel_apply = false;
+        }
+        else
+        {
+                options.proto.logical.streaming_str = NULL;
+                MyLogicalRepWorker->parallel_apply = false;
+        }

This change moves the code of adjustment of the streaming option based
on the publisher server version from libpqwalreceiver.c to worker.c.
On the other hand, the similar logic for other parameters such as
"two_phase" and "origin" are still done in libpqwalreceiver.c. How
about passing MySubscription->stream via WalRcvStreamOptions and
constructing a streaming option string in libpqrcv_startstreaming()?
In ApplyWorkerMain(), we just need to set
MyLogicalRepWorker->parallel_apply = true if (server_version >= 160000
&& MySubscription->stream == SUBSTREAM_PARALLEL). We won't need
pstrdup for "parallel" and "on", and it's more consistent with other
parameters.

---
+ * We maintain a worker pool to avoid restarting workers for each streaming
+ * transaction. We maintain each worker's information in the

Do we need to describe the pool in the doc?

---
+ * in AccessExclusive mode at transaction finish commands (STREAM_COMMIT and
+ * STREAM_PREAPRE) and release it immediately.

typo, s/STREAM_PREAPRE/STREAM_PREPARE/

---
+/* Parallel apply workers hash table (initialized on first use). */
+static HTAB *ParallelApplyWorkersHash = NULL;
+
+/*
+ * A list to maintain the active parallel apply workers. The information for
+ * the new worker is added to the list after successfully launching it. The
+ * list entry is removed if there are already enough workers in the worker
+ * pool either at the end of the transaction or while trying to find a free
+ * worker for applying the transaction. For more information about the worker
+ * pool, see comments atop this file.
+ */
+static List *ParallelApplyWorkersList = NIL;

The names ParallelApplyWorkersHash and ParallelWorkersList are very
similar but the usages are completely different. Probably we can find
better names such as ParallelApplyTxnHash and ParallelApplyWorkerPool.
And probably we can add more comments for ParallelApplyWorkersHash.

---
if (winfo->serialize_changes ||
    napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
{
    int         slot_no;
    uint16      generation;

    SpinLockAcquire(&winfo->shared->mutex);
    generation = winfo->shared->logicalrep_worker_generation;
    slot_no = winfo->shared->logicalrep_worker_slot_no;
    SpinLockRelease(&winfo->shared->mutex);

    logicalrep_pa_worker_stop(slot_no, generation);

    pa_free_worker_info(winfo);

    return true;
}

/* Unlink any files that were needed to serialize partial changes. */
if (winfo->serialize_changes)
    stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);

If winfo->serialize_changes is true, we return true in the first if
statement. So stream_cleanup_files in the second if statement is never
executed.

---
+        /*
+         * First, try to get a parallel apply worker from the pool,
if available.
+         * Otherwise, try to start a new parallel apply worker.
+         */
+        winfo = pa_get_available_worker();
+        if (!winfo)
+        {
+                winfo = pa_init_and_launch_worker();
+                if (!winfo)
+                        return;
+        }

I think we don't necessarily need to separate two functions for
getting a worker from the pool and launching a new worker. It seems to
reduce the readability. Instead, I think that we can have one function
that returns winfo if there is a free worker in the worker pool or it
launches a worker. That way, we can simply do like:

winfo = pg_launch_parallel_worker()
if (!winfo)
    return;

---
+        /* Setup replication origin tracking. */
+        StartTransactionCommand();
+        ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+
     originname, sizeof(originname));
+        originid = replorigin_by_name(originname, true);
+        if (!OidIsValid(originid))
+                originid = replorigin_create(originname);

This code looks to allow parallel workers to use different origins in
cases where the origin doesn't exist, but is that okay? Shouldn't we
pass miassing_ok = false in this case?

---
cfbot seems to fails:

https://cirrus-ci.com/task/6264595342426112

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



pgsql-hackers by date:

Previous
From: Yugo NAGATA
Date:
Subject: Re: SI-read predicate locks on materialized views
Next
From: Amit Langote
Date:
Subject: Re: generic plans and "initial" pruning