From 14822086201852dc532f698901858c017e5378b7 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 23 Jul 2025 11:34:50 +0530 Subject: [PATCH v20250801 4/6] Introduce "REFRESH PUBLICATION SEQUENCES" for subscriptions This patch adds support for a new SQL command: ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES This command synchronizes the set of sequences associated with a subscription based on the sequences currently present in the publication on the publisher. It also marks the corresponding entries in the pg_subscription_rel catalog table with the INIT state to trigger resynchronization. Additionally, the following subscription commands: ALTER SUBSCRIPTION ... REFRESH PUBLICATION ALTER SUBSCRIPTION ... ADD PUBLICATION ALTER SUBSCRIPTION ... DROP PUBLICATION ALTER SUBSCRIPTION ... SET PUBLICATION have been extended to also refresh sequence mappings. These commands will: Add newly published sequences that are not yet part of the subscription. Remove sequences that are no longer included in the publication. This ensures that sequence replication remains aligned with the current state of the publication on the publisher side, improving consistency and reducing manual maintenance. Author: Vignesh C, Tomas Vondra Reviewer: Amit Kapila, Shveta Malik, Dilip Kumar, Peter Smith, Nisha Moond Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com --- src/backend/catalog/pg_publication.c | 64 +++- src/backend/catalog/pg_subscription.c | 60 +++- src/backend/catalog/system_views.sql | 10 + src/backend/commands/subscriptioncmds.c | 374 ++++++++++++++------ src/backend/executor/execReplication.c | 4 +- src/backend/parser/gram.y | 11 +- src/backend/replication/logical/syncutils.c | 5 +- src/bin/psql/tab-complete.in.c | 2 +- src/include/catalog/pg_proc.dat | 5 + src/include/catalog/pg_publication.h | 2 +- src/include/catalog/pg_subscription_rel.h | 4 +- src/include/nodes/parsenodes.h | 3 +- src/test/regress/expected/rules.out | 8 + src/test/regress/expected/subscription.out | 8 +- src/test/regress/sql/subscription.sql | 4 + 15 files changed, 438 insertions(+), 126 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index d7d33c8b709..3f15ccaac04 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -777,7 +777,7 @@ GetRelationPublications(Oid relid) /* * Gets list of relation oids for a publication. * - * This should only be used FOR TABLE publications, the FOR ALL TABLES + * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES * should use GetAllTablesPublicationRelations(). */ List * @@ -858,14 +858,16 @@ GetAllTablesPublications(void) } /* - * Gets list of all relation published by FOR ALL TABLES publication(s). + * Gets list of all relations published by FOR ALL TABLES/SEQUENCES + * publication(s). * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the - * root partitioned tables. + * root partitioned tables. This is not applicable for FOR ALL SEQEUNCES + * publication. */ List * -GetAllTablesPublicationRelations(bool pubviaroot) +GetAllTablesPublicationRelations(char relkind, bool pubviaroot) { Relation classRel; ScanKeyData key[1]; @@ -873,12 +875,14 @@ GetAllTablesPublicationRelations(bool pubviaroot) HeapTuple tuple; List *result = NIL; + Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot)); + classRel = table_open(RelationRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_class_relkind, BTEqualStrategyNumber, F_CHAREQ, - CharGetDatum(RELKIND_RELATION)); + CharGetDatum(relkind)); scan = table_beginscan_catalog(classRel, 1, key); @@ -1165,7 +1169,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * those. Otherwise, get the partitioned table itself. */ if (pub_elem->alltables) - pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot); + pub_elem_tables = GetAllTablesPublicationRelations(RELKIND_RELATION, + pub_elem->pubviaroot); else { List *relids, @@ -1337,3 +1342,50 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + +/* + * Returns Oids of sequences in a publication. + */ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + List *sequences = NIL; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + Publication *publication; + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + publication = GetPublicationByName(pubname, false); + + if (publication->allsequences) + sequences = GetAllTablesPublicationRelations(RELKIND_SEQUENCE, + false); + + funcctx->user_fctx = (void *) sequences; + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + sequences = (List *) funcctx->user_fctx; + + if (funcctx->call_cntr < list_length(sequences)) + { + Oid relid = list_nth_oid(sequences, funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); + } + + SRF_RETURN_DONE(funcctx); +} diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index b6ba367b877..dabd87a622d 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -463,7 +463,9 @@ RemoveSubscriptionRel(Oid subid, Oid relid) * leave tablesync slots or origins in the system when the * corresponding table is dropped. */ - if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY) + if (!OidIsValid(subid) && + get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE && + subrel->srsubstate != SUBREL_STATE_READY) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -500,7 +502,8 @@ HasSubscriptionTables(Oid subid) Relation rel; ScanKeyData skey[1]; SysScanDesc scan; - bool has_subrels; + HeapTuple tup; + bool has_subrels = false; rel = table_open(SubscriptionRelRelationId, AccessShareLock); @@ -512,8 +515,22 @@ HasSubscriptionTables(Oid subid) scan = systable_beginscan(rel, InvalidOid, false, NULL, 1, skey); - /* If even a single tuple exists then the subscription has tables. */ - has_subrels = HeapTupleIsValid(systable_getnext(scan)); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + /* + * Skip sequence tuples. If even a single table tuple exists then the + * subscription has tables. + */ + if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE) + { + has_subrels = true; + break; + } + } /* Cleanup */ systable_endscan(scan); @@ -525,12 +542,22 @@ HasSubscriptionTables(Oid subid) /* * Get the relations for the subscription. * - * If not_ready is true, return only the relations that are not in a ready - * state, otherwise return all the relations of the subscription. The - * returned list is palloc'ed in the current memory context. + * get_tables: get relations for tables of the subscription. + * + * get_sequences: get relations for sequences of the subscription. + * + * not_ready: + * If getting tables and not_ready is false get all tables, otherwise, + * only get tables that have not reached READY state. + * If getting sequences and not_ready is false get all sequences, + * otherwise, only get sequences that have not reached READY state (i.e. are + * still in INIT state). + * + * The returned list is palloc'ed in the current memory context. */ List * -GetSubscriptionRelations(Oid subid, bool not_ready) +GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences, + bool not_ready) { List *res = NIL; Relation rel; @@ -539,6 +566,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready) ScanKeyData skey[2]; SysScanDesc scan; + /* One or both of 'get_tables' and 'get_sequences' must be true. */ + Assert(get_tables || get_sequences); + rel = table_open(SubscriptionRelRelationId, AccessShareLock); ScanKeyInit(&skey[nkeys++], @@ -561,9 +591,23 @@ GetSubscriptionRelations(Oid subid, bool not_ready) SubscriptionRelState *relstate; Datum d; bool isnull; + char relkind; subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + /* Relation is either a sequence or a table */ + relkind = get_rel_relkind(subrel->srrelid); + Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION || + relkind == RELKIND_PARTITIONED_TABLE); + + /* Skip sequences if they were not requested */ + if (!get_sequences && (relkind == RELKIND_SEQUENCE)) + continue; + + /* Skip tables if they were not requested */ + if (!get_tables && (relkind != RELKIND_SEQUENCE)) + continue; + relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); relstate->relid = subrel->srrelid; relstate->state = subrel->srsubstate; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f6eca09ee15..a0b1a0ef56f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -394,6 +394,16 @@ CREATE VIEW pg_publication_tables AS pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) WHERE C.oid = GPT.relid; +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename + FROM pg_publication P, + LATERAL pg_get_publication_sequences(P.pubname) GPS, + pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) + WHERE C.oid = GPS.relid; + CREATE VIEW pg_locks AS SELECT * FROM pg_lock_status() AS L; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index cd6c3684482..1aa93f05aea 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -27,6 +27,7 @@ #include "catalog/objectaddress.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" +#include "catalog/pg_sequence.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" @@ -106,6 +107,7 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, @@ -715,6 +717,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + /* + * XXX: Currently, a replication origin is created for all subscriptions, + * including those for sequence-only publications. However, this is + * unnecessary, as incremental synchronization of sequences is not + * supported. + */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); @@ -726,9 +734,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *tables; - ListCell *lc; - char table_state; bool must_use_password; /* Try to connect to the publisher. */ @@ -743,6 +748,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool has_tables; + List *relations; + char relation_state; + check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.retaindeadtuples, opts.origin, @@ -755,16 +764,19 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table list from publisher and build local table status - * info. + * Build local relation status info. Relations are for both tables + * and sequences from the publisher. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + relations = fetch_table_list(wrconn, publications); + has_tables = relations != NIL; + relations = list_concat(relations, + fetch_sequence_list(wrconn, publications)); + + foreach_ptr(RangeVar, rv, relations) { - RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); @@ -773,7 +785,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - AddSubscriptionRelState(subid, relid, table_state, + AddSubscriptionRelState(subid, relid, relation_state, InvalidXLogRecPtr, true); } @@ -781,6 +793,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to * export it. + * + * XXX: Currently, a replication slot is created for all + * subscriptions, including those for sequence-only publications. + * However, this is unnecessary, as incremental synchronization of + * sequences is not supported. */ if (opts.create_slot) { @@ -804,7 +821,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && !opts.copy_data && has_tables) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -843,18 +860,49 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, return myself; } +/* + * Update the subscription to refresh both the publication and the publication + * objects associated with the subscription. + * + * Parameters: + * + * If 'copy_data' is true, the function will set the state to INIT; otherwise, + * it will set the state to READY. + * + * If 'validate_publications' is provided with a publication list, the + * function checks that the specified publications exist on the publisher. + * + * If 'resync_all_sequences' is false: + * Add or remove tables and sequences that have been added to or removed + * from the publication since the last subscription creation or refresh. + * If 'resync_all_sequences' is true: + * Perform the above operation only for sequences. + * + * Note, this is a common function for handling different REFRESH commands + * according to the parameter 'resync_all_sequences' + * + * 1. ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES + * (when parameter resync_all_sequences is true) + * + * The function will mark all sequences with INIT state. + * + * 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION [WITH (copy_data=true|false)] + * (when parameter resync_all_sequences is false) + * + * The function will update only the newly added tables and/or sequences + * based on the copy_data parameter. + */ static void AlterSubscription_refresh(Subscription *sub, bool copy_data, - List *validate_publications) + List *validate_publications, bool resync_all_sequences) { char *err; - List *pubrel_names; + List *pubrel_names = NIL; List *subrel_states; Oid *subrel_local_oids; Oid *pubrel_local_oids; ListCell *lc; int off; - int remove_rel_len; int subrel_count; Relation rel = NULL; typedef struct SubRemoveRels @@ -862,9 +910,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, Oid relid; char state; } SubRemoveRels; - SubRemoveRels *sub_remove_rels; + + List *sub_remove_rels = NIL; WalReceiverConn *wrconn; bool must_use_password; + bool refresh_tables = !resync_all_sequences; /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); @@ -885,16 +935,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, check_publications(wrconn, validate_publications); /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + if (refresh_tables) + pubrel_names = fetch_table_list(wrconn, sub->publications); + + /* Get the sequence list from publisher. */ + pubrel_names = list_concat(pubrel_names, + fetch_sequence_list(wrconn, + sub->publications)); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false); + /* Get local relation list. */ + subrel_states = GetSubscriptionRelations(sub->oid, refresh_tables, true, false); subrel_count = list_length(subrel_states); /* - * Build qsorted array of local table oids for faster lookup. This can - * potentially contain all tables in the database so speed of lookup - * is important. + * Build qsorted array of local relation oids for faster lookup. This + * can potentially contain all relation in the database so speed of + * lookup is important. */ subrel_local_oids = palloc(subrel_count * sizeof(Oid)); off = 0; @@ -907,22 +963,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); - check_publications_origin(wrconn, sub->publications, copy_data, - sub->retaindeadtuples, sub->origin, - subrel_local_oids, subrel_count, sub->name); - - /* - * Rels that we want to remove from subscription and drop any slots - * and origins corresponding to them. - */ - sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); + if (refresh_tables) + check_publications_origin(wrconn, sub->publications, copy_data, + sub->retaindeadtuples, sub->origin, + subrel_local_oids, subrel_count, + sub->name); /* - * Walk over the remote tables and try to match them to locally known - * tables. If the table is not known locally create a new state for - * it. + * Walk over the remote relations and try to match them to locally + * known tables. If the table is not known locally create a new state + * for it. * - * Also builds array of local oids of remote tables for the next step. + * Also builds array of local oids of remote relations for the next + * step. */ off = 0; pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); @@ -931,12 +984,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; + char relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + relkind = get_rel_relkind(relid); + CheckSubscriptionRelkind(relkind, rv->schemaname, rv->relname); pubrel_local_oids[off++] = relid; @@ -947,28 +1001,48 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr, true); ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + rv->schemaname, rv->relname, sub->name)); } } /* - * Next remove state for tables we should not care about anymore using - * the data we collected above + * Next remove state for relations we should not care about anymore + * using the data we collected above */ qsort(pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp); - remove_rel_len = 0; for (off = 0; off < subrel_count; off++) { Oid relid = subrel_local_oids[off]; - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + if (bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + /* + * The resync_all_sequences flag will only be set to true for + * the REFRESH PUBLICATION SEQUENCES command, indicating that + * the existing sequences need to be re-synchronized by + * resetting the relation to its initial state. + */ + if (resync_all_sequences) + { + UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, + InvalidXLogRecPtr); + ereport(DEBUG1, + errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); + } + } + else { char state; XLogRecPtr statelsn; + char relkind = get_rel_relkind(relid); /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -990,41 +1064,55 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Last known rel state. */ state = GetSubscriptionRelState(sub->oid, relid, &statelsn); - sub_remove_rels[remove_rel_len].relid = relid; - sub_remove_rels[remove_rel_len++].state = state; - RemoveSubscriptionRel(sub->oid, relid); - logicalrep_worker_stop(sub->oid, relid); - /* - * For READY state, we would have already dropped the - * tablesync origin. + * A single sequencesync worker synchronizes all sequences, so + * only stop workers when relation kind is not sequence. */ - if (state != SUBREL_STATE_READY) + if (relkind != RELKIND_SEQUENCE) { - char originname[NAMEDATALEN]; + SubRemoveRels *rel = palloc(sizeof(SubRemoveRels)); + + rel->relid = relid; + rel->state = state; + + sub_remove_rels = lappend(sub_remove_rels, rel); + + logicalrep_worker_stop(sub->oid, relid); /* - * Drop the tablesync's origin tracking if exists. - * - * It is possible that the origin is not yet created for - * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or - * apply worker can also concurrently try to drop the - * origin and by this time the origin might be already - * removed. For these reasons, passing missing_ok = true. + * For READY state, we would have already dropped the + * tablesync origin. */ - ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + if (state != SUBREL_STATE_READY) + { + char originname[NAMEDATALEN]; + + /* + * Drop the tablesync's origin tracking if exists. + * + * It is possible that the origin is not yet created + * for tablesync worker, this can happen for the + * states before SUBREL_STATE_FINISHEDCOPY. The + * tablesync worker or apply worker can also + * concurrently try to drop the origin and by this + * time the origin might be already removed. For these + * reasons, passing missing_ok = true. + */ + ReplicationOriginNameForLogicalRep(sub->oid, relid, + originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + } } ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name))); + errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); } } @@ -1033,10 +1121,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * to be at the end because otherwise if there is an error while doing * the database operations we won't be able to rollback dropped slots. */ - for (off = 0; off < remove_rel_len; off++) + foreach_ptr(SubRemoveRels, rel, sub_remove_rels) { - if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + if (rel->state != SUBREL_STATE_READY && + rel->state != SUBREL_STATE_SYNCDONE) { char syncslotname[NAMEDATALEN] = {0}; @@ -1050,11 +1138,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * dropped slots and fail. For these reasons, we allow * missing_ok = true for the drop. */ - ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, + ReplicationSlotNameForTablesync(sub->oid, rel->relid, syncslotname, sizeof(syncslotname)); ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } } + + list_free_deep(sub_remove_rels); } PG_FINALLY(); { @@ -1538,8 +1628,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1553,7 +1643,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = stmt->publication; AlterSubscription_refresh(sub, opts.copy_data, - stmt->publication); + stmt->publication, false); } break; @@ -1593,8 +1683,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)"))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details + * why this is not allowed. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1612,18 +1702,18 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub->publications = publist; AlterSubscription_refresh(sub, opts.copy_data, - validate_publications); + validate_publications, false); } break; } - case ALTER_SUBSCRIPTION_REFRESH: + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION: { if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, SUBOPT_COPY_DATA, &opts); @@ -1635,8 +1725,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * * But, having reached this two-phase commit "enabled" state * we must not allow any subsequent table initialization to - * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed - * when the user had requested two_phase = on mode. + * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is + * disallowed when the user had requested two_phase = on mode. * * The exception to this restriction is when copy_data = * false, because when copy_data is false the tablesync will @@ -1648,12 +1738,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), - errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION"); + + AlterSubscription_refresh(sub, opts.copy_data, NULL, false); + + break; + } - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); + case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES: + { + if (!sub->enabled) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled subscriptions")); - AlterSubscription_refresh(sub, opts.copy_data, NULL); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); + + AlterSubscription_refresh(sub, true, NULL, true); break; } @@ -1931,7 +2035,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. */ - rstates = GetSubscriptionRelations(subid, true); + rstates = GetSubscriptionRelations(subid, true, false, true); foreach(lc, rstates) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -2246,16 +2350,16 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * it's a partitioned table), from some other publishers. This check is * required in the following scenarios: * - * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements - * with "copy_data = true" and "origin = none": + * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * statements with "copy_data = true" and "origin = none": * - Warn the user that data with an origin might have been copied. * - This check is skipped for tables already added, as incremental sync via * WAL allows origin tracking. The list of such tables is in * subrel_local_oids. * - * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements - * with "retain_dead_tuples = true" and "origin = any", and for ALTER - * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or + * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * statements with "retain_dead_tuples = true" and "origin = any", and for + * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin, or * when the publisher's status changes (e.g., due to a connection string * update): * - Warn the user that only conflict detection info for local changes on @@ -2314,24 +2418,28 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, appendStringInfoString(&cmd, ")\n"); /* - * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains - * the list of relation oids that are already present on the subscriber. - * This check should be skipped for these tables if checking for table - * sync scenario. However, when handling the retain_dead_tuples scenario, - * ensure all tables are checked, as some existing tables may now include - * changes from other origins due to newly created subscriptions on the - * publisher. + * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, + * subrel_local_oids contains the list of relation oids that are already + * present on the subscriber. This check should be skipped for these + * tables if checking for table sync scenario. However, when handling the + * retain_dead_tuples scenario, ensure all tables are checked, as some + * existing tables may now include changes from other origins due to newly + * created subscriptions on the publisher. */ if (check_table_sync) { for (i = 0; i < subrel_count; i++) { Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + if (get_rel_relkind(relid) != RELKIND_SEQUENCE) + { + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } } @@ -2611,6 +2719,68 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } +/* + * Get the list of sequences which belong to specified publications on the + * publisher connection. + */ +static List * +fetch_sequence_list(WalReceiverConn *wrconn, List *publications) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + List *seqlist = NIL; + int server_version = walrcv_server_version(wrconn); + + /* Skip sequence fetch if the publisher is older than version 19 */ + if (server_version < 190000) + return seqlist; + + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + + appendStringInfoString(&cmd, + "SELECT DISTINCT s.schemaname, s.sequencename\n" + "FROM pg_catalog.pg_publication_sequences s\n" + "WHERE s.pubname IN ("); + GetPublicationsStr(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not receive list of sequences from the publisher: %s", + res->err)); + + /* Process sequences. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + RangeVar *rv; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + rv = makeRangeVar(nspname, relname, -1); + seqlist = lappend(seqlist, rv); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + return seqlist; +} + /* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index f262e7a66f7..b58e81424ab 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -877,7 +877,9 @@ void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (relkind != RELKIND_RELATION && + relkind != RELKIND_PARTITIONED_TABLE && + relkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 740cc910870..9cefecf1da1 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10983,11 +10983,20 @@ AlterSubscriptionStmt: AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_REFRESH; + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION; n->subname = $3; n->options = $6; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name REFRESH PUBLICATION SEQUENCES + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES; + n->subname = $3; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index 5109b197805..45b6d429558 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -152,8 +152,9 @@ FetchRelationStates(bool *started_tx) *started_tx = true; } - /* Fetch tables that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); + /* Fetch tables and sequences that are in non-ready state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, true, + true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 057f7b4879c..9b5eefe7cbe 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2312,7 +2312,7 @@ match_previous_words(int pattern_id, "ADD PUBLICATION", "DROP PUBLICATION"); /* ALTER SUBSCRIPTION REFRESH PUBLICATION */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION")) - COMPLETE_WITH("WITH ("); + COMPLETE_WITH("SEQUENCES", "WITH ("); /* ALTER SUBSCRIPTION REFRESH PUBLICATION WITH ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION", "WITH", "(")) COMPLETE_WITH("copy_data"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9cc52a7c83f..3fe60ae82cd 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12282,6 +12282,11 @@ proargmodes => '{v,o,o,o,o}', proargnames => '{pubname,pubid,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' }, +{ oid => '8052', descr => 'get OIDs of sequences in a publication', + proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'oid', proargtypes => 'text', + proallargtypes => '{text,oid}', proargmodes => '{i,o}', + proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', proname => 'pg_relation_is_publishable', provolatile => 's', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 24e09c76649..2a0f49cb742 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -170,7 +170,7 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetAllTablesPublicationRelations(char relkind, bool pubviaroot); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); extern List *GetSchemaPublicationRelations(Oid schemaid, diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index ea869588d84..a541f4843bd 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -90,6 +90,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionTables(Oid subid); -extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern List *GetSubscriptionRelations(Oid subid, bool get_tables, + bool get_sequences, + bool not_ready); #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 73e505c25b3..c2e9583cdb7 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4359,7 +4359,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, - ALTER_SUBSCRIPTION_REFRESH, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, + ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_SKIP, } AlterSubscriptionType; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index dce8c672b40..5d58e57585f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1462,6 +1462,14 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_publication_sequences| SELECT p.pubname, + n.nspname AS schemaname, + c.relname AS sequencename + FROM pg_publication p, + LATERAL pg_get_publication_sequences((p.pubname)::text) gps(relid), + (pg_class c + JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE (c.oid = gps.relid); pg_publication_tables| SELECT p.pubname, n.nspname AS schemaname, c.relname AS tablename, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a98c97f7616..0042d0b0f07 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -107,7 +107,7 @@ HINT: To initiate replication, you must manually create the replication slot, e ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; -ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION is not allowed for disabled subscriptions -- fail - origin must be either none or any CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); ERROR: unrecognized origin value: "foo" @@ -352,7 +352,11 @@ ERROR: ALTER SUBSCRIPTION with refresh cannot run inside a transaction block END; BEGIN; ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION; -ERROR: ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block +ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION cannot run inside a transaction block +END; +BEGIN; +ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION SEQUENCES; +ERROR: ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES cannot run inside a transaction block END; CREATE FUNCTION func() RETURNS VOID AS $$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index f0f714fe747..4ace5f4fa95 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -240,6 +240,10 @@ BEGIN; ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION; END; +BEGIN; +ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION SEQUENCES; +END; + CREATE FUNCTION func() RETURNS VOID AS $$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL; SELECT func(); -- 2.43.0