From 1fa69067c1175f985710dfd8987b119f4ecb1351 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 9 Jul 2025 14:58:11 +0530 Subject: [PATCH v20250714 5/6] New worker for sequence synchronization during subscription management This patch introduces sequence synchronization: Sequences have 2 states: - INIT (needs synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. It does the following: a) Retrieves remote values of sequences with pg_sequence_state() INIT. b) Logs a warning if the sequence parameters differ between the publisher and subscriber. c) Sets the local sequence values accordingly. d) Updates the local sequence state to READY. e) Repeats until all done; Commits synchronized sequences in batches of 100 Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - (PG19 command syntax is unchanged) - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker (see above) to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - (PG19 command syntax is unchanged) - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker (see above) to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES - The patch introduces this new command to refresh all sequences - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel - All sequences in pg_subscription_rel are reset to INIT state. - Initiate the sequencesync worker (see above) to synchronize all sequences. --- src/backend/catalog/pg_subscription.c | 2 +- src/backend/catalog/system_views.sql | 1 + src/backend/commands/sequence.c | 26 +- src/backend/commands/subscriptioncmds.c | 4 +- src/backend/postmaster/bgworker.c | 5 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 62 +- src/backend/replication/logical/meson.build | 1 + .../replication/logical/sequencesync.c | 631 ++++++++++++++++++ src/backend/replication/logical/syncutils.c | 70 +- src/backend/replication/logical/tablesync.c | 48 +- src/backend/replication/logical/worker.c | 73 +- .../utils/activity/pgstat_subscription.c | 27 +- src/backend/utils/adt/pgstatfuncs.c | 25 +- src/backend/utils/misc/guc_tables.c | 2 +- src/include/catalog/pg_proc.dat | 6 +- src/include/catalog/pg_subscription_rel.h | 8 + src/include/commands/sequence.h | 3 + src/include/pgstat.h | 6 +- src/include/replication/logicalworker.h | 3 +- src/include/replication/worker_internal.h | 29 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/036_sequences.pl | 239 +++++++ src/tools/pgindent/typedefs.list | 1 + 24 files changed, 1164 insertions(+), 110 deletions(-) create mode 100644 src/backend/replication/logical/sequencesync.c create mode 100644 src/test/subscription/t/036_sequences.pl diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ebd5605afe3..b8f415cd50d 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -337,7 +337,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); if (!HeapTupleIsValid(tup)) - elog(ERROR, "subscription table %u in subscription %u does not exist", + elog(ERROR, "subscription relation %u in subscription %u does not exist", relid, subid); /* Update the tuple. */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 43c3d9c2975..485f6be15b7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1404,6 +1404,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.subid, s.subname, ss.apply_error_count, + ss.sequence_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index d051adf4931..4d03704f39b 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -110,7 +110,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity, Form_pg_sequence_data seqdataform, bool *need_seq_rewrite, List **owned_by); -static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); @@ -941,9 +940,12 @@ lastval(PG_FUNCTION_ARGS) * restore the state of a sequence exactly during data-only restores - * it is the only way to clear the is_called flag in an existing * sequence. + * + * log_cnt is currently used only by the sequence syncworker to set the + * log_cnt for sequences while synchronizing values from the publisher. */ -static void -do_setval(Oid relid, int64 next, bool iscalled) +void +SetSequence(Oid relid, int64 next, int64 log_cnt, bool is_called) { SeqTable elm; Relation seqrel; @@ -994,7 +996,7 @@ do_setval(Oid relid, int64 next, bool iscalled) minv, maxv))); /* Set the currval() state only if iscalled = true */ - if (iscalled) + if (is_called) { elm->last = next; /* last returned number */ elm->last_valid = true; @@ -1011,8 +1013,8 @@ do_setval(Oid relid, int64 next, bool iscalled) START_CRIT_SECTION(); seq->last_value = next; /* last fetched number */ - seq->is_called = iscalled; - seq->log_cnt = 0; + seq->is_called = is_called; + seq->log_cnt = log_cnt; MarkBufferDirty(buf); @@ -1044,7 +1046,7 @@ do_setval(Oid relid, int64 next, bool iscalled) /* * Implement the 2 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval_oid(PG_FUNCTION_ARGS) @@ -1052,14 +1054,14 @@ setval_oid(PG_FUNCTION_ARGS) Oid relid = PG_GETARG_OID(0); int64 next = PG_GETARG_INT64(1); - do_setval(relid, next, true); + SetSequence(relid, next, SEQ_LOG_CNT_INVALID, true); PG_RETURN_INT64(next); } /* * Implement the 3 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval3_oid(PG_FUNCTION_ARGS) @@ -1068,7 +1070,7 @@ setval3_oid(PG_FUNCTION_ARGS) int64 next = PG_GETARG_INT64(1); bool iscalled = PG_GETARG_BOOL(2); - do_setval(relid, next, iscalled); + SetSequence(relid, next, SEQ_LOG_CNT_INVALID, iscalled); PG_RETURN_INT64(next); } @@ -1889,6 +1891,10 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) /* * Return the current on-disk state of the sequence. * + * The page LSN will be used in logical replication of sequences to record the + * LSN of the sequence page in the pg_subscription_rel system catalog. It + * reflects the LSN of the remote sequence at the time it was synchronized. + * * Note: This is roughly equivalent to selecting the data from the sequence, * except that it also returns the page LSN. */ diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index bd575237d5d..fb410c5e503 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1066,7 +1066,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub_remove_rels = lappend(sub_remove_rels, rel); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop(sub->oid, relid, WORKERTYPE_TABLESYNC); /* * For READY state, we would have already dropped the @@ -1870,7 +1870,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->subid, w->relid, w->type); } list_free(subworkers); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 116ddf7b835..81e0e369fb0 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,7 +131,10 @@ static const struct "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, { - "TablesyncWorkerMain", TablesyncWorkerMain + "TableSyncWorkerMain", TableSyncWorkerMain + }, + { + "SequenceSyncWorkerMain", SequenceSyncWorkerMain } }; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c62c8c67521..c719af1f8a9 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -26,6 +26,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ syncutils.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4aed0dfcebb..5df81cbec82 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -239,19 +239,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given - * subscription id and relid. - * - * We are only interested in the leader apply worker or table sync worker. + * subscription id, relid and type. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype, + bool only_running) { int i; LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - /* Search for attached worker for a given subscription id. */ + /* Search for the attached worker matching the specified criteria. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; @@ -261,7 +260,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) continue; if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + w->type == wtype && (!only_running || w->proc)) { res = w; break; @@ -321,6 +320,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -406,7 +406,8 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -492,8 +493,16 @@ retry: memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -616,13 +625,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * Stop the logical replication worker for subid/relid, if any. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(subid, relid, wtype, false); if (worker) { @@ -689,7 +698,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(subid, relid, WORKERTYPE_APPLY, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -819,6 +828,28 @@ logicalrep_launcher_onexit(int code, Datum arg) LogicalRepCtx->launcher_pid = 0; } +/* + * Update the failure time of the sequencesync worker in the subscription's + * apply worker. + * + * This function is invoked when the sequencesync worker exits due to a + * failure. + */ +void +logicalrep_seqsyncworker_failure(int code, Datum arg) +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, + WORKERTYPE_APPLY, true); + if (worker) + worker->sequencesync_failure_time = GetCurrentTimestamp(); + + LWLockRelease(LogicalRepWorkerLock); +} + /* * Cleanup function. * @@ -867,7 +898,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w))) res++; } @@ -1182,7 +1213,7 @@ ApplyLauncherMain(Datum main_arg) continue; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); + w = logicalrep_worker_find(sub->oid, InvalidOid, WORKERTYPE_APPLY, false); LWLockRelease(LogicalRepWorkerLock); if (w != NULL) @@ -1329,7 +1360,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1369,6 +1400,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_PARALLEL_APPLY: values[9] = CStringGetTextDatum("parallel apply"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 9283e996ef4..a2268d8361e 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -12,6 +12,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'syncutils.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 00000000000..aad9bee03d7 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,631 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: sequence synchronization + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + * Sequences to be synchronized by the sequencesync worker will + * be added to pg_subscription_rel in INIT state when one of the following + * commands is executed: + * CREATE SUBSCRIPTION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES + * + * The apply worker will periodically check if there are any sequences in INIT + * state and will start a sequencesync worker if needed. + * + * The sequencesync worker retrieves the sequences to be synchronized from the + * pg_subscription_rel catalog table. It synchronizes multiple sequences per + * single transaction by fetching the sequence value and page LSN from the + * remote publisher and updating them in the local subscriber sequence. After + * synchronization, it sets the sequence state to READY. + * + * So the state progression is always just: INIT -> READY. + * + * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH + * (100) sequences are synchronized per transaction. The locks on the sequence + * relation will be periodically released at each transaction commit. + * + * XXX: An alternative design was considered where the launcher process would + * periodically check for sequences that need syncing and then start the + * sequencesync worker. However, the approach of having the apply worker + * manage the sequencesync worker was chosen for the following reasons: + * a) It avoids overloading the launcher, which handles various other + * subscription requests. + * b) It offers a more straightforward path for extending support for + * incremental sequence synchronization. + * c) It utilizes the existing tablesync worker code to start the sequencesync + * process, thus preventing code duplication in the launcher. + * d) It simplifies code maintenance by consolidating changes to a single + * location rather than multiple components. + * e) The apply worker can access the sequences that need to be synchronized + * from the pg_subscription_rel system catalog. Whereas the launcher process + * operates without direct database access so would need a framework to + * establish connections with the databases to retrieve the sequences for + * synchronization. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_sequence.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/catcache.h" +#include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/rls.h" +#include "utils/syscache.h" +#include "utils/usercontext.h" + +List *sequence_states_not_ready = NIL; + +/* + * Handle sequence synchronization cooperation from the apply worker. + * + * Start a sequencesync worker if one is not already running. The active + * sequencesync worker will handle all pending sequence synchronization. If any + * sequences remain unsynchronized after it exits, a new worker can be started + * in the next iteration. + */ +void +ProcessSyncingSequencesForApply(void) +{ + LogicalRepWorker *sequencesync_worker; + int nsyncworkers; + + /* No sequences to sync, so nothing to do */ + if (list_length(sequence_states_not_ready) == 0) + return; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if there is a sequencesync worker already running? */ + sequencesync_worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, + WORKERTYPE_SEQUENCESYNC, + true); + if (sequencesync_worker) + { + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * Count running sync workers for this subscription, while we have the + * lock. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + + /* + * If there is a free sync worker slot, start a new sequencesync + * worker, and break from the loop. + */ + if (nsyncworkers < max_sync_workers_per_subscription) + { + TimestampTz now = GetCurrentTimestamp(); + + /* + * To prevent starting the sequencesync worker at a high frequency + * after a failure, we store its last failure time. We start the + * sequencesync worker again after waiting at least + * wal_retrieve_retry_interval. + */ + if (!MyLogicalRepWorker->sequencesync_failure_time || + TimestampDifferenceExceeds(MyLogicalRepWorker->sequencesync_failure_time, + now, wal_retrieve_retry_interval)) + { + MyLogicalRepWorker->sequencesync_failure_time = 0; + + if (!logicalrep_worker_launch(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + InvalidOid, + DSM_HANDLE_INVALID)) + MyLogicalRepWorker->sequencesync_failure_time = now; + } + } +} + +/* + * report_error_sequences + * + * Reports discrepancies in sequence data between the publisher and subscriber. + * It identifies sequences that are missing on the publisher, as well as + * sequences that exist on both sides but have mismatched values. + */ +static void +report_error_sequences(StringInfo missing_seqs, StringInfo mismatched_seqs) +{ + StringInfo combined_error_detail = makeStringInfo(); + StringInfo combined_error_hint = makeStringInfo(); + + if (missing_seqs->len) + { + appendStringInfo(combined_error_detail, "Missing sequence(s) on publisher: (%s).", + missing_seqs->data); + appendStringInfoString(combined_error_hint, "For missing sequences, use ALTER SUBSCRIPTION with either REFRESH PUBLICATION or REFRESH PUBLICATION SEQUENCES."); + } + + if (mismatched_seqs->len) + { + if (missing_seqs->len) + { + appendStringInfo(combined_error_detail, "; mismatched sequence(s) on subscriber: (%s).", + mismatched_seqs->data); + appendStringInfoString(combined_error_hint, " For mismatched sequences, alter or re-create local sequences to have matching parameters as publishers."); + } + else + { + appendStringInfo(combined_error_detail, "Mismatched sequence(s) on subscriber: (%s).", + mismatched_seqs->data); + appendStringInfoString(combined_error_hint, "For mismatched sequences, alter or re-create local sequences to have matching parameters as publishers"); + } + } + + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication sequence synchronization failed for subscription \"%s\"", MySubscription->name), + errdetail("%s", combined_error_detail->data), + errhint("%s", combined_error_hint->data)); +} + +/* + * sequence_comparator + * + * Comparator function for sorting LogicalRepSequenceInfo objects in a list. + * It compares sequences first by namespace name and then by sequence name. + */ +static int +sequence_comparator(const ListCell *s1, const ListCell *s2) +{ + int cmp; + LogicalRepSequenceInfo *seqinfo1 = (LogicalRepSequenceInfo *) (s1->ptr_value); + LogicalRepSequenceInfo *seqinfo2 = (LogicalRepSequenceInfo *) (s2->ptr_value); + + /* Compare by namespace name first */ + cmp = strcmp(seqinfo1->nspname, seqinfo2->nspname); + if (cmp != 0) + return cmp; + + /* If namespace names are equal, compare by sequence name */ + return strcmp(seqinfo1->seqname, seqinfo2->seqname); +} + +/* + * Copy existing data of sequences from the publisher. + * + * Fetch the sequence value from the publisher and set the subscriber sequence + * with the same value. Caller is responsible for locking the local relation. + */ +static void +copy_sequences(WalReceiverConn *conn, List *sequences_to_copy, Oid subid) +{ + int total_seqs = list_length(sequences_to_copy); + int current_index = 0; + int search_pos = 0; + StringInfo mismatched_seqs = makeStringInfo(); + StringInfo missing_seqs = makeStringInfo(); + + ereport(LOG, + errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", + MySubscription->name, total_seqs)); + + /* Sort the list of sequences to optimize the search */ + list_sort(sequences_to_copy, sequence_comparator); + + /* + * We batch synchronize multiple sequences per transaction, because the + * alternative of synchronizing each sequence individually incurs overhead + * of starting and committing transactions repeatedly. On the other hand, + * we want to avoid keeping this batch transaction open for extended + * periods so it is currently limited to 100 sequences per batch. + */ +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + while (current_index < total_seqs) + { +#define REMOTE_SEQ_COL_COUNT 12 + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {TEXTOID, TEXTOID, LSNOID, INT8OID, + INT8OID, BOOLOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; + + int batch_size = Min(MAX_SEQUENCES_SYNC_PER_BATCH, total_seqs - current_index); + int batch_succeeded_count = 0; + int batch_mismatched_count = 0; + StringInfo seqstr = makeStringInfo(); + StringInfo cmd = makeStringInfo(); + WalRcvExecResult *res; + TupleTableSlot *slot; + + StartTransactionCommand(); + + /* + * Prepare the string of current batch sequences to fetch from the + * publisher. + */ + for (int i = 0; i < batch_size; i++) + { + LogicalRepSequenceInfo *seqinfo = lfirst(list_nth_cell(sequences_to_copy, current_index + i)); + + if (seqstr->len > 0) + appendStringInfoString(seqstr, ", "); + + appendStringInfo(seqstr, "(\'%s\', \'%s\')", seqinfo->nspname, + seqinfo->seqname); + } + + initStringInfo(cmd); + appendStringInfo(cmd, + "SELECT s.schname, s.seqname, ps.*, seq.seqtypid,\n" + " seq.seqstart, seq.seqincrement, seq.seqmin,\n" + " seq.seqmax, seq.seqcycle\n" + "FROM ( VALUES %s ) AS s (schname, seqname)\n" + "JOIN LATERAL pg_sequence_state(s.schname, s.seqname) ps ON true\n" + "JOIN pg_namespace n ON n.nspname = s.schname\n" + "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" + "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" + "ORDER BY s.schname, s.seqname\n", + seqstr->data); + + res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of sequence information from the publisher: %s", + res->err)); + + destroyStringInfo(seqstr); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + int col = 0; + bool isnull; + char *nspname; + char *seqname; + XLogRecPtr page_lsn; + int64 last_value; + int64 log_cnt; + bool is_called; + Oid seqtypid; + int64 seqstart; + int64 seqmin; + int64 seqmax; + int64 seqincrement; + bool seqcycle; + HeapTuple tup; + Form_pg_sequence seqform; + LogicalRepSequenceInfo *seqinfo = NULL; + + CHECK_FOR_INTERRUPTS(); + + nspname = TextDatumGetCString(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqname = TextDatumGetCString(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + log_cnt = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqtypid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqstart = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqincrement = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqmin = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqmax = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqcycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Sanity check */ + Assert(col == REMOTE_SEQ_COL_COUNT); + + /* Retrieve the sequence object fetched from the publisher */ + while (search_pos < total_seqs) + { + LogicalRepSequenceInfo *candidate_seq = lfirst(list_nth_cell(sequences_to_copy, search_pos)); + + if (!strcmp(candidate_seq->nspname, nspname) && + !strcmp(candidate_seq->seqname, seqname)) + { + seqinfo = candidate_seq; + search_pos++; + break; + } + + search_pos++; + } + + Assert(seqinfo); + + seqinfo->remote_seq_fetched = true; + + /* Get the local sequence */ + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo->localrelid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for sequence \"%s.%s\"", + seqinfo->nspname, seqinfo->seqname); + + seqform = (Form_pg_sequence) GETSTRUCT(tup); + + /* Update the sequence only if the parameters are identical */ + if (seqform->seqtypid == seqtypid && + seqform->seqmin == seqmin && seqform->seqmax == seqmax && + seqform->seqcycle == seqcycle && + seqform->seqstart == seqstart && + seqform->seqincrement == seqincrement) + { + SetSequence(seqinfo->localrelid, last_value, log_cnt, is_called); + + UpdateSubscriptionRelState(subid, seqinfo->localrelid, + SUBREL_STATE_READY, page_lsn); + if (message_level_is_interesting(DEBUG1)) + ereport(DEBUG1, + errmsg_internal("logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", + MySubscription->name, + seqinfo->nspname, + seqinfo->seqname)); + + batch_succeeded_count++; + } + else + { + if (mismatched_seqs->len) + appendStringInfoString(mismatched_seqs, ", "); + + appendStringInfo(mismatched_seqs, "\"%s.%s\"", + seqinfo->nspname, seqinfo->seqname); + batch_mismatched_count++; + } + + ReleaseSysCache(tup); + } + + ereport(LOG, + errmsg("logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d missing", + MySubscription->name, (current_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, + batch_succeeded_count, batch_mismatched_count, + batch_size - (batch_succeeded_count + batch_mismatched_count))); + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + destroyStringInfo(cmd); + + /* Commit this batch, and prepare for next batch */ + CommitTransactionCommand(); + + /* + * Sequence synchronization for this batch was incomplete because some + * sequences are missing on the publisher. Identify the missing + * sequences. + */ + if ((batch_succeeded_count + batch_mismatched_count) < batch_size) + { + for (int i = 0; i < batch_size; i++) + { + LogicalRepSequenceInfo *seqinfo = lfirst(list_nth_cell(sequences_to_copy, current_index + i)); + + if (!seqinfo->remote_seq_fetched) + { + if (missing_seqs->len) + appendStringInfoString(missing_seqs, ", "); + + appendStringInfo(missing_seqs, "\"%s.%s\"", + seqinfo->nspname, seqinfo->seqname); + } + } + } + + /* + * current_indexes is not incremented sequentially because some + * sequences may be missing, and the number of fetched rows may not + * match the batch size. + */ + current_index += batch_size; + } + + /* + * Raise an error if any sequences are missing on the remote server, or if + * the local and remote sequence parameters do not match. + */ + if (missing_seqs->len || mismatched_seqs->len) + report_error_sequences(missing_seqs, mismatched_seqs); + + destroyStringInfo(missing_seqs); + destroyStringInfo(mismatched_seqs); +} + +/* + * Start syncing the sequences in the sequencesync worker. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + List *subsequences; + AclResult aclresult; + UserContext ucxt; + bool run_as_owner = false; + int seq_count; + Oid subid = MyLogicalRepWorker->subid; + MemoryContext oldctx; + StringInfoData app_name; + List *sequences_to_copy = NIL; + + StartTransactionCommand(); + + /* Get the sequences that should be synchronized. */ + subsequences = GetSubscriptionRelations(subid, false, true, true); + seq_count = list_length(subsequences); + + foreach_ptr(SubscriptionRelState, subseq, subsequences) + { + Relation sequence_rel; + LogicalRepSequenceInfo *seq_info; + char *nspname; + char *seqname; + + CHECK_FOR_INTERRUPTS(); + + sequence_rel = table_open(subseq->relid, RowExclusiveLock); + seqname = RelationGetRelationName(sequence_rel); + nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); + + /* + * Make sure that the copy command runs as the sequence owner, unless + * the user has opted out of that behaviour. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt); + + /* + * Check that our sequencesync worker has permission to insert into + * the target sequence. + */ + aclresult = pg_class_aclcheck(RelationGetRelid(sequence_rel), GetUserId(), + ACL_INSERT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, + get_relkind_objtype(sequence_rel->rd_rel->relkind), + seqname); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + seq_info = (LogicalRepSequenceInfo *) palloc(sizeof(LogicalRepSequenceInfo)); + seq_info->seqname = pstrdup(seqname); + seq_info->nspname = pstrdup(nspname); + seq_info->localrelid = subseq->relid; + seq_info->remote_seq_fetched = false; + sequences_to_copy = lappend(sequences_to_copy, seq_info); + + MemoryContextSwitchTo(oldctx); + + table_close(sequence_rel, NoLock); + } + + CommitTransactionCommand(); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + initStringInfo(&app_name); + appendStringInfo(&app_name, "%s_%s", MySubscription->name, "sequencesync worker"); + + /* + * Establish the connection to the publisher for sequence synchronization. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + app_name.data, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s", + MySubscription->name, err)); + + pfree(app_name.data); + + copy_sequences(LogRepWorkerWalRcvConn, sequences_to_copy, subid); + + list_free_deep(sequences_to_copy); + + if (!run_as_owner && seq_count) + RestoreUserContext(&ucxt); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_SEQUENCESYNC); + + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequencesync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + FinishSyncWorker(WORKERTYPE_SEQUENCESYNC); +} diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index 45b6d429558..8914f5cca10 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -50,8 +50,10 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE * Exit routine for synchronization worker. */ pg_noreturn void -FinishSyncWorker(void) +FinishSyncWorker(LogicalRepWorkerType wtype) { + Assert(wtype == WORKERTYPE_TABLESYNC || wtype == WORKERTYPE_SEQUENCESYNC); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -66,15 +68,24 @@ FinishSyncWorker(void) XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + if (wtype == WORKERTYPE_TABLESYNC) + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + else + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + /* This is a clean exit, so no need for any sequence failure logic. */ + if (wtype == WORKERTYPE_SEQUENCESYNC) + cancel_before_shmem_exit(logicalrep_seqsyncworker_failure, 0); + /* Stop gracefully */ proc_exit(0); } @@ -89,7 +100,9 @@ InvalidateRelationStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Process possible state change(s) of relations that are being synchronized. + * Process possible state change(s) of relations that are being synchronized + * and start new tablesync workers for the newly added tables. Also, start a + * new sequencesync worker for the newly added sequences. */ void ProcessSyncingRelations(XLogRecPtr current_lsn) @@ -109,7 +122,19 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) break; case WORKERTYPE_APPLY: + /* + * We need up-to-date sync state info for subscription tables and + * sequences here. + */ + FetchRelationStates(); + ProcessSyncingTablesForApply(current_lsn); + ProcessSyncingSequencesForApply(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + Assert(0); break; case WORKERTYPE_UNKNOWN: @@ -121,17 +146,22 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) /* * Common code to fetch the up-to-date sync state info into the static lists. * - * Returns true if subscription has 1 or more tables, else false. + * The pg_subscription_rel catalog is shared by tables and sequences. Changes + * to either sequences or tables can affect the validity of relation states, so + * we update both table_states_not_ready and sequence_states_not_ready + * simultaneously to ensure consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * Returns true if subscription has 1 or more tables, else false. */ bool -FetchRelationStates(bool *started_tx) +FetchRelationStates() { + /* + * has_subtables is declared as static, since the same value can be used + * until the system table is invalidated. + */ static bool has_subtables = false; - - *started_tx = false; + bool started_tx = false; if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) { @@ -144,12 +174,14 @@ FetchRelationStates(bool *started_tx) /* Clean the old lists. */ list_free_deep(table_states_not_ready); + list_free_deep(sequence_states_not_ready); table_states_not_ready = NIL; + sequence_states_not_ready = NIL; if (!IsTransactionState()) { StartTransactionCommand(); - *started_tx = true; + started_tx = true; } /* Fetch tables and sequences that are in non-ready state. */ @@ -162,7 +194,11 @@ FetchRelationStates(bool *started_tx) { rstate = palloc(sizeof(SubscriptionRelState)); memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); + + if (get_rel_relkind(rstate->relid) == RELKIND_SEQUENCE) + sequence_states_not_ready = lappend(sequence_states_not_ready, rstate); + else + table_states_not_ready = lappend(table_states_not_ready, rstate); } MemoryContextSwitchTo(oldctx); @@ -187,5 +223,11 @@ FetchRelationStates(bool *started_tx) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + return has_subtables; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b66ac6eb865..6cf910c6ea6 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -161,7 +161,7 @@ wait_for_table_state_change(Oid relid, char expected_state) /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, - false); + WORKERTYPE_TABLESYNC, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) break; @@ -208,7 +208,7 @@ wait_for_worker_state_change(char expected_state) */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + InvalidOid, WORKERTYPE_APPLY, false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); @@ -334,7 +334,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - FinishSyncWorker(); + FinishSyncWorker(WORKERTYPE_TABLESYNC); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -376,9 +376,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); - /* We need up-to-date sync state info for subscription tables here. */ - FetchRelationStates(&started_tx); - /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need @@ -411,6 +408,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE); + if (rstate->state == SUBREL_STATE_SYNCDONE) { /* @@ -424,11 +429,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -465,8 +465,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, - rstate->relid, false); - + rstate->relid, + WORKERTYPE_TABLESYNC, true); if (syncworker) { /* Found one, update our copy of its state */ @@ -1248,7 +1248,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - FinishSyncWorker(); /* doesn't return */ + FinishSyncWorker(WORKERTYPE_TABLESYNC); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1521,7 +1521,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_TABLESYNC); PG_RE_THROW(); } @@ -1566,7 +1567,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); @@ -1574,7 +1575,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - FinishSyncWorker(); + FinishSyncWorker(WORKERTYPE_TABLESYNC); } /* @@ -1588,23 +1589,16 @@ TablesyncWorkerMain(Datum main_arg) bool AllTablesyncsReady(void) { - bool started_tx = false; - bool has_subrels = false; + bool has_tables; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchRelationStates(&started_tx); - - if (started_tx) - { - CommitTransactionCommand(); - pgstat_report_stat(true); - } + has_tables = FetchRelationStates(); /* * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ - return has_subrels && (table_states_not_ready == NIL); + return has_tables && (table_states_not_ready == NIL); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 39a53c84e04..c49b025f16a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -482,6 +482,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + Assert(0); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1022,7 +1027,10 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1144,7 +1152,10 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -1200,7 +1211,10 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1266,7 +1280,10 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1401,7 +1418,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -2243,7 +2263,10 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -3720,7 +3743,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) AcceptInvalidationMessages(); maybe_reread_subscription(); - /* Process any table synchronization changes. */ + /* + * Process any tables that are being synchronized in parallel and + * any newly added relations. + */ ProcessSyncingRelations(last_received); } @@ -4529,7 +4555,8 @@ start_apply(XLogRecPtr origin_startpos) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_APPLY); PG_RE_THROW(); } @@ -4649,8 +4676,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -4729,6 +4756,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_sequencesync_worker()) + ereport(LOG, + (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -4748,14 +4779,17 @@ replorigin_reset(int code, Datum arg) replorigin_session_origin_timestamp = 0; } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync worker and sequencesync + * worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -4800,6 +4834,9 @@ SetupApplyOrSyncWorker(int worker_slot) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, InvalidateRelationStates, (Datum) 0); + + if (am_sequencesync_worker()) + before_shmem_exit(logicalrep_seqsyncworker_failure, (Datum) 0); } /* Logical Replication Apply worker entry point */ @@ -4826,6 +4863,10 @@ ApplyWorkerMain(Datum main_arg) void DisableSubscriptionAndExit(void) { + LogicalRepWorkerType wtype = am_tablesync_worker() ? WORKERTYPE_TABLESYNC : + (am_sequencesync_worker()) ? WORKERTYPE_SEQUENCESYNC : + WORKERTYPE_APPLY; + /* * Emit the error message, and recover from the error state to an idle * state @@ -4838,9 +4879,11 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); + /* + * Report the worker failed during either sequence synchronization or + * table synchronization or apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, wtype); /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index f9a1c831a07..002d630d4ae 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "replication/worker_internal.h" #include "utils/pgstat_internal.h" @@ -24,7 +25,7 @@ * Report a subscription error. */ void -pgstat_report_subscription_error(Oid subid, bool is_apply_error) +pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) { PgStat_EntryRef *entry_ref; PgStat_BackendSubEntry *pending; @@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) InvalidOid, subid, NULL); pending = entry_ref->pending; - if (is_apply_error) - pending->apply_error_count++; - else - pending->sync_error_count++; + switch (wtype) + { + case WORKERTYPE_APPLY: + pending->apply_error_count++; + break; + + case WORKERTYPE_SEQUENCESYNC: + pending->sequence_sync_error_count++; + break; + + case WORKERTYPE_TABLESYNC: + pending->sync_error_count++; + break; + + default: + /* Should never happen. */ + Assert(0); + break; + } } /* @@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); + SUB_ACC(sequence_sync_error_count); SUB_ACC(sync_error_count); for (int i = 0; i < CONFLICT_NUM_TYPES; i++) SUB_ACC(conflict_count[i]); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 1c12ddbae49..ab061d0ba9b 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2189,23 +2189,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sequence_sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2222,6 +2224,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* apply_error_count */ values[i++] = Int64GetDatum(subentry->apply_error_count); + /* sequence_sync_error_count */ + values[i++] = Int64GetDatum(subentry->sequence_sync_error_count); + /* sync_error_count */ values[i++] = Int64GetDatum(subentry->sync_error_count); diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index a925be86944..a7ccfaa8bd9 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3356,7 +3356,7 @@ struct config_int ConfigureNamesInt[] = {"max_sync_workers_per_subscription", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, - gettext_noop("Maximum number of table synchronization workers per subscription."), + gettext_noop("Maximum number of workers per subscription for synchronizing tables and sequences."), NULL, }, &max_sync_workers_per_subscription, diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 2bbcdbb4afa..3bae4e6dc11 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5696,9 +5696,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sequence_sync_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index a541f4843bd..49af743b20d 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -82,6 +82,14 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; +typedef struct LogicalRepSequenceInfo +{ + char *seqname; + char *nspname; + Oid localrelid; + bool remote_seq_fetched; +} LogicalRepSequenceInfo; + extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9ac0b67683d..3aec610028f 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -45,6 +45,8 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; /* XLOG stuff */ #define XLOG_SEQ_LOG 0x00 +#define SEQ_LOG_CNT_INVALID 0 + typedef struct xl_seq_rec { RelFileLocator locator; @@ -60,6 +62,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid relid, int64 next, int64 log_cnt, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 202bd2d5ace..4bc05518c3a 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -15,6 +15,7 @@ #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ #include "replication/conflict.h" +#include "replication/worker_internal.h" #include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/pgstat_kind.h" @@ -107,6 +108,7 @@ typedef struct PgStat_FunctionCallUsage typedef struct PgStat_BackendSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter sequence_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; } PgStat_BackendSubEntry; @@ -413,6 +415,7 @@ typedef struct PgStat_SLRUStats typedef struct PgStat_StatSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter sequence_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; TimestampTz stat_reset_timestamp; @@ -763,7 +766,8 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); * Functions in pgstat_subscription.c */ -extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); +extern void pgstat_report_subscription_error(Oid subid, + LogicalRepWorkerType wtype); extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d..56fa79b648e 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 663b87a9c80..2373968ff0d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -92,6 +93,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz sequencesync_failure_time; } LogicalRepWorker; /* @@ -238,9 +241,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern PGDLLIMPORT List *sequence_states_not_ready; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, + LogicalRepWorkerType wtype, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock); @@ -248,13 +253,16 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); -extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_stop(Oid subid, Oid relid, + LogicalRepWorkerType wtype); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); +extern void logicalrep_seqsyncworker_failure(int code, Datum arg); + extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); @@ -263,11 +271,12 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSyncingSequencesForApply(void); -pg_noreturn extern void FinishSyncWorker(void); +pg_noreturn extern void FinishSyncWorker(LogicalRepWorkerType wtype); extern void InvalidateRelationStates(Datum arg, int cacheid, uint32 hashvalue); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); -extern bool FetchRelationStates(bool *started_tx); +extern bool FetchRelationStates(void); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); @@ -332,15 +341,25 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +#define isApplyWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_APPLY) #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 586ffba434e..a6c267a8a2c 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -42,6 +42,7 @@ tests += { 't/033_run_as_table_owner.pl', 't/034_temporal.pl', 't/035_conflicts.pl', + 't/036_sequences.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl new file mode 100644 index 00000000000..b8a89275f13 --- /dev/null +++ b/src/test/subscription/t/036_sequences.pl @@ -0,0 +1,239 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# This tests that sequences are synced correctly to the subscriber +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); + +# Avoid checkpoint during the test, otherwise, extra values will be fetched for +# the sequences which will cause the test to fail randomly. +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h'); +$node_publisher->start; + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Setup structure on the publisher +my $ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; +); +$node_publisher->safe_psql('postgres', $ddl); + +# Setup the same structure on the subscriber, plus some extra sequences that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; + CREATE SEQUENCE regress_s2; + CREATE SEQUENCE regress_s3; + CREATE SEQUENCE regress_s4 +); +$node_subscriber->safe_psql('postgres', $ddl); + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- generate a number of values using the sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Setup logical replication pub/sub +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_seq_pub FOR ALL SEQUENCES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub" +); + +# Wait for initial sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|32|t', 'initial test data replicated'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new +# sequences of the publisher, but changes to existing sequences should +# not be synced. +########## + +# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s2; + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|31|t', 'Check sequence value in the publisher'); + +# Check - existing sequence is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION does not sync existing sequence'); + +# Check - newly published sequence is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION will sync newly published sequence'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES should cause sync of +# new sequences of the publisher, and changes to existing sequences should +# also be synced. +########## + +# Create a new sequence 'regress_s3', and update the existing sequence +# 'regress_s2'. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s3; + INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequences are synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|31|t', + 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '200|31|t', + 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); + +# Check - newly published sequence is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION SEQUENCES will sync newly published sequence'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = off) should +# not update the sequence values for the new sequence. +########## + +# Create a new sequence 'regress_s4' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4; + INSERT INTO regress_seq_test SELECT nextval('regress_s4') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION with (copy_data = false); +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s4; +)); +is($result, '100|32|t', 'Check sequence value in the publisher'); + +# Check - newly published sequence values are not updated +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s4; +)); +is($result, '1|0|f', + 'REFRESH PUBLICATION will not sync newly published sequence with copy_data as off' +); + +########## +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES should throw an error +# for sequence definition not matching between the publisher and the subscriber. +########## + +# Create a new sequence 'regress_s5' whose START value is not the same in the +# publisher and subscriber. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s5 START 1 INCREMENT 2; +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s5 START 10 INCREMENT 2; +)); + +my $log_offset = -s $node_subscriber->logfile; + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES" +); + +# Confirm that the warning for parameters differing is logged. +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"\n.*DETAIL:.* Mismatched sequence\(s\) on subscriber: \("public.regress_s5"\)/, + $log_offset); + +$node_publisher->safe_psql( + 'postgres', qq( + DROP SEQUENCE regress_s5; +)); + +# Confirm that the warning for missing sequence is logged. +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"\n.*DETAIL:.* Missing sequence\(s\) on publisher: \("public.regress_s5"\)/, + $log_offset); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 67de8beeaf2..59c537393c8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1630,6 +1630,7 @@ LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation LogicalRepRollbackPreparedTxnData +LogicalRepSequenceInfo LogicalRepStreamAbortData LogicalRepTupleData LogicalRepTyp -- 2.43.0