RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB5716339FF7CB759E751492CB940D9@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Peter Smith <smithpb2250@gmail.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
On Tues, November 22, 2022 13:20 PM Peter Smith <smithpb2250@gmail.com> wrote: > Thanks for addressing my review comments on v47-0001. > > Here are my review comments for v49-0001. Thanks for your comments. > ====== > > src/backend/replication/logical/applyparallelworker.c > > 1. GENERAL - NULL checks > > There is inconsistent NULL checking in the patch. > > Sometimes it is like (!winfo) > Sometimes explicit NULL checks like (winfo->mq_handle != NULL) > > (That is just one example -- there are differences in many places) > > It would be better to use a consistent style everywhere. Changed. > ~ > > 2. GENERAL - Error message worker name > > 2a. > In worker.c all the messages are now "logical replication apply > worker" or "logical replication parallel apply worker" etc, but in the > applyparallel.c sometimes the "logical replication" part is missing. > IMO all the messages in this patch should be consistently worded. > > I've reported some of them in the following comment below, but please > search the whole patch for any I might have missed. Rename LA and PA to the following styles: ``` LA -> logical replication apply worker PA -> logical replication parallel apply worker ``` > 2b. > Consider if maybe all of these ought to be calling get_worker_name() > which is currently static in worker.c. Doing this means any future > changes to get_worker_name won't cause more inconsistencies. The most error message in applyparallelxx.c can only use "xx parallel worker", so I think it's fine not to call get_worker_name > ~~~ > > 3. File header comment > > + * IDENTIFICATION > + src/backend/replication/logical/applyparallelworker.c > > The word "IDENTIFICATION" should be on a separate line (for > consistency with every other PG source file) Fixed. > ~ > > 4. > > + * In order for lmgr to detect this, we have LA acquire a session > + lock on the > + * remote transaction (by pa_lock_stream()) and have PA wait on the > + lock > before > + * trying to receive messages. In other words, LA acquires the lock > + before > + * sending STREAM_STOP and releases it if already acquired before > + sending > + * STREAM_START, STREAM_ABORT(for toplevel transaction), > STREAM_PREPARE and > + * STREAM_COMMIT. For PA, it always needs to acquire the lock after > processing > + * STREAM_STOP and STREAM_ABORT(for subtransaction) and then release > + * immediately after acquiring it. That way, when PA is waiting for > + LA, we can > + * have a wait-edge from PA to LA in lmgr, which will make a deadlock > + in lmgr > + * like: > > Missing spaces before '(' deliberate? Added. > ~~~ > > 5. globals > > +/* > + * Is there a message sent by parallel apply worker which the leader > +apply > + * worker need to receive? > + */ > +volatile sig_atomic_t ParallelApplyMessagePending = false; > > SUGGESTION > Is there a message sent by a parallel apply worker that the leader > apply worker needs to receive? Changed. > ~~~ > > 6. pa_get_available_worker > > +/* > + * get an available parallel apply worker from the worker pool. > + */ > +static ParallelApplyWorkerInfo * > +pa_get_available_worker(void) > > Uppercase comment Changed. > ~ > > 7. > > + /* > + * We first try to free the worker to improve our chances of getting > + * the worker. Normally, we free the worker after ensuring that the > + * transaction is committed by parallel worker but for rollbacks, we > + * don't wait for the transaction to finish so can't free the worker > + * information immediately. > + */ > > 7a. > "We first try to free the worker to improve our chances of getting the worker." > > SUGGESTION > We first try to free the worker to improve our chances of finding one > that is not in use. > > ~ > > 7b. > "parallel worker" -> "the parallel worker" Changed. > ~~~ > > 8. pa_allocate_worker > > + /* Try to get a free parallel apply worker. */ winfo = > + pa_get_available_worker(); > + > > SUGGESTION > First, try to get a parallel apply worker from the pool. Changed. > ~~~ > > 9. pa_free_worker > > + * This removes the parallel apply worker entry from the hash table > + so that it > + * can't be used. This either stops the worker and free the > + corresponding info, > + * if there are enough workers in the pool or just marks it available > + for > + * reuse. > > BEFORE > This either stops the worker and free the corresponding info, if there > are enough workers in the pool or just marks it available for reuse. > > SUGGESTION > If there are enough workers in the pool it stops the worker and frees > the corresponding info, otherwise it just marks the worker as > available for reuse. Changed. > ~ > > 10. > > + /* Free the corresponding info if the worker exited cleanly. */ if > + (winfo->error_mq_handle == NULL) { pa_free_worker_info(winfo); > + return true; } > > Is it correct that this bypasses the removal from the hash table? I rethink about this, it seems unnecessary to free the information here as we don't expect the worker to stop unless the leader as them to stop. So, I temporarily remove this and will think about this in next version. > ~ > > 14. > > + case 'X': /* Terminate, indicating clean exit. */ { > + shm_mq_detach(winfo->error_mq_handle); > + winfo->error_mq_handle = NULL; > + break; > + } > + default: > > > No need for the { } here. Changed. > ~~~ > > 16. pa_init_and_launch_worker > > + /* Setup shared memory. */ > + if (!pa_setup_dsm(winfo)) > + { > + MemoryContextSwitchTo(oldcontext); > + pfree(winfo); > + return NULL; > + } > > > Wouldn't it be better to do the pfree before switching back to the oldcontext? I think either style seems fine. > ~~~ > > 17. pa_send_data > > + /* Wait before retrying. */ > + rc = WaitLatch(MyLatch, > + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, > + SHM_SEND_RETRY_INTERVAL_MS, > + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); > + > + if (rc & WL_LATCH_SET) > + { > + ResetLatch(MyLatch); > + CHECK_FOR_INTERRUPTS(); > + } > > > Instead of CHECK_FOR_INTERRUPTS, should this be calling your new > function ProcessParallelApplyInterrupts? I thought the ProcessParallelApplyInterrupts is intended to be invoked only in main loop(LogicalParallelApplyLoop) to make the parallel apply worker exit cleanly. > ~ > > 18. > > + if (startTime == 0) > + startTime = GetCurrentTimestamp(); > + else if (TimestampDifferenceExceeds(startTime, > + GetCurrentTimestamp(), > + SHM_SEND_TIMEOUT_MS)) > + ereport(ERROR, > + (errcode(ERRCODE_CONNECTION_FAILURE), > + errmsg("terminating logical replication parallel apply worker due to > timeout"))); > > > I'd previously commented that the timeout calculation seemed wrong. > Hou-san replied [1,#9] "start counting from the first failure looks > fine to me." but I am not so sure - e.g. If the timeout is 10s then I > expect it to fail ~10s after the function is called, not 11s after. I > know it's pedantic, but where's the harm in making the calculation > right instead of just nearly right? > > IMO probably an easy fix for this is like: > > #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS > (10000 - SHM_SEND_RETRY_INTERVAL_MS) OK, I moved the place of setting startTime before the WaitLatch. > ~~~ > > 20. pa_savepoint_name > > +static void > +pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, > + Size szsp) > > Unnecessary wrapping? Changed. > ====== > > src/backend/replication/logical/origin.c > > 21. replorigin_session_setup > > + * However, we do allow multiple processes to point to the same > + origin slot > + * if requested by the caller by passing PID of the process that has > + already > + * acquired it. This is to allow using the same origin by multiple > + parallel > + * apply processes the provided they maintain commit order, for > + example, by > + * allowing only one process to commit at a time. > > 21a. > I thought the comment should mention this is optional and the special > value acquired_by=0 means don't do this. Added. > ~ > > 21b. > "the provided they" ?? typo? Changed. > ====== > > src/backend/replication/logical/tablesync.c > > 22. process_syncing_tables > > process_syncing_tables(XLogRecPtr current_lsn) { > + /* > + * Skip for parallel apply workers as they don't operate on tables > + that > + * are not in ready state. See pa_can_start() and > + * should_apply_changes_for_rel(). > + */ > + if (am_parallel_apply_worker()) > + return; > > SUGGESTION (remove the double negative) Skip for parallel apply > workers because they only operate on tables that are in a READY state. > See pa_can_start() and should_apply_changes_for_rel(). Changed. > ====== > > src/backend/replication/logical/worker.c > > 23. apply_handle_stream_stop > > > Previously I suggested that this lock/unlock seems too tricky and > needed a comment. The reply [1,#12] was that this is already described > atop parallelapplyworker.c. OK, but in that case maybe here the > comment can just refer to that explanation: > > SUGGESTION > Refer to the comments atop applyparallelworker.c for what this lock > and immediate unlock is doing. > > ~~~ > > 24. apply_handle_stream_abort > > + if > + (pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_stream_count), > 1) == 0) > + { > + pa_lock_stream(MyParallelShared->xid, AccessShareLock); > + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); } > > ditto comment #23 I feel the place atop the definition of pa_lock_xxx function is a better place to put the comments, so added there. User can check it when reading the lock functions. > ~~~ > > 25. apply_worker_clean_exit > > +void > +apply_worker_clean_exit(void) > +{ > + /* Notify the leader apply worker that we have exited cleanly. */ > +if (am_parallel_apply_worker()) pq_putmessage('X', NULL, 0); > + > + proc_exit(0); > +} > > Somehow it doesn't seem right that the PA worker sending 'X' is here > in worker.c, while the LA worker receipt of this 'X' is in the other > applyparallelworker.c module. Maybe that other function > HandleParallelApplyMessage should also be here in worker.c? I thought the function apply_worker_clean_exit is widely used in worker.c and is a common function for both leader/parallel apply workers, so I put it in worker.c. But HandleParallelApplyMessage is a function only for parallel worker, so it would be better to put it in applyparallelworker.c. > ====== > > src/backend/utils/misc/guc_tables.c > > 26. > > @@ -2957,6 +2957,18 @@ struct config_int ConfigureNamesInt[] = > NULL, > }, > &max_sync_workers_per_subscription, > + 2, 0, MAX_PARALLEL_WORKER_LIMIT, > + NULL, NULL, NULL > + }, > + > + { > + {"max_parallel_apply_workers_per_subscription", > + PGC_SIGHUP, > + REPLICATION_SUBSCRIBERS, > + gettext_noop("Maximum number of parallel apply workers per > + subscription."), NULL, }, > + &max_parallel_apply_workers_per_subscription, > 2, 0, MAX_BACKENDS, > NULL, NULL, NULL > > Is this correct? Did you mean to change > max_sync_workers_per_subscription, My 1st impression is that there has > been some mixup with the MAX_PARALLEL_WORKER_LIMIT and MAX_BACKENDS or > that this change was accidentally made to the wrong GUC. Fixed. > ====== > > src/include/replication/worker_internal.h > > 27. ParallelApplyWorkerShared > > + /* > + * Indicates whether there are pending streaming blocks in the queue. > + The > + * parallel apply worker will check it before starting to wait. > + */ > + pg_atomic_uint32 pending_stream_count; > > A better name might be 'n_pending_stream_blocks'. I am not sure if the name looks better, so didn’t change this. > ~ > > 28. function names > > extern void logicalrep_worker_stop(Oid subid, Oid relid); > +extern void logicalrep_parallel_apply_worker_stop(int slot_no, uint16 > generation); > extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern > void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); > > extern int logicalrep_sync_worker_count(Oid subid); > +extern int logicalrep_parallel_apply_worker_count(Oid subid); > > Would it be better to call those new functions using similar shorter > names as done elsewhere? > > logicalrep_parallel_apply_worker_stop -> logicalrep_pa_worker_stop > logicalrep_parallel_apply_worker_count -> logicalrep_pa_worker_count Changed. Attach new version patch which also fixed an invalid shared memory access bug in 0002 patch reported by Kuroda-San offlist. Best regards, Hou zj
Attachment
pgsql-hackers by date: