diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 2a1d4e03fe2..ff9bc02a3df 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -872,11 +872,11 @@ logicalrep_reset_seqsync_start_time(void) LogicalRepWorker *worker; /* - * Acquire LogicalRepWorkerLock in LW_EXCLUSIVE mode to block the apply - * worker (holding LW_SHARED) from reading or updating - * last_seqsync_start_time. See ProcessSyncingSequencesForApply(). + * The apply worker can't access last_seqsync_start_time concurrently, so + * it is okay to use SHARED lock here. See + * ProcessSyncingSequencesForApply(). */ - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, InvalidOid, diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 4bf70abcbaf..7f8afc1fec8 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -127,6 +127,10 @@ ProcessSyncingSequencesForApply(void) nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); + /* + * It is okay to read/update last_seqsync_start_time here in apply worker + * as we have already ensured that sync worker doesn't exist. + */ launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid, &MyLogicalRepWorker->last_seqsync_start_time); } @@ -419,7 +423,7 @@ copy_sequences(WalReceiverConn *conn) * - On each node, a background worker acquires a lock on a sequence * as part of a sync operation. * - * - Concurrently, a user transaction attempts to alter the same + * - Concurrently, a user transaction attempts to alter the same * sequence, waiting on the background worker's lock. * * - Meanwhile, a query from the other node tries to access metadata @@ -485,8 +489,9 @@ copy_sequences(WalReceiverConn *conn) case COPYSEQ_MISMATCH: /* - * Allocate in a long-lived memory context, since these - * errors will be reported after the transaction commits. + * Remember mismatched sequences in a long-lived memory + * context, since these will be used after the transaction + * commits. */ oldctx = MemoryContextSwitchTo(TopMemoryContext); mismatched_seqs = lappend_int(mismatched_seqs, seqidx); @@ -496,8 +501,9 @@ copy_sequences(WalReceiverConn *conn) case COPYSEQ_INSUFFICIENT_PERM: /* - * Allocate in a long-lived memory context, since these - * errors will be reported after the transaction commits. + * Remember the sequences with insufficient privileges in a + * long-lived memory context, since these will be used after + * the transaction commits. */ oldctx = MemoryContextSwitchTo(TopMemoryContext); insuffperm_seqs = lappend_int(insuffperm_seqs, seqidx); @@ -573,7 +579,7 @@ copy_sequences(WalReceiverConn *conn) } /* - * Determines which sequences require synchronization and initiates their + * Identifies sequences that require synchronization and initiates the * synchronization process. */ static void diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index f8b1a3d4827..53530bb39b6 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -107,8 +107,8 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Attempt to launch a sync worker (sequence or table) if there is a sync - * worker slot available and the retry interval has elapsed. + * Attempt to launch a sync worker for one or more sequences or a table, if + * a worker slot is available and the retry interval has elapsed. * * wtype: sync worker type. * nsyncworkers: Number of currently running sync workers for the subscription. @@ -179,7 +179,7 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) case WORKERTYPE_SEQUENCESYNC: /* Should never happen. */ - elog(ERROR, "Sequence synchronization worker not expected to process relations"); + elog(ERROR, "sequence synchronization worker is not expected to process relations"); break; case WORKERTYPE_UNKNOWN: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8026a007ec3..bc8b5a5cb69 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -704,7 +704,7 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) case WORKERTYPE_SEQUENCESYNC: /* Should never happen. */ - elog(ERROR, "Sequence synchronization worker not expected to apply changes"); + elog(ERROR, "sequence synchronization worker is not expected to apply changes"); break; case WORKERTYPE_UNKNOWN: diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 32ef365f4a6..87fb211d040 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -267,8 +267,6 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples); -extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, - Oid relid, TimestampTz *last_start_time); extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); @@ -292,6 +290,8 @@ extern void ProcessSyncingSequencesForApply(void); pg_noreturn extern void FinishSyncWorker(void); extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); +extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, + Oid relid, TimestampTz* last_start_time); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); extern void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_sequences, bool *started_tx);