RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57167BF64FC0891734C8E81A94149@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
List pgsql-hackers
On Thursday, December 1, 2022 3:58 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> 
> On Wed, Nov 30, 2022 at 10:51 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > On Wednesday, November 30, 2022 9:41 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> > >
> > > On Tuesday, November 29, 2022 8:34 PM Amit Kapila
> > > > Review comments on v53-0001*
> > >
> > > Attach the new version patch set.
> >
> > Sorry, there were some mistakes in the previous patch set.
> > Here is the correct V54 patch set. I also ran pgindent for the patch set.
> >
> 
> Thank you for updating the patches. Here are random review comments for
> 0001 and 0002 patches.

Thanks for the comments!

> 
> ereport(ERROR,
>                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>                  errmsg("logical replication parallel apply worker exited
> abnormally"),
>                  errcontext("%s", edata.context))); and
> 
> ereport(ERROR,
>                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>                  errmsg("logical replication parallel apply worker exited
> because of subscription information change")));
> 
> I'm not sure ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE is appropriate
> here. Given that parallel apply worker has already reported the error message
> with the error code, I think we don't need to set the errorcode for the logs
> from the leader process.
> 
> Also, I'm not sure the term "exited abnormally" is appropriate since we use it
> when the server crashes for example. I think ERRORs reported here don't mean
> that in general.

How about reporting "xxx worker exited due to error" ?

> ---
> if (am_parallel_apply_worker() && on_subinfo_change) {
>     /*
>      * If a parallel apply worker exits due to the subscription
>      * information change, we notify the leader apply worker so that the
>      * leader can report more meaningful message in time and restart the
>      * logical replication.
>      */
>     pq_putmessage('X', NULL, 0);
> }
> 
> and
> 
> ereport(ERROR,
>                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>                  errmsg("logical replication parallel apply worker exited
> because of subscription information change")));
> 
> Do we really need an additional message in case of 'X'? When we call
> apply_worker_clean_exit with on_subinfo_change = true, we have reported the
> error message such as:
> 
> ereport(LOG,
>         (errmsg("logical replication parallel apply worker for subscription
> \"%s\" will stop because of a parameter change",
>                 MySubscription->name)));
> 
> I think that reporting a similar message from the leader might not be
> meaningful for users.

The intention is to let leader report more meaningful message if a worker
exited due to subinfo change. Otherwise, the leader is likely to report an
error like " lost connection ... to parallel apply worker" when trying to send
data via shared memory if the worker exited. What do you think ?

> ---
> -                if (options->proto.logical.streaming &&
> -                        PQserverVersion(conn->streamConn) >= 140000)
> -                        appendStringInfoString(&cmd, ", streaming 'on'");
> +                if (options->proto.logical.streaming_str)
> +                        appendStringInfo(&cmd, ", streaming '%s'",
> +
> options->proto.logical.streaming_str);
> 
> and
> 
> +        /*
> +         * Assign the appropriate option value for streaming option
> according to
> +         * the 'streaming' mode and the publisher's ability to
> support that mode.
> +         */
> +        if (server_version >= 160000 &&
> +                MySubscription->stream == SUBSTREAM_PARALLEL)
> +        {
> +                options.proto.logical.streaming_str = pstrdup("parallel");
> +                MyLogicalRepWorker->parallel_apply = true;
> +        }
> +        else if (server_version >= 140000 &&
> +                         MySubscription->stream != SUBSTREAM_OFF)
> +        {
> +                options.proto.logical.streaming_str = pstrdup("on");
> +                MyLogicalRepWorker->parallel_apply = false;
> +        }
> +        else
> +        {
> +                options.proto.logical.streaming_str = NULL;
> +                MyLogicalRepWorker->parallel_apply = false;
> +        }
> 
> This change moves the code of adjustment of the streaming option based on
> the publisher server version from libpqwalreceiver.c to worker.c.
> On the other hand, the similar logic for other parameters such as "two_phase"
> and "origin" are still done in libpqwalreceiver.c. How about passing
> MySubscription->stream via WalRcvStreamOptions and constructing a
> streaming option string in libpqrcv_startstreaming()?
> In ApplyWorkerMain(), we just need to set
> MyLogicalRepWorker->parallel_apply = true if (server_version >= 160000
> && MySubscription->stream == SUBSTREAM_PARALLEL). We won't need
> pstrdup for "parallel" and "on", and it's more consistent with other parameters.

Thanks for the suggestion. I thought about the same idea before, but it seems
we would weed to introduce " pg_subscription.h " into libpqwalreceiver.c. The
libpqwalreceiver.c looks a like a common place. So I am not sure is it looks
better to expose the detail of streaming option to it.

> ---
> + * We maintain a worker pool to avoid restarting workers for each
> + streaming
> + * transaction. We maintain each worker's information in the
> 
> Do we need to describe the pool in the doc?

I thought the worker pool is kind of internal information.
Maybe we can add it later if receive some feedback about this
after pushing the main patch.

> ---
> + * in AccessExclusive mode at transaction finish commands
> + (STREAM_COMMIT and
> + * STREAM_PREAPRE) and release it immediately.
> 
> typo, s/STREAM_PREAPRE/STREAM_PREPARE/

Will change.

> ---
> +/* Parallel apply workers hash table (initialized on first use). */
> +static HTAB *ParallelApplyWorkersHash = NULL;
> +
> +/*
> + * A list to maintain the active parallel apply workers. The
> +information for
> + * the new worker is added to the list after successfully launching it.
> +The
> + * list entry is removed if there are already enough workers in the
> +worker
> + * pool either at the end of the transaction or while trying to find a
> +free
> + * worker for applying the transaction. For more information about the
> +worker
> + * pool, see comments atop this file.
> + */
> +static List *ParallelApplyWorkersList = NIL;
> 
> The names ParallelApplyWorkersHash and ParallelWorkersList are very similar
> but the usages are completely different. Probably we can find better names
> such as ParallelApplyTxnHash and ParallelApplyWorkerPool.
> And probably we can add more comments for ParallelApplyWorkersHash.

Will change.

> ---
> if (winfo->serialize_changes ||
>     napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) {
>     int         slot_no;
>     uint16      generation;
> 
>     SpinLockAcquire(&winfo->shared->mutex);
>     generation = winfo->shared->logicalrep_worker_generation;
>     slot_no = winfo->shared->logicalrep_worker_slot_no;
>     SpinLockRelease(&winfo->shared->mutex);
> 
>     logicalrep_pa_worker_stop(slot_no, generation);
> 
>     pa_free_worker_info(winfo);
> 
>     return true;
> }
> 
> /* Unlink any files that were needed to serialize partial changes. */ if
> (winfo->serialize_changes)
>     stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
> 
> If winfo->serialize_changes is true, we return true in the first if statement. So
> stream_cleanup_files in the second if statement is never executed.

pa_free_worker_info will also cleanup the fileset. But I think I can move that
stream_cleanup_files before the "... napplyworkers >
(max_parallel_apply_workers_per_subscription / 2))" check so that it would be
more clear.

> ---
> +        /*
> +         * First, try to get a parallel apply worker from the pool,
> if available.
> +         * Otherwise, try to start a new parallel apply worker.
> +         */
> +        winfo = pa_get_available_worker();
> +        if (!winfo)
> +        {
> +                winfo = pa_init_and_launch_worker();
> +                if (!winfo)
> +                        return;
> +        }
> 
> I think we don't necessarily need to separate two functions for getting a worker
> from the pool and launching a new worker. It seems to reduce the readability.
> Instead, I think that we can have one function that returns winfo if there is a free
> worker in the worker pool or it launches a worker. That way, we can simply do
> like:
> 
> winfo = pg_launch_parallel_worker()
> if (!winfo)
>     return;

Will change

> ---
> +        /* Setup replication origin tracking. */
> +        StartTransactionCommand();
> +        ReplicationOriginNameForLogicalRep(MySubscription->oid,
> + InvalidOid,
> +
>      originname, sizeof(originname));
> +        originid = replorigin_by_name(originname, true);
> +        if (!OidIsValid(originid))
> +                originid = replorigin_create(originname);
> 
> This code looks to allow parallel workers to use different origins in cases where
> the origin doesn't exist, but is that okay? Shouldn't we pass miassing_ok = false
> in this case?
>

Will change

> ---
> cfbot seems to fails:
> 
> https://cirrus-ci.com/task/6264595342426112

Thanks for reporting, it's due to a testcase problem, I will fix that test soon.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: initdb: Refactor PG_CMD_PUTS loops
Next
From: Alvaro Herrera
Date:
Subject: Re: ExecRTCheckPerms() and many prunable partitions