From f1b314e2341c35115eebcc407060d80633c95665 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 15 Apr 2026 20:26:01 +1000 Subject: [PATCH v12 3/3] Synchronize sequences directly in REFRESH SEQUENCES command. The ALTER SUBSCRIPTION ... REFRESH SEQUENCES command currently sets all sequence states in pg_subscription_rel to INIT and relies on the sequence sync worker to perform the actual synchronization and update states to READY. With the recent change making the sequence sync worker long-lived, most sequences are now synchronized in the background, reducing the need for REFRESH SEQUENCES. However, the command remains necessary for sequences that haven't been synchronized. This commit enhances REFRESH SEQUENCES to synchronize sequences directly within the command itself, eliminating the overhead of launching a worker and updating catalog entries unnecessarily. --- doc/src/sgml/logical-replication.sgml | 5 +- src/backend/commands/subscriptioncmds.c | 27 ++-- .../replication/logical/sequencesync.c | 148 +++++++++++++----- src/include/replication/logicalworker.h | 5 + src/test/subscription/t/036_sequences.pl | 49 ++++++ 5 files changed, 176 insertions(+), 58 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 430d2581699..99746c410cb 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1791,8 +1791,9 @@ Publications: A sequence synchronization worker will be started - after executing any of the above subscriber commands. The worker will - remain running for the life of the subscription, periodically + after executing CREATE SUBSCRIPTION or + ALTER SUBSCRIPTION ... REFRESH PUBLICATION command. The + worker will remain running for the life of the subscription, periodically synchronizing all published sequences. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3a30d8ddb1f..91b3d05aaf8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1288,25 +1288,20 @@ AlterSubscription_refresh_seq(Subscription *sub) PG_TRY(); { - List *subrel_states; - check_publications_origin_sequences(wrconn, sub->publications, true, sub->origin, NULL, 0, sub->name); - /* Get local sequence list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false, true, false); - foreach_ptr(SubscriptionRelState, subrel, subrel_states) - { - Oid relid = subrel->relid; - - UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, - InvalidXLogRecPtr, false); - ereport(DEBUG1, - errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name)); - } + /* + * Stop the sequencesync worker to prevent concurrent updates. This + * avoids a race condition where the sequence value could be updated + * by this command and then immediately moved backward by a + * concurrently running worker. Stopping the worker is safe even if it + * attempts to restart, as it will wait on the subscription lock + * already held by this ALTER SUBSCRIPTION command. + */ + logicalrep_worker_stop(WORKERTYPE_SEQUENCESYNC, sub->oid, InvalidOid); + + AlterSubSyncSequences(wrconn, sub->oid, sub->name, sub->runasowner); } PG_FINALLY(); { diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 352fd0e742b..e1fc556dd51 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -217,7 +217,7 @@ get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf) */ static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, - List *missing_seqs_idx, List *seqinfos) + List *missing_seqs_idx, List *seqinfos, char *subname) { StringInfoData seqstr; @@ -263,7 +263,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication sequence synchronization failed for subscription \"%s\"", - MySubscription->name)); + subname)); } /* @@ -420,10 +420,9 @@ check_seq_privileges_and_drift(LogicalRepSequenceInfo *seqinfo, */ static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, - bool update_lsn) + Oid subid, bool run_as_owner, bool update_lsn) { UserContext ucxt; - bool run_as_owner = MySubscription->runasowner; Oid seqoid = seqinfo->localrelid; CopySeqResult result; bool need_lsn_update = false; @@ -467,8 +466,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, XLogRecPtr local_page_lsn; /* Get the local page LSN for comparison with the remote value */ - (void) GetSubscriptionRelState(MySubscription->oid, - RelationGetRelid(sequence_rel), + (void) GetSubscriptionRelState(subid, RelationGetRelid(sequence_rel), &local_page_lsn); need_lsn_update = (local_page_lsn != seqinfo->page_lsn); @@ -480,7 +478,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, * cycle (update_lsn is true). */ if (seqinfo->relstate == SUBREL_STATE_INIT || need_lsn_update) - UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, + UpdateSubscriptionRelState(subid, seqoid, SUBREL_STATE_READY, seqinfo->page_lsn, false); return COPYSEQ_SUCCESS; @@ -499,7 +497,8 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, * Returns true/false if any sequences were actually copied. */ static bool -copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn) +copy_sequences(WalReceiverConn *conn, List *seqinfos, Oid subid, char *subname, + bool runasowner, bool update_lsn) { int cur_batch_base_index = 0; int n_seqinfos = list_length(seqinfos); @@ -531,11 +530,16 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn) int batch_no_drift = 0; int batch_missing_count; Relation sequence_rel = NULL; + bool started_tx = false; WalRcvExecResult *res; TupleTableSlot *slot; - StartTransactionCommand(); + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) { @@ -627,14 +631,15 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn) &seqinfo, &seqidx, seqinfos); if (sync_status == COPYSEQ_ALLOWED) - sync_status = copy_sequence(seqinfo, sequence_rel, update_lsn); + sync_status = copy_sequence(seqinfo, sequence_rel, subid, + runasowner, update_lsn); switch (sync_status) { case COPYSEQ_SUCCESS: elog(DEBUG1, "logical replication synchronization has updated sequence \"%s.%s\" in subscription \"%s\"", - seqinfo->nspname, seqinfo->seqname, MySubscription->name); + seqinfo->nspname, seqinfo->seqname, subname); batch_succeeded_count++; sequence_copied = true; break; @@ -707,13 +712,17 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn) elog(DEBUG1, "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift", - MySubscription->name, + subname, (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, batch_succeeded_count, batch_mismatched_count, batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift); - /* Commit this batch, and prepare for next batch */ - CommitTransactionCommand(); + /* + * Commit this batch if started a transaction, and prepare for next + * batch. + */ + if (started_tx) + CommitTransactionCommand(); if (batch_missing_count) { @@ -738,7 +747,7 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn) /* Report mismatches, permission issues, or missing sequences */ report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, - missing_seqs_idx, seqinfos); + missing_seqs_idx, seqinfos, subname); return sequence_copied; } @@ -754,37 +763,28 @@ invalidate_syncing_sequence_infos(Datum arg, SysCacheIdentifier cacheid, } /* - * Get the list of sequence information for the current subscription. + * Get the list of sequence information for the given subscription from + * catalog. * - * Return cached sequence states if valid; otherwise fetches them from the - * catalog, caches the result, and return it. + * All entries in the returned list are allocated in the specified memory + * context. */ static List * -fetch_sequence_infos(void) +fetch_sequences_from_catalog(Oid subid, MemoryContext ctx) { Relation rel; HeapTuple tup; ScanKeyData skey[1]; SysScanDesc scan; - Oid subid = MyLogicalRepWorker->subid; - List *tmp_seqinfos = NIL; + List *seqinfos = NIL; + bool started_tx = false; - if (sequence_infos_valid) - return sequence_infos; - - /* Free the existing invalid cache entries */ - foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos) + if (!IsTransactionState()) { - pfree(seqinfo->nspname); - pfree(seqinfo->seqname); - pfree(seqinfo); + StartTransactionCommand(); + started_tx = true; } - list_free(sequence_infos); - sequence_infos = NIL; - - StartTransactionCommand(); - rel = table_open(SubscriptionRelRelationId, AccessShareLock); /* Scan for all sequences belonging to this subscription */ @@ -825,14 +825,14 @@ fetch_sequence_infos(void) Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY); - /* Cache the information in a permanent memory context */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); + /* Cache the information in the given memory context */ + oldctx = MemoryContextSwitchTo(ctx); seq = palloc0_object(LogicalRepSequenceInfo); seq->localrelid = subrel->srrelid; seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); seq->seqname = pstrdup(RelationGetRelationName(sequence_rel)); seq->relstate = relstate; - tmp_seqinfos = lappend(tmp_seqinfos, seq); + seqinfos = lappend(seqinfos, seq); MemoryContextSwitchTo(oldctx); table_close(sequence_rel, NoLock); @@ -842,10 +842,38 @@ fetch_sequence_infos(void) systable_endscan(scan); table_close(rel, AccessShareLock); - sequence_infos = tmp_seqinfos; - sequence_infos_valid = true; + if (started_tx) + CommitTransactionCommand(); - CommitTransactionCommand(); + return seqinfos; +} + +/* + * Get the list of sequence information for the current subscription. + * + * Return cached sequence states if valid; otherwise fetches them from the + * catalog, caches the result, and return it. + */ +static List * +fetch_sequence_infos(void) +{ + if (sequence_infos_valid) + return sequence_infos; + + /* Free the existing invalid cache entries */ + foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos) + { + pfree(seqinfo->nspname); + pfree(seqinfo->seqname); + pfree(seqinfo); + } + + list_free(sequence_infos); + sequence_infos = NIL; + + sequence_infos = fetch_sequences_from_catalog(MySubscription->oid, + CacheMemoryContext); + sequence_infos_valid = true; return sequence_infos; } @@ -950,6 +978,9 @@ start_sequence_sync(void) seqinfos = fetch_sequence_infos(); sequence_copied = copy_sequences(LogRepWorkerWalRcvConn, seqinfos, + MySubscription->oid, + MySubscription->name, + MySubscription->runasowner, update_lsn); MemoryContextReset(SequenceSyncContext); @@ -1019,3 +1050,40 @@ SequenceSyncWorkerMain(Datum main_arg) FinishSyncWorker(); } + +/* + * Wrapper for LogicalRepSyncSequences to synchronize all sequences of a + * subscription from outside the sequencesync worker. + */ +void +AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, char *subname, + bool runasowner) +{ + List *seqinfos; + + Assert(!SequenceSyncContext); + + /* + * Fetch sequences directly from the catalog rather than using the + * sequence cache, which is maintained only for the sequence sync + * worker. + */ + seqinfos = fetch_sequences_from_catalog(subid, CurrentMemoryContext); + + PG_TRY(); + { + /* + * Use the current memory context for synchronization. Since this should + * be short-lived command context that will be cleaned up automatically, + * we can simply assign it as the synchronization context. + */ + SequenceSyncContext = CurrentMemoryContext; + + (void) copy_sequences(conn, seqinfos, subid, subname, runasowner, true); + } + PG_FINALLY(); + { + SequenceSyncContext = NULL; + } + PG_END_TRY(); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 7d748a28da8..73afd7853d0 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,6 +14,8 @@ #include +#include "replication/walreceiver.h" + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); @@ -31,4 +33,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid); extern void AtEOXact_LogicalRepWorkers(bool isCommit); +extern void AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, + char *subname, bool runasowner); + #endif /* LOGICALWORKER_H */ diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl index af190713b2b..8d25ac40ce0 100644 --- a/src/test/subscription/t/036_sequences.pl +++ b/src/test/subscription/t/036_sequences.pl @@ -176,4 +176,53 @@ $node_subscriber->wait_for_log( qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)/, $log_offset); +########## +# ALTER SUBSCRIPTION ... REFRESH SEQUENCES synchronizes sequences online, +# eliminating the need to launch a sequencesync worker. +########## + +# Reduce max_logical_replication_workers to disallow sequence worker from running +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 0)); +$node_subscriber->restart; + +# Verify there is no logical replication apply worker running +$result = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"); + +is($result, '0', 'no logical replication worker is running'); + +# Increment sequence on publisher +$node_publisher->safe_psql('postgres', + qq(SELECT nextval('regress_s1');)); + +# The command should fail due to missing sequence ('regress_s4') +my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;"); + +like( + $stderr, + qr/WARNING: missing sequence on publisher \("public.regress_s4"\)/, + "output the wanring for the missing sequence regress_s4"); + +like( + $stderr, + qr/ERROR: logical replication sequence synchronization failed for subscription \"regress_seq_sub\"/, + "the command failed due to the missing sequence regress_s4"); + +# Refresh the publication to remove the missing sequence +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;"); + +# Sync the sequence regress_s1 +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;"); + +# Get the current sequence value on subscriber +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT last_value FROM regress_s1;)); + +is($result, '201', 'sequence regress_s1 is synced now'); + done_testing(); -- 2.47.3