Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | Masahiko Sawada |
---|---|
Subject | Re: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | CAD21AoBeJxF3ZOoU6rUXh3UZVxqmdVGNxC=ERPBe-7mOr=Sz5A@mail.gmail.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
|
List | pgsql-hackers |
On Wed, Nov 30, 2022 at 10:51 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote: > > On Wednesday, November 30, 2022 9:41 PM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote: > > > > On Tuesday, November 29, 2022 8:34 PM Amit Kapila > > > Review comments on v53-0001* > > > > Attach the new version patch set. > > Sorry, there were some mistakes in the previous patch set. > Here is the correct V54 patch set. I also ran pgindent for the patch set. > Thank you for updating the patches. Here are random review comments for 0001 and 0002 patches. ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication parallel apply worker exited abnormally"), errcontext("%s", edata.context))); and ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication parallel apply worker exited because of subscription information change"))); I'm not sure ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE is appropriate here. Given that parallel apply worker has already reported the error message with the error code, I think we don't need to set the errorcode for the logs from the leader process. Also, I'm not sure the term "exited abnormally" is appropriate since we use it when the server crashes for example. I think ERRORs reported here don't mean that in general. --- if (am_parallel_apply_worker() && on_subinfo_change) { /* * If a parallel apply worker exits due to the subscription * information change, we notify the leader apply worker so that the * leader can report more meaningful message in time and restart the * logical replication. */ pq_putmessage('X', NULL, 0); } and ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication parallel apply worker exited because of subscription information change"))); Do we really need an additional message in case of 'X'? When we call apply_worker_clean_exit with on_subinfo_change = true, we have reported the error message such as: ereport(LOG, (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change", MySubscription->name))); I think that reporting a similar message from the leader might not be meaningful for users. --- - if (options->proto.logical.streaming && - PQserverVersion(conn->streamConn) >= 140000) - appendStringInfoString(&cmd, ", streaming 'on'"); + if (options->proto.logical.streaming_str) + appendStringInfo(&cmd, ", streaming '%s'", + options->proto.logical.streaming_str); and + /* + * Assign the appropriate option value for streaming option according to + * the 'streaming' mode and the publisher's ability to support that mode. + */ + if (server_version >= 160000 && + MySubscription->stream == SUBSTREAM_PARALLEL) + { + options.proto.logical.streaming_str = pstrdup("parallel"); + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != SUBSTREAM_OFF) + { + options.proto.logical.streaming_str = pstrdup("on"); + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options.proto.logical.streaming_str = NULL; + MyLogicalRepWorker->parallel_apply = false; + } This change moves the code of adjustment of the streaming option based on the publisher server version from libpqwalreceiver.c to worker.c. On the other hand, the similar logic for other parameters such as "two_phase" and "origin" are still done in libpqwalreceiver.c. How about passing MySubscription->stream via WalRcvStreamOptions and constructing a streaming option string in libpqrcv_startstreaming()? In ApplyWorkerMain(), we just need to set MyLogicalRepWorker->parallel_apply = true if (server_version >= 160000 && MySubscription->stream == SUBSTREAM_PARALLEL). We won't need pstrdup for "parallel" and "on", and it's more consistent with other parameters. --- + * We maintain a worker pool to avoid restarting workers for each streaming + * transaction. We maintain each worker's information in the Do we need to describe the pool in the doc? --- + * in AccessExclusive mode at transaction finish commands (STREAM_COMMIT and + * STREAM_PREAPRE) and release it immediately. typo, s/STREAM_PREAPRE/STREAM_PREPARE/ --- +/* Parallel apply workers hash table (initialized on first use). */ +static HTAB *ParallelApplyWorkersHash = NULL; + +/* + * A list to maintain the active parallel apply workers. The information for + * the new worker is added to the list after successfully launching it. The + * list entry is removed if there are already enough workers in the worker + * pool either at the end of the transaction or while trying to find a free + * worker for applying the transaction. For more information about the worker + * pool, see comments atop this file. + */ +static List *ParallelApplyWorkersList = NIL; The names ParallelApplyWorkersHash and ParallelWorkersList are very similar but the usages are completely different. Probably we can find better names such as ParallelApplyTxnHash and ParallelApplyWorkerPool. And probably we can add more comments for ParallelApplyWorkersHash. --- if (winfo->serialize_changes || napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) { int slot_no; uint16 generation; SpinLockAcquire(&winfo->shared->mutex); generation = winfo->shared->logicalrep_worker_generation; slot_no = winfo->shared->logicalrep_worker_slot_no; SpinLockRelease(&winfo->shared->mutex); logicalrep_pa_worker_stop(slot_no, generation); pa_free_worker_info(winfo); return true; } /* Unlink any files that were needed to serialize partial changes. */ if (winfo->serialize_changes) stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); If winfo->serialize_changes is true, we return true in the first if statement. So stream_cleanup_files in the second if statement is never executed. --- + /* + * First, try to get a parallel apply worker from the pool, if available. + * Otherwise, try to start a new parallel apply worker. + */ + winfo = pa_get_available_worker(); + if (!winfo) + { + winfo = pa_init_and_launch_worker(); + if (!winfo) + return; + } I think we don't necessarily need to separate two functions for getting a worker from the pool and launching a new worker. It seems to reduce the readability. Instead, I think that we can have one function that returns winfo if there is a free worker in the worker pool or it launches a worker. That way, we can simply do like: winfo = pg_launch_parallel_worker() if (!winfo) return; --- + /* Setup replication origin tracking. */ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); This code looks to allow parallel workers to use different origins in cases where the origin doesn't exist, but is that okay? Shouldn't we pass miassing_ok = false in this case? --- cfbot seems to fails: https://cirrus-ci.com/task/6264595342426112 Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com
pgsql-hackers by date: