RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57160D1B0D29DC66ED035F5694F89@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
List pgsql-hackers
On Friday, April 22, 2022 12:12 PM Peter Smith <smithpb2250@gmail.com> wrote:
> 
> Hello Hou-san. Here are my review comments for v4-0001. Sorry, there
> are so many of them (it is a big patch); some are trivial, and others
> you might easily dismiss due to my misunderstanding of the code. But
> hopefully, there are at least some comments that can be helpful in
> improving the patch quality.

Thanks for the comments !
I think most of the comments make sense and here are explanations for
some of them.

> 24. src/backend/replication/logical/launcher.c - ApplyLauncherMain
> 
> @@ -869,7 +917,7 @@ ApplyLauncherMain(Datum main_arg)
>   wait_time = wal_retrieve_retry_interval;
> 
>   logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
> - sub->owner, InvalidOid);
> + sub->owner, InvalidOid, DSM_HANDLE_INVALID);
>   }
> Now that the logicalrep_worker_launch is retuning a bool, should this
> call be checking the return value and taking appropriate action if it
> failed?

Not sure we can change the logic of existing caller. I think only the new
caller in the patch is necessary to check this.


> 26. src/backend/replication/logical/origin.c - acquire code
> 
> + /*
> + * We allow the apply worker to get the slot which is acquired by its
> + * leader process.
> + */
> + else if (curstate->acquired_by != 0 && acquire)
>   {
>   ereport(ERROR,
> 
> I somehow felt that this param would be better called 'skip_acquire',
> so all the callers would have to use the opposite boolean and then
> this code would say like below (which seemed easier to me). YMMV.
> 
> else if (curstate->acquired_by != 0 && !skip_acquire)
>   {
>   ereport(ERROR,

Not sure about this.


> 59. src/backend/replication/logical/worker.c - ApplyWorkerMain
> 
> @@ -3733,7 +4292,7 @@ ApplyWorkerMain(Datum main_arg)
> 
>   options.proto.logical.publication_names = MySubscription->publications;
>   options.proto.logical.binary = MySubscription->binary;
> - options.proto.logical.streaming = MySubscription->stream;
> + options.proto.logical.streaming = (MySubscription->stream != SUBSTREAM_OFF);
>   options.proto.logical.twophase = false;
>
> I was not sure why this is converting from an enum to a boolean? Is it right?

I think it's ok, the "logical.streaming" is used in publisher which don't need
to know the exact type of the streaming(it only need to know whether the
streaming is enabled for now)


> 63. src/backend/replication/logical/worker.c - ApplyBgwShutdown
> 
> +static void
> +ApplyBgwShutdown(int code, Datum arg)
> +{
> + SpinLockAcquire(&MyParallelState->mutex);
> + MyParallelState->failed = true;
> + SpinLockRelease(&MyParallelState->mutex);
> +
> + dsm_detach((dsm_segment *) DatumGetPointer(arg));
> +}
> 
> Should this do detach first and set the flag last?

Not sure about this. I think it's fine to detach this at the end.

> 76. src/backend/replication/logical/worker.c - check_workers_status
> 
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("Background worker %u exited unexpectedly",
> + wstate->pstate->n)));
> 
> Should that message also give more identifying info about the
> *current* worker doing the ERROR - e.g.the one which found this the
> other bgworker was failed? Or is that just the PIC in the log message
> good enough?

Currently, only the main apply worker should report this error, so not sure do
we need to report the current worker.

> 77. src/backend/replication/logical/worker.c - check_workers_status
> 
> + if (!AllTablesyncsReady() && nfreeworkers != list_length(ApplyWorkersList))
> + {
> 
> I did not really understand this code, but isn't there a possibility
> that it will cause many restarts if the tablesyncs are taking a long
> time to complete?

I think it's ok, after restarting, we won't start bgworker until all the table
is READY.

Best regards,
Hou zj





pgsql-hackers by date:

Previous
From: Jiří Fejfar
Date:
Subject: variable filename for psql \copy
Next
From: "Daniel Verite"
Date:
Subject: Re: variable filename for psql \copy