From 57f79cd97270c5087d440ffc5ee680d2c9867e93 Mon Sep 17 00:00:00 2001 From: Vignesh Date: Tue, 25 Mar 2025 10:33:59 +0530 Subject: [PATCH v20250325 4/5] Enhance sequence synchronization during subscription management This patch introduces sequence synchronization: Sequences have 2 states: - INIT (needs synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. It does the following: a) Retrieves remote values of sequences with pg_sequence_state() INIT. b) Log a warning if the sequence parameters differ between the publisher and subscriber. c) Sets the local sequence values accordingly. d) Updates the local sequence state to READY. e) Repeat until all done; Commits synchronized sequences in batches of 100 Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - (PG17 command syntax is unchanged) - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiates the sequencesync worker (see above) to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - (PG17 command syntax is unchanged) - Drop published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiates the sequencesync worker (see above) to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES - The patch introduces this new command to refresh all sequences - Drop published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel - All sequences in pg_subscription_rel are reset to INIT state. - Initiates the sequencesync worker (see above) to synchronize all sequences. --- src/backend/catalog/pg_publication.c | 46 ++ src/backend/catalog/pg_subscription.c | 63 +- src/backend/catalog/system_views.sql | 10 + src/backend/commands/sequence.c | 27 +- src/backend/commands/subscriptioncmds.c | 323 +++++++-- src/backend/executor/execReplication.c | 4 +- src/backend/parser/gram.y | 11 +- src/backend/postmaster/bgworker.c | 5 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 71 +- src/backend/replication/logical/meson.build | 1 + .../replication/logical/sequencesync.c | 654 ++++++++++++++++++ src/backend/replication/logical/syncutils.c | 75 +- src/backend/replication/logical/tablesync.c | 45 +- src/backend/replication/logical/worker.c | 58 +- src/backend/utils/misc/guc_tables.c | 2 +- src/bin/pg_dump/common.c | 4 +- src/bin/pg_dump/pg_dump.c | 6 +- src/bin/pg_dump/pg_dump.h | 2 +- src/bin/psql/tab-complete.in.c | 2 +- src/include/catalog/pg_proc.dat | 5 + src/include/catalog/pg_subscription_rel.h | 4 +- src/include/commands/sequence.h | 3 + src/include/nodes/parsenodes.h | 3 +- src/include/replication/logicalworker.h | 3 +- src/include/replication/worker_internal.h | 30 +- src/test/regress/expected/rules.out | 8 + src/test/regress/expected/subscription.out | 4 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/036_sequences.pl | 215 ++++++ 30 files changed, 1509 insertions(+), 177 deletions(-) create mode 100644 src/backend/replication/logical/sequencesync.c create mode 100644 src/test/subscription/t/036_sequences.pl diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 617ed0b82c9..d4a20c5da88 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1370,3 +1370,49 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Returns Oids of sequences in a publication. + */ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Publication *publication; + List *sequences = NIL; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + publication = GetPublicationByName(pubname, false); + + if (publication->allsequences) + sequences = GetAllSequencesPublicationRelations(); + + funcctx->user_fctx = (void *) sequences; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + sequences = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(sequences)) + { + Oid relid = list_nth_oid(sequences, funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); + } + + SRF_RETURN_DONE(funcctx); +} diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1c71161e723..68b55bb5ea5 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -27,6 +27,7 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/memutils.h" #include "utils/lsyscache.h" #include "utils/pg_lsn.h" #include "utils/rel.h" @@ -462,7 +463,9 @@ RemoveSubscriptionRel(Oid subid, Oid relid) * leave tablesync slots or origins in the system when the * corresponding table is dropped. */ - if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY) + if (!OidIsValid(subid) && + get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE && + subrel->srsubstate != SUBREL_STATE_READY) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -499,7 +502,8 @@ HasSubscriptionTables(Oid subid) Relation rel; ScanKeyData skey[1]; SysScanDesc scan; - bool has_subrels; + HeapTuple tup; + bool has_subrels = false; rel = table_open(SubscriptionRelRelationId, AccessShareLock); @@ -511,8 +515,22 @@ HasSubscriptionTables(Oid subid) scan = systable_beginscan(rel, InvalidOid, false, NULL, 1, skey); - /* If even a single tuple exists then the subscription has tables. */ - has_subrels = HeapTupleIsValid(systable_getnext(scan)); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + /* + * Skip sequence tuples. If even a single table tuple exists then the + * subscription has tables. + */ + if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE) + { + has_subrels = true; + break; + } + } /* Cleanup */ systable_endscan(scan); @@ -524,12 +542,22 @@ HasSubscriptionTables(Oid subid) /* * Get the relations for the subscription. * - * If not_ready is true, return only the relations that are not in a ready - * state, otherwise return all the relations of the subscription. The - * returned list is palloc'ed in the current memory context. + * get_tables: get relations for tables of the subscription. + * + * get_sequences: get relations for sequences of the subscription. + * + * all_states: + * If getting tables, if all_states is true get all tables, otherwise + * only get tables that have not reached READY state. + * If getting sequences, if all_states is true get all sequences, + * otherwise only get sequences that have not reached READY state (i.e. are + * still in INIT state). + * + * The returned list is palloc'ed in the current memory context. */ List * -GetSubscriptionRelations(Oid subid, bool not_ready) +GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences, + bool all_states) { List *res = NIL; Relation rel; @@ -538,6 +566,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready) ScanKeyData skey[2]; SysScanDesc scan; + /* One or both of 'get_tables' and 'get_sequences' must be true. */ + Assert(get_tables || get_sequences); + rel = table_open(SubscriptionRelRelationId, AccessShareLock); ScanKeyInit(&skey[nkeys++], @@ -545,7 +576,7 @@ GetSubscriptionRelations(Oid subid, bool not_ready) BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(subid)); - if (not_ready) + if (!all_states) ScanKeyInit(&skey[nkeys++], Anum_pg_subscription_rel_srsubstate, BTEqualStrategyNumber, F_CHARNE, @@ -560,9 +591,23 @@ GetSubscriptionRelations(Oid subid, bool not_ready) SubscriptionRelState *relstate; Datum d; bool isnull; + bool issequence; + bool istable; subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + /* Relation is either a sequence or a table */ + issequence = get_rel_relkind(subrel->srrelid) == RELKIND_SEQUENCE; + istable = !issequence; + + /* Skip sequences if they were not requested */ + if (!get_sequences && issequence) + continue; + + /* Skip tables if they were not requested */ + if (!get_tables && istable) + continue; + relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); relstate->relid = subrel->srrelid; relstate->state = subrel->srsubstate; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 31d269b7ee0..b53f3102764 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename + FROM pg_publication P, + LATERAL pg_get_publication_sequences(P.pubname) GPS, + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) + WHERE C.oid = GPS.relid; + CREATE VIEW pg_locks AS SELECT * FROM pg_lock_status() AS L; diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index ae6eafbe21a..223bd2ac529 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -110,7 +110,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity, Form_pg_sequence_data seqdataform, bool *need_seq_rewrite, List **owned_by); -static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); @@ -941,9 +940,12 @@ lastval(PG_FUNCTION_ARGS) * restore the state of a sequence exactly during data-only restores - * it is the only way to clear the is_called flag in an existing * sequence. + * + * log_cnt is currently used only by the sequence syncworker to set the + * log_cnt for sequences while synchronizing values from the publisher. */ -static void -do_setval(Oid relid, int64 next, bool iscalled) +void +SetSequence(Oid relid, int64 next, bool is_called, int64 log_cnt) { SeqTable elm; Relation seqrel; @@ -994,7 +996,7 @@ do_setval(Oid relid, int64 next, bool iscalled) (long long) minv, (long long) maxv))); /* Set the currval() state only if iscalled = true */ - if (iscalled) + if (is_called) { elm->last = next; /* last returned number */ elm->last_valid = true; @@ -1011,8 +1013,8 @@ do_setval(Oid relid, int64 next, bool iscalled) START_CRIT_SECTION(); seq->last_value = next; /* last fetched number */ - seq->is_called = iscalled; - seq->log_cnt = 0; + seq->is_called = is_called; + seq->log_cnt = log_cnt; MarkBufferDirty(buf); @@ -1044,7 +1046,7 @@ do_setval(Oid relid, int64 next, bool iscalled) /* * Implement the 2 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval_oid(PG_FUNCTION_ARGS) @@ -1052,14 +1054,14 @@ setval_oid(PG_FUNCTION_ARGS) Oid relid = PG_GETARG_OID(0); int64 next = PG_GETARG_INT64(1); - do_setval(relid, next, true); + SetSequence(relid, next, true, SEQ_LOG_CNT_INVALID); PG_RETURN_INT64(next); } /* * Implement the 3 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval3_oid(PG_FUNCTION_ARGS) @@ -1068,7 +1070,7 @@ setval3_oid(PG_FUNCTION_ARGS) int64 next = PG_GETARG_INT64(1); bool iscalled = PG_GETARG_BOOL(2); - do_setval(relid, next, iscalled); + SetSequence(relid, next, iscalled, SEQ_LOG_CNT_INVALID); PG_RETURN_INT64(next); } @@ -1889,6 +1891,11 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) /* * Return the current on-disk state of the sequence. * + * The page_lsn will be utilized in logical replication sequence + * synchronization to record the page_lsn of sequence in the pg_subscription_rel + * system catalog. It will reflect the page_lsn of the remote sequence at the + * moment it was synchronized. + * * Note: This is roughly equivalent to selecting the data from the sequence, * except that it also returns the page LSN. */ diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4aec73bcc6b..cd9ad6aa5bf 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -26,6 +26,7 @@ #include "catalog/objectaddress.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" +#include "catalog/pg_sequence.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" @@ -103,6 +104,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, @@ -692,6 +694,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + /* + * XXX: If the subscription is for a sequence-only publication, creating + * this origin is unnecessary. It can be created later during the ALTER + * SUBSCRIPTION ... REFRESH command, if the publication is updated to + * include tables or tables in schemas. + */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); @@ -703,9 +711,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *tables; - ListCell *lc; - char table_state; bool must_use_password; /* Try to connect to the publisher. */ @@ -720,6 +725,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool has_tables; + List *relations; + char table_state; + check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); @@ -731,13 +740,16 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table list from publisher and build local table status - * info. + * Build local relation status info. Relations are for both tables + * and sequences from the publisher. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + relations = fetch_table_list(wrconn, publications); + has_tables = relations != NIL; + relations = list_concat(relations, + fetch_sequence_list(wrconn, publications)); + + foreach_ptr(RangeVar, rv, relations) { - RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); @@ -754,6 +766,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to * export it. + * + * XXX: If the subscription is for a sequence-only publication, + * creating this slot is unnecessary. It can be created later + * during the ALTER SUBSCRIPTION ... REFRESH PUBLICATION or ALTER + * SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES command, if the + * publication is updated to include tables or tables in schema. */ if (opts.create_slot) { @@ -777,7 +795,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && !opts.copy_data && has_tables) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -816,12 +834,50 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, return myself; } +/* + * Update the subscription to refresh both the publication and the publication + * objects associated with the subscription. + * + * Parameters: + * + * If 'copy_data' is true, the function will set the state to INIT; otherwise, + * it will set the state to READY. + * + * If 'validate_publications' is provided with a publication list, the + * function checks that the specified publications exist on the publisher. + * + * If 'refresh_tables' is true, update the subscription by adding or removing + * tables that have been added or removed since the last subscription creation + * or refresh publication. + * + * If 'refresh_sequences' is true, update the subscription by adding or removing + * sequences that have been added or removed since the last subscription + * creation or refresh publication. + * + * Note, this is a common function for handling different REFRESH commands + * according to the parameter 'resync_all_sequences' + * + * 1. ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES + * (when parameter resync_all_sequences is true) + * + * The function will mark all sequences with INIT state. + * Assert copy_data is true. + * Assert refresh_tables is false. + * Assert refresh_sequences is true. + * + * 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION [WITH (copy_data=true|false)] + * (when parameter resync_all_sequences is false) + * + * The function will update only the newly added tables and/or sequences + * based on the copy_data parameter. + */ static void AlterSubscription_refresh(Subscription *sub, bool copy_data, - List *validate_publications) + List *validate_publications, bool refresh_tables, + bool refresh_sequences, bool resync_all_sequences) { char *err; - List *pubrel_names; + List *pubrel_names = NIL; List *subrel_states; Oid *subrel_local_oids; Oid *pubrel_local_oids; @@ -839,6 +895,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, WalReceiverConn *wrconn; bool must_use_password; +#ifdef USE_ASSERT_CHECKING + if (resync_all_sequences) + Assert(copy_data && !refresh_tables && refresh_sequences); +#endif + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -858,10 +919,17 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, check_publications(wrconn, validate_publications); /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + if (refresh_tables) + pubrel_names = fetch_table_list(wrconn, sub->publications); + + /* Get the sequence list from publisher. */ + if (refresh_sequences) + pubrel_names = list_concat(pubrel_names, + fetch_sequence_list(wrconn, + sub->publications)); /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false); + subrel_states = GetSubscriptionRelations(sub->oid, refresh_tables, refresh_sequences, true); subrel_count = list_length(subrel_states); /* @@ -880,9 +948,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); - check_publications_origin(wrconn, sub->publications, copy_data, - sub->origin, subrel_local_oids, - subrel_count, sub->name); + if (refresh_tables) + check_publications_origin(wrconn, sub->publications, copy_data, + sub->origin, subrel_local_oids, + subrel_count, sub->name); /* * Rels that we want to remove from subscription and drop any slots @@ -904,12 +973,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; + char relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + relkind = get_rel_relkind(relid); + CheckSubscriptionRelkind(relkind, rv->schemaname, rv->relname); pubrel_local_oids[off++] = relid; @@ -920,8 +990,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr, true); ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + rv->schemaname, rv->relname, sub->name)); } } @@ -937,11 +1008,31 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { Oid relid = subrel_local_oids[off]; - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + if (bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + /* + * The resync_all_sequences flag will only be set to true for + * the REFRESH PUBLICATION SEQUENCES command, indicating that + * the existing sequences need to be re-synchronized by + * resetting the relation to its initial state. + */ + if (resync_all_sequences) + { + UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, + InvalidXLogRecPtr); + ereport(DEBUG1, + errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); + } + } + else { char state; XLogRecPtr statelsn; + char relkind = get_rel_relkind(relid); /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -963,41 +1054,51 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Last known rel state. */ state = GetSubscriptionRelState(sub->oid, relid, &statelsn); - sub_remove_rels[remove_rel_len].relid = relid; - sub_remove_rels[remove_rel_len++].state = state; - RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + sub_remove_rels[remove_rel_len].relid = relid; + sub_remove_rels[remove_rel_len++].state = state; /* - * For READY state, we would have already dropped the - * tablesync origin. + * A single sequencesync worker synchronizes all sequences, so + * only stop workers when relation kind is not sequence. */ - if (state != SUBREL_STATE_READY) + if (relkind != RELKIND_SEQUENCE) { - char originname[NAMEDATALEN]; + logicalrep_worker_stop(sub->oid, relid, WORKERTYPE_TABLESYNC); /* - * Drop the tablesync's origin tracking if exists. - * - * It is possible that the origin is not yet created for - * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or - * apply worker can also concurrently try to drop the - * origin and by this time the origin might be already - * removed. For these reasons, passing missing_ok = true. + * For READY state, we would have already dropped the + * tablesync origin. */ - ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + if (state != SUBREL_STATE_READY) + { + char originname[NAMEDATALEN]; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created + * for tablesync worker, this can happen for the + * states before SUBREL_STATE_FINISHEDCOPY. The + * tablesync worker or apply worker can also + * concurrently try to drop the origin and by this + * time the origin might be already removed. For these + * reasons, passing missing_ok = true. + */ + ReplicationOriginNameForLogicalRep(sub->oid, relid, + originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + } } ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name))); + errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); } } @@ -1008,6 +1109,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, */ for (off = 0; off < remove_rel_len; off++) { + /* Skip relations belonging to sequences. */ + if (get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE) + continue; + if (sub_remove_rels[off].state != SUBREL_STATE_READY && sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) { @@ -1393,8 +1498,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1408,7 +1513,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = stmt->publication; AlterSubscription_refresh(sub, opts.copy_data, - stmt->publication); + stmt->publication, true, true, + false); } break; @@ -1448,8 +1554,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)"))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1467,18 +1573,33 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = publist; AlterSubscription_refresh(sub, opts.copy_data, - validate_publications); + validate_publications, true, true, + false); } break; } - case ALTER_SUBSCRIPTION_REFRESH: + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES: + { + if (!sub->enabled) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions")); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); + + AlterSubscription_refresh(sub, true, NULL, false, true, true); + + break; + } + + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION: { if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, SUBOPT_COPY_DATA, &opts); @@ -1490,8 +1611,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * * But, having reached this two-phase commit "enabled" state * we must not allow any subsequent table initialization to - * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed - * when the user had requested two_phase = on mode. + * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is + * disallowed when the user had requested two_phase = on mode. * * The exception to this restriction is when copy_data = * false, because when copy_data is false the tablesync will @@ -1503,12 +1624,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), - errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION"); - AlterSubscription_refresh(sub, opts.copy_data, NULL); + AlterSubscription_refresh(sub, opts.copy_data, NULL, true, true, false); break; } @@ -1750,7 +1871,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->subid, w->relid, w->type); } list_free(subworkers); @@ -1773,7 +1894,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. */ - rstates = GetSubscriptionRelations(subid, true); + rstates = GetSubscriptionRelations(subid, true, false, false); foreach(lc, rstates) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -2087,8 +2208,8 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * its partition ancestors (if it's a partition), or its partition children (if * it's a partitioned table), from some other publishers. This check is * required only if "copy_data = true" and "origin = none" for CREATE - * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the - * user that data having origin might have been copied. + * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION statements to + * notify the user that data having origin might have been copied. * * This check need not be performed on the tables that are already added * because incremental sync for those tables will happen through WAL and the @@ -2127,18 +2248,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, appendStringInfoString(&cmd, ")\n"); /* - * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains - * the list of relation oids that are already present on the subscriber. - * This check should be skipped for these tables. + * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, + * subrel_local_oids contains the list of relation oids that are already + * present on the subscriber. This check should be skipped for these + * tables. */ for (i = 0; i < subrel_count; i++) { Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + if (get_rel_relkind(relid) != RELKIND_SEQUENCE) + { + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } res = walrcv_exec(wrconn, cmd.data, 1, tableRow); @@ -2307,6 +2433,63 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } +/* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + List *seqlist = NIL; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + + appendStringInfoString(&cmd, + "SELECT DISTINCT s.schemaname, s.sequencename\n" + "FROM pg_catalog.pg_publication_sequences s\n" + "WHERE s.pubname IN ("); + GetPublicationsStr(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not receive list of sequences from the publisher: %s", + res->err)); + + /* Process sequences. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + seqlist = lappend(seqlist, rv); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + return seqlist; +} + /* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index ede89ea3cf9..2b81d97c015 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -847,7 +847,9 @@ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && + relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index f5012ea27bd..0f4ed1b9e30 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10899,11 +10899,20 @@ AlterSubscriptionStmt: AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_REFRESH; + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION; n->subname = $3; n->options = $6; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES; + n->subname = $3; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 116ddf7b835..81e0e369fb0 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,7 +131,10 @@ static const struct "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, { - "TablesyncWorkerMain", TablesyncWorkerMain + "TableSyncWorkerMain", TableSyncWorkerMain + }, + { + "SequenceSyncWorkerMain", SequenceSyncWorkerMain } }; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c62c8c67521..c719af1f8a9 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -26,6 +26,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ syncutils.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 10677da56b2..2d8267e6ed8 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -226,19 +226,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given - * subscription id and relid. - * - * We are only interested in the leader apply worker or table sync worker. + * subscription id, relid and type. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype, + bool only_running) { int i; LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - /* Search for attached worker for a given subscription id. */ + /* Search for the attached worker matching the specified criteria. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; @@ -248,7 +247,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) continue; if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + w->type == wtype && (!only_running || w->proc)) { res = w; break; @@ -308,6 +307,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -393,7 +393,8 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -480,7 +481,7 @@ retry: break; case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -488,6 +489,14 @@ retry: snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "unknown worker type"); @@ -603,13 +612,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * Stop the logical replication worker for subid/relid, if any. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(subid, relid, wtype, false); if (worker) { @@ -676,7 +685,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(subid, relid, WORKERTYPE_APPLY, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -806,6 +815,37 @@ logicalrep_launcher_onexit(int code, Datum arg) LogicalRepCtx->launcher_pid = 0; } +/* + * Set the sequencesync worker failure time. + */ +void +logicalrep_seqsyncworker_set_failuretime() +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, + WORKERTYPE_APPLY, true); + if (worker) + worker->sequencesync_failure_time = GetCurrentTimestamp(); + + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Update the failure time of the sequencesync worker in the subscription's + * apply worker. + * + * This function is invoked when the sequencesync worker exits due to a + * failure. + */ +void +logicalrep_seqsyncworker_failuretime(int code, Datum arg) +{ + logicalrep_seqsyncworker_set_failuretime(); +} + /* * Cleanup function. * @@ -854,7 +894,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w))) res++; } @@ -1169,7 +1209,7 @@ ApplyLauncherMain(Datum main_arg) continue; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); + w = logicalrep_worker_find(sub->oid, InvalidOid, WORKERTYPE_APPLY, false); LWLockRelease(LogicalRepWorkerLock); if (w != NULL) @@ -1305,7 +1345,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1348,6 +1388,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "unknown worker type"); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 9283e996ef4..a2268d8361e 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -12,6 +12,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'syncutils.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 00000000000..196d29e5165 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,654 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: sequence synchronization + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + * Sequences to be synchronized by the sequencesync worker will + * be added to pg_subscription_rel in INIT state when one of the following + * commands is executed: + * CREATE SUBSCRIPTION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES + * + * The apply worker will periodically check if there are any sequences in INIT + * state and will start a sequencesync worker if needed. + * + * The sequencesync worker retrieves the sequences to be synchronized from the + * pg_subscription_rel catalog table. It synchronizes multiple sequences per + * single transaction by fetching the sequence value and page LSN from the + * remote publisher and updating them in the local subscriber sequence. After + * synchronization, it sets the sequence state to READY. + * + * So the state progression is always just: INIT -> READY. + * + * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH + * (100) sequences are synchronized per transaction. The locks on the sequence + * relation will be periodically released at each transaction commit. + * + * XXX: An alternative design was considered where the launcher process would + * periodically check for sequences that need syncing and then start the + * sequencesync worker. However, the approach of having the apply worker + * manage the sequencesync worker was chosen for the following reasons: + * a) It avoids overloading the launcher, which handles various other + * subscription requests. + * b) It offers a more straightforward path for extending support for + * incremental sequence synchronization. + * c) It utilizes the existing tablesync worker code to start the sequencesync + * process, thus preventing code duplication in the launcher. + * d) It simplifies code maintenance by consolidating changes to a single + * location rather than multiple components. + * e) The apply worker can access the sequences that need to be synchronized + * from the pg_subscription_rel system catalog. Whereas the launcher process + * operates without direct database access so would need a framework to + * establish connections with the databases to retrieve the sequences for + * synchronization. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_sequence.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/catcache.h" +#include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/rls.h" +#include "utils/syscache.h" +#include "utils/usercontext.h" + +List *sequence_states_not_ready = NIL; + +/* + * Handle sequence synchronization cooperation from the apply worker. + * + * Walk over all subscription sequences that are individually tracked by the + * apply process (currently, all that have state SUBREL_STATE_INIT) and manage + * synchronization for them. + * + * If a sequencesync worker is running already, there is no need to start a new + * one; the existing sequencesync worker will synchronize all the sequences. If + * there are still any sequences to be synced after the sequencesync worker + * exited, then a new sequencesync worker can be started in the next iteration. + * To prevent starting the sequencesync worker at a high frequency after a + * failure, we store its last failure time. We start the sequencesync worker + * again after waiting at least wal_retrieve_retry_interval. + */ +void +ProcessSyncingSequencesForApply(void) +{ + bool started_tx = false; + + Assert(!IsTransactionState()); + + /* Start the sequencesync worker if needed, and there is not one already. */ + foreach_ptr(SubscriptionRelState, rstate, sequence_states_not_ready) + { + LogicalRepWorker *sequencesync_worker; + int nsyncworkers; + + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + Assert(get_rel_relkind(rstate->relid) == RELKIND_SEQUENCE); + + if (rstate->state != SUBREL_STATE_INIT) + continue; + + /* + * Check if there is a sequencesync worker already running? + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + sequencesync_worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, + WORKERTYPE_SEQUENCESYNC, + true); + if (sequencesync_worker) + { + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + break; + } + + /* + * Count running sync workers for this subscription, while we have the + * lock. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + + /* + * If there are free sync worker slot(s), start a new sequencesync + * worker, and break from the loop. + */ + if (nsyncworkers < max_sync_workers_per_subscription) + { + TimestampTz now = GetCurrentTimestamp(); + + if (!MyLogicalRepWorker->sequencesync_failure_time || + TimestampDifferenceExceeds(MyLogicalRepWorker->sequencesync_failure_time, + now, wal_retrieve_retry_interval)) + { + MyLogicalRepWorker->sequencesync_failure_time = 0; + + logicalrep_worker_launch(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + InvalidOid, + DSM_HANDLE_INVALID); + break; + } + } + } + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } +} + +/* + * fetch_remote_sequence_data + * + * Retrieves sequence data (last_value, log_cnt, page_lsn, and is_called) and + * parameters (seqtypid, seqstart, seqincrement, seqmin, seqmax and seqcycle) + * from a remote node. + * + * Output Parameters: + * - log_cnt: The log count of the sequence. + * - is_called: Indicates if the sequence has been called. + * - page_lsn: The log sequence number of the sequence page. + * - last_value: The last value of the sequence. + * + * Returns: + * - TRUE if parameters match for the local and remote sequences. + * - FALSE if parameters differ for the local and remote sequences. + */ +static bool +fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid, + char *nspname, char *relname, int64 *log_cnt, + bool *is_called, XLogRecPtr *page_lsn, + int64 *last_value) +{ +#define REMOTE_SEQ_COL_COUNT 10 + Oid tableRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, BOOLOID, + LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; + + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + bool isnull; + Oid seqtypid; + int64 seqstart; + int64 seqincrement; + int64 seqmin; + int64 seqmax; + bool seqcycle; + bool seq_params_match; + HeapTuple tup; + Form_pg_sequence seqform; + int col = 0; + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT last_value, log_cnt, is_called, page_lsn,\n" + "seqtypid, seqstart, seqincrement, seqmin, seqmax, seqcycle\n" + "FROM pg_sequence_state(%d), pg_sequence WHERE seqrelid = %d", + remoteid, remoteid); + + res = walrcv_exec(conn, cmd.data, REMOTE_SEQ_COL_COUNT, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch sequence info for sequence \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("sequence \"%s.%s\" not found on publisher", + nspname, relname)); + + *last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + *log_cnt = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + *is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + *page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqtypid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqstart = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqincrement = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqmin = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqmax = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqcycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Sanity check */ + Assert(col == REMOTE_SEQ_COL_COUNT); + + /* Get the local sequence */ + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for sequence \"%s.%s\"", + nspname, relname); + + seqform = (Form_pg_sequence) GETSTRUCT(tup); + + seq_params_match = seqform->seqtypid == seqtypid && + seqform->seqmin == seqmin && seqform->seqmax == seqmax && + seqform->seqcycle == seqcycle && + seqform->seqstart == seqstart && + seqform->seqincrement == seqincrement; + + ReleaseSysCache(tup); + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + return seq_params_match; +} + +/* + * Copy existing data of a sequence from publisher. + * + * Fetch the sequence value from the publisher and set the subscriber sequence + * with the same value. Caller is responsible for locking the local + * relation. + * + * The output parameter 'sequence_mismatch' indicates if a local/remote + * sequence parameter mismatch was detected. + */ +static XLogRecPtr +copy_sequence(WalReceiverConn *conn, Relation rel, bool *sequence_mismatch) +{ + StringInfoData cmd; + int64 seq_last_value; + int64 seq_log_cnt; + bool seq_is_called; + XLogRecPtr seq_page_lsn = InvalidXLogRecPtr; + WalRcvExecResult *res; + Oid tableRow[] = {OIDOID, CHAROID}; + TupleTableSlot *slot; + LogicalRepRelId remoteid; /* unique id of the relation */ + char relkind PG_USED_FOR_ASSERTS_ONLY; + bool isnull; + char *nspname = get_namespace_name(RelationGetNamespace(rel)); + char *relname = RelationGetRelationName(rel); + Oid relid = RelationGetRelid(rel); + + Assert(!*sequence_mismatch); + + /* Fetch Oid. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT c.oid, c.relkind\n" + "FROM pg_catalog.pg_class c\n" + "INNER JOIN pg_catalog.pg_namespace n\n" + " ON (c.relnamespace = n.oid)\n" + "WHERE n.nspname = %s AND c.relname = %s", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); + + res = walrcv_exec(conn, cmd.data, + lengthof(tableRow), tableRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequence \"%s.%s\" info could not be fetched from publisher: %s", + nspname, relname, res->err)); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("sequence \"%s.%s\" not found on publisher", + nspname, relname)); + + remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relkind = DatumGetChar(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + Assert(relkind == RELKIND_SEQUENCE); + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + *sequence_mismatch = !fetch_remote_sequence_data(conn, relid, remoteid, + nspname, relname, + &seq_log_cnt, &seq_is_called, + &seq_page_lsn, &seq_last_value); + + /* Update the sequence only if the parameters are identical. */ + if (*sequence_mismatch == false) + SetSequence(RelationGetRelid(rel), seq_last_value, seq_is_called, + seq_log_cnt); + + /* Return the LSN when the sequence state was set. */ + return seq_page_lsn; +} + +/* + * report_mismatched_sequences + * + * Report any sequence mismatches as a single warning log. + */ +static void +report_mismatched_sequences(StringInfo mismatched_seqs) +{ + if (mismatched_seqs->len) + { + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parameters differ for the remote and local sequences (%s) for subscription \"%s\"", + mismatched_seqs->data, MySubscription->name), + errhint("Alter/Re-create local sequences to have the same parameters as the remote sequences.")); + + resetStringInfo(mismatched_seqs); + } +} + +/* + * append_mismatched_sequences + * + * Appends details of sequences that have discrepancies between the publisher + * and subscriber to the mismatched_seqs string. + */ +static void +append_mismatched_sequences(StringInfo mismatched_seqs, Relation seqrel) +{ + if (mismatched_seqs->len) + appendStringInfoString(mismatched_seqs, ", "); + + appendStringInfo(mismatched_seqs, "\"%s.%s\"", + get_namespace_name(RelationGetNamespace(seqrel)), + RelationGetRelationName(seqrel)); +} + +/* + * Start syncing the sequences in the sequencesync worker. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + List *sequences; + List *sequences_not_synced = NIL; + AclResult aclresult; + UserContext ucxt; + bool run_as_owner = false; + int curr_seq = 0; + int seq_count; + int curr_batch_seq = 0; + bool start_txn = true; + bool sequence_sync_error = false; + Oid subid = MyLogicalRepWorker->subid; + MemoryContext oldctx; + StringInfo mismatched_seqs = makeStringInfo(); + StringInfoData app_name; + +/* + * Synchronizing each sequence individually incurs overhead from starting + * and committing a transaction repeatedly. Additionally, we want to avoid + * keeping transactions open for extended periods by setting excessively + * high values. + */ +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + StartTransactionCommand(); + + /* Get the sequences that should be synchronized. */ + sequences = GetSubscriptionRelations(subid, false, true, false); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach_ptr(SubscriptionRelState, seq_state, sequences) + { + SubscriptionRelState *rstate = palloc(sizeof(SubscriptionRelState)); + + memcpy(rstate, seq_state, sizeof(SubscriptionRelState)); + sequences_not_synced = lappend(sequences_not_synced, rstate); + } + MemoryContextSwitchTo(oldctx); + + CommitTransactionCommand(); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + initStringInfo(&app_name); + appendStringInfo(&app_name, "%s_%s", MySubscription->name, "sequencesync worker"); + + /* + * Establish the connection to the publisher for sequence synchronization. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + app_name.data, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err)); + + pfree(app_name.data); + + seq_count = list_length(sequences_not_synced); + foreach_ptr(SubscriptionRelState, seqinfo, sequences_not_synced) + { + Relation sequence_rel; + XLogRecPtr sequence_lsn; + bool sequence_mismatch = false; + + CHECK_FOR_INTERRUPTS(); + + if (start_txn) + { + StartTransactionCommand(); + start_txn = false; + } + + sequence_rel = table_open(seqinfo->relid, RowExclusiveLock); + + /* + * Make sure that the copy command runs as the sequence owner, unless + * the user has opted out of that behaviour. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt); + + /* + * Check that our sequencesync worker has permission to insert into + * the target sequence. + */ + aclresult = pg_class_aclcheck(RelationGetRelid(sequence_rel), GetUserId(), + ACL_INSERT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, + get_relkind_objtype(sequence_rel->rd_rel->relkind), + RelationGetRelationName(sequence_rel)); + + /* + * In case sequence copy fails, throw a warning for the sequences that + * did not match before exiting. + */ + PG_TRY(); + { + sequence_lsn = copy_sequence(LogRepWorkerWalRcvConn, sequence_rel, + &sequence_mismatch); + } + PG_CATCH(); + { + if (sequence_mismatch) + append_mismatched_sequences(mismatched_seqs, sequence_rel); + + report_mismatched_sequences(mismatched_seqs); + PG_RE_THROW(); + } + PG_END_TRY(); + + if (sequence_mismatch) + append_mismatched_sequences(mismatched_seqs, sequence_rel); + else + UpdateSubscriptionRelState(subid, seqinfo->relid, + SUBREL_STATE_READY, sequence_lsn); + + table_close(sequence_rel, NoLock); + + curr_seq++; + curr_batch_seq++; + + /* + * Have we reached the end of the current batch of sequences, or last + * remaining sequences to synchronize? + */ + if (curr_batch_seq == MAX_SEQUENCES_SYNC_PER_BATCH || + curr_seq == seq_count) + { + if (message_level_is_interesting(DEBUG1)) + { + /* LOG all the sequences synchronized during current batch. */ + for (int i = 0; i < curr_batch_seq; i++) + { + SubscriptionRelState *done_seq; + + done_seq = (SubscriptionRelState *) lfirst(list_nth_cell(sequences_not_synced, + (curr_seq - curr_batch_seq) + i)); + + ereport(DEBUG1, + errmsg_internal("logical replication synchronization for subscription \"%s\", sequence \"%s\" has finished", + get_subscription_name(subid, false), + get_rel_name(done_seq->relid))); + } + } + + if (mismatched_seqs->len) + sequence_sync_error = true; + + report_mismatched_sequences(mismatched_seqs); + + ereport(LOG, + errmsg("logical replication synchronized %d of %d sequences for subscription \"%s\" ", + curr_seq, seq_count, get_subscription_name(subid, false))); + + /* Commit this batch, and prepare for next batch. */ + CommitTransactionCommand(); + start_txn = true; + + /* Prepare for next batch */ + curr_batch_seq = 0; + } + } + + /* + * Sequence synchronization failed due to a parameter mismatch. Setting + * the failure time to prevent repeated initiation of the sequencesync + * worker. + */ + if (sequence_sync_error) + { + logicalrep_seqsyncworker_set_failuretime(); + ereport(LOG, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("sequence synchronization failed because the parameters between the publisher and subscriber do not match for all sequences")); + } + + list_free_deep(sequences_not_synced); + if (!run_as_owner && seq_count) + RestoreUserContext(&ucxt); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, false); + + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequencesync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + SyncFinishWorker(WORKERTYPE_SEQUENCESYNC); +} diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index 63174d0cdff..31ca93375a8 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -50,8 +50,10 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE * Exit routine for synchronization worker. */ pg_noreturn void -SyncFinishWorker(void) +SyncFinishWorker(LogicalRepWorkerType wtype) { + Assert(wtype == WORKERTYPE_TABLESYNC || wtype == WORKERTYPE_SEQUENCESYNC); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -66,15 +68,24 @@ SyncFinishWorker(void) XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + if (wtype == WORKERTYPE_TABLESYNC) + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + else + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + /* This is a clean exit, so no need to set a sequence failure time. */ + if (wtype == WORKERTYPE_SEQUENCESYNC) + cancel_before_shmem_exit(logicalrep_seqsyncworker_failuretime, 0); + /* Stop gracefully */ proc_exit(0); } @@ -89,7 +100,9 @@ SyncInvalidateRelationStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Process possible state change(s) of relations that are being synchronized. + * Process possible state change(s) of relations that are being synchronized + * and start new tablesync workers for the newly added tables. Also, start a + * new sequencesync worker for the newly added sequences. */ void SyncProcessRelations(XLogRecPtr current_lsn) @@ -109,7 +122,19 @@ SyncProcessRelations(XLogRecPtr current_lsn) break; case WORKERTYPE_APPLY: + /* + * We need up-to-date sync state info for subscription tables and + * sequences here. + */ + FetchRelationStates(); + ProcessSyncingTablesForApply(current_lsn); + ProcessSyncingSequencesForApply(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + Assert(0); break; case WORKERTYPE_UNKNOWN: @@ -121,17 +146,22 @@ SyncProcessRelations(XLogRecPtr current_lsn) /* * Common code to fetch the up-to-date sync state info into the static lists. * - * Returns true if subscription has 1 or more tables, else false. + * The pg_subscription_rel catalog is shared by tables and sequences. Changes + * to either sequences or tables can affect the validity of relation states, so + * we update both table_states_not_ready and sequence_states_not_ready + * simultaneously to ensure consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * Returns true if subscription has 1 or more tables, else false. */ bool -FetchRelationStates(bool *started_tx) +FetchRelationStates() { + /* + * This is declared as static, since the same value can be used until the + * system table is invalidated. + */ static bool has_subtables = false; - - *started_tx = false; + bool started_tx = false; if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) { @@ -144,16 +174,19 @@ FetchRelationStates(bool *started_tx) /* Clean the old lists. */ list_free_deep(table_states_not_ready); + list_free_deep(sequence_states_not_ready); table_states_not_ready = NIL; + sequence_states_not_ready = NIL; if (!IsTransactionState()) { StartTransactionCommand(); - *started_tx = true; + started_tx = true; } - /* Fetch tables that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); + /* Fetch tables and sequences that are in non-ready state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, true, + false); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); @@ -161,7 +194,11 @@ FetchRelationStates(bool *started_tx) { rstate = palloc(sizeof(SubscriptionRelState)); memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); + + if (get_rel_relkind(rstate->relid) == RELKIND_SEQUENCE) + sequence_states_not_ready = lappend(sequence_states_not_ready, rstate); + else + table_states_not_ready = lappend(table_states_not_ready, rstate); } MemoryContextSwitchTo(oldctx); @@ -186,5 +223,11 @@ FetchRelationStates(bool *started_tx) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + return has_subtables; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index cfe638ae6af..810f38d5f90 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -161,7 +161,7 @@ WaitForRelationStateChange(Oid relid, char expected_state) /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, - false); + WORKERTYPE_TABLESYNC, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) break; @@ -208,7 +208,7 @@ wait_for_worker_state_change(char expected_state) */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - InvalidOid, false); + InvalidOid, WORKERTYPE_APPLY, false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); @@ -334,7 +334,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - SyncFinishWorker(); + SyncFinishWorker(WORKERTYPE_TABLESYNC); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -376,9 +376,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); - /* We need up-to-date sync state info for subscription tables here. */ - FetchRelationStates(&started_tx); - /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need @@ -411,6 +408,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE); + if (rstate->state == SUBREL_STATE_SYNCDONE) { /* @@ -424,11 +429,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -465,8 +465,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, - rstate->relid, false); - + rstate->relid, + WORKERTYPE_TABLESYNC, true); if (syncworker) { /* Found one, update our copy of its state */ @@ -1243,7 +1243,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - SyncFinishWorker(); /* doesn't return */ + SyncFinishWorker(WORKERTYPE_TABLESYNC); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1561,7 +1561,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); @@ -1569,7 +1569,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - SyncFinishWorker(); + SyncFinishWorker(WORKERTYPE_TABLESYNC); } /* @@ -1583,23 +1583,16 @@ TablesyncWorkerMain(Datum main_arg) bool AllTablesyncsReady(void) { - bool started_tx = false; - bool has_subrels = false; + bool has_tables = false; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchRelationStates(&started_tx); - - if (started_tx) - { - CommitTransactionCommand(); - pgstat_report_stat(true); - } + has_tables = FetchRelationStates(); /* * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ - return has_subrels && (table_states_not_ready == NIL); + return has_tables && (table_states_not_ready == NIL); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ce668a0ef54..f67920b1a90 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -487,6 +487,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + Assert(0); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1027,7 +1032,10 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ SyncProcessRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1149,7 +1157,10 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ SyncProcessRelations(prepare_data.end_lsn); /* @@ -1205,7 +1216,10 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ SyncProcessRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1271,7 +1285,10 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ SyncProcessRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1406,7 +1423,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ SyncProcessRelations(prepare_data.end_lsn); /* @@ -2248,7 +2268,10 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel and any + * newly added relations. + */ SyncProcessRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -3725,7 +3748,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) AcceptInvalidationMessages(); maybe_reread_subscription(); - /* Process any table synchronization changes. */ + /* + * Process any tables that are being synchronized in parallel and + * any newly added relations. + */ SyncProcessRelations(last_received); } @@ -4638,8 +4664,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -4718,6 +4744,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_sequencesync_worker()) + ereport(LOG, + (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -4737,14 +4767,17 @@ replorigin_reset(int code, Datum arg) replorigin_session_origin_timestamp = 0; } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync worker and sequencesync + * worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -4789,6 +4822,9 @@ SetupApplyOrSyncWorker(int worker_slot) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, SyncInvalidateRelationStates, (Datum) 0); + + if (am_sequencesync_worker()) + before_shmem_exit(logicalrep_seqsyncworker_failuretime, (Datum) 0); } /* Logical Replication Apply worker entry point */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 989825d3a9c..43e9dfe708a 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3354,7 +3354,7 @@ struct config_int ConfigureNamesInt[] = {"max_sync_workers_per_subscription", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, - gettext_noop("Maximum number of table synchronization workers per subscription."), + gettext_noop("Maximum number of workers per subscription for synchronizing tables and sequences."), NULL, }, &max_sync_workers_per_subscription, diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index 56b6c368acf..5c5a775d40d 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -243,8 +243,8 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading subscriptions"); getSubscriptions(fout); - pg_log_info("reading subscription membership of tables"); - getSubscriptionTables(fout); + pg_log_info("reading subscription membership of relations"); + getSubscriptionRelations(fout); free(inhinfo); /* not needed any longer */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 0bd1387cb61..8c940e81720 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5084,12 +5084,12 @@ getSubscriptions(Archive *fout) } /* - * getSubscriptionTables - * Get information about subscription membership for dumpable tables. This + * getSubscriptionRelations + * Get information about subscription membership for dumpable relations. This * will be used only in binary-upgrade mode for PG17 or later versions. */ void -getSubscriptionTables(Archive *fout) +getSubscriptionRelations(Archive *fout) { DumpOptions *dopt = fout->dopt; SubscriptionInfo *subinfo = NULL; diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 892b53c0184..59a1dfe81be 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -813,6 +813,6 @@ extern void getPublicationNamespaces(Archive *fout); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); extern void getSubscriptions(Archive *fout); -extern void getSubscriptionTables(Archive *fout); +extern void getSubscriptionRelations(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 92a2adf4ced..ac5853ed13c 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2288,7 +2288,7 @@ match_previous_words(int pattern_id, "ADD PUBLICATION", "DROP PUBLICATION"); /* ALTER SUBSCRIPTION REFRESH PUBLICATION */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION")) - COMPLETE_WITH("WITH ("); + COMPLETE_WITH("SEQUENCES", "WITH ("); /* ALTER SUBSCRIPTION REFRESH PUBLICATION WITH ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION", "WITH", "(")) COMPLETE_WITH("copy_data"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 3f03c220c4d..983d71b4f89 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12190,6 +12190,11 @@ proargmodes => '{v,o,o,o,o}', proargnames => '{pubname,pubid,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' }, +{ oid => '8052', descr => 'get OIDs of sequences in a publication', + proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'oid', proargtypes => 'text', + proallargtypes => '{text,oid}', proargmodes => '{i,o}', + proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', proname => 'pg_relation_is_publishable', provolatile => 's', diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index ea869588d84..0c706bd9cd5 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -90,6 +90,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionTables(Oid subid); -extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern List *GetSubscriptionRelations(Oid subid, bool get_tables, + bool get_sequences, + bool all_states); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9ac0b67683d..26e3c9096ae 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -45,6 +45,8 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; /* XLOG stuff */ #define XLOG_SEQ_LOG 0x00 +#define SEQ_LOG_CNT_INVALID 0 + typedef struct xl_seq_rec { RelFileLocator locator; @@ -60,6 +62,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid relid, int64 next, bool is_called, int64 log_cnt); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index bc8ad978369..172b2b96500 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4314,7 +4314,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, - ALTER_SUBSCRIPTION_REFRESH, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_SKIP, } AlterSubscriptionType; diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d..56fa79b648e 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a43a9b192bd..1e6e4088e28 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -92,6 +93,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz sequencesync_failure_time; } LogicalRepWorker; /* @@ -238,9 +241,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern PGDLLIMPORT List *sequence_states_not_ready; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, + LogicalRepWorkerType wtype, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock); @@ -248,24 +253,29 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm); -extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_stop(Oid subid, Oid relid, + LogicalRepWorkerType wtype); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); -pg_noreturn extern void SyncFinishWorker(void); +pg_noreturn extern void SyncFinishWorker(LogicalRepWorkerType wtype); extern int logicalrep_sync_worker_count(Oid subid); +extern void logicalrep_seqsyncworker_set_failuretime(void); +extern void logicalrep_seqsyncworker_failuretime(int code, Datum arg); + extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); extern bool AllTablesyncsReady(void); extern void UpdateTwoPhaseState(Oid suboid, char new_state); -extern bool FetchRelationStates(bool *started_tx); +extern bool FetchRelationStates(void); extern bool WaitForRelationStateChange(Oid relid, char expected_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSyncingSequencesForApply(void); extern void SyncProcessRelations(XLogRecPtr current_lsn); extern void SyncInvalidateRelationStates(Datum arg, int cacheid, uint32 hashvalue); @@ -333,15 +343,25 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +#define isApplyWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_APPLY) #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 47478969135..d221f65b7af 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1442,6 +1442,14 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_publication_sequences| SELECT p.pubname, + n.nspname AS schemaname, + c.relname AS sequencename + FROM pg_publication p, + LATERAL pg_get_publication_sequences((p.pubname)::text) gps(relid), + (pg_class c + JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE (c.oid = gps.relid); pg_publication_tables| SELECT p.pubname, n.nspname AS schemaname, c.relname AS tablename, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 1443e1d9292..66dcd71eefa 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -107,7 +107,7 @@ HINT: To initiate replication, you must manually create the replication slot, e ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; -ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions -- fail - origin must be either none or any CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); ERROR: unrecognized origin value: "foo" @@ -352,7 +352,7 @@ ERROR: ALTER SUBSCRIPTION with refresh cannot run inside a transaction block END; BEGIN; ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION; -ERROR: ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block +ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION cannot run inside a transaction block END; CREATE FUNCTION func() RETURNS VOID AS $$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL; diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 586ffba434e..a6c267a8a2c 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -42,6 +42,7 @@ tests += { 't/033_run_as_table_owner.pl', 't/034_temporal.pl', 't/035_conflicts.pl', + 't/036_sequences.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl new file mode 100644 index 00000000000..94466a4f83f --- /dev/null +++ b/src/test/subscription/t/036_sequences.pl @@ -0,0 +1,215 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# This tests that sequences are synced correctly to the subscriber +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); + +# Avoid checkpoint during the test, otherwise, extra values will be fetched for +# the sequences which will cause the test to fail randomly. +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h'); +$node_publisher->start; + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Setup structure on the publisher +my $ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; +); +$node_publisher->safe_psql('postgres', $ddl); + +# Setup the same structure on the subscriber, plus some extra sequences that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; + CREATE SEQUENCE regress_s2; + CREATE SEQUENCE regress_s3; + CREATE SEQUENCE regress_s4 +); +$node_subscriber->safe_psql('postgres', $ddl); + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- generate a number of values using the sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Setup logical replication pub/sub +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_seq_pub FOR ALL SEQUENCES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub" +); + +# Wait for initial sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|32|t', 'initial test data replicated'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new +# sequences of the publisher, but changes to existing sequences should +# not be synced. +########## + +# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s2; + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequence is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION does not sync existing sequence'); + +# Check - newly published sequence is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION will sync newly published sequence'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES should cause sync of +# new sequences of the publisher, and changes to existing sequences should +# also be synced. +########## + +# Create a new sequence 'regress_s3', and update the existing sequence +# 'regress_s2'. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s3; + INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequences are synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|31|t', + 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '200|31|t', + 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); + +# Check - newly published sequence is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION SEQUENCES will sync newly published sequence'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = off) should +# not update the sequence values for the new sequence. +########## + +# Create a new sequence 'regress_s4' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4; + INSERT INTO regress_seq_test SELECT nextval('regress_s4') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION with (copy_data = false); +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - newly published sequence values are not updated +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s4; +)); +is($result, '1|0|f', + 'REFRESH PUBLICATION will sync newly published sequence'); + +########## +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES should throw a warning +# for sequence definition not matching between the publisher and the subscriber. +########## + +# Create a new sequence 'regress_s5' whose START value is not the same in the +# publisher and subscriber. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s5 START 1 INCREMENT 2; +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s5 START 10 INCREMENT 2; +)); + +my $log_offset = -s $node_subscriber->logfile; + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES" +); + +# Confirm that the warning for parameters differing is logged. +$node_subscriber->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? parameters differ for the remote and local sequences \("public.regress_s5"\) for subscription "regress_seq_sub"/, + $log_offset); +done_testing(); -- 2.43.0