From c3989215cd3c1d1efa790e96ad9fa537a6b8da4e Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Thu, 17 Aug 2023 14:09:17 +0530 Subject: [PATCH v28 3/4] apply worker assigns tables apply worker assigns tables --- src/backend/replication/logical/launcher.c | 32 ++++++ src/backend/replication/logical/tablesync.c | 110 +++++++++++--------- src/include/replication/worker_internal.h | 1 + 3 files changed, 93 insertions(+), 50 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index b19437f9d0..27fea8cd6f 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -297,6 +297,38 @@ logicalrep_workers_find(Oid subid, bool only_running) return res; } +/* + * Return a logical rep worker in ready state + */ +LogicalRepWorker * +logicalrep_worker_find_syncdone(Oid subid, bool only_running) +{ + int i; + LogicalRepWorker *res = NULL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* Skip parallel apply workers. */ + if (isParallelApplyWorker(w)) + continue; + + if (w->in_use && w->subid == subid && + w->relstate == SUBREL_STATE_SYNCDONE && + (!only_running || w->proc)) + { + res = w; + break; + } + } + + return res; +} + /* * Start new logical replication background worker, if possible. * diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index ec2f67d879..c65f2a64d9 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -241,6 +241,12 @@ wait_for_worker_state_change(char expected_state) CHECK_FOR_INTERRUPTS(); + /* No table needs sync anymore. Apply worker wants this sync worker to exit. */ + if (!OidIsValid(MyLogicalRepWorker->relid)) + { + return false; + } + /* * Done if already in correct state. (We assume this fetch is atomic * enough to not give a misleading answer if we do it with no lock.) @@ -463,8 +469,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ else if (table_states_not_ready == NIL && last_start_times) { + List *workers; + hash_destroy(last_start_times); last_start_times = NULL; + + /* Let all sync workers exit */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + workers = logicalrep_workers_find(MyLogicalRepWorker->subid, false); + foreach(lc, workers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + + if (OidIsValid(w->relid)) + { + SpinLockAcquire(&w->relmutex); + w->relid = InvalidOid; + SpinLockRelease(&w->relmutex); + logicalrep_worker_wakeup_ptr(w); + } + } + + LWLockRelease(LogicalRepWorkerLock); } /* @@ -613,6 +639,32 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) hentry->last_start_time = now; } } + else + { + /* + * We reached the max_sync_workers_per_subscription limit. + * Check if there is an existing sync worker waiting for + * new table to sync. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + syncworker = logicalrep_worker_find_syncdone(MyLogicalRepWorker->subid, false); + if (syncworker) + { + SpinLockAcquire(&syncworker->relmutex); + syncworker->relid = rstate->relid; + syncworker->relstate = rstate->state; + syncworker->relstate_lsn = rstate->lsn; + SpinLockRelease(&syncworker->relmutex); + + if (syncworker->proc) + { + logicalrep_worker_wakeup_ptr(syncworker); + } + } + + LWLockRelease(LogicalRepWorkerLock); + } } } } @@ -1731,59 +1783,17 @@ TablesyncWorkerMain(Datum main_arg) if (IsTransactionState()) CommitTransactionCommand(); + finish_sync_worker(true); + if (MyLogicalRepWorker->relsync_completed) { - List *rstates; - ListCell *lc; - - /* - * This tablesync worker is 'done' unless another table that needs - * syncing is found. - */ - done = true; - - /* This transaction will be committed by finish_sync_worker. */ - StartTransactionCommand(); - - /* - * Check if there is any table whose relation state is still INIT. - * If a table in INIT state is found, the worker will not be - * finished, it will be reused instead. - */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); - - foreach(lc, rstates) - { - SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); - - if (rstate->state == SUBREL_STATE_SYNCDONE) - continue; - - /* - * Take exclusive lock to prevent any other sync worker from - * picking the same table. - */ - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - - /* - * Pick the table for the next run if it is not already picked - * up by another worker. - */ - if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) - { - /* Update worker state for the next table */ - MyLogicalRepWorker->relid = rstate->relid; - MyLogicalRepWorker->relstate = rstate->state; - MyLogicalRepWorker->relstate_lsn = rstate->lsn; - LWLockRelease(LogicalRepWorkerLock); + /* wait for apply worker to assign a new table with INIT state. */ + wait_for_worker_state_change(SUBREL_STATE_INIT); + } - /* Found a table for next iteration */ - finish_sync_worker(true); - done = false; - break; - } - LWLockRelease(LogicalRepWorkerLock); - } + if (!OidIsValid(MyLogicalRepWorker->relid)) + { + break; } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 47fa7fbd55..0cf2a69fd6 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,7 @@ extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); +extern LogicalRepWorker * logicalrep_worker_find_syncdone(Oid subid, bool only_running); extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); -- 2.34.1