From de045d83fc1277d57a453fe137e439b11fd6743e Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 19 Jun 2024 14:58:14 +0530 Subject: [PATCH v20240703 3/3] Enhance sequence synchronization during subscription management This commit introduces sequence synchronization: 1) During subscription creation: - The subscriber retrieves sequences associated with publications. - Sequences are added in 'init' state to pg_subscription_rel table. - A new sequence synchronization worker handles synchronization in batches of 100 sequences: a) Retrieves sequence values using pg_sequence_state from the publisher. b) Sets sequence values accordingly. c) Updates sequence state to 'READY'. d) Commits batches of 100 synchronized sequences. 2) Refreshing sequences: - Refreshing sequences occurs with ALTER SUBSCRIPTION ... REFRESH PUBLICATION (no syntax change). - Stale sequences are removed from pg_subscription_rel. - Newly added sequences in the publisher are added in 'init' state to pg_subscription_rel. - Initiates sequence synchronization for all sequences by sequence sync worker as listed in subscription creation process. - Sequence synchronization occurs for newly added sequences only. 3) Introduce new command for refreshing all sequences: - ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES. - Removes stale sequences and adds newly added sequences from the publisher to pg_subscription_rel. - Resets all sequences in pg_subscription_rel to 'init' state. - Initiates sequence synchronization for all sequences by sequence sync worker as listed in subscription creation process. --- doc/src/sgml/config.sgml | 4 +- doc/src/sgml/logical-replication.sgml | 4 +- doc/src/sgml/monitoring.sgml | 5 +- doc/src/sgml/ref/alter_subscription.sgml | 11 + src/backend/catalog/pg_subscription.c | 64 ++++ src/backend/commands/subscriptioncmds.c | 264 +++++++++++++- src/backend/executor/execReplication.c | 4 +- src/backend/parser/gram.y | 9 + src/backend/postmaster/bgworker.c | 3 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 50 ++- src/backend/replication/logical/meson.build | 1 + .../replication/logical/sequencesync.c | 325 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 146 +++++++- src/backend/replication/logical/worker.c | 12 +- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_subscription.h | 6 + src/include/catalog/pg_subscription_rel.h | 1 + src/include/nodes/parsenodes.h | 1 + src/include/replication/logicalworker.h | 1 + src/include/replication/worker_internal.h | 13 + src/test/subscription/t/034_sequences.pl | 145 ++++++++ src/tools/pgindent/typedefs.list | 1 + 23 files changed, 1041 insertions(+), 32 deletions(-) create mode 100644 src/backend/replication/logical/sequencesync.c create mode 100644 src/test/subscription/t/034_sequences.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 17d84bd321..98be4899a1 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5200,8 +5200,8 @@ ANY num_sync ( max_logical_replication_workers must be set to at least the number of subscriptions (for leader apply - workers), plus some reserve for the table synchronization workers and - parallel apply workers. + workers), plus some reserve for the table synchronization workers, sequence + synchronization worker and parallel apply workers. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 991f629907..62870aa41b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2017,8 +2017,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage Type of the subscription worker process. Possible types are - apply, parallel apply, and - table synchronization. + apply, parallel apply, + table synchronization, and + sequence synchronization. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 476f195622..fc8a33c0b5 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -27,6 +27,7 @@ ALTER SUBSCRIPTION name ADD PUBLICA ALTER SUBSCRIPTION name DROP PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name ENABLE +ALTER SUBSCRIPTION name REFRESH SEQUENCES ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) ALTER SUBSCRIPTION name SKIP ( skip_option = value ) @@ -194,6 +195,16 @@ ALTER SUBSCRIPTION name RENAME TO < + + REFRESH SEQUENCES + + + Fetch missing sequences information from publisher and re-synchronize the + sequence data from the publisher. + + + + ENABLE diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..7673f1384c 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -27,6 +27,7 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/memutils.h" #include "utils/lsyscache.h" #include "utils/pg_lsn.h" #include "utils/rel.h" @@ -551,3 +552,66 @@ GetSubscriptionRelations(Oid subid, bool not_ready) return res; } + + +/* + * Get the sequences for the subscription. + * + * The returned list is palloc'ed in the current memory context. + */ +List * +GetSubscriptionSequences(Oid subid, char state) +{ + List *res = NIL; + Relation rel; + HeapTuple tup; + int nkeys = 0; + ScanKeyData skey[2]; + SysScanDesc scan; + MemoryContext oldctx; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + if (state != '\0') + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(state)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, nkeys, skey); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subseq; + SubscriptionRelState *seqinfo; + Datum d; + bool isnull; + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + subseq = (Form_pg_subscription_rel) GETSTRUCT(tup); + seqinfo = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); + seqinfo->relid = subseq->srrelid; + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, + Anum_pg_subscription_rel_srsublsn, &isnull); + if (isnull) + seqinfo->lsn = InvalidXLogRecPtr; + else + seqinfo->lsn = DatumGetLSN(d); + + res = lappend(res, seqinfo); + MemoryContextSwitchTo(oldctx); + } + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + return res; +} \ No newline at end of file diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e407428dbc..32e19a739c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -102,6 +102,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, @@ -759,6 +760,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + List *sequences; check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); @@ -769,6 +771,22 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, */ table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + /* Add the sequences in init state */ + sequences = fetch_sequence_list(wrconn, publications); + foreach_ptr(RangeVar, rv, sequences) + { + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr, true); + } + /* * Get the table list from publisher and build local table status * info. @@ -898,6 +916,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); + /* Get the sequence list from publisher. */ + pubrel_names = list_concat(pubrel_names, fetch_sequence_list(wrconn, sub->publications)); + /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid, false); subrel_count = list_length(subrel_states); @@ -980,6 +1001,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { char state; XLogRecPtr statelsn; + char relkind = get_rel_relkind(relid); /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -1006,13 +1028,15 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); + /* Stop the worker if relation kind is not sequence*/ + if (relkind != RELKIND_SEQUENCE) + logicalrep_worker_stop(sub->oid, relid); /* * For READY state, we would have already dropped the * tablesync origin. */ - if (state != SUBREL_STATE_READY) + if (state != SUBREL_STATE_READY && relkind != RELKIND_SEQUENCE) { char originname[NAMEDATALEN]; @@ -1047,7 +1071,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, for (off = 0; off < remove_rel_len; off++) { if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE && + get_rel_relkind(sub_remove_rels[off].relid) != RELKIND_SEQUENCE) { char syncslotname[NAMEDATALEN] = {0}; @@ -1077,6 +1102,142 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, table_close(rel, NoLock); } +/* + * Refresh the sequences data of the subscription. + */ +static void +AlterSubscription_refreshsequences(Subscription *sub) +{ + char *err; + List *pubseq_names = NIL; + List *subseq_states; + Oid *subseq_local_oids; + Oid *pubseq_local_oids; + int off; + int subrel_count; + Relation rel = NULL; + WalReceiverConn *wrconn; + bool must_use_password; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + must_use_password = sub->passwordrequired && !sub->ownersuperuser; + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, + sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + PG_TRY(); + { + /* Get the sequences from the publisher. */ + pubseq_names = fetch_sequence_list(wrconn, sub->publications); + + /* Get local sequence list. */ + subseq_states = GetSubscriptionSequences(sub->oid, '\0'); + subrel_count = list_length(subseq_states); + + /* + * Build qsorted array of local sequence oids for faster lookup. This + * can potentially contain all sequences in the database so speed of + * lookup is important. + */ + subseq_local_oids = palloc(subrel_count * sizeof(Oid)); + off = 0; + foreach_ptr(SubscriptionSeqInfo, seqinfo, subseq_states) + subseq_local_oids[off++] = seqinfo->seqid; + + qsort(subseq_local_oids, subrel_count, sizeof(Oid), oid_cmp); + + /* + * Walk over the remote sequences and try to match them to locally + * known sequences. If the sequence is not known locally create a new + * state for it. + * + * Also builds array of local oids of remote sequences for the next + * step. + */ + off = 0; + pubseq_local_oids = palloc(list_length(pubseq_names) * sizeof(Oid)); + + foreach_ptr(RangeVar, rv, pubseq_names) + { + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + pubseq_local_oids[off++] = relid; + + if (!bsearch(&relid, subseq_local_oids, + subrel_count, sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + SUBREL_STATE_INIT, + InvalidXLogRecPtr, true); + ereport(DEBUG1, + (errmsg_internal("sequence \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } + } + + /* + * Next remove state for sequences we should not care about anymore + * using the data we collected above + */ + qsort(pubseq_local_oids, list_length(pubseq_names), + sizeof(Oid), oid_cmp); + + for (off = 0; off < subrel_count; off++) + { + Oid relid = subseq_local_oids[off]; + + if (!bsearch(&relid, pubseq_local_oids, + list_length(pubseq_names), sizeof(Oid), oid_cmp)) + { + /* + * This locking ensures that the state of rels won't change + * till we are done with this refresh operation. + */ + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + + RemoveSubscriptionRel(sub->oid, relid); + + ereport(DEBUG1, + (errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name))); + } + else + { + ereport(DEBUG1, + (errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name))); + UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, + InvalidXLogRecPtr); + } + } + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); + + if (rel) + table_close(rel, NoLock); +} + /* * Alter the existing subscription. */ @@ -1404,6 +1565,20 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES: + { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions"))); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); + + AlterSubscription_refreshsequences(sub); + + break; + } + case ALTER_SUBSCRIPTION_REFRESH: { if (!sub->enabled) @@ -2060,11 +2235,17 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, for (i = 0; i < subrel_count; i++) { Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); + char *schemaname; + char *tablename; + + if (get_rel_relkind(relid) != RELKIND_SEQUENCE) + { + schemaname = get_namespace_name(get_rel_namespace(relid)); + tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } res = walrcv_exec(wrconn, cmd.data, 1, tableRow); @@ -2234,6 +2415,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } +/* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + ListCell *lc; + bool first; + List *tablelist = NIL; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ("); + first = true; + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_literal_cstr(pubname)); + } + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated sequences from the publisher: %s", + res->err))); + + /* Process sequences. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + tablelist = lappend(tablelist, rv); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return tablelist; +} + /* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index d0a89cd577..fdf69e4f28 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -739,7 +739,9 @@ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && + relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 382cf5c872..c9d07d9915 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10839,6 +10839,15 @@ AlterSubscriptionStmt: n->options = $6; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES; + n->subname = $3; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 77707bb384..6770e26569 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -132,6 +132,9 @@ static const struct }, { "TablesyncWorkerMain", TablesyncWorkerMain + }, + { + "SequencesyncWorkerMain", SequencesyncWorkerMain } }; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index ba03eeff1c..7621fa8aed 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ tablesync.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 27c3a91fb7..466771d775 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -267,6 +267,39 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) return res; } +/* + * Walks the workers array and searches for one that matches given + * subscription id. + * + * We are only interested in the sequence sync worker. + */ +LogicalRepWorker * +logicalrep_sequence_sync_worker_find(Oid subid, bool only_running) +{ + int i; + LogicalRepWorker *res = NULL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* Skip parallel apply workers. */ + if (!isSequencesyncWorker(w)) + continue; + + if (w->in_use && w->subid == subid && (only_running && w->proc)) + { + res = w; + break; + } + } + + return res; +} + /* * Similar to logicalrep_worker_find(), but returns a list of all workers for * the subscription, instead of just one. @@ -311,6 +344,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -320,7 +354,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * - parallel apply worker is the only kind of subworker */ Assert(wtype != WORKERTYPE_UNKNOWN); - Assert(is_tablesync_worker == OidIsValid(relid)); + Assert(is_tablesync_worker == OidIsValid(relid) || is_sequencesync_worker == OidIsValid(relid)); Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); ereport(DEBUG1, @@ -396,7 +430,8 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -491,6 +526,14 @@ retry: snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequencesyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "unknown worker type"); @@ -1351,6 +1394,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "unknown worker type"); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 3dec36a6de..1711fc3248 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'tablesync.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 0000000000..7f609fc0e9 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,325 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: initial sequence synchronization + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rls.h" +#include "utils/usercontext.h" + +/* + * Fetch sequence data (current state) from the remote node, including the + * page LSN. + */ +static int64 +fetch_sequence_data(WalReceiverConn *conn, Oid remoteid, XLogRecPtr *lsn) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {INT8OID, LSNOID}; + int64 value = (Datum) 0; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT (last_value + log_cnt), page_lsn " + "FROM pg_sequence_state(%d)", remoteid); + + res = walrcv_exec(conn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive sequence list from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + *lsn = DatumGetInt64(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + return value; +} + +/* + * Copy existing data of a sequence from publisher. + * + * Caller is responsible for locking the local relation. + */ +static XLogRecPtr +copy_sequence(WalReceiverConn *conn, Relation rel) +{ + StringInfoData cmd; + int64 sequence_value; + XLogRecPtr lsn = InvalidXLogRecPtr; + WalRcvExecResult *res; + Oid tableRow[] = {OIDOID, CHAROID}; + TupleTableSlot *slot; + LogicalRepRelId remoteid; /* unique id of the relation */ + char relkind PG_USED_FOR_ASSERTS_ONLY; + bool isnull; + char *nspname = get_namespace_name(RelationGetNamespace(rel)); + char *relname = RelationGetRelationName(rel); + + /* Fetch Oid. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT c.oid, c.relkind" + " FROM pg_catalog.pg_class c" + " INNER JOIN pg_catalog.pg_namespace n" + " ON (c.relnamespace = n.oid)" + " WHERE n.nspname = %s" + " AND c.relname = %s", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); + res = walrcv_exec(conn, cmd.data, + lengthof(tableRow), tableRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch sequence info for table \"%s.%s\" from publisher: %s", + nspname, RelationGetRelationName(rel), res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("sequence \"%s.%s\" not found on publisher", + nspname, relname))); + + remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relkind = DatumGetChar(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + Assert(relkind == RELKIND_SEQUENCE); + + /* + * Logical replication of sequences is based on decoding WAL records, + * describing the "next" state of the sequence the current state in the + * relfilenode is yet to reach. But during the initial sync we read the + * current state, so we need to reconstruct the WAL record logged when we + * started the current batch of sequence values. + * + * Otherwise we might get duplicate values (on subscriber) if we failed + * over right after the sync. + */ + sequence_value = fetch_sequence_data(conn, remoteid, &lsn); + + /* sets the sequences in non-transactional way */ + SetSequenceLastValue(RelationGetRelid(rel), sequence_value); + + /* return the LSN when the sequence state was set */ + return lsn; +} + +/* + * Start syncing the sequences in the sync worker. + */ +static void +LogicalRepSyncSequences() +{ + char *err; + bool must_use_password; + List *sequences; + char slotname[NAMEDATALEN]; + AclResult aclresult; + UserContext ucxt; + bool run_as_owner; + int curr_seq = 0; + int seq_count; + Oid subid = MyLogicalRepWorker->subid; + +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + /* Get the sequences that should be synchronized. */ + StartTransactionCommand(); + sequences = GetSubscriptionSequences(subid, + SUBREL_STATE_INIT); + CommitTransactionCommand(); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + snprintf(slotname, NAMEDATALEN, "pg_%u_sync_sequences_" UINT64_FORMAT, + subid, GetSystemIdentifier()); + + /* + * Here we use the slot name instead of the subscription name as the + * application_name, so that it is different from the leader apply worker, + * so that synchronous replication can distinguish them. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + + seq_count = list_length(sequences); + foreach_ptr(SubscriptionRelState, seqinfo, sequences) + { + Relation sequencerel; + XLogRecPtr sequence_lsn; + int next_seq; + + CHECK_FOR_INTERRUPTS(); + + if (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH == 0) + StartTransactionCommand(); + + sequencerel = table_open(seqinfo->relid, RowExclusiveLock); + + /* + * Make sure that the copy command runs as the sequence owner, unless the + * user has opted out of that behaviour. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(sequencerel->rd_rel->relowner, &ucxt); + + /* + * Check that our sequence sync worker has permission to insert into the + * target sequence. + */ + aclresult = pg_class_aclcheck(RelationGetRelid(sequencerel), GetUserId(), + ACL_INSERT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, + get_relkind_objtype(sequencerel->rd_rel->relkind), + RelationGetRelationName(sequencerel)); + + /* + * COPY FROM does not honor RLS policies. That is not a problem for + * subscriptions owned by roles with BYPASSRLS privilege (or superuser, + * who has it implicitly), but other roles should not be able to + * circumvent RLS. Disallow logical replication into RLS enabled + * relations for such roles. + */ + if (check_enable_rls(RelationGetRelid(sequencerel), InvalidOid, false) == RLS_ENABLED) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"", + GetUserNameFromId(GetUserId(), true), + RelationGetRelationName(sequencerel))); + + sequence_lsn = copy_sequence(LogRepWorkerWalRcvConn, sequencerel); + + UpdateSubscriptionRelState(subid, seqinfo->relid, SUBREL_STATE_READY, + sequence_lsn); + + table_close(sequencerel, NoLock); + + next_seq = curr_seq + 1; + if (((next_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || next_seq == seq_count) + { + /* LOG all the sequences synchronized during current batch. */ + int i = curr_seq - (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH); + for (; i <= curr_seq; i++) + { + SubscriptionRelState *done_seq; + done_seq = (SubscriptionRelState *) lfirst(list_nth_cell(sequences, i)); + ereport(LOG, + errmsg("logical replication synchronization for subscription \"%s\", sequence \"%s\" has finished", + get_subscription_name(subid, false), get_rel_name(done_seq->relid))); + } + + CommitTransactionCommand(); + } + + curr_seq++; + } + + if (!run_as_owner && seq_count) + RestoreUserContext(&ucxt); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if it's required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, false); + + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication Sequencesync worker entry point */ +void +SequencesyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + finish_sync_worker(false); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b00267f042..5541187353 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -139,9 +139,9 @@ static StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. */ -static void +void pg_attribute_noreturn() -finish_sync_worker(void) +finish_sync_worker(bool istable) { /* * Commit any outstanding transaction. This is the usual case, unless @@ -157,10 +157,15 @@ finish_sync_worker(void) XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + if (istable) + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + else + ereport(LOG, + errmsg("logical replication sequences synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ @@ -387,7 +392,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + finish_sync_worker(true); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -463,6 +468,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + char relkind; + + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + relkind = get_rel_relkind(rstate->relid); + if (relkind == RELKIND_SEQUENCE) + continue; if (rstate->state == SUBREL_STATE_SYNCDONE) { @@ -477,11 +493,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -660,6 +671,106 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } +/* + * Handle sequence synchronization cooperation from the apply worker. + * + * Walk over all subscription sequences that are individually tracked by the + * apply process (currently, all that have state SUBREL_STATE_INIT) and manage + * synchronization for them. + * + * If there is a sequence syncronization worker running already, no need to + * start a sequence synchronization in this case. The existing sequence + * sync worker will syncronize the sequences. If there are still any sequences + * to be synced after the sequence sync worker exited, then we new sequence + * sync worker can be started in the next iteration. To prevent starting the + * seqeuence sync worker at a high frequency after a failure, we store its last + * start time. We start the sync worker for the same relation after waiting + * at least wal_retrieve_retry_interval. + */ +static void +process_syncing_sequences_for_apply() +{ + bool started_tx = false; + + Assert(!IsTransactionState()); + + /* We need up-to-date sync state info for subscription tables here. */ + FetchTableStates(&started_tx); + + /* + * Start sequence sync worker if there is no sequence sync worker running. + */ + foreach_ptr(SubscriptionRelState, rstate, table_states_not_ready) + { + LogicalRepWorker *syncworker; + char relkind; + + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + relkind = get_rel_relkind(rstate->relid); + if (relkind != RELKIND_SEQUENCE || rstate->state != SUBREL_STATE_INIT) + continue; + + /* + * Check if there is a sequence worker running? + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + syncworker = logicalrep_sequence_sync_worker_find(MyLogicalRepWorker->subid, + true); + /* + * If there is a sequence sync worker, the sequence sync worker + * will handle sync of this sequence. + */ + if (syncworker) + { + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + break; + } + else + { + /* + * Count running sync workers for this subscription, while we have + * the lock. + */ + int nsyncworkers = + logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + + /* + * If there are free sync worker slot(s), start a new sequence sync + * worker to sync the sequences and break from the loop, as this + * sequence sync worker will take care of synchronizing all the + * sequences that are in init state. + */ + if (nsyncworkers < max_sync_workers_per_subscription) + { + logicalrep_worker_launch(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + InvalidOid, + DSM_HANDLE_INVALID); + break; + } + } + } + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } +} + /* * Process possible state change(s) of tables that are being synchronized. */ @@ -682,9 +793,16 @@ process_syncing_tables(XLogRecPtr current_lsn) break; case WORKERTYPE_APPLY: + process_syncing_sequences_for_apply(); process_syncing_tables_for_apply(current_lsn); break; + /* Sequence sync is not expected to come here */ + case WORKERTYPE_SEQUENCESYNC: + Assert(0); + /* not reached, here to make compiler happy */ + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1320,7 +1438,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + finish_sync_worker(true); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1716,7 +1834,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - finish_sync_worker(); + finish_sync_worker(true); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3b285894db..478bea7f69 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -489,6 +489,12 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + /* Sequence sync is not expected to come here */ + case WORKERTYPE_SEQUENCESYNC: + Assert(0); + /* not reached, here to make compiler happy */ + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -4631,6 +4637,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_sequencesync_worker()) + ereport(LOG, + (errmsg("logical replication sequences synchronization worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -4646,7 +4656,7 @@ SetupApplyOrSyncWorker(int worker_slot) /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index da608d074b..5c9586a5b9 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1936,7 +1936,7 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION REFRESH PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("REFRESH", "PUBLICATION")) - COMPLETE_WITH("WITH ("); + COMPLETE_WITH("SEQUENCES", "WITH ("); /* ALTER SUBSCRIPTION REFRESH PUBLICATION WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("REFRESH", "PUBLICATION", "WITH", "(")) diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..8c96f0ce72 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -159,6 +159,12 @@ typedef struct Subscription * specified origin */ } Subscription; +typedef struct SubscriptionSeqInfo +{ + Oid seqid; + XLogRecPtr lsn; +} SubscriptionSeqInfo; + /* Disallow streaming in-progress transactions. */ #define LOGICALREP_STREAM_OFF 'f' diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 8244ad537a..3cf7834f8d 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -91,5 +91,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern List *GetSubscriptionSequences(Oid subid, char state); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 1c50940b57..8107ba1cf5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4231,6 +4231,7 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_SKIP, } AlterSubscriptionType; diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..f380c1ba60 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -19,6 +19,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); extern void TablesyncWorkerMain(Datum main_arg); +extern void SequencesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..23b4267598 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -32,6 +32,7 @@ typedef enum LogicalRepWorkerType WORKERTYPE_TABLESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, + WORKERTYPE_SEQUENCESYNC, } LogicalRepWorkerType; typedef struct LogicalRepWorker @@ -240,6 +241,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); +extern LogicalRepWorker *logicalrep_sequence_sync_worker_find(Oid subid, + bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, @@ -252,6 +255,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); +extern void pg_attribute_noreturn() finish_sync_worker(bool istable); + extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); @@ -329,6 +334,8 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequencesyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) @@ -336,6 +343,12 @@ am_tablesync_worker(void) return isTablesyncWorker(MyLogicalRepWorker); } +static inline bool +am_sequencesync_worker(void) +{ + return isSequencesyncWorker(MyLogicalRepWorker); +} + static inline bool am_leader_apply_worker(void) { diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl new file mode 100644 index 0000000000..94bf83a14b --- /dev/null +++ b/src/test/subscription/t/034_sequences.pl @@ -0,0 +1,145 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# This tests that sequences are synced correctly to the subscriber +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +my $ddl = qq( + CREATE TABLE seq_test (v BIGINT); + CREATE SEQUENCE s; +); + +# Setup structure on the publisher +$node_publisher->safe_psql('postgres', $ddl); + +# Create some the same structure on subscriber, and an extra sequence that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE seq_test (v BIGINT); + CREATE SEQUENCE s; + CREATE SEQUENCE s2; + CREATE SEQUENCE s3; +); + +$node_subscriber->safe_psql('postgres', $ddl); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION seq_pub FOR ALL SEQUENCES"); + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- generate a number of values using the sequence + INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100); +)); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub" +); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s; +)); + +is($result, '132|0|t', 'initial test data replicated'); + +# create a new sequence, it should be synced +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE s2; + INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); +)); + +# changes to existing sequences should not be synced +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100); +)); + +# Refresh publication after create a new sequence and updating existing +# sequence. +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION +)); + +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s; +)); + +is($result, '132|0|t', 'initial test data replicated'); + +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s2; +)); + +is($result, '132|0|t', 'initial test data replicated'); + +# Changes of both new and existing sequence should be synced after REFRESH +# PUBLICATION SEQUENCES. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE s3; + INSERT INTO seq_test SELECT nextval('s3') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); +)); + +# Refresh publication sequences after create new sequence and updating existing +# sequence. +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION SEQUENCES +)); + +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s2; +)); + +is($result, '231|0|t', 'initial test data replicated'); + +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s3; +)); + +is($result, '132|0|t', 'initial test data replicated'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e6c1caf649..50e651ce8c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2767,6 +2767,7 @@ SubscriptingRefState Subscription SubscriptionInfo SubscriptionRelState +SubscriptionSeqInfo SummarizerReadLocalXLogPrivate SupportRequestCost SupportRequestIndexCondition -- 2.34.1