From b645e3ccef7106dc57e576bf8a62a41767469bb9 Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 7 Nov 2019 18:19:33 +0900 Subject: [PATCH v4 1/2] Some refactoring of publication and subscription code --- src/backend/catalog/pg_publication.c | 5 +- src/backend/commands/subscriptioncmds.c | 79 ++++++++++++++++++++--------- src/backend/commands/tablecmds.c | 2 +- src/backend/replication/pgoutput/pgoutput.c | 11 ++-- src/backend/utils/cache/relcache.c | 2 +- src/include/catalog/pg_publication.h | 2 +- 6 files changed, 67 insertions(+), 34 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index fd5da7d5f7..80b98e2c3c 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -224,11 +224,12 @@ publication_add_relation(Oid pubid, Relation targetrel, /* - * Gets list of publication oids for a relation oid. + * Finds all publications associated with the relation. */ List * -GetRelationPublications(Oid relid) +GetRelationPublications(Relation rel) { + Oid relid = RelationGetRelid(rel); List *result = NIL; CatCList *pubrellist; int i; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1419195766..11c0f305ff 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -52,7 +52,19 @@ #include "utils/memutils.h" #include "utils/syscache.h" +/* + * Structure for fetch_table_list() to store the information about + * a given published table. + */ +typedef struct PublicationTable +{ + char *nspname; + char *relname; + char relkind; +} PublicationTable; + static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static Oid ValidateSubscriptionRel(PublicationTable *pt); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -464,15 +476,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) tables = fetch_table_list(wrconn, publications); foreach(lc, tables) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublicationTable *pt = lfirst(lc); Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - + relid = ValidateSubscriptionRel(pt); AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -573,14 +580,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) foreach(lc, pubrel_names) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublicationTable *pt = lfirst(lc); Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + /* Check that there's an appropriate relation present locally. */ + relid = ValidateSubscriptionRel(pt); pubrel_local_oids[off++] = relid; @@ -592,7 +596,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) InvalidXLogRecPtr); ereport(DEBUG1, (errmsg("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + pt->nspname, pt->relname, sub->name))); } } @@ -1137,7 +1141,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, CHAROID}; ListCell *lc; bool first; List *tablelist = NIL; @@ -1145,9 +1149,12 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, c.relkind\n" " FROM pg_catalog.pg_publication_tables t\n" + " JOIN pg_class c ON t.schemaname = c.relnamespace::regnamespace::name\n" + " AND t.tablename = c.relname\n" " WHERE t.pubname IN ("); + first = true; foreach(lc, publications) { @@ -1162,7 +1169,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1174,18 +1181,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - char *nspname; - char *relname; + PublicationTable *pt = palloc0(sizeof(PublicationTable)); bool isnull; - RangeVar *rv; - nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + pt->nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + pt->relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + pt->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); - rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); - tablelist = lappend(tablelist, rv); + tablelist = lappend(tablelist, pt); ExecClearTuple(slot); } @@ -1195,3 +1201,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * Looks up a local relation matching the given publication table and + * checks that it's appropriate to use as replication target, erroring + * out if not. + * + * Oid of the successfully validated local relation is returned. + */ +static Oid +ValidateSubscriptionRel(PublicationTable *pt) +{ + Oid relid; + RangeVar *rv; + char local_relkind; + + rv = makeRangeVar(pstrdup(pt->nspname), pstrdup(pt->relname), -1); + relid = RangeVarGetRelid(rv, AccessShareLock, false); + Assert(OidIsValid(relid)); + + /* Check for supported relkind. */ + local_relkind = get_rel_relkind(relid); + CheckSubscriptionRelkind(local_relkind, rv->schemaname, rv->relname); + + return relid; +} diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 5597be6e3d..270e76ad73 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -14157,7 +14157,7 @@ ATPrepChangePersistence(Relation rel, bool toLogged) * UNLOGGED as UNLOGGED tables can't be published. */ if (!toLogged && - list_length(GetRelationPublications(RelationGetRelid(rel))) > 0) + list_length(GetRelationPublications(rel)) > 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c08757fca..20856fa33c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -66,7 +66,7 @@ typedef struct RelationSyncEntry static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -314,7 +314,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) return; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ switch (change->action) @@ -404,7 +404,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -529,8 +529,9 @@ init_rel_sync_cache(MemoryContext cachectx) * Find or create entry in the relation schema cache. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation rel) { + Oid relid = RelationGetRelid(rel); RelationSyncEntry *entry; bool found; MemoryContext oldctx; @@ -548,7 +549,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Not found means schema wasn't sent */ if (!found || !entry->replicate_valid) { - List *pubids = GetRelationPublications(relid); + List *pubids = GetRelationPublications(rel); ListCell *lc; /* Reload publications if needed before use. */ diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 585dcee5db..161fe95fe6 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5105,7 +5105,7 @@ GetRelationPublicationActions(Relation relation) sizeof(PublicationActions)); /* Fetch the publication membership info. */ - puboids = GetRelationPublications(RelationGetRelid(relation)); + puboids = GetRelationPublications(relation); puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); foreach(lc, puboids) diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 20a2f0ac1b..2981f61db1 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -80,7 +80,7 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); -extern List *GetRelationPublications(Oid relid); +extern List *GetRelationPublications(Relation rel); extern List *GetPublicationRelations(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); -- 2.11.0