From 2c714f0ec91cc491af9851f77f63d1f3d8100a5d Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 5 Mar 2026 10:44:17 +0800 Subject: [PATCH v10 2/2] Synchronize sequences directly in REFRESH SEQUENCES command The ALTER SUBSCRIPTION ... REFRESH SEQUENCES command currently sets all sequence states in pg_subscription_rel to INIT and relies on the sequence sync worker to perform the actual synchronization and update states to READY. With the recent change making the sequence sync worker long-lived, most sequences are now synchronized in the background, reducing the need for REFRESH SEQUENCES. However, the command remains necessary for sequences that haven't been synchronized. This commit enhances REFRESH SEQUENCES to synchronize sequences directly within the command itself, eliminating the overhead of launching a worker and updating catalog entries unnecessarily. --- doc/src/sgml/logical-replication.sgml | 6 +- src/backend/commands/subscriptioncmds.c | 17 +- .../replication/logical/sequencesync.c | 158 ++++++++++++++---- src/include/replication/logicalworker.h | 5 + src/test/subscription/t/036_sequences.pl | 49 ++++++ 5 files changed, 188 insertions(+), 47 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index bb523af5d37..323d5b7c7a0 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1787,9 +1787,9 @@ Publications: A sequence synchronization worker will be started - after executing any of the above subscriber commands. The worker will - remain running for the life of the subscription, periodically - synchronizing all published sequences. + after executing ALTER SUBSCRIPTION ... REFRESH PUBLICATION + command. The worker will remain running for the life of the subscription, + periodically synchronizing all published sequences. The ability to launch a sequence synchronization worker is limited by the diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5e3c0964d38..0a5acfda0ff 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1245,25 +1245,10 @@ AlterSubscription_refresh_seq(Subscription *sub) PG_TRY(); { - List *subrel_states; - check_publications_origin_sequences(wrconn, sub->publications, true, sub->origin, NULL, 0, sub->name); - /* Get local sequence list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false, true, false); - foreach_ptr(SubscriptionRelState, subrel, subrel_states) - { - Oid relid = subrel->relid; - - UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, - InvalidXLogRecPtr, false); - ereport(DEBUG1, - errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name)); - } + AlterSubSyncSequences(wrconn, sub->oid, sub->name, sub->runasowner); } PG_FINALLY(); { diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index fb293487657..8b910aa1bc0 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -210,7 +210,7 @@ get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf) */ static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, - List *missing_seqs_idx, List *seqinfos) + List *missing_seqs_idx, List *seqinfos, char *subname) { StringInfo seqstr; @@ -256,7 +256,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication sequence synchronization failed for subscription \"%s\"", - MySubscription->name)); + subname)); } /* @@ -284,6 +284,7 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, HeapTuple tup; Form_pg_sequence local_seq; LogicalRepSequenceInfo *seqinfo_local; + LOCKMODE lockmode; *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); @@ -330,7 +331,20 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, seqinfo_local->found_on_pub = true; - *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock); + /* + * We take a stronger lock during DDL commands (currently only ALTER + * SUBSCRIPTION ... REFRESH SEQUENCES) to prevent concurrent sequencesync + * workers from updating the page_lsn while the DDL is also updating the + * same sequence. This ensures we can always fetch the latest page_lsn to + * determine whether the remote sequence value should be synchronized (see + * validate_seqsync_state). + */ + if (IsLogicalWorker()) + lockmode = RowExclusiveLock; + else + lockmode = ShareRowExclusiveLock; + + *sequence_rel = try_table_open(seqinfo_local->localrelid, lockmode); /* Sequence was concurrently dropped? */ if (!*sequence_rel) @@ -369,7 +383,8 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, * whether the sequence has drifted. */ static CopySeqResult -validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) +validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, + XLogRecPtr local_page_lsn) { AclResult aclresult; Oid seqoid = seqinfo->localrelid; @@ -380,6 +395,16 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) int64 local_last_value; bool local_is_called; + /* + * Skip synchronization if we are processing outdated sequence info + * based on the LSN. This occurs when the sequence has been updated to + * more recent data concurrently (via either ALTER SUBSCRIPTION ... + * REFRESH SEQUENCES or the sequencesync worker). + */ + if (XLogRecPtrIsValid(local_page_lsn) && + local_page_lsn > seqinfo->page_lsn) + return COPYSEQ_NO_DRIFT; + /* Get current local sequence state */ GetSequence(sequence_rel, &local_last_value, &local_is_called); @@ -390,6 +415,32 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) if (local_last_value == 0) return COPYSEQ_INSUFFICIENT_PERM; + /* + * Skip synchronization if the local sequence value is already ahead of + * the publisher's value. + * + * XXX This occurs not only when the local sequence has been + * synchronized to a newer value from the publisher (where skipping is + * necessary to avoid backward movement), but also when the local + * sequence has been manually updated by the user on the subscriber. The + * latter could be considered a replication conflict, and overwriting + * the user's update might be acceptable. However, since we cannot + * easily distinguish between these two scenarios, we choose to skip + * synchronization in all cases and emit a WARNING to notify the user to + * manually resolve the conflict. + */ + if (local_last_value > seqinfo->last_value) + { + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped synchronizing the sequence \"%s.%s\"", + seqinfo->nspname, seqinfo->seqname), + errdetail("The local last_value %lld is ahead of the one on publisher", + (long long int) local_last_value)); + + return COPYSEQ_NO_DRIFT; + } + /* * Skip synchronization if the sequence has not drifted from the * publisher's value. @@ -414,16 +465,15 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) * synchronized. */ static CopySeqResult -copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) +copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, + Oid subid, bool run_as_owner) { UserContext ucxt; - bool run_as_owner = MySubscription->runasowner; Oid seqoid = seqinfo->localrelid; CopySeqResult result; XLogRecPtr local_page_lsn; - (void) GetSubscriptionRelState(MySubscription->oid, - RelationGetRelid(sequence_rel), + (void) GetSubscriptionRelState(subid, RelationGetRelid(sequence_rel), &local_page_lsn); /* @@ -433,7 +483,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) if (!run_as_owner) SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt); - result = validate_seqsync_state(seqinfo, sequence_rel); + result = validate_seqsync_state(seqinfo, sequence_rel, local_page_lsn); if (result != COPYSEQ_ALLOWED) { @@ -478,7 +528,8 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) * Returns true/false if any sequences were actually copied. */ static bool -copy_sequences(WalReceiverConn *conn, List *seqinfos) +copy_sequences(WalReceiverConn *conn, List *seqinfos, Oid subid, char *subname, + bool runasowner) { int cur_batch_base_index = 0; int n_seqinfos = list_length(seqinfos); @@ -504,11 +555,16 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) int batch_no_drift = 0; int batch_missing_count; Relation sequence_rel = NULL; + bool started_tx = false; WalRcvExecResult *res; TupleTableSlot *slot; - StartTransactionCommand(); + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) { @@ -604,14 +660,15 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) * sequences in READY state, only sync if there's drift. */ if (sync_status == COPYSEQ_ALLOWED) - sync_status = copy_sequence(seqinfo, sequence_rel); + sync_status = copy_sequence(seqinfo, sequence_rel, + subid, runasowner); switch (sync_status) { case COPYSEQ_SUCCESS: elog(DEBUG1, "logical replication synchronization has updated sequence \"%s.%s\" in subscription \"%s\"", - seqinfo->nspname, seqinfo->seqname, MySubscription->name); + seqinfo->nspname, seqinfo->seqname, subname); batch_succeeded_count++; sequence_copied = true; break; @@ -619,9 +676,8 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) case COPYSEQ_MISMATCH: /* - * Remember mismatched sequences in SequenceSyncContext - * since these will be used after the transaction is - * committed. + * Remember mismatched sequences in SequenceSyncContext since + * these will be used after the transaction is committed. */ oldctx = MemoryContextSwitchTo(SequenceSyncContext); mismatched_seqs_idx = lappend_int(mismatched_seqs_idx, @@ -689,13 +745,17 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) elog(DEBUG1, "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift", - MySubscription->name, + subname, (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, batch_succeeded_count, batch_mismatched_count, batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift); - /* Commit this batch, and prepare for next batch */ - CommitTransactionCommand(); + /* + * Commit this batch if started a transaction, and prepare for next + * batch. + */ + if (started_tx) + CommitTransactionCommand(); if (batch_missing_count) { @@ -720,7 +780,7 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) /* Report mismatches, permission issues, or missing sequences */ report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, - missing_seqs_idx, seqinfos); + missing_seqs_idx, seqinfos, subname); return sequence_copied; } @@ -732,20 +792,23 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) * Returns true if sequences have been updated. */ static bool -LogicalRepSyncSequences(WalReceiverConn *conn) +LogicalRepSyncSequences(WalReceiverConn *conn, Oid subid, char *subname, + bool runasowner) { Relation rel; HeapTuple tup; ScanKeyData skey[1]; SysScanDesc scan; - Oid subid = MyLogicalRepWorker->subid; bool sequence_copied = false; List *seqinfos = NIL; MemoryContext oldctx; + bool started_tx = false; - Assert(SequenceSyncContext); - - StartTransactionCommand(); + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } rel = table_open(SubscriptionRelRelationId, AccessShareLock); @@ -805,7 +868,8 @@ LogicalRepSyncSequences(WalReceiverConn *conn) systable_endscan(scan); table_close(rel, AccessShareLock); - CommitTransactionCommand(); + if (started_tx) + CommitTransactionCommand(); /* * Exit early if no catalog entries found, likely due to concurrent drops. @@ -814,7 +878,8 @@ LogicalRepSyncSequences(WalReceiverConn *conn) return false; /* Process sequences */ - sequence_copied = copy_sequences(conn, seqinfos); + sequence_copied = copy_sequences(conn, seqinfos, subid, subname, + runasowner); return sequence_copied; } @@ -889,7 +954,10 @@ start_sequence_sync(void) /* * Synchronize all sequences (both READY and INIT states). */ - sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn); + sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn, + MySubscription->oid, + MySubscription->name, + MySubscription->runasowner); MemoryContextReset(SequenceSyncContext); MemoryContextSwitchTo(oldctx); @@ -951,3 +1019,37 @@ SequenceSyncWorkerMain(Datum main_arg) FinishSyncWorker(); } + +/* + * Wrapper for LogicalRepSyncSequences to synchronize all sequences of a + * subscription from outside the sequencesync worker + */ +void +AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, char *subname, + bool runasowner) +{ + /* + * Init the SequenceSyncContext which we clean up after the sequence + * synchronization. + */ + SequenceSyncContext = AllocSetContextCreate(CurrentMemoryContext, + "SequenceSyncContext", + ALLOCSET_DEFAULT_SIZES); + + PG_TRY(); + { + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(SequenceSyncContext); + + LogicalRepSyncSequences(conn, subid, subname, runasowner); + + MemoryContextSwitchTo(oldctx); + } + PG_FINALLY(); + { + MemoryContextDelete(SequenceSyncContext); + SequenceSyncContext = NULL; + } + PG_END_TRY(); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 7d748a28da8..73afd7853d0 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,6 +14,8 @@ #include +#include "replication/walreceiver.h" + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); @@ -31,4 +33,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid); extern void AtEOXact_LogicalRepWorkers(bool isCommit); +extern void AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, + char *subname, bool runasowner); + #endif /* LOGICALWORKER_H */ diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl index 1d81518fe22..9a61b7bd0c8 100644 --- a/src/test/subscription/t/036_sequences.pl +++ b/src/test/subscription/t/036_sequences.pl @@ -176,4 +176,53 @@ $node_subscriber->wait_for_log( qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)/, $log_offset); +########## +# ALTER SUBSCRIPTION ... REFRESH SEQUENCES synchronizes sequences online, +# eliminating the need to launch a sequencesync worker. +########## + +# Reduce max_logical_replication_workers to disallow sequence worker from running +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 0)); +$node_subscriber->restart; + +# Verify there is no logical replication apply worker running +$result = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"); + +is($result, '0', 'no logical replication worker is running'); + +# Increment sequence on publisher +$node_publisher->safe_psql('postgres', + qq(SELECT nextval('regress_s1');)); + +# The command should fail due to missing sequence ('regress_s4') +my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;"); + +like( + $stderr, + qr/WARNING: missing sequence on publisher \("public.regress_s4"\)/, + "output the wanring for the missing sequence regress_s4"); + +like( + $stderr, + qr/ERROR: logical replication sequence synchronization failed for subscription \"regress_seq_sub\"/, + "the command failed due to the missing sequence regress_s4"); + +# Refresh the publication to remove the missing sequence +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;"); + +# Sync the sequence regress_s1 +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;"); + +# Get the current sequence value on subscriber +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT last_value FROM regress_s1;)); + +is($result, '201', 'sequence regress_s1 is synced now'); + done_testing(); -- 2.51.1.windows.1