From 6238a3cb7c3a261192182c5b3656f822be19621f Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Thu, 13 Jul 2023 17:15:47 +0300 Subject: [PATCH v18 3/3] apply worker assigns tables --- src/backend/replication/logical/launcher.c | 32 ++++++ src/backend/replication/logical/tablesync.c | 121 ++++++++++---------- src/include/replication/worker_internal.h | 1 + 3 files changed, 94 insertions(+), 60 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 72e5ef8a78..d40c17c6f3 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -297,6 +297,38 @@ logicalrep_workers_find(Oid subid, bool only_running) return res; } +/* + * Return a logical rep worker in ready state + */ +LogicalRepWorker * +logicalrep_worker_find_syncdone(Oid subid, bool only_running) +{ + int i; + LogicalRepWorker *res = NULL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* Skip parallel apply workers. */ + if (isParallelApplyWorker(w)) + continue; + + if (w->in_use && w->subid == subid && + w->relstate == SUBREL_STATE_SYNCDONE && + (!only_running || w->proc)) + { + res = w; + break; + } + } + + return res; +} + /* * Start new logical replication background worker, if possible. * diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 46e6f7ea10..51c1a607f3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -136,7 +136,6 @@ static StringInfo copybuf = NULL; * Exit routine for synchronization worker. */ static void -pg_attribute_noreturn() finish_sync_worker(bool reuse_worker) { /* @@ -247,6 +246,12 @@ wait_for_worker_state_change(char expected_state) CHECK_FOR_INTERRUPTS(); + /* No table needs sync anymore. Apply worker wants this sync worker to exit. */ + if (!OidIsValid(MyLogicalRepWorker->relid)) + { + return false; + } + /* * Done if already in correct state. (We assume this fetch is atomic * enough to not give a misleading answer if we do it with no lock.) @@ -469,8 +474,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ else if (table_states_not_ready == NIL && last_start_times) { + List *workers; + hash_destroy(last_start_times); last_start_times = NULL; + + /* Let all sync workers exit */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + workers = logicalrep_workers_find(MyLogicalRepWorker->subid, false); + foreach(lc, workers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + + if (OidIsValid(w->relid)) + { + SpinLockAcquire(&w->relmutex); + w->relid = InvalidOid; + SpinLockRelease(&w->relmutex); + logicalrep_worker_wakeup_ptr(w); + } + } + LWLockRelease(LogicalRepWorkerLock); } /* @@ -619,6 +643,33 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) hentry->last_start_time = now; } } + else + { + /* + * We reached the max_sync_workers_per_subscription limit. + * Check if there is an existing sync worker waiting for + * new table to sync. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + syncworker = logicalrep_worker_find_syncdone(MyLogicalRepWorker->subid, false); + + if (syncworker) + { + SpinLockAcquire(&syncworker->relmutex); + syncworker->relid = rstate->relid; + syncworker->relstate = rstate->state; + syncworker->relstate_lsn = rstate->lsn; + SpinLockRelease(&syncworker->relmutex); + + if (syncworker->proc) + { + logicalrep_worker_wakeup_ptr(syncworker); + } + } + + LWLockRelease(LogicalRepWorkerLock); + } } } } @@ -1308,7 +1359,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(true); /* doesn't return */ + finish_sync_worker(false); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1735,10 +1786,6 @@ TablesyncWorkerMain(Datum main_arg) */ for (;;) { - List *rstates; - ListCell *lc; - bool is_table_found = false; - run_tablesync_worker(&options, myslotname, originname, @@ -1748,63 +1795,17 @@ TablesyncWorkerMain(Datum main_arg) if (IsTransactionState()) CommitTransactionCommand(); + finish_sync_worker(true); + if (MyLogicalRepWorker->is_sync_completed) { - /* This transaction will be committed by finish_sync_worker. */ - StartTransactionCommand(); - - /* - * Check if there is any table whose relation state is still INIT. - * If a table in INIT state is found, the worker will not be - * finished, it will be reused instead. - */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); - - foreach(lc, rstates) - { - SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); - - if (rstate->state == SUBREL_STATE_SYNCDONE) - continue; - - /* - * Take exclusive lock to prevent any other sync worker from - * picking the same table. - */ - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - - /* - * Pick the table for the next run if it is not already picked - * up by another worker. - */ - if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) - { - /* Update worker state for the next table */ - MyLogicalRepWorker->relid = rstate->relid; - MyLogicalRepWorker->relstate = rstate->state; - MyLogicalRepWorker->relstate_lsn = rstate->lsn; - LWLockRelease(LogicalRepWorkerLock); - - /* Found a table for next iteration */ - is_table_found = true; - finish_sync_worker(true); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("%s for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", - get_worker_name(), - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid), - MyLogicalRepWorker->relid))); - CommitTransactionCommand(); - - break; - } - LWLockRelease(LogicalRepWorkerLock); - } + /* wait for apply worker to assign a new table with INIT state. */ + wait_for_worker_state_change(SUBREL_STATE_INIT); + } - if (!is_table_found) - break; + if (!OidIsValid(MyLogicalRepWorker->relid)) + { + break; } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9c0237fe0b..7aa0f2be63 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -238,6 +238,7 @@ extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); +extern LogicalRepWorker * logicalrep_worker_find_syncdone(Oid subid, bool only_running); extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); -- 2.25.1