From 130f4be8fc6f2523f57eaab63216a239db68443c Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 24 Jul 2024 11:24:57 +0530 Subject: [PATCH v20240725 3/3] Enhance sequence synchronization during subscription management This commit introduces sequence synchronization: 1) During subscription creation: - The subscriber retrieves sequences associated with publications. - Sequences are added in 'init' state to pg_subscription_rel table. - A new sequence synchronization worker handles synchronization in batches of 100 sequences: a) Retrieves sequence values using pg_sequence_state from the publisher. b) Sets sequence values accordingly. c) Updates sequence state to 'READY'. d) Commits batches of 100 synchronized sequences. 2) Refreshing sequences: - Refreshing sequences occurs with ALTER SUBSCRIPTION ... REFRESH PUBLICATION (no syntax change). - Stale sequences are removed from pg_subscription_rel. - Newly added sequences in the publisher are added in 'init' state to pg_subscription_rel. - Initiates sequence synchronization for all sequences by sequence sync worker as listed in subscription creation process. - Sequence synchronization occurs for newly added sequences only. 3) Introduce new command for refreshing all sequences: - ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES. - Removes stale sequences and adds newly added sequences from the publisher to pg_subscription_rel. - Resets all sequences in pg_subscription_rel to 'init' state. - Initiates sequence synchronization for all sequences by sequence sync worker as listed in subscription creation process. --- doc/src/sgml/catalogs.sgml | 13 +- doc/src/sgml/config.sgml | 4 +- doc/src/sgml/logical-replication.sgml | 4 +- doc/src/sgml/monitoring.sgml | 5 +- doc/src/sgml/ref/alter_subscription.sgml | 39 +- doc/src/sgml/system-views.sgml | 67 ++++ src/backend/catalog/pg_publication.c | 46 +++ src/backend/catalog/pg_subscription.c | 35 +- src/backend/catalog/system_views.sql | 10 + src/backend/commands/sequence.c | 4 +- src/backend/commands/subscriptioncmds.c | 204 ++++++++-- src/backend/executor/execReplication.c | 4 +- src/backend/parser/gram.y | 11 +- src/backend/postmaster/bgworker.c | 5 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 100 ++++- src/backend/replication/logical/meson.build | 1 + .../replication/logical/sequencesync.c | 356 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 163 +++++++- src/backend/replication/logical/worker.c | 23 +- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_proc.dat | 5 + src/include/catalog/pg_subscription.h | 6 + src/include/catalog/pg_subscription_rel.h | 10 +- src/include/commands/sequence.h | 2 +- src/include/nodes/parsenodes.h | 3 +- src/include/replication/logicalworker.h | 3 +- src/include/replication/worker_internal.h | 23 +- src/test/regress/expected/rules.out | 8 + src/test/subscription/t/034_sequences.pl | 154 ++++++++ src/tools/pgindent/typedefs.list | 2 + 31 files changed, 1218 insertions(+), 95 deletions(-) create mode 100644 src/backend/replication/logical/sequencesync.c create mode 100644 src/test/subscription/t/034_sequences.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index b654fae1b2..19d04b107e 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8103,14 +8103,15 @@ SCRAM-SHA-256$<iteration count>:&l The catalog pg_subscription_rel contains the - state for each replicated relation in each subscription. This is a - many-to-many mapping. + state for each replicated tables and sequences in each subscription. This + is a many-to-many mapping. - This catalog only contains tables known to the subscription after running - either CREATE SUBSCRIPTION or - ALTER SUBSCRIPTION ... REFRESH + This catalog only contains tables and sequences known to the subscription + after running either + CREATE SUBSCRIPTION + or ALTER SUBSCRIPTION ... REFRESH PUBLICATION. @@ -8145,7 +8146,7 @@ SCRAM-SHA-256$<iteration count>:&l (references pg_class.oid) - Reference to relation + Reference to table or sequence diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 3dec0b7cfe..2bb4660336 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5206,8 +5206,8 @@ ANY num_sync ( max_logical_replication_workers must be set to at least the number of subscriptions (for leader apply - workers), plus some reserve for the table synchronization workers and - parallel apply workers. + workers), plus some reserve for the parallel apply workers, table synchronization workers, and a sequence + synchronization worker. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 55417a6fa9..5fbb0c9c45 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2017,8 +2017,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage Type of the subscription worker process. Possible types are - apply, parallel apply, and - table synchronization. + apply, parallel apply, + table synchronization, and + sequence synchronization. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6af6d0d2c8..d3029b9e91 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -26,6 +26,7 @@ ALTER SUBSCRIPTION name SET PUBLICA ALTER SUBSCRIPTION name ADD PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name DROP PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ] +ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES ALTER SUBSCRIPTION name ENABLE ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) @@ -153,30 +154,45 @@ ALTER SUBSCRIPTION name RENAME TO < REFRESH PUBLICATION - Fetch missing table information from publisher. This will start + Fetch missing table information from the publisher. This will start replication of tables that were added to the subscribed-to publications since CREATE SUBSCRIPTION or the last invocation of REFRESH PUBLICATION. + + Also, fetch missing sequence information from the publisher. + + + + The system catalog pg_subscription_rel + is updated to record all tables and sequences known to the subscription, + that are still part of the publication. + + refresh_option specifies additional options for the - refresh operation. The supported options are: + refresh operation. The only supported option is: copy_data (boolean) - Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is true. + Specifies whether to copy pre-existing data for tables and synchronize + sequences in the publications that are being subscribed to when the replication + starts. The default is true. Previously subscribed tables are not copied, even if a table's row filter WHERE clause has since been modified. + + Previously subscribed sequences are not re-synchronized. To do that, + see + ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES + See for details of how copy_data = true can interact with the @@ -195,6 +211,19 @@ ALTER SUBSCRIPTION name RENAME TO < + + REFRESH PUBLICATION SEQUENCES + + + Fetch missing sequence information from the publisher, then re-synchronize + sequence data with the publisher. Unlike + ALTER SUBSCRIPTION ... REFRESH PUBLICATION which + only synchronizes newly added sequences, REFRESH PUBLICATION SEQUENCES + will re-synchronize the sequence data for all subscribed sequences. + + + + ENABLE diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index bdc34cf94e..b893fc2d90 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -126,6 +126,11 @@ prepared transactions + + pg_publication_sequences + publications and information of their associated sequences + + pg_publication_tables publications and information of their associated tables @@ -2147,6 +2152,68 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx + + <structname>pg_publication_sequences</structname> + + + pg_publication_sequences + + + + The view pg_publication_sequences provides + information about the mapping between publications and information of + sequences they contain. + + + + <structname>pg_publication_sequences</structname> Columns + + + + + Column Type + + + Description + + + + + + + + pubname name + (references pg_publication.pubname) + + + Name of publication + + + + + + schemaname name + (references pg_namespace.nspname) + + + Name of schema containing sequence + + + + + + sequencename name + (references pg_class.relname) + + + Name of sequence + + + + +
+
+ <structname>pg_publication_tables</structname> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index acfac67f8c..980e5574a4 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1292,3 +1292,49 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Returns Oids of sequences in a publication. + */ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Publication *publication; + List *sequences = NIL; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + publication = GetPublicationByName(pubname, false); + + if (publication->allsequences) + sequences = GetAllSequencesPublicationRelations(); + + funcctx->user_fctx = (void *) sequences; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + sequences = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(sequences)) + { + Oid relid = list_nth_oid(sequences, funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); + } + + SRF_RETURN_DONE(funcctx); +} diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..5610c0749c 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -27,6 +27,7 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/memutils.h" #include "utils/lsyscache.h" #include "utils/pg_lsn.h" #include "utils/rel.h" @@ -493,12 +494,19 @@ HasSubscriptionRelations(Oid subid) /* * Get the relations for the subscription. * - * If not_ready is true, return only the relations that are not in a ready - * state, otherwise return all the relations of the subscription. The - * returned list is palloc'ed in the current memory context. + * If rel_type is SUB_REL_KIND_SEQUENCE, get only the sequences. If rel_type is + * SUB_REL_KIND_TABLE, get only the tables. If rel_type is SUB_REL_KIND_ALL, + * get both tables and sequences. + * If not_all_relations is true for SUB_REL_KIND_TABLE and SUB_REL_KIND_ALL, + * return only the relations that are not in a ready state, otherwise return all + * the relations of the subscription. If not_all_relations is true for + * SUB_REL_KIND_SEQUENCE, return only the sequences that are in init state, + * otherwise return all the sequences of the subscription. + * The returned list is palloc'ed in the current memory context. */ List * -GetSubscriptionRelations(Oid subid, bool not_ready) +GetSubscriptionRelations(Oid subid, SubscriptionRelKind rel_type, + bool not_all_relations) { List *res = NIL; Relation rel; @@ -514,11 +522,18 @@ GetSubscriptionRelations(Oid subid, bool not_ready) BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(subid)); - if (not_ready) + /* Get the relations that are not in ready state */ + if (rel_type != SUB_REL_KIND_SEQUENCE && not_all_relations) ScanKeyInit(&skey[nkeys++], Anum_pg_subscription_rel_srsubstate, BTEqualStrategyNumber, F_CHARNE, CharGetDatum(SUBREL_STATE_READY)); + /* Get the sequences that are in init state */ + else if (rel_type == SUB_REL_KIND_SEQUENCE && not_all_relations) + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(SUBREL_STATE_INIT)); scan = systable_beginscan(rel, InvalidOid, false, NULL, nkeys, skey); @@ -529,8 +544,18 @@ GetSubscriptionRelations(Oid subid, bool not_ready) SubscriptionRelState *relstate; Datum d; bool isnull; + char subreltype; subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + subreltype = get_rel_relkind(subrel->srrelid); + + /* If only tables were requested, skip the sequences */ + if (rel_type == SUB_REL_KIND_TABLE && subreltype == RELKIND_SEQUENCE) + continue; + + /* If only sequences were requested, skip the tables */ + if (rel_type == SUB_REL_KIND_SEQUENCE && subreltype != RELKIND_SEQUENCE) + continue; relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); relstate->relid = subrel->srrelid; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 19cabc9a47..a6475af855 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename + FROM pg_publication P, + LATERAL pg_get_publication_sequences(P.pubname) GPS, + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) + WHERE C.oid = GPS.relid; + CREATE VIEW pg_locks AS SELECT * FROM pg_lock_status() AS L; diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index bff990afa7..a3e7c791b2 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -342,7 +342,7 @@ ResetSequence(Oid seq_relid) * logical replication. */ void -SetSequenceLastValue(Oid seq_relid, int64 new_last_value) +SetSequenceLastValue(Oid seq_relid, int64 new_last_value, int64 log_cnt) { SeqTable elm; Relation seqrel; @@ -370,7 +370,7 @@ SetSequenceLastValue(Oid seq_relid, int64 new_last_value) seq->last_value = new_last_value; seq->is_called = true; - seq->log_cnt = 0; + seq->log_cnt = log_cnt; MarkBufferDirty(buf); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d124bfe55c..3318598f08 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -103,6 +103,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, @@ -751,6 +752,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + List *sequences; + check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); @@ -781,6 +784,22 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, InvalidXLogRecPtr, true); } + /* Add the sequences in init state */ + sequences = fetch_sequence_list(wrconn, publications); + foreach_ptr(RangeVar, rv, sequences) + { + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr, true); + } + /* * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to @@ -847,12 +866,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, return myself; } +/* + * Update the subscription to refresh both the publication and the publication + * objects associated with the subscription. + * + * If the copy_data parameter is true, the function will set the state + * to "init"; otherwise, it will set the state to "ready". When the + * validate_publications is provided with a publication list, the function + * checks that the specified publications exist on the publisher. If + * refresh_all_sequences is true, it will mark all sequences with "init" state + * for re-synchronization; otherwise, only the newly added relations and + * sequences will be updated based on the copy_data parameter. + */ static void AlterSubscription_refresh(Subscription *sub, bool copy_data, - List *validate_publications) + List *validate_publications, + bool refresh_all_sequences) { char *err; - List *pubrel_names; + List *pubrel_names = NIL; List *subrel_states; Oid *subrel_local_oids; Oid *pubrel_local_oids; @@ -885,14 +917,21 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, PG_TRY(); { + SubscriptionRelKind reltype = refresh_all_sequences ? + SUB_REL_KIND_SEQUENCE : SUB_REL_KIND_ALL; + if (validate_publications) check_publications(wrconn, validate_publications); - /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + if (reltype == SUB_REL_KIND_ALL) + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); + + /* Get the sequence list from publisher. */ + pubrel_names = list_concat(pubrel_names, fetch_sequence_list(wrconn, sub->publications)); /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false); + subrel_states = GetSubscriptionRelations(sub->oid, reltype, false); subrel_count = list_length(subrel_states); /* @@ -911,9 +950,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); - check_publications_origin(wrconn, sub->publications, copy_data, - sub->origin, subrel_local_oids, - subrel_count, sub->name); + if (!refresh_all_sequences) + check_publications_origin(wrconn, sub->publications, copy_data, + sub->origin, subrel_local_oids, + subrel_count, sub->name); /* * Rels that we want to remove from subscription and drop any slots @@ -973,6 +1013,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { char state; XLogRecPtr statelsn; + char relkind = get_rel_relkind(relid); /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -994,30 +1035,37 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Last known rel state. */ state = GetSubscriptionRelState(sub->oid, relid, &statelsn); + RemoveSubscriptionRel(sub->oid, relid); + sub_remove_rels[remove_rel_len].relid = relid; sub_remove_rels[remove_rel_len++].state = state; - RemoveSubscriptionRel(sub->oid, relid); - - logicalrep_worker_stop(sub->oid, relid); + /* + * Since one sequence sync workers synchronizes all the + * sequences, stop the worker only if relation kind is not + * sequence. + */ + if (relkind != RELKIND_SEQUENCE) + logicalrep_worker_stop(sub->oid, relid); /* * For READY state, we would have already dropped the * tablesync origin. */ - if (state != SUBREL_STATE_READY) + if (state != SUBREL_STATE_READY && relkind != RELKIND_SEQUENCE) { char originname[NAMEDATALEN]; /* * Drop the tablesync's origin tracking if exists. * - * It is possible that the origin is not yet created for - * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or - * apply worker can also concurrently try to drop the - * origin and by this time the origin might be already - * removed. For these reasons, passing missing_ok = true. + * It is possible that the origin is not yet created + * for tablesync worker, this can happen for the + * states before SUBREL_STATE_FINISHEDCOPY. The + * tablesync worker or apply worker can also + * concurrently try to drop the origin and by this + * time the origin might be already removed. For these + * reasons, passing missing_ok = true. */ ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, sizeof(originname)); @@ -1025,10 +1073,25 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", + (errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name, + get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table"))); + } + /* + * In case of REFRESH PUBLICATION SEQUENCES, the existing sequences + * should be re-synchronized. + */ + else if (refresh_all_sequences) + { + ereport(DEBUG1, + (errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", get_namespace_name(get_rel_namespace(relid)), get_rel_name(relid), sub->name))); + UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, + InvalidXLogRecPtr); } } @@ -1039,6 +1102,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, */ for (off = 0; off < remove_rel_len; off++) { + if (get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE) + continue; + if (sub_remove_rels[off].state != SUBREL_STATE_READY && sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) { @@ -1424,8 +1490,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1439,7 +1505,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = stmt->publication; AlterSubscription_refresh(sub, opts.copy_data, - stmt->publication); + stmt->publication, false); } break; @@ -1479,8 +1545,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)"))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1498,13 +1564,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = publist; AlterSubscription_refresh(sub, opts.copy_data, - validate_publications); + validate_publications, false); } break; } - case ALTER_SUBSCRIPTION_REFRESH: + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES: + { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions"))); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); + + AlterSubscription_refresh(sub, true, NULL, true); + + break; + } + + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION: { if (!sub->enabled) ereport(ERROR, @@ -1539,7 +1619,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data, NULL); + AlterSubscription_refresh(sub, opts.copy_data, NULL, false); break; } @@ -1804,7 +1884,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. */ - rstates = GetSubscriptionRelations(subid, true); + rstates = GetSubscriptionRelations(subid, SUB_REL_KIND_TABLE, true); foreach(lc, rstates) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -2162,11 +2242,15 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, for (i = 0; i < subrel_count; i++) { Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + if (get_rel_relkind(relid) != RELKIND_SEQUENCE) + { + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } res = walrcv_exec(wrconn, cmd.data, 1, tableRow); @@ -2336,6 +2420,62 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } +/* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + List *seqlist = NIL; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of sequences from the publisher: %s", + res->err))); + + /* Process sequences. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + seqlist = lappend(seqlist, rv); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return seqlist; +} + /* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd577..fdf69e4f28 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -739,7 +739,9 @@ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && + relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 585f61e414..3f66ddd616 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10835,11 +10835,20 @@ AlterSubscriptionStmt: AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_REFRESH; + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION; n->subname = $3; n->options = $6; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES; + n->subname = $3; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 77707bb384..f8dd93a83a 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,7 +131,10 @@ static const struct "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, { - "TablesyncWorkerMain", TablesyncWorkerMain + "TableSyncWorkerMain", TableSyncWorkerMain + }, + { + "SequenceSyncWorkerMain", SequenceSyncWorkerMain } }; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index ba03eeff1c..7621fa8aed 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ tablesync.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index c566d50a07..76b934234b 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -237,7 +237,8 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * Walks the workers array and searches for one that matches given * subscription id and relid. * - * We are only interested in the leader apply worker or table sync worker. + * We are only interested in the leader apply worker, table sync worker and + * sequence sync worker. */ LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running) @@ -267,6 +268,38 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) return res; } +/* + * Walks the workers array and searches for one that matches given + * subscription id. + * + * We are only interested in the sequence sync worker. + */ +LogicalRepWorker * +logicalrep_sequence_sync_worker_find(Oid subid, bool only_running) +{ + LogicalRepWorker *res = NULL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (int i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* Skip non sequence sync workers. */ + if (!isSequenceSyncWorker(w)) + continue; + + if (w->in_use && w->subid == subid && (only_running && w->proc)) + { + res = w; + break; + } + } + + return res; +} + /* * Similar to logicalrep_worker_find(), but returns a list of all workers for * the subscription, instead of just one. @@ -297,6 +330,26 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) return res; } +/* + * Return the pid of the apply worker for one that matches given + * subscription id. + */ +static LogicalRepWorker * +logicalrep_apply_worker_find(Oid subid, bool only_running) +{ + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + for (int i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (isApplyWorker(w) && w->subid == subid && only_running && w->proc) + return w; + } + + return NULL; +} + /* * Start new logical replication background worker, if possible. * @@ -317,6 +370,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -324,11 +378,15 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * - must be valid worker type * - tablesync workers are only ones to have relid * - parallel apply worker is the only kind of subworker + * - sequencesync workers will not have relid */ Assert(wtype != WORKERTYPE_UNKNOWN); Assert(is_tablesync_worker == OidIsValid(relid)); Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); + if (is_sequencesync_worker) + Assert(!OidIsValid(relid)); + ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", subname))); @@ -402,7 +460,8 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -489,7 +548,7 @@ retry: break; case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -497,6 +556,14 @@ retry: snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "unknown worker type"); @@ -815,6 +882,26 @@ logicalrep_launcher_onexit(int code, Datum arg) LogicalRepCtx->launcher_pid = 0; } +/* + * Update the failure time for the sequence sync worker in the subscription's + * apply worker. + * + * This function is invoked when the sequence sync worker exits due to a + * failure. + */ +void +logicalrep_seqsyncworker_failuretime(int code, Datum arg) +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_apply_worker_find(MyLogicalRepWorker->subid, true); + if (worker) + worker->sequencesync_failure_time = GetCurrentTimestamp(); + + LWLockRelease(LogicalRepWorkerLock); +} + /* * Cleanup function. * @@ -863,7 +950,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (isTableSyncWorker(w) && w->subid == subid) res++; } @@ -1314,7 +1401,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1357,6 +1444,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "unknown worker type"); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 3dec36a6de..1711fc3248 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'tablesync.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 0000000000..d8b007f9e9 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,356 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: initial sequence synchronization + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/catcache.h" +#include "utils/lsyscache.h" +#include "utils/rls.h" +#include "utils/usercontext.h" + +/* + * fetch_remote_sequence_data + * + * Fetch sequence data (current state) from the remote node, including + * the latest sequence value from the publisher and the Page LSN for the + * sequence. + */ +static int64 +fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, + int64 *log_cnt, XLogRecPtr *lsn) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[3] = {INT8OID, INT8OID, LSNOID}; + int64 value = (Datum) 0; + + initStringInfo(&cmd); + + appendStringInfo(&cmd, "SELECT last_value, log_cnt, page_lsn " + "FROM pg_sequence_state(%d)", remoteid); + + res = walrcv_exec(conn, cmd.data, 3, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive sequence list from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *lsn = DatumGetInt64(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return value; +} + +/* + * Copy existing data of a sequence from publisher. + * + * Fetch the sequence value from the publisher and set the subscriber sequence + * withe the retreived value. Caller is responsible for locking the local + * relation. + */ +static XLogRecPtr +copy_sequence(WalReceiverConn *conn, Relation rel) +{ + StringInfoData cmd; + int64 sequence_value; + int64 log_cnt; + XLogRecPtr lsn = InvalidXLogRecPtr; + WalRcvExecResult *res; + Oid tableRow[] = {OIDOID, CHAROID}; + TupleTableSlot *slot; + LogicalRepRelId remoteid; /* unique id of the relation */ + char relkind PG_USED_FOR_ASSERTS_ONLY; + bool isnull; + char *nspname = get_namespace_name(RelationGetNamespace(rel)); + char *relname = RelationGetRelationName(rel); + + /* Fetch Oid. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT c.oid, c.relkind" + " FROM pg_catalog.pg_class c" + " INNER JOIN pg_catalog.pg_namespace n" + " ON (c.relnamespace = n.oid)" + " WHERE n.nspname = %s" + " AND c.relname = %s", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); + res = walrcv_exec(conn, cmd.data, + lengthof(tableRow), tableRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequence \"%s.%s\" info could not be fetched from publisher: %s", + nspname, relname, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("sequence \"%s.%s\" not found on publisher", + nspname, relname))); + + remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relkind = DatumGetChar(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + Assert(relkind == RELKIND_SEQUENCE); + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + sequence_value = fetch_remote_sequence_data(conn, remoteid, &log_cnt, &lsn); + + SetSequenceLastValue(RelationGetRelid(rel), sequence_value, log_cnt); + + /* return the LSN when the sequence state was set */ + return lsn; +} + +/* + * Start syncing the sequences in the sync worker. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + List *sequences; + List *sequences_not_synced = NIL; + char slotname[NAMEDATALEN]; + AclResult aclresult; + UserContext ucxt; + bool run_as_owner = false; + int curr_seq = 0; + int seq_count; + bool start_txn = true; + Oid subid = MyLogicalRepWorker->subid; + MemoryContext oldctx; + +/* + * Synchronizing each sequence individually incurs overhead from starting + * and committing a transaction repeatedly. Additionally, we want to avoid + * keeping transactions open for extended periods by setting excessively + * high values. + */ +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + /* Get the sequences that should be synchronized. */ + StartTransactionCommand(); + sequences = GetSubscriptionRelations(subid, SUB_REL_KIND_SEQUENCE, true); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach_ptr(SubscriptionRelState, seq_state, sequences) + { + SubscriptionRelState *rstate = palloc(sizeof(SubscriptionRelState)); + + memcpy(rstate, seq_state, sizeof(SubscriptionRelState)); + sequences_not_synced = lappend(sequences_not_synced, rstate); + } + MemoryContextSwitchTo(oldctx); + + + CommitTransactionCommand(); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + snprintf(slotname, NAMEDATALEN, "pg_%u_sync_sequences_" UINT64_FORMAT, + subid, GetSystemIdentifier()); + + /* + * Here we use the slot name instead of the subscription name as the + * application_name, so that it is different from the leader apply worker, + * so that synchronous replication can distinguish them. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + seq_count = list_length(sequences_not_synced); + foreach_ptr(SubscriptionRelState, seqinfo, sequences_not_synced) + { + Relation sequence_rel; + XLogRecPtr sequence_lsn; + + CHECK_FOR_INTERRUPTS(); + + if (start_txn) + { + StartTransactionCommand(); + start_txn = false; + } + + sequence_rel = table_open(seqinfo->relid, RowExclusiveLock); + + /* + * Make sure that the copy command runs as the sequence owner, unless + * the user has opted out of that behaviour. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt); + + /* + * Check that our sequence sync worker has permission to insert into + * the target sequence. + */ + aclresult = pg_class_aclcheck(RelationGetRelid(sequence_rel), GetUserId(), + ACL_INSERT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, + get_relkind_objtype(sequence_rel->rd_rel->relkind), + RelationGetRelationName(sequence_rel)); + + /* + * COPY FROM does not honor RLS policies. That is not a problem for + * subscriptions owned by roles with BYPASSRLS privilege (or + * superuser, who has it implicitly), but other roles should not be + * able to circumvent RLS. Disallow logical replication into RLS + * enabled relations for such roles. + */ + if (check_enable_rls(RelationGetRelid(sequence_rel), InvalidOid, false) == RLS_ENABLED) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("user \"%s\" cannot replicate into sequence with row-level security enabled: \"%s\"", + GetUserNameFromId(GetUserId(), true), + RelationGetRelationName(sequence_rel))); + + sequence_lsn = copy_sequence(LogRepWorkerWalRcvConn, sequence_rel); + + UpdateSubscriptionRelState(subid, seqinfo->relid, SUBREL_STATE_READY, + sequence_lsn); + + table_close(sequence_rel, NoLock); + + /* + * Verify whether the current batch of sequences is synchronized or if + * there are no remaining sequences to synchronize. + */ + if ((((curr_seq + 1) % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || + (curr_seq + 1) == seq_count) + { + /* Obtain the starting index of the current batch. */ + int i = curr_seq - (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH); + + /* LOG all the sequences synchronized during current batch. */ + for (; i <= curr_seq; i++) + { + SubscriptionRelState *done_seq; + + done_seq = (SubscriptionRelState *) lfirst(list_nth_cell(sequences_not_synced, i)); + ereport(LOG, + errmsg("logical replication synchronization for subscription \"%s\", sequence \"%s\" has finished", + get_subscription_name(subid, false), get_rel_name(done_seq->relid))); + } + + CommitTransactionCommand(); + start_txn = true; + } + + curr_seq++; + } + + list_free_deep(sequences_not_synced); + if (!run_as_owner && seq_count) + RestoreUserContext(&ucxt); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, false); + + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequence sync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + finish_sync_worker(WORKERTYPE_SEQUENCESYNC); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e03e761392..313e5eb357 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -132,16 +132,16 @@ typedef enum static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; static List *table_states_not_ready = NIL; -static bool FetchTableStates(bool *started_tx); +static bool FetchTableStates(bool *started_tx, SubscriptionRelKind rel_type); static StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. */ -static void +void pg_attribute_noreturn() -finish_sync_worker(void) +finish_sync_worker(LogicalRepWorkerType wtype) { /* * Commit any outstanding transaction. This is the usual case, unless @@ -157,15 +157,24 @@ finish_sync_worker(void) XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + if (wtype == WORKERTYPE_TABLESYNC) + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + else + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + /* No need to set the sequence failure time when it is a clean exit */ + if (wtype == WORKERTYPE_SEQUENCESYNC) + cancel_before_shmem_exit(logicalrep_seqsyncworker_failuretime, 0); + /* Stop gracefully */ proc_exit(0); } @@ -387,7 +396,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + finish_sync_worker(WORKERTYPE_TABLESYNC); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -430,7 +439,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchTableStates(&started_tx); + FetchTableStates(&started_tx, SUB_REL_KIND_ALL); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -463,6 +472,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + char relkind; + + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + relkind = get_rel_relkind(rstate->relid); + if (relkind == RELKIND_SEQUENCE) + continue; if (rstate->state == SUBREL_STATE_SYNCDONE) { @@ -477,11 +497,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -660,6 +675,108 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } +/* + * Handle sequence synchronization cooperation from the apply worker. + * + * Walk over all subscription sequences that are individually tracked by the + * apply process (currently, all that have state SUBREL_STATE_INIT) and manage + * synchronization for them. + * + * If there is a sequence synchronization worker running already, no need to + * start a new one; the existing sequence sync worker will synchronize all the + * sequences. If there are still any sequences to be synced after the sequence + * sync worker exited, then a new sequence sync worker can be started in the + * next iteration. To prevent starting the sequence sync worker at a high + * frequency after a failure, we store its last failure time. We start the sync + * worker for the same relation after waiting at least + * wal_retrieve_retry_interval. + */ +static void +process_syncing_sequences_for_apply() +{ + bool started_tx = false; + + Assert(!IsTransactionState()); + + /* We need up-to-date sync state info for subscription sequences here. */ + FetchTableStates(&started_tx, SUB_REL_KIND_ALL); + + /* + * Start sequence sync worker if there is not one already. + */ + foreach_ptr(SubscriptionRelState, rstate, table_states_not_ready) + { + LogicalRepWorker *syncworker; + char relkind; + + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + relkind = get_rel_relkind(rstate->relid); + if (relkind != RELKIND_SEQUENCE || rstate->state != SUBREL_STATE_INIT) + continue; + + /* + * Check if there is a sequence worker already running? + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + syncworker = logicalrep_sequence_sync_worker_find(MyLogicalRepWorker->subid, + true); + if (syncworker) + { + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + break; + } + else + { + /* + * Count running sync workers for this subscription, while we have + * the lock. + */ + int nsyncworkers = + logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + + /* + * If there are free sync worker slot(s), start a new sequence + * sync worker, and break from the loop. + */ + if (nsyncworkers < max_sync_workers_per_subscription) + { + TimestampTz now = GetCurrentTimestamp(); + + if (!MyLogicalRepWorker->sequencesync_failure_time || + TimestampDifferenceExceeds(MyLogicalRepWorker->sequencesync_failure_time, + now, wal_retrieve_retry_interval)) + { + MyLogicalRepWorker->sequencesync_failure_time = 0; + logicalrep_worker_launch(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + InvalidOid, + DSM_HANDLE_INVALID); + break; + } + } + } + } + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } +} + /* * Process possible state change(s) of tables that are being synchronized. */ @@ -683,6 +800,12 @@ process_syncing_tables(XLogRecPtr current_lsn) case WORKERTYPE_APPLY: process_syncing_tables_for_apply(current_lsn); + process_syncing_sequences_for_apply(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + Assert(0); break; case WORKERTYPE_UNKNOWN: @@ -1320,7 +1443,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + finish_sync_worker(WORKERTYPE_TABLESYNC); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1570,7 +1693,7 @@ copy_table_done: * then it is the caller's responsibility to commit it. */ static bool -FetchTableStates(bool *started_tx) +FetchTableStates(bool *started_tx, SubscriptionRelKind rel_type) { static bool has_subrels = false; @@ -1596,7 +1719,7 @@ FetchTableStates(bool *started_tx) } /* Fetch all non-ready tables. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); + rstates = GetSubscriptionRelations(MySubscription->oid, rel_type, true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); @@ -1709,7 +1832,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); @@ -1717,7 +1840,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - finish_sync_worker(); + finish_sync_worker(WORKERTYPE_TABLESYNC); } /* @@ -1735,7 +1858,7 @@ AllTablesyncsReady(void) bool has_subrels = false; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchTableStates(&started_tx); + has_subrels = FetchTableStates(&started_tx, SUB_REL_KIND_TABLE); if (started_tx) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ec96b5fe85..e60e52dff2 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -486,6 +486,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + Assert(0); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -4531,8 +4536,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -4611,6 +4616,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_sequencesync_worker()) + ereport(LOG, + (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -4619,14 +4628,17 @@ InitializeLogRepWorker(void) CommitTransactionCommand(); } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync worker and sequencesync + * worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -4658,6 +4670,9 @@ SetupApplyOrSyncWorker(int worker_slot) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); + + if (isSequenceSyncWorker(MyLogicalRepWorker)) + before_shmem_exit(logicalrep_seqsyncworker_failuretime, (Datum) 0); } /* Logical Replication Apply worker entry point */ diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index be0ed1fc27..0c5601af82 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1936,7 +1936,7 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION REFRESH PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("REFRESH", "PUBLICATION")) - COMPLETE_WITH("WITH ("); + COMPLETE_WITH("SEQUENCES", "WITH ("); /* ALTER SUBSCRIPTION REFRESH PUBLICATION WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("REFRESH", "PUBLICATION", "WITH", "(")) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 1a949966e0..e09e321ab1 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11968,6 +11968,11 @@ proargmodes => '{v,o,o,o,o}', proargnames => '{pubname,pubid,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' }, +{ oid => '8000', descr => 'get OIDs of sequences in a publication', + proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'oid', proargtypes => 'text', + proallargtypes => '{text,oid}', proargmodes => '{i,o}', + proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', proname => 'pg_relation_is_publishable', provolatile => 's', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..8c96f0ce72 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -159,6 +159,12 @@ typedef struct Subscription * specified origin */ } Subscription; +typedef struct SubscriptionSeqInfo +{ + Oid seqid; + XLogRecPtr lsn; +} SubscriptionSeqInfo; + /* Disallow streaming in-progress transactions. */ #define LOGICALREP_STREAM_OFF 'f' diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 8244ad537a..5584136a35 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -82,6 +82,13 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; +typedef enum +{ + SUB_REL_KIND_TABLE, + SUB_REL_KIND_SEQUENCE, + SUB_REL_KIND_ALL, +} SubscriptionRelKind; + extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, @@ -90,6 +97,7 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); -extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern List *GetSubscriptionRelations(Oid subid, SubscriptionRelKind reltype, + bool not_ready); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 003f2e3413..a302890156 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -60,7 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); -extern void SetSequenceLastValue(Oid seq_relid, int64 new_last_value); +extern void SetSequenceLastValue(Oid seq_relid, int64 new_last_value, int64 log_cnt); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3a5f8279ed..346abdcef9 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4230,7 +4230,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, - ALTER_SUBSCRIPTION_REFRESH, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_SKIP, } AlterSubscriptionType; diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..47a3326ad3 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9646261d7e..adb1c6e32b 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -92,6 +93,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz sequencesync_failure_time; } LogicalRepWorker; /* @@ -242,6 +245,8 @@ extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock); +extern LogicalRepWorker *logicalrep_sequence_sync_worker_find(Oid subid, + bool only_running); extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, @@ -253,6 +258,10 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); +extern void pg_attribute_noreturn() finish_sync_worker(LogicalRepWorkerType wtype); + +extern void logicalrep_seqsyncworker_failuretime(int code, Datum arg); + extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); @@ -326,15 +335,25 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +#define isApplyWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_APPLY) #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 4c789279e5..bd5efd5d27 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1442,6 +1442,14 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_publication_sequences| SELECT p.pubname, + n.nspname AS schemaname, + c.relname AS sequencename + FROM pg_publication p, + LATERAL pg_get_publication_sequences((p.pubname)::text) gps(relid), + (pg_class c + JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE (c.oid = gps.relid); pg_publication_tables| SELECT p.pubname, n.nspname AS schemaname, c.relname AS tablename, diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl new file mode 100644 index 0000000000..adf7b0cfc1 --- /dev/null +++ b/src/test/subscription/t/034_sequences.pl @@ -0,0 +1,154 @@ + +# Copyright (c) 2024, PostgreSQL Global Development Group + +# This tests that sequences are synced correctly to the subscriber +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); + +# Avoid checkpoint during the test, otherwise, extra values will be fetched for +# the sequences which will cause the test to fail randomly. +$node_publisher->init( + allows_streaming => 'logical', + checkpoint_timeout => '1h'); +$node_publisher->start; + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Setup structure on the publisher +my $ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; +); +$node_publisher->safe_psql('postgres', $ddl); + +# Setup the same structure on the subscriber, plus some extra sequences that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; + CREATE SEQUENCE regress_s2; + CREATE SEQUENCE regress_s3; +); +$node_subscriber->safe_psql('postgres', $ddl); + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- generate a number of values using the sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Setup logical replication pub/sub +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_seq_pub FOR ALL SEQUENCES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub" +); + +# Wait for initial sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|32|t', 'initial test data replicated'); + +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new +# sequences of the publisher, but changes to existing sequences should +# not be synced. + +# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s2; + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequence is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|32|t', 'REFRESH PUBLICATION does not sync existing sequence'); + +# Check - newly published sequence is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION will sync newly published sequence'); + +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES should cause sync of +# new sequences of the publisher, and changes to existing sequences should +# also be synced. + +# Create a new sequence 'regress_s3', and update the existing sequence +# 'regress_s2'. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s3; + INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES +)); + +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequences are syned +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|31|t', + 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '200|31|t', + 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); + +# Check - newly published sequence is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '100|32|t', + 'REFRESH PUBLICATION SEQUENCES will sync newly published sequence'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 90326c6e53..a875c196a8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2772,7 +2772,9 @@ SubscriptingRef SubscriptingRefState Subscription SubscriptionInfo +SubscriptionRelKind SubscriptionRelState +SubscriptionSeqInfo SummarizerReadLocalXLogPrivate SupportRequestCost SupportRequestIndexCondition -- 2.34.1