From 8119252c633814378a0ed0cb017f321b736332a4 Mon Sep 17 00:00:00 2001 From: sherlockcpp Date: Sat, 17 Dec 2022 20:43:21 +0800 Subject: [PATCH v75 3/5] Stop extra worker if GUC was changed If the max_parallel_apply_workers_per_subscription is changed to a lower value, try to stop free workers in the pool to keep the number of workers lower than half of the max_parallel_apply_workers_per_subscription --- .../replication/logical/applyparallelworker.c | 60 ++++++++++++++++++---- src/backend/replication/logical/worker.c | 7 +++ src/include/replication/worker_internal.h | 1 + 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index e5ec8b2..083ff33 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -546,6 +546,25 @@ pa_find_worker(TransactionId xid) } /* + * Stop the given parallel apply worker and free the corresponding info. + */ +static void +pa_stop_worker(ParallelApplyWorkerInfo *winfo) +{ + int slot_no; + uint16 generation; + + SpinLockAcquire(&winfo->shared->mutex); + generation = winfo->shared->logicalrep_worker_generation; + slot_no = winfo->shared->logicalrep_worker_slot_no; + SpinLockRelease(&winfo->shared->mutex); + + logicalrep_pa_worker_stop(slot_no, generation); + + pa_free_worker_info(winfo); +} + +/* * Makes the worker available for reuse. * * This removes the parallel apply worker entry from the hash table so that it @@ -580,23 +599,42 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) list_length(ParallelApplyWorkerPool) > (max_parallel_apply_workers_per_subscription / 2)) { - int slot_no; - uint16 generation; - - SpinLockAcquire(&winfo->shared->mutex); - generation = winfo->shared->logicalrep_worker_generation; - slot_no = winfo->shared->logicalrep_worker_slot_no; - SpinLockRelease(&winfo->shared->mutex); + pa_stop_worker(winfo); + return; + } - logicalrep_pa_worker_stop(slot_no, generation); + winfo->in_use = false; + winfo->serialize_changes = false; +} - pa_free_worker_info(winfo); +/* + * Try to stop parallel apply workers that are not in use to keep the number of + * workers lower than half of the max_parallel_apply_workers_per_subscription. + */ +void +pa_stop_idle_workers(void) +{ + List *active_workers; + ListCell *lc; + int max_applyworkers = max_parallel_apply_workers_per_subscription / 2; + if (list_length(ParallelApplyWorkerPool) <= max_applyworkers) return; + + active_workers = list_copy(ParallelApplyWorkerPool); + + foreach(lc, active_workers) + { + ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + pa_stop_worker(winfo); + + /* Recheck the number of workers. */ + if (list_length(ParallelApplyWorkerPool) <= max_applyworkers) + break; } - winfo->in_use = false; - winfo->serialize_changes = false; + list_free(active_workers); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2f644b7..eb53ace 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3631,6 +3631,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* + * Try to stop free workers in the pool in case the + * max_parallel_apply_workers_per_subscription is changed to a + * lower value. + */ + pa_stop_idle_workers(); } if (rc & WL_TIMEOUT) diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a682791..f3b2f2d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -278,6 +278,7 @@ extern void set_apply_error_context_origin(char *originname); /* Parallel apply worker setup and interactions */ extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); +extern void pa_stop_idle_workers(void); extern void pa_detach_all_error_mq(void); extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, -- 2.7.2.windows.1