From e0bd9f806cf4c5264cb5543e7136e5af30272c5f Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 15 Oct 2025 19:00:16 +0800 Subject: [PATCH v20251016 2/3] New worker for sequence synchronization during subscription management This patch introduces sequence synchronization: Sequences have 3 states: - INIT (needs [re]synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. It does the following: a) Retrieves remote values of sequences with pg_sequence_state() INIT. b) Logs a warning if the sequence parameters differ between the publisher and subscriber. c) Sets the local sequence values accordingly. d) Updates the local sequence state to READY. e) Repeats until all done; Commits synchronized sequences in batches of 100 Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - (PG19 command syntax is unchanged) - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker (see above) to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - (PG19 command syntax is unchanged) - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker (see above) to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES - The patch introduces this new command to refresh all sequences - All sequences in pg_subscription_rel are reset to DATASYNC state. - Initiate the sequencesync worker (see above) to synchronize all sequences. - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command, addition and removal of missing sequences will not be done in this case Author: Vignesh C Reviewer: Amit Kapila, Shveta Malik, Dilip Kumar, Peter Smith, Nisha Moond Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com --- src/backend/catalog/pg_subscription.c | 2 +- src/backend/catalog/system_views.sql | 1 + src/backend/commands/sequence.c | 23 +- src/backend/commands/subscriptioncmds.c | 8 +- src/backend/postmaster/bgworker.c | 5 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 63 +- src/backend/replication/logical/meson.build | 1 + .../replication/logical/sequencesync.c | 759 ++++++++++++++++++ src/backend/replication/logical/syncutils.c | 106 ++- src/backend/replication/logical/tablesync.c | 85 +- src/backend/replication/logical/worker.c | 71 +- .../utils/activity/pgstat_subscription.c | 27 +- src/backend/utils/adt/pgstatfuncs.c | 27 +- src/backend/utils/misc/guc_parameters.dat | 2 +- src/include/catalog/pg_proc.dat | 8 +- src/include/catalog/pg_subscription_rel.h | 16 + src/include/commands/sequence.h | 3 + src/include/pgstat.h | 6 +- src/include/replication/logicalworker.h | 3 +- src/include/replication/worker_internal.h | 29 +- src/test/regress/expected/rules.out | 3 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/026_stats.pl | 58 +- src/test/subscription/t/036_sequences.pl | 238 ++++++ src/tools/pgindent/typedefs.list | 2 + 26 files changed, 1392 insertions(+), 156 deletions(-) create mode 100644 src/backend/replication/logical/sequencesync.c create mode 100644 src/test/subscription/t/036_sequences.pl diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index c615005c923..153a2da6940 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -354,7 +354,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); if (!HeapTupleIsValid(tup)) - elog(ERROR, "subscription table %u in subscription %u does not exist", + elog(ERROR, "subscription relation %u in subscription %u does not exist", relid, subid); /* Update the tuple. */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 823776c1498..1f3ef004aa3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1414,6 +1414,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.subid, s.subname, ss.apply_error_count, + ss.sequence_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index cf46a543364..067c6c68ee8 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -112,7 +112,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity, bool *is_called, bool *need_seq_rewrite, List **owned_by); -static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); @@ -955,8 +954,8 @@ lastval(PG_FUNCTION_ARGS) * it is the only way to clear the is_called flag in an existing * sequence. */ -static void -do_setval(Oid relid, int64 next, bool iscalled) +void +SetSequence(Oid relid, int64 next, bool iscalled) { SeqTable elm; Relation seqrel; @@ -1057,7 +1056,7 @@ do_setval(Oid relid, int64 next, bool iscalled) /* * Implement the 2 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval_oid(PG_FUNCTION_ARGS) @@ -1065,14 +1064,14 @@ setval_oid(PG_FUNCTION_ARGS) Oid relid = PG_GETARG_OID(0); int64 next = PG_GETARG_INT64(1); - do_setval(relid, next, true); + SetSequence(relid, next, true); PG_RETURN_INT64(next); } /* * Implement the 3 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval3_oid(PG_FUNCTION_ARGS) @@ -1081,7 +1080,7 @@ setval3_oid(PG_FUNCTION_ARGS) int64 next = PG_GETARG_INT64(1); bool iscalled = PG_GETARG_BOOL(2); - do_setval(relid, next, iscalled); + SetSequence(relid, next, iscalled); PG_RETURN_INT64(next); } @@ -1799,7 +1798,8 @@ pg_sequence_parameters(PG_FUNCTION_ARGS) * Return the sequence tuple along with its page LSN. * * This is primarily intended for use by pg_dump to gather sequence data - * without needing to individually query each sequence relation. + * without needing to individually query each sequence relation. This will also + * be used by logical replication while synchronizing sequences. */ Datum pg_get_sequence_data(PG_FUNCTION_ARGS) @@ -1843,6 +1843,13 @@ pg_get_sequence_data(PG_FUNCTION_ARGS) values[0] = Int64GetDatum(seq->last_value); values[1] = BoolGetDatum(seq->is_called); + + /* + * The page LSN will be used in logical replication of sequences to + * record the LSN of the sequence page in the pg_subscription_rel + * system catalog. It reflects the LSN of the remote sequence at the + * time it was synchronized. + */ values[2] = LSNGetDatum(PageGetLSN(page)); UnlockReleaseBuffer(buf); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5dd4bbc1c20..021d4e0ac69 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1050,8 +1050,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, RemoveSubscriptionRel(sub->oid, relid); /* - * XXX Currently there is no sequencesync worker, so we only - * stop tablesync workers. + * A single sequencesync worker synchronizes all sequences, so + * only stop workers when relation kind is not sequence. */ if (relkind != RELKIND_SEQUENCE) { @@ -1062,7 +1062,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub_remove_rels = lappend(sub_remove_rels, rel); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop(sub->oid, relid, WORKERTYPE_TABLESYNC); /* * For READY state, we would have already dropped the @@ -2090,7 +2090,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->subid, w->relid, w->type); } list_free(subworkers); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 1ad65c237c3..142a02eb5e9 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,7 +131,10 @@ static const struct "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, { - "TablesyncWorkerMain", TablesyncWorkerMain + "TableSyncWorkerMain", TableSyncWorkerMain + }, + { + "SequenceSyncWorkerMain", SequenceSyncWorkerMain } }; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c62c8c67521..c719af1f8a9 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -26,6 +26,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ syncutils.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 218cefe86e2..f6a1c85fdb0 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -246,19 +246,23 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given - * subscription id and relid. + * subscription id, relid and type. * - * We are only interested in the leader apply worker or table sync worker. + * For both apply workers and sequence sync workers, the relid should be set to + * InvalidOid, as they manage changes across all tables and sequences. For table + * sync workers, the relid should be set to the OID of the relation being + * synchronized. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype, + bool only_running) { int i; LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - /* Search for attached worker for a given subscription id. */ + /* Search for the attached worker matching the specified criteria. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; @@ -268,7 +272,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) continue; if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + w->type == wtype && (!only_running || w->proc)) { res = w; break; @@ -329,6 +333,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -417,7 +422,8 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -506,8 +512,16 @@ retry: memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -630,13 +644,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * Stop the logical replication worker for subid/relid, if any. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(subid, relid, wtype, false); if (worker) { @@ -703,7 +717,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(subid, relid, WORKERTYPE_APPLY, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -835,6 +849,25 @@ logicalrep_launcher_onexit(int code, Datum arg) LogicalRepCtx->launcher_pid = 0; } +/* + * Reset the last_seqsync_start_time of the sequencesync worker in the + * subscription's apply worker. + */ +void +logicalrep_reset_seqsync_start_time(void) +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, + WORKERTYPE_APPLY, true); + if (worker) + worker->last_seqsync_start_time = 0; + + LWLockRelease(LogicalRepWorkerLock); +} + /* * Cleanup function. * @@ -883,7 +916,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w))) res++; } @@ -1260,7 +1293,8 @@ ApplyLauncherMain(Datum main_arg) continue; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); + w = logicalrep_worker_find(sub->oid, InvalidOid, WORKERTYPE_APPLY, + false); if (w != NULL) { @@ -1596,7 +1630,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1636,6 +1670,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_PARALLEL_APPLY: values[9] = CStringGetTextDatum("parallel apply"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 9283e996ef4..a2268d8361e 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -12,6 +12,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'syncutils.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 00000000000..7bfcc08a6b1 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,759 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: sequence synchronization + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + * Sequences requiring synchronization are tracked in the pg_subscription_rel + * catalog. + * + * Sequences to be synchronized will be added with state INIT when either of + * the following commands is executed: + * CREATE SUBSCRIPTION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * + * Executing the following command resets all sequences in the subscription to + * state DATASYNC, triggering re-synchronization: + * ALTER SUBSCRIPTION ... REFRESH SEQUENCES + * + * The apply worker periodically scans pg_subscription_rel for sequences in + * INIT or DATASYNC state. When such sequences are found, it spawns a + * sequencesync worker to handle synchronization. + * + * The sequencesync worker is responsible for synchronizing sequences marked in + * pg_subscription_rel. It begins by retrieving the list of sequences flagged + * for synchronization. These sequences are then processed in batches, allowing + * multiple entries to be synchronized within a single transaction. The worker + * fetches the current sequence values and page LSNs from the remote publisher, + * updates the corresponding sequences on the local subscriber, and finally + * marks each sequence as READY upon successful synchronization. + * + * Sequence state transitions follow this pattern: + * INIT / DATASYNC → READY + * + * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH + * (100) sequences are synchronized per transaction. The locks on the sequence + * relation will be periodically released at each transaction commit. + * + * XXX: An alternative design was considered where the launcher process would + * periodically check for sequences that need syncing and then start the + * sequencesync worker. However, the approach of having the apply worker + * manage the sequencesync worker was chosen for the following reasons: + * a) It avoids overloading the launcher, which handles various other + * subscription requests. + * b) It offers a more straightforward path for extending support for + * incremental sequence synchronization. + * c) It utilizes the existing tablesync worker code to start the sequencesync + * process, thus preventing code duplication in the launcher. + * d) It simplifies code maintenance by consolidating changes to a single + * location rather than multiple components. + * e) The apply worker can access the sequences that need to be synchronized + * from the pg_subscription_rel system catalog. Whereas the launcher process + * operates without direct database access so would need a framework to + * establish connections with the databases to retrieve the sequences for + * synchronization. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_sequence.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "common/hashfn.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/catcache.h" +#include "utils/fmgroids.h" +#include "utils/guc.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/rls.h" +#include "utils/syscache.h" +#include "utils/usercontext.h" + +#define REMOTE_SEQ_COL_COUNT 11 + +static HTAB *sequences_to_copy = NULL; + +/* + * Handle sequence synchronization cooperation from the apply worker. + * + * Start a sequencesync worker if one is not already running. The active + * sequencesync worker will handle all pending sequence synchronization. If any + * sequences remain unsynchronized after it exits, a new worker can be started + * in the next iteration. + */ +void +ProcessSyncingSequencesForApply(void) +{ + LogicalRepWorker *sequencesync_worker; + int nsyncworkers; + bool has_pending_sequences; + bool started_tx; + + FetchRelationStates(&has_pending_sequences, &started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + if (!has_pending_sequences) + return; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if there is a sequencesync worker already running? */ + sequencesync_worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, + WORKERTYPE_SEQUENCESYNC, + true); + if (sequencesync_worker) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * Count running sync workers for this subscription, while we have the + * lock. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + LWLockRelease(LogicalRepWorkerLock); + + launch_sync_worker(nsyncworkers, InvalidOid, + &MyLogicalRepWorker->last_seqsync_start_time); +} + +/* + * report_error_sequences + * + * Reports discrepancies in sequence data between the publisher and subscriber. + * It identifies sequences that do not have sufficient privileges, as well as + * sequences that exist on both sides but have mismatched values. + */ +static void +report_error_sequences(StringInfo insuffperm_seqs, StringInfo mismatched_seqs) +{ + StringInfo combined_error_detail = makeStringInfo(); + StringInfo combined_error_hint = makeStringInfo(); + + if (insuffperm_seqs->len) + { + appendStringInfo(combined_error_detail, "Insufficient permission for sequence(s): (%s).", + insuffperm_seqs->data); + appendStringInfoString(combined_error_hint, "Grant permissions for the sequence(s)."); + } + + if (mismatched_seqs->len) + { + if (insuffperm_seqs->len) + { + appendStringInfo(combined_error_detail, "; mismatched sequence(s) on subscriber: (%s).", + mismatched_seqs->data); + appendStringInfoString(combined_error_hint, " For mismatched sequences, alter or re-create local sequences to have matching parameters as publishers."); + } + else + { + appendStringInfo(combined_error_detail, "Mismatched sequence(s) on subscriber: (%s).", + mismatched_seqs->data); + appendStringInfoString(combined_error_hint, "For mismatched sequences, alter or re-create local sequences to have matching parameters as publishers"); + } + } + + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication sequence synchronization failed for subscription \"%s\"", MySubscription->name), + errdetail("%s", combined_error_detail->data), + errhint("%s", combined_error_hint->data)); +} + +/* + * Appends a qualified sequence name to a StringInfo buffer. Optionally + * increments a counter if provided. Used to build comma-separated lists of + * sequences. + */ +static void +append_sequence_name(StringInfo buf, const char *nspname, const char *seqname, + int *count) +{ + if (buf->len > 0) + appendStringInfoString(buf, ", "); + + appendStringInfo(buf, "\"%s.%s\"", nspname, seqname); + + if (count) + (*count)++; +} + + +/* + * Copy existing data of sequence from the publisher. + * + * Fetch the sequence value from the publisher and set the subscriber sequence + * with the same value. + */ +static void +copy_sequence(TupleTableSlot *slot, LogicalRepSequenceInfo *seqinfo, + StringInfo mismatched_seqs, StringInfo insuffperm_seqs, + int *succeeded_count, int *mismatched_count, int *skipped_count, + int *insuffperm_count) +{ + int col = 0; + bool isnull; + char *nspname; + char *seqname; + int64 last_value; + bool is_called; + XLogRecPtr page_lsn; + Oid seqtypid; + int64 seqstart; + int64 seqmin; + int64 seqmax; + int64 seqincrement; + bool seqcycle; + HeapTuple tup; + Relation sequence_rel; + Form_pg_sequence seqform; + UserContext ucxt; + AclResult aclresult; + bool run_as_owner = MySubscription->runasowner; + + CHECK_FOR_INTERRUPTS(); + + /* Get sequence information from the fetched tuple */ + nspname = TextDatumGetCString(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqname = TextDatumGetCString(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqtypid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqstart = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqincrement = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqmin = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqmax = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqcycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Sanity check */ + Assert(col == REMOTE_SEQ_COL_COUNT); + + /* Get the local sequence object */ + sequence_rel = try_table_open(seqinfo->localrelid, RowExclusiveLock); + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo->localrelid)); + if (!sequence_rel || !HeapTupleIsValid(tup)) + { + (*skipped_count)++; + elog(LOG, "skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently", + nspname, seqname); + return; + } + + /* Skip if the entry is no longer valid */ + if (!seqinfo->entry_valid) + { + ReleaseSysCache(tup); + table_close(sequence_rel, RowExclusiveLock); + (*skipped_count)++; + ereport(LOG, errmsg("skip synchronization of sequence \"%s.%s\" because it has been altered concurrently", + nspname, seqname)); + return; + } + + seqform = (Form_pg_sequence) GETSTRUCT(tup); + + /* Update the sequence only if the parameters are identical */ + if (seqform->seqtypid == seqtypid && + seqform->seqmin == seqmin && seqform->seqmax == seqmax && + seqform->seqcycle == seqcycle && + seqform->seqstart == seqstart && + seqform->seqincrement == seqincrement) + { + if (!run_as_owner) + SwitchToUntrustedUser(seqinfo->seqowner, &ucxt); + + /* Check for sufficient permissions */ + aclresult = pg_class_aclcheck(seqinfo->localrelid, GetUserId(), ACL_UPDATE); + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + if (aclresult != ACLCHECK_OK) + { + append_sequence_name(insuffperm_seqs, nspname, seqname, + insuffperm_count); + ReleaseSysCache(tup); + table_close(sequence_rel, RowExclusiveLock); + return; + } + + SetSequence(seqinfo->localrelid, last_value, is_called); + (*succeeded_count)++; + + ereport(DEBUG1, + errmsg_internal("logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", + MySubscription->name, nspname, seqname)); + + UpdateSubscriptionRelState(MySubscription->oid, seqinfo->localrelid, + SUBREL_STATE_READY, page_lsn, false); + } + else + append_sequence_name(mismatched_seqs, nspname, seqname, + mismatched_count); + + ReleaseSysCache(tup); + table_close(sequence_rel, NoLock); +} + +/* + * Copy existing data of sequences from the publisher. Caller is responsible + * for locking the local relation. + */ +static void +copy_sequences(WalReceiverConn *conn, Oid subid) +{ + int total_seqs = hash_get_num_entries(sequences_to_copy); + int current_index = 0; + StringInfo mismatched_seqs = makeStringInfo(); + StringInfo missing_seqs = makeStringInfo(); + StringInfo insuffperm_seqs = makeStringInfo(); + HASH_SEQ_STATUS status; + LogicalRepSequenceInfo *entry; + +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + ereport(LOG, + errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", + MySubscription->name, total_seqs)); + + while (current_index < total_seqs) + { + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {TEXTOID, TEXTOID, INT8OID, + BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; + + StringInfo seqstr = makeStringInfo(); + StringInfo cmd = makeStringInfo(); + int batch_size = 0; + int batch_succeeded_count = 0; + int batch_mismatched_count = 0; + int batch_skipped_count = 0; + int batch_insuffperm_count = 0; + + WalRcvExecResult *res; + TupleTableSlot *slot; + + StartTransactionCommand(); + hash_seq_init(&status, sequences_to_copy); + + /* Collect a batch of sequences */ + while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) + { + if (entry->remote_seq_queried) + continue; + + if (seqstr->len > 0) + appendStringInfoString(seqstr, ", "); + + appendStringInfo(seqstr, "(\'%s\', \'%s\')", entry->nspname, entry->seqname); + entry->remote_seq_queried = true; + + batch_size++; + if (batch_size == MAX_SEQUENCES_SYNC_PER_BATCH || + (current_index + batch_size == total_seqs)) + break; + } + + hash_seq_term(&status); + + appendStringInfo(cmd, + "SELECT s.schname, s.seqname, ps.*, seq.seqtypid,\n" + " seq.seqstart, seq.seqincrement, seq.seqmin,\n" + " seq.seqmax, seq.seqcycle\n" + "FROM ( VALUES %s ) AS s (schname, seqname)\n" + "JOIN pg_namespace n ON n.nspname = s.schname\n" + "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" + "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" + "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n" + "ORDER BY s.schname, s.seqname\n", + seqstr->data); + + res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of sequence information from the publisher: %s", + res->err)); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + LogicalRepSequenceInfo *seqinfo; + LogicalRepSeqHashKey key; + bool isnull; + bool found; + + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + key.nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + key.seqname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + seqinfo = hash_search(sequences_to_copy, &key, HASH_FIND, &found); + Assert(seqinfo); + + copy_sequence(slot, seqinfo, mismatched_seqs, + insuffperm_seqs, &batch_succeeded_count, + &batch_mismatched_count, &batch_skipped_count, + &batch_insuffperm_count); + + /* Remove successfully processed sequence */ + if (!hash_search(sequences_to_copy, &key, HASH_REMOVE, NULL)) + elog(ERROR, "hash table corrupted"); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + destroyStringInfo(seqstr); + destroyStringInfo(cmd); + + ereport(LOG, + errmsg("logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d skipped, %d mismatched, %d insufficient permission, %d missing, ", + MySubscription->name, (current_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, + batch_succeeded_count, batch_skipped_count, batch_mismatched_count, batch_insuffperm_count, + batch_size - (batch_succeeded_count + batch_skipped_count + batch_mismatched_count + batch_insuffperm_count))); + + /* Commit this batch, and prepare for next batch */ + CommitTransactionCommand(); + + /* + * current_indexes is not incremented sequentially because some + * sequences may be missing, and the number of fetched rows may not + * match the batch size. The `hash_search` with HASH_REMOVE takes care + * of the count. + */ + current_index += batch_size; + } + + /* + * Any sequences remaining in the hash table were not found on the + * publisher. This is because they were included in a query + * (remote_seq_queried) but were not returned in the result set. + */ + hash_seq_init(&status, sequences_to_copy); + while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) + { + Assert(entry->remote_seq_queried); + append_sequence_name(missing_seqs, entry->nspname, entry->seqname, NULL); + } + + /* Log missing sequences if any */ + if (missing_seqs->len) + ereport(LOG, + errmsg_internal("sequences not found on publisher removed from resynchronization: (%s)", + missing_seqs->data)); + + /* Report errors if mismatches or permission issues occurred */ + if (insuffperm_seqs->len || mismatched_seqs->len) + report_error_sequences(insuffperm_seqs, mismatched_seqs); + + destroyStringInfo(missing_seqs); + destroyStringInfo(mismatched_seqs); + destroyStringInfo(insuffperm_seqs); +} + +/* + * Relcache invalidation callback + */ +static void +sequencesync_list_invalidate_cb(Datum arg, Oid reloid) +{ + HASH_SEQ_STATUS status; + LogicalRepSequenceInfo *entry; + + /* Quick exit if no sequence is listed yet */ + if (hash_get_num_entries(sequences_to_copy) == 0) + return; + + if (reloid != InvalidOid) + { + hash_seq_init(&status, sequences_to_copy); + + while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) + { + if (entry->localrelid == reloid) + { + entry->entry_valid = false; + hash_seq_term(&status); + break; + } + } + } + else + { + /* invalidate all entries */ + hash_seq_init(&status, sequences_to_copy); + while ((entry = (LogicalRepSequenceInfo *) hash_seq_search(&status)) != NULL) + entry->entry_valid = false; + } +} + +static uint32 +LogicalRepSeqHash(const void *key, Size keysize) +{ + const LogicalRepSeqHashKey *k = (const LogicalRepSeqHashKey *) key; + uint32 h1 = string_hash(k->nspname, strlen(k->nspname)); + uint32 h2 = string_hash(k->seqname, strlen(k->seqname)); + + return h1 ^ h2; + /* XOR combine */ +} + +static int +LogicalRepSeqMatchFunc(const void *key1, const void *key2, Size keysize) +{ + int cmp; + const LogicalRepSeqHashKey *k1 = (const LogicalRepSeqHashKey *) key1; + const LogicalRepSeqHashKey *k2 = (const LogicalRepSeqHashKey *) key2; + + /* Compare by namespace name first */ + cmp = strcmp(k1->nspname, k2->nspname); + if (cmp != 0) + return cmp; + + /* If namespace names are equal, compare by sequence name */ + return strcmp(k1->seqname, k2->seqname); +} + +/* + * Start syncing the sequences in the sequencesync worker. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + Relation rel; + HeapTuple tup; + ScanKeyData skey[2]; + SysScanDesc scan; + Oid subid = MyLogicalRepWorker->subid; + StringInfoData app_name; + HASHCTL ctl; + bool found; + HASH_SEQ_STATUS hash_seq; + LogicalRepSequenceInfo *seq_entry; + + ctl.keysize = sizeof(LogicalRepSeqHashKey); + ctl.entrysize = sizeof(LogicalRepSequenceInfo); + ctl.hcxt = CacheMemoryContext; + ctl.hash = LogicalRepSeqHash; + ctl.match = LogicalRepSeqMatchFunc; + sequences_to_copy = hash_create("Logical replication sequences", 256, &ctl, + HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); + + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(sequencesync_list_invalidate_cb, + (Datum) 0); + + StartTransactionCommand(); + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[1], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHARNE, + CharGetDatum(SUBREL_STATE_READY)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 2, skey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + char relkind; + Relation sequence_rel; + LogicalRepSeqHashKey key; + MemoryContext oldctx; + + CHECK_FOR_INTERRUPTS(); + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + /* Skip if the relation is not a sequence */ + relkind = get_rel_relkind(subrel->srrelid); + if (relkind != RELKIND_SEQUENCE) + continue; + + /* Skip if sequence was dropped concurrently */ + sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock); + if (!sequence_rel) + continue; + + key.seqname = RelationGetRelationName(sequence_rel); + key.nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + seq_entry = hash_search(sequences_to_copy, &key, HASH_ENTER, &found); + Assert(!found); + + memset(seq_entry, 0, sizeof(LogicalRepSequenceInfo)); + + seq_entry->seqname = pstrdup(key.seqname); + seq_entry->nspname = pstrdup(key.nspname); + seq_entry->localrelid = subrel->srrelid; + seq_entry->remote_seq_queried = false; + seq_entry->seqowner = sequence_rel->rd_rel->relowner; + seq_entry->entry_valid = true; + + MemoryContextSwitchTo(oldctx); + + table_close(sequence_rel, RowExclusiveLock); + } + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + CommitTransactionCommand(); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + initStringInfo(&app_name); + appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT, + MySubscription->oid, GetSystemIdentifier()); + + /* + * Establish the connection to the publisher for sequence synchronization. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + app_name.data, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s", + MySubscription->name, err)); + + pfree(app_name.data); + + /* If there are any sequences that need to be copied */ + if (hash_get_num_entries(sequences_to_copy)) + { + copy_sequences(LogRepWorkerWalRcvConn, subid); + + hash_seq_init(&hash_seq, sequences_to_copy); + while ((seq_entry = hash_seq_search(&hash_seq)) != NULL) + { + pfree(seq_entry->seqname); + pfree(seq_entry->nspname); + } + } + + hash_destroy(sequences_to_copy); + sequences_to_copy = NULL; +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_SEQUENCESYNC); + + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequencesync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + FinishSyncWorker(WORKERTYPE_SEQUENCESYNC); +} diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index 46d88838894..19b875427ca 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -16,6 +16,7 @@ #include "catalog/pg_subscription_rel.h" #include "pgstat.h" +#include "replication/logicallauncher.h" #include "replication/worker_internal.h" #include "storage/ipc.h" #include "utils/lsyscache.h" @@ -46,8 +47,10 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE * Exit routine for synchronization worker. */ pg_noreturn void -FinishSyncWorker(void) +FinishSyncWorker(LogicalRepWorkerType wtype) { + Assert(wtype == WORKERTYPE_TABLESYNC || wtype == WORKERTYPE_SEQUENCESYNC); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -62,14 +65,26 @@ FinishSyncWorker(void) XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + if (wtype == WORKERTYPE_TABLESYNC) + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + else + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + /* + * This is a clean exit of the sequencesync worker; reset the + * last_seqsync_start_time. + */ + if (wtype == WORKERTYPE_SEQUENCESYNC) + logicalrep_reset_seqsync_start_time(); + else + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); /* Stop gracefully */ proc_exit(0); @@ -85,7 +100,48 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Process possible state change(s) of relations that are being synchronized. + * Attempt to launch a sync worker (sequence or table) if there is a sync + * worker slot available and the retry interval has elapsed. + * + * nsyncworkers: Number of currently running sync workers for the subscription. + * relid: InvalidOid for sequence sync worker, actual relid for table sync + * worker. + * last_start_time: Pointer to the last start time of the worker. + */ +void +launch_sync_worker(int nsyncworkers, Oid relid, TimestampTz *last_start_time) +{ + /* If there is a free sync worker slot, start a new sync worker */ + if (nsyncworkers < max_sync_workers_per_subscription) + { + TimestampTz now = GetCurrentTimestamp(); + + if (!(*last_start_time) || + TimestampDifferenceExceeds(*last_start_time, now, + wal_retrieve_retry_interval)) + { + /* + * Set the last_start_time even if we fail to start the worker, so + * that we won't retry until wal_retrieve_retry_interval has + * elapsed. + */ + *last_start_time = now; + (void) logicalrep_worker_launch((relid == InvalidOid) ? WORKERTYPE_SEQUENCESYNC : WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + relid, + DSM_HANDLE_INVALID, + false); + } + } +} + +/* + * Process possible state change(s) of relations that are being synchronized + * and start new tablesync workers for the newly added tables. Also, start a + * new sequencesync worker for the newly added sequences. */ void ProcessSyncingRelations(XLogRecPtr current_lsn) @@ -107,6 +163,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) case WORKERTYPE_APPLY: ProcessSyncingTablesForApply(current_lsn); + ProcessSyncingSequencesForApply(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "Sequence synchronization worker not expected to process relations"); break; case WORKERTYPE_UNKNOWN: @@ -116,18 +178,26 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) } /* - * Common code to fetch the up-to-date sync state info into the static lists. + * Common code to fetch the up-to-date sync state info for tables and sequences. * - * Returns true if subscription has 1 or more tables, else false. + * The pg_subscription_rel catalog is shared by tables and sequences. Changes + * to either sequences or tables can affect the validity of relation states, so + * we identify non-ready tables and non-ready sequences together to ensure + * consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * Returns true if subscription has 1 or more tables, else false. */ bool -FetchRelationStates(bool *started_tx) +FetchRelationStates(bool *has_pending_sequences, bool *started_tx) { + /* + * has_subtables and has_subsequences is declared as static, since the + * same value can be used until the system table is invalidated. + */ static bool has_subtables = false; + static bool has_subsequences_non_ready = false; + *has_pending_sequences = false; *started_tx = false; if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) @@ -138,6 +208,7 @@ FetchRelationStates(bool *started_tx) SubscriptionRelState *rstate; relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + has_subsequences_non_ready = false; /* Clean the old lists. */ list_free_deep(table_states_not_ready); @@ -159,7 +230,12 @@ FetchRelationStates(bool *started_tx) { rstate = palloc(sizeof(SubscriptionRelState)); memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); + + if (get_rel_relkind(rstate->relid) == RELKIND_SEQUENCE) + has_subsequences_non_ready = true; + else + table_states_not_ready = lappend(table_states_not_ready, + rstate); } MemoryContextSwitchTo(oldctx); @@ -184,5 +260,7 @@ FetchRelationStates(bool *started_tx) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } + *has_pending_sequences = has_subsequences_non_ready; + return has_subtables; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 40e1ed3c20e..8543d6c279d 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -161,7 +161,7 @@ wait_for_table_state_change(Oid relid, char expected_state) /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, - false); + WORKERTYPE_TABLESYNC, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) break; @@ -208,7 +208,7 @@ wait_for_worker_state_change(char expected_state) */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + InvalidOid, WORKERTYPE_APPLY, false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); @@ -335,7 +335,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - FinishSyncWorker(); + FinishSyncWorker(WORKERTYPE_TABLESYNC); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -375,11 +375,12 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) bool started_tx = false; bool should_exit = false; Relation rel = NULL; + bool has_pending_sequences; Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchRelationStates(&started_tx); + FetchRelationStates(&has_pending_sequences, &started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -413,6 +414,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE); + if (rstate->state == SUBREL_STATE_SYNCDONE) { /* @@ -426,11 +435,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -477,8 +481,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, - rstate->relid, false); - + rstate->relid, + WORKERTYPE_TABLESYNC, true); if (syncworker) { /* Found one, update our copy of its state */ @@ -549,43 +553,19 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ int nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + struct tablesync_start_time_mapping *hentry; + bool found; /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); - /* - * If there are free sync worker slot(s), start a new sync - * worker for the table. - */ - if (nsyncworkers < max_sync_workers_per_subscription) - { - TimestampTz now = GetCurrentTimestamp(); - struct tablesync_start_time_mapping *hentry; - bool found; + hentry = hash_search(last_start_times, &rstate->relid, + HASH_ENTER, &found); + if (!found) + hentry->last_start_time = 0; - hentry = hash_search(last_start_times, &rstate->relid, - HASH_ENTER, &found); - - if (!found || - TimestampDifferenceExceeds(hentry->last_start_time, now, - wal_retrieve_retry_interval)) - { - /* - * Set the last_start_time even if we fail to start - * the worker, so that we won't retry until - * wal_retrieve_retry_interval has elapsed. - */ - hentry->last_start_time = now; - (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC, - MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid, - DSM_HANDLE_INVALID, - false); - } - } + launch_sync_worker(nsyncworkers, rstate->relid, + &hentry->last_start_time); } } } @@ -1273,7 +1253,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - FinishSyncWorker(); /* doesn't return */ + FinishSyncWorker(WORKERTYPE_TABLESYNC); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1548,7 +1528,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_TABLESYNC); PG_RE_THROW(); } @@ -1593,7 +1574,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); @@ -1601,7 +1582,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - FinishSyncWorker(); + FinishSyncWorker(WORKERTYPE_TABLESYNC); } /* @@ -1616,10 +1597,11 @@ bool AllTablesyncsReady(void) { bool started_tx = false; - bool has_subrels = false; + bool has_tables; + bool has_pending_sequences; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchRelationStates(&started_tx); + has_tables = FetchRelationStates(&has_pending_sequences, &started_tx); if (started_tx) { @@ -1631,7 +1613,7 @@ AllTablesyncsReady(void) * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ - return has_subrels && (table_states_not_ready == NIL); + return has_tables && (table_states_not_ready == NIL); } /* @@ -1647,9 +1629,10 @@ HasSubscriptionTablesCached(void) { bool started_tx; bool has_subrels; + bool has_pending_sequences; /* We need up-to-date subscription tables info here */ - has_subrels = FetchRelationStates(&started_tx); + has_subrels = FetchRelationStates(&has_pending_sequences, &started_tx); if (started_tx) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d986ba2ea50..bae32c5645c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "Sequence synchronization worker not expected to apply changes"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1242,7 +1247,10 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any relations that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1364,7 +1372,10 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any relations that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -1420,7 +1431,10 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any relations that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1486,7 +1500,10 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any relations that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1621,7 +1638,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any relations that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -2463,7 +2483,10 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any relations that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -3284,7 +3307,7 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); leader = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + InvalidOid, WORKERTYPE_APPLY, false); if (!leader) { ereport(ERROR, @@ -4134,7 +4157,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) AcceptInvalidationMessages(); maybe_reread_subscription(); - /* Process any table synchronization changes. */ + /* + * Process any relations that are being synchronized in parallel and + * any newly added relations. + */ ProcessSyncingRelations(last_received); } @@ -5577,7 +5603,8 @@ start_apply(XLogRecPtr origin_startpos) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_APPLY); PG_RE_THROW(); } @@ -5697,8 +5724,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -5809,6 +5836,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_sequencesync_worker()) + ereport(LOG, + (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -5828,14 +5859,16 @@ replorigin_reset(int code, Datum arg) replorigin_session_origin_timestamp = 0; } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync and sequencesync worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -5906,6 +5939,10 @@ ApplyWorkerMain(Datum main_arg) void DisableSubscriptionAndExit(void) { + LogicalRepWorkerType wtype = am_tablesync_worker() ? WORKERTYPE_TABLESYNC : + (am_sequencesync_worker()) ? WORKERTYPE_SEQUENCESYNC : + WORKERTYPE_APPLY; + /* * Emit the error message, and recover from the error state to an idle * state @@ -5918,9 +5955,11 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); + /* + * Report the worker failed during either sequence synchronization or + * table synchronization or apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, wtype); /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index f9a1c831a07..002d630d4ae 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "replication/worker_internal.h" #include "utils/pgstat_internal.h" @@ -24,7 +25,7 @@ * Report a subscription error. */ void -pgstat_report_subscription_error(Oid subid, bool is_apply_error) +pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) { PgStat_EntryRef *entry_ref; PgStat_BackendSubEntry *pending; @@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) InvalidOid, subid, NULL); pending = entry_ref->pending; - if (is_apply_error) - pending->apply_error_count++; - else - pending->sync_error_count++; + switch (wtype) + { + case WORKERTYPE_APPLY: + pending->apply_error_count++; + break; + + case WORKERTYPE_SEQUENCESYNC: + pending->sequence_sync_error_count++; + break; + + case WORKERTYPE_TABLESYNC: + pending->sync_error_count++; + break; + + default: + /* Should never happen. */ + Assert(0); + break; + } } /* @@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); + SUB_ACC(sequence_sync_error_count); SUB_ACC(sync_error_count); for (int i = 0; i < CONFLICT_NUM_TYPES; i++) SUB_ACC(conflict_count[i]); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 1fe33df2756..4da7298502e 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2195,7 +2195,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2213,25 +2213,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sequence_sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2248,6 +2250,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* apply_error_count */ values[i++] = Int64GetDatum(subentry->apply_error_count); + /* sequence_sync_error_count */ + values[i++] = Int64GetDatum(subentry->sequence_sync_error_count); + /* sync_error_count */ values[i++] = Int64GetDatum(subentry->sync_error_count); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index d6fc8333850..0b49b98da99 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1924,7 +1924,7 @@ }, { name => 'max_sync_workers_per_subscription', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_SUBSCRIBERS', - short_desc => 'Maximum number of table synchronization workers per subscription.', + short_desc => 'Maximum number of workers per subscription for synchronizing tables and sequences.', variable => 'max_sync_workers_per_subscription', boot_val => '2', min => '0', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b51d2b17379..8a2e1d1158a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3433,7 +3433,7 @@ proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u', prorettype => 'int8', proargtypes => 'regclass', prosrc => 'pg_sequence_last_value' }, -{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump', +{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization', proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u', prorettype => 'record', proargtypes => 'regclass', proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}', @@ -5704,9 +5704,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sequence_sync_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 49deec052c6..88772a22b80 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -82,6 +82,22 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; +typedef struct LogicalRepSeqHashKey +{ + const char *seqname; + const char *nspname; +} LogicalRepSeqHashKey; + +typedef struct LogicalRepSequenceInfo +{ + char *seqname; + char *nspname; + Oid localrelid; + bool remote_seq_queried; + Oid seqowner; + bool entry_valid; +} LogicalRepSequenceInfo; + extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9ac0b67683d..bcea652ef61 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -45,6 +45,8 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; /* XLOG stuff */ #define XLOG_SEQ_LOG 0x00 +#define SEQ_LOG_CNT_INVALID 0 + typedef struct xl_seq_rec { RelFileLocator locator; @@ -60,6 +62,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid relid, int64 next, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index bc8077cbae6..2db16bd7f84 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -16,6 +16,7 @@ #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ #include "replication/conflict.h" +#include "replication/worker_internal.h" #include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/pgstat_kind.h" @@ -108,6 +109,7 @@ typedef struct PgStat_FunctionCallUsage typedef struct PgStat_BackendSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter sequence_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; } PgStat_BackendSubEntry; @@ -416,6 +418,7 @@ typedef struct PgStat_SLRUStats typedef struct PgStat_StatSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter sequence_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; TimestampTz stat_reset_timestamp; @@ -768,7 +771,8 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); * Functions in pgstat_subscription.c */ -extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); +extern void pgstat_report_subscription_error(Oid subid, + LogicalRepWorkerType wtype); extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d..56fa79b648e 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index ae352f6e691..a7c6588999f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -106,6 +107,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz last_seqsync_start_time; } LogicalRepWorker; /* @@ -255,6 +258,7 @@ extern PGDLLIMPORT List *table_states_not_ready; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, + LogicalRepWorkerType wtype, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock); @@ -263,12 +267,16 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples); -extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void launch_sync_worker(int nsyncworkers, Oid relid, + TimestampTz *last_start_time); +extern void logicalrep_worker_stop(Oid subid, Oid relid, + LogicalRepWorkerType wtype); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); +extern void logicalrep_reset_seqsync_start_time(void); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); @@ -279,11 +287,12 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSyncingSequencesForApply(void); -pg_noreturn extern void FinishSyncWorker(void); +pg_noreturn extern void FinishSyncWorker(LogicalRepWorkerType wtype); extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); -extern bool FetchRelationStates(bool *started_tx); +extern bool FetchRelationStates(bool *has_pending_sequences, bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); @@ -348,15 +357,25 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +#define isApplyWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_APPLY) #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 16753b2e4c0..c7bcc922ae8 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2191,6 +2191,7 @@ pg_stat_subscription| SELECT su.oid AS subid, pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, + ss.sequence_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, @@ -2202,7 +2203,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sequence_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 20b4e523d93..85d10a89994 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -45,6 +45,7 @@ tests += { 't/033_run_as_table_owner.pl', 't/034_temporal.pl', 't/035_conflicts.pl', + 't/036_sequences.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index 00a1c2fcd48..3c0b1db0510 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -21,7 +21,8 @@ $node_subscriber->start; sub create_sub_pub_w_errors { - my ($node_publisher, $node_subscriber, $db, $table_name) = @_; + my ($node_publisher, $node_subscriber, $db, $table_name, $sequence_name) + = @_; # Initial table setup on both publisher and subscriber. On subscriber we # create the same tables but with primary keys. Also, insert some data that # will conflict with the data replicated from publisher later. @@ -32,6 +33,7 @@ sub create_sub_pub_w_errors CREATE TABLE $table_name(a int); ALTER TABLE $table_name REPLICA IDENTITY FULL; INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name; COMMIT; ]); $node_subscriber->safe_psql( @@ -40,45 +42,56 @@ sub create_sub_pub_w_errors BEGIN; CREATE TABLE $table_name(a int primary key); INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name INCREMENT BY 10; COMMIT; ]); # Set up publication. my $pub_name = $table_name . '_pub'; + my $pub_seq_name = $sequence_name . '_pub'; my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db); - $node_publisher->safe_psql($db, - qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name)); + $node_publisher->safe_psql( + $db, + qq[ + CREATE PUBLICATION $pub_name FOR TABLE $table_name; + CREATE PUBLICATION $pub_seq_name FOR ALL SEQUENCES; + ]); # Create subscription. The tablesync for table on subscription will enter into - # infinite error loop due to violating the unique constraint. + # infinite error loop due to violating the unique constraint. The sequencesync + # will also fail due to different sequence increment values on publisher and + # subscriber. my $sub_name = $table_name . '_sub'; $node_subscriber->safe_psql($db, - qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name) + qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name, $pub_seq_name) ); $node_publisher->wait_for_catchup($sub_name); - # Wait for the tablesync error to be reported. + # Wait for the tablesync and sequencesync error to be reported. $node_subscriber->poll_query_until( $db, qq[ - SELECT sync_error_count > 0 - FROM pg_stat_subscription_stats - WHERE subname = '$sub_name' + SELECT count(1) = 1 FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' and sync_error_count > 0 and sequence_sync_error_count > 0 ]) or die qq(Timed out while waiting for tablesync errors for subscription '$sub_name'); + # Change the sequence start value on the subscriber so that it doesn't error out. + $node_subscriber->safe_psql($db, + qq(ALTER SEQUENCE $sequence_name INCREMENT 1)); + # Truncate test_tab1 so that tablesync worker can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name)); - # Wait for initial tablesync to finish. + # Wait for initial sync to finish. $node_subscriber->poll_query_until( $db, qq[ - SELECT count(1) = 1 FROM pg_subscription_rel - WHERE srrelid = '$table_name'::regclass AND srsubstate in ('r', 's') + SELECT count(1) = 2 FROM pg_subscription_rel + WHERE srrelid IN ('$table_name'::regclass, '$sequence_name'::regclass) AND srsubstate in ('r', 's') ]) or die qq(Timed out while waiting for subscriber to synchronize data for table '$table_name'.); @@ -136,22 +149,24 @@ is($result, qq(0), # Create the publication and subscription with sync and apply errors my $table1_name = 'test_tab1'; +my $sequence1_name = 'test_seq1'; my ($pub1_name, $sub1_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table1_name); + $table1_name, $sequence1_name); # Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, sync_error_count > 0, + sequence_sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, stats_reset IS NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), + qq(t|t|t|t|t|t), qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.) ); @@ -165,13 +180,14 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + sequence_sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), + qq(t|t|t|t|t|t), qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) ); @@ -198,22 +214,24 @@ is( $node_subscriber->safe_psql( # Make second subscription and publication my $table2_name = 'test_tab2'; +my $sequence2_name = 'test_seq2'; my ($pub2_name, $sub2_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table2_name); + $table2_name, $sequence2_name); # Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, sync_error_count > 0, + sequence_sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, stats_reset IS NULL FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), + qq(t|t|t|t|t|t), qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.) ); @@ -226,13 +244,14 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + sequence_sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), + qq(t|t|t|t|t|t), qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) ); @@ -240,13 +259,14 @@ is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, sync_error_count = 0, + sequence_sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, stats_reset IS NOT NULL FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), + qq(t|t|t|t|t|t), qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) ); diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl new file mode 100644 index 00000000000..ad96e616c02 --- /dev/null +++ b/src/test/subscription/t/036_sequences.pl @@ -0,0 +1,238 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# This tests that sequences are synced correctly to the subscriber +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); + +# Avoid checkpoint during the test, otherwise, extra values will be fetched for +# the sequences which will cause the test to fail randomly. +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h'); +$node_publisher->start; + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Setup structure on the publisher +my $ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; +); +$node_publisher->safe_psql('postgres', $ddl); + +# Setup the same structure on the subscriber, plus some extra sequences that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; + CREATE SEQUENCE regress_s2; + CREATE SEQUENCE regress_s3; + CREATE SEQUENCE regress_s4 +); +$node_subscriber->safe_psql('postgres', $ddl); + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- generate a number of values using the sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Setup logical replication pub/sub +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_seq_pub FOR ALL SEQUENCES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub" +); + +# Wait for initial sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|0|t', 'initial test data replicated'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new +# sequences of the publisher, but changes to existing sequences should +# not be synced. +########## + +# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s2; + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|31|t', 'Check sequence value in the publisher'); + +# Check - existing sequence is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|0|t', + 'REFRESH PUBLICATION does not sync existing sequence'); + +# Check - newly published sequence is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '100|0|t', + 'REFRESH PUBLICATION will sync newly published sequence'); + +########## +## ALTER SUBSCRIPTION ... REFRESH SEQUENCES should cause sync of new sequences +# of the publisher, and changes to existing sequences should also be synced. +########## + +# Create a new sequence 'regress_s3', and update the existing sequence +# 'regress_s2'. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s3; + INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH SEQUENCES +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequences are synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|0|t', + 'REFRESH SEQUENCES will sync existing sequences'); +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '200|0|t', + 'REFRESH SEQUENCES will sync existing sequences'); + +# Check - newly published sequence is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '1|0|f', + 'REFRESH SEQUENCES will not sync newly published sequence'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = off) should +# not update the sequence values for the new sequence. +########## + +# Create a new sequence 'regress_s4' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4; + INSERT INTO regress_seq_test SELECT nextval('regress_s4') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION with (copy_data = false); +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s4; +)); +is($result, '100|32|t', 'Check sequence value in the publisher'); + +# Check - newly published sequence values are not updated +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s4; +)); +is($result, '1|0|f', + 'REFRESH PUBLICATION will not sync newly published sequence with copy_data as off' +); + +########## +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should throw an error +# for sequence definition not matching between the publisher and the subscriber. +########## + +# Create a new sequence 'regress_s5' whose START value is not the same in the +# publisher and subscriber. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s5 START 1 INCREMENT 2; +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s5 START 10 INCREMENT 2; +)); + +my $log_offset = -s $node_subscriber->logfile; + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION" +); + +# Confirm that the warning for parameters differing is logged. +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"\n.*DETAIL:.* Mismatched sequence\(s\) on subscriber: \("public.regress_s5"\)/, + $log_offset); + +$node_publisher->safe_psql( + 'postgres', qq( + DROP SEQUENCE regress_s5; +)); + +# Confirm that the warning for missing sequence is logged. +$node_subscriber->wait_for_log( + qr/LOG: ? sequences not found on publisher removed from resynchronization: \("public.regress_s5"\)/, + $log_offset); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index fb6a67d94e0..072f39292ec 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1629,6 +1629,8 @@ LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation LogicalRepRollbackPreparedTxnData +LogicalRepSeqHashKey +LogicalRepSequenceInfo LogicalRepStreamAbortData LogicalRepTupleData LogicalRepTyp -- 2.51.0.windows.1