RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57160D1B0D29DC66ED035F5694F89@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Peter Smith <smithpb2250@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
|
List | pgsql-hackers |
On Friday, April 22, 2022 12:12 PM Peter Smith <smithpb2250@gmail.com> wrote: > > Hello Hou-san. Here are my review comments for v4-0001. Sorry, there > are so many of them (it is a big patch); some are trivial, and others > you might easily dismiss due to my misunderstanding of the code. But > hopefully, there are at least some comments that can be helpful in > improving the patch quality. Thanks for the comments ! I think most of the comments make sense and here are explanations for some of them. > 24. src/backend/replication/logical/launcher.c - ApplyLauncherMain > > @@ -869,7 +917,7 @@ ApplyLauncherMain(Datum main_arg) > wait_time = wal_retrieve_retry_interval; > > logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, > - sub->owner, InvalidOid); > + sub->owner, InvalidOid, DSM_HANDLE_INVALID); > } > Now that the logicalrep_worker_launch is retuning a bool, should this > call be checking the return value and taking appropriate action if it > failed? Not sure we can change the logic of existing caller. I think only the new caller in the patch is necessary to check this. > 26. src/backend/replication/logical/origin.c - acquire code > > + /* > + * We allow the apply worker to get the slot which is acquired by its > + * leader process. > + */ > + else if (curstate->acquired_by != 0 && acquire) > { > ereport(ERROR, > > I somehow felt that this param would be better called 'skip_acquire', > so all the callers would have to use the opposite boolean and then > this code would say like below (which seemed easier to me). YMMV. > > else if (curstate->acquired_by != 0 && !skip_acquire) > { > ereport(ERROR, Not sure about this. > 59. src/backend/replication/logical/worker.c - ApplyWorkerMain > > @@ -3733,7 +4292,7 @@ ApplyWorkerMain(Datum main_arg) > > options.proto.logical.publication_names = MySubscription->publications; > options.proto.logical.binary = MySubscription->binary; > - options.proto.logical.streaming = MySubscription->stream; > + options.proto.logical.streaming = (MySubscription->stream != SUBSTREAM_OFF); > options.proto.logical.twophase = false; > > I was not sure why this is converting from an enum to a boolean? Is it right? I think it's ok, the "logical.streaming" is used in publisher which don't need to know the exact type of the streaming(it only need to know whether the streaming is enabled for now) > 63. src/backend/replication/logical/worker.c - ApplyBgwShutdown > > +static void > +ApplyBgwShutdown(int code, Datum arg) > +{ > + SpinLockAcquire(&MyParallelState->mutex); > + MyParallelState->failed = true; > + SpinLockRelease(&MyParallelState->mutex); > + > + dsm_detach((dsm_segment *) DatumGetPointer(arg)); > +} > > Should this do detach first and set the flag last? Not sure about this. I think it's fine to detach this at the end. > 76. src/backend/replication/logical/worker.c - check_workers_status > > + ereport(ERROR, > + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("Background worker %u exited unexpectedly", > + wstate->pstate->n))); > > Should that message also give more identifying info about the > *current* worker doing the ERROR - e.g.the one which found this the > other bgworker was failed? Or is that just the PIC in the log message > good enough? Currently, only the main apply worker should report this error, so not sure do we need to report the current worker. > 77. src/backend/replication/logical/worker.c - check_workers_status > > + if (!AllTablesyncsReady() && nfreeworkers != list_length(ApplyWorkersList)) > + { > > I did not really understand this code, but isn't there a possibility > that it will cause many restarts if the tablesyncs are taking a long > time to complete? I think it's ok, after restarting, we won't start bgworker until all the table is READY. Best regards, Hou zj
pgsql-hackers by date: