From 53d4a199cb4703dea8eaa568e7aa624277496793 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Sat, 28 Feb 2026 16:14:14 +0800 Subject: [PATCH v8 2/2] Synchronize sequences directly in REFRESH SEQUENCES command The ALTER SUBSCRIPTION ... REFRESH SEQUENCES command currently sets all sequence states in pg_subscription_rel to INIT and relies on the sequence sync worker to perform the actual synchronization and update states to READY. With the recent change making the sequence sync worker long-lived, most sequences are now synchronized in the background, reducing the need for REFRESH SEQUENCES. However, the command remains necessary for sequences that haven't been synchronized. This commit enhances REFRESH SEQUENCES to synchronize sequences directly within the command itself, eliminating the overhead of launching a worker and updating catalog entries unnecessarily. --- src/backend/commands/subscriptioncmds.c | 17 +- .../replication/logical/sequencesync.c | 165 ++++++++++++++---- src/include/replication/logicalworker.h | 5 + src/test/subscription/t/036_sequences.pl | 49 ++++++ 4 files changed, 190 insertions(+), 46 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5e3c0964d38..0a5acfda0ff 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1245,25 +1245,10 @@ AlterSubscription_refresh_seq(Subscription *sub) PG_TRY(); { - List *subrel_states; - check_publications_origin_sequences(wrconn, sub->publications, true, sub->origin, NULL, 0, sub->name); - /* Get local sequence list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false, true, false); - foreach_ptr(SubscriptionRelState, subrel, subrel_states) - { - Oid relid = subrel->relid; - - UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, - InvalidXLogRecPtr, false); - ereport(DEBUG1, - errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name)); - } + AlterSubSyncSequences(wrconn, sub->oid, sub->name, sub->runasowner); } PG_FINALLY(); { diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 7df871ae7bc..71452261dd2 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -208,7 +208,7 @@ get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf) */ static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, - List *missing_seqs_idx, List *seqinfos) + List *missing_seqs_idx, List *seqinfos, char *subname) { StringInfo seqstr; @@ -254,7 +254,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication sequence synchronization failed for subscription \"%s\"", - MySubscription->name)); + subname)); } /* @@ -282,6 +282,7 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, HeapTuple tup; Form_pg_sequence local_seq; LogicalRepSequenceInfo *seqinfo_local; + LOCKMODE lockmode; *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); @@ -328,7 +329,20 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, seqinfo_local->found_on_pub = true; - *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock); + /* + * We take a stronger lock during DDL commands (currently only ALTER + * SUBSCRIPTION ... REFRESH SEQUENCES) to prevent concurrent sequencesync + * workers from updating the page_lsn while the DDL is also updating the + * same sequence. This ensures we can always fetch the latest page_lsn to + * determine whether the remote sequence value should be synchronized (see + * should_sync_sequence). + */ + if (IsLogicalWorker()) + lockmode = RowExclusiveLock; + else + lockmode = ShareRowExclusiveLock; + + *sequence_rel = try_table_open(seqinfo_local->localrelid, lockmode); /* Sequence was concurrently dropped? */ if (!*sequence_rel) @@ -366,7 +380,8 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, * Preliminary check to determine if copying the sequence is allowed. */ static CopySeqResult -validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) +validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, + XLogRecPtr local_page_lsn) { AclResult aclresult; Oid seqoid = seqinfo->localrelid; @@ -376,6 +391,16 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) /* Perform drift check if it's not the initial sync */ if (seqinfo->relstate == SUBREL_STATE_READY) { + /* + * Skip synchronization if we are processing outdated sequence info + * based on the LSN. This occurs when the sequence has been updated to + * more recent data concurrently (via either ALTER SUBSCRIPTION ... + * REFRESH SEQUENCES or the sequencesync worker). + */ + if (XLogRecPtrIsValid(local_page_lsn) && + local_page_lsn > seqinfo->page_lsn) + return COPYSEQ_NO_DRIFT; + /* * Verify that the current user has SELECT privilege on the sequence. * This is required to read the sequence state below. @@ -389,9 +414,32 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) GetSequence(sequence_rel, &local_last_value, &local_is_called); /* - * Skip synchronization if the sequence is already in READY state and - * has not drifted from the publisher's value. + * Skip synchronization if the local sequence value is already ahead of + * the publisher's value. + * + * XXX This occurs not only when the local sequence has been + * synchronized to a newer value from the publisher (where skipping is + * necessary to avoid backward movement), but also when the local + * sequence has been manually updated by the user on the subscriber. The + * latter could be considered a replication conflict, and overwriting + * the user's update might be acceptable. However, since we cannot + * easily distinguish between these two scenarios, we choose to skip + * synchronization in all cases and emit a WARNING to notify the user to + * manually resolve the conflict. */ + if (local_last_value > seqinfo->last_value) + { + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped synchronizing the sequence \"%s.%s\"", + seqinfo->nspname, seqinfo->seqname), + errdetail("The local last_value %lld is ahead of the one on publisher", + (long long int) local_last_value)); + + return COPYSEQ_NO_DRIFT; + } + + /* Skip synchronization if the sequence hasn't drifted */ if (local_last_value == seqinfo->last_value && local_is_called == seqinfo->is_called) return COPYSEQ_NO_DRIFT; @@ -412,12 +460,16 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) * synchronized. */ static CopySeqResult -copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) +copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel, + Oid subid, bool run_as_owner) { UserContext ucxt; - bool run_as_owner = MySubscription->runasowner; Oid seqoid = seqinfo->localrelid; CopySeqResult result; + XLogRecPtr local_page_lsn; + + (void) GetSubscriptionRelState(subid, RelationGetRelid(sequence_rel), + &local_page_lsn); /* * If the user did not opt to run as the owner of the subscription @@ -426,7 +478,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) if (!run_as_owner) SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt); - result = validate_seqsync_state(seqinfo, sequence_rel); + result = validate_seqsync_state(seqinfo, sequence_rel, local_page_lsn); if (result != COPYSEQ_SUCCESS) { @@ -453,8 +505,9 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) * Record the remote sequence's LSN in pg_subscription_rel and mark the * sequence as READY if updating a sequence that is in INIT state. */ - if (seqinfo->relstate == SUBREL_STATE_INIT) - UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, + if (seqinfo->relstate == SUBREL_STATE_INIT || + seqinfo->page_lsn != local_page_lsn) + UpdateSubscriptionRelState(subid, seqoid, SUBREL_STATE_READY, seqinfo->page_lsn, false); return COPYSEQ_SUCCESS; @@ -470,7 +523,8 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel) * Returns true/false if any sequences were actually copied. */ static bool -copy_sequences(WalReceiverConn *conn, List *seqinfos) +copy_sequences(WalReceiverConn *conn, List *seqinfos, Oid subid, char *subname, + bool runasowner) { int cur_batch_base_index = 0; int n_seqinfos = list_length(seqinfos); @@ -496,11 +550,16 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) int batch_no_drift = 0; int batch_missing_count; Relation sequence_rel = NULL; + bool started_tx = false; WalRcvExecResult *res; TupleTableSlot *slot; - StartTransactionCommand(); + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) { @@ -596,14 +655,15 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) * sequences in READY state, only sync if there's drift. */ if (sync_status == COPYSEQ_SUCCESS) - sync_status = copy_sequence(seqinfo, sequence_rel); + sync_status = copy_sequence(seqinfo, sequence_rel, + subid, runasowner); switch (sync_status) { case COPYSEQ_SUCCESS: elog(DEBUG1, "logical replication synchronization has updated sequence \"%s.%s\" in subscription \"%s\"", - seqinfo->nspname, seqinfo->seqname, MySubscription->name); + seqinfo->nspname, seqinfo->seqname, subname); batch_succeeded_count++; sequence_copied = true; break; @@ -611,9 +671,8 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) case COPYSEQ_MISMATCH: /* - * Remember mismatched sequences in SequenceSyncContext - * since these will be used after the transaction is - * committed. + * Remember mismatched sequences in SequenceSyncContext since + * these will be used after the transaction is committed. */ oldctx = MemoryContextSwitchTo(SequenceSyncContext); mismatched_seqs_idx = lappend_int(mismatched_seqs_idx, @@ -679,13 +738,17 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) elog(DEBUG1, "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift", - MySubscription->name, + subname, (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, batch_succeeded_count, batch_mismatched_count, batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift); - /* Commit this batch, and prepare for next batch */ - CommitTransactionCommand(); + /* + * Commit this batch if started a transaction, and prepare for next + * batch. + */ + if (started_tx) + CommitTransactionCommand(); if (batch_missing_count) { @@ -710,7 +773,7 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) /* Report mismatches, permission issues, or missing sequences */ report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, - missing_seqs_idx, seqinfos); + missing_seqs_idx, seqinfos, subname); return sequence_copied; } @@ -722,20 +785,23 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos) * Returns true if sequences have been updated. */ static bool -LogicalRepSyncSequences(WalReceiverConn *conn) +LogicalRepSyncSequences(WalReceiverConn *conn, Oid subid, char *subname, + bool runasowner) { Relation rel; HeapTuple tup; ScanKeyData skey[1]; SysScanDesc scan; - Oid subid = MyLogicalRepWorker->subid; bool sequence_copied = false; List *seqinfos = NIL; MemoryContext oldctx; + bool started_tx = false; - Assert(SequenceSyncContext); - - StartTransactionCommand(); + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } rel = table_open(SubscriptionRelRelationId, AccessShareLock); @@ -795,7 +861,8 @@ LogicalRepSyncSequences(WalReceiverConn *conn) systable_endscan(scan); table_close(rel, AccessShareLock); - CommitTransactionCommand(); + if (started_tx) + CommitTransactionCommand(); /* * Exit early if no catalog entries found, likely due to concurrent drops. @@ -804,7 +871,8 @@ LogicalRepSyncSequences(WalReceiverConn *conn) return false; /* Process sequences */ - sequence_copied = copy_sequences(conn, seqinfos); + sequence_copied = copy_sequences(conn, seqinfos, subid, subname, + runasowner); return sequence_copied; } @@ -879,7 +947,10 @@ start_sequence_sync(void) /* * Synchronize all sequences (both READY and INIT states). */ - sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn); + sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn, + MySubscription->oid, + MySubscription->name, + MySubscription->runasowner); MemoryContextReset(SequenceSyncContext); MemoryContextSwitchTo(oldctx); @@ -941,3 +1012,37 @@ SequenceSyncWorkerMain(Datum main_arg) FinishSyncWorker(); } + +/* + * Wrapper for LogicalRepSyncSequences to synchronize all sequences of a + * subscription from outside the sequencesync worker + */ +void +AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, char *subname, + bool runasowner) +{ + /* + * Init the SequenceSyncContext which we clean up after the sequence + * synchronization. + */ + SequenceSyncContext = AllocSetContextCreate(CurrentMemoryContext, + "SequenceSyncContext", + ALLOCSET_DEFAULT_SIZES); + + PG_TRY(); + { + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(SequenceSyncContext); + + LogicalRepSyncSequences(conn, subid, subname, runasowner); + + MemoryContextSwitchTo(oldctx); + } + PG_FINALLY(); + { + MemoryContextDelete(SequenceSyncContext); + SequenceSyncContext = NULL; + } + PG_END_TRY(); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 7d748a28da8..73afd7853d0 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,6 +14,8 @@ #include +#include "replication/walreceiver.h" + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); @@ -31,4 +33,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid); extern void AtEOXact_LogicalRepWorkers(bool isCommit); +extern void AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, + char *subname, bool runasowner); + #endif /* LOGICALWORKER_H */ diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl index 1d81518fe22..9a61b7bd0c8 100644 --- a/src/test/subscription/t/036_sequences.pl +++ b/src/test/subscription/t/036_sequences.pl @@ -176,4 +176,53 @@ $node_subscriber->wait_for_log( qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)/, $log_offset); +########## +# ALTER SUBSCRIPTION ... REFRESH SEQUENCES synchronizes sequences online, +# eliminating the need to launch a sequencesync worker. +########## + +# Reduce max_logical_replication_workers to disallow sequence worker from running +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 0)); +$node_subscriber->restart; + +# Verify there is no logical replication apply worker running +$result = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"); + +is($result, '0', 'no logical replication worker is running'); + +# Increment sequence on publisher +$node_publisher->safe_psql('postgres', + qq(SELECT nextval('regress_s1');)); + +# The command should fail due to missing sequence ('regress_s4') +my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;"); + +like( + $stderr, + qr/WARNING: missing sequence on publisher \("public.regress_s4"\)/, + "output the wanring for the missing sequence regress_s4"); + +like( + $stderr, + qr/ERROR: logical replication sequence synchronization failed for subscription \"regress_seq_sub\"/, + "the command failed due to the missing sequence regress_s4"); + +# Refresh the publication to remove the missing sequence +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;"); + +# Sync the sequence regress_s1 +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;"); + +# Get the current sequence value on subscriber +$result = $node_subscriber->safe_psql('postgres', + qq(SELECT last_value FROM regress_s1;)); + +is($result, '201', 'sequence regress_s1 is synced now'); + done_testing(); -- 2.51.1.windows.1