From a268e9a2eb13ecd8bc5f1f9053f13605bc4a3629 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Tue, 15 Feb 2022 23:08:11 +0100 Subject: [PATCH 3/4] Peter Smith review --- doc/src/sgml/catalogs.sgml | 4 +- doc/src/sgml/ref/alter_publication.sgml | 4 +- doc/src/sgml/ref/create_publication.sgml | 20 +++++--- src/backend/catalog/pg_publication.c | 57 ++++++++------------- src/backend/commands/tablecmds.c | 1 - src/backend/replication/logical/proto.c | 10 ++-- src/backend/replication/logical/tablesync.c | 11 ++-- src/backend/replication/pgoutput/pgoutput.c | 16 +++--- src/bin/pg_dump/pg_dump.c | 2 +- src/include/catalog/pg_publication.h | 2 + 10 files changed, 61 insertions(+), 66 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index f20cb2b78ab..adb0819c120 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -4392,7 +4392,7 @@ SCRAM-SHA-256$<iteration count>:&l This is an array of indnatts values that - indicate which table columns this index indexes. For example a value + indicate which table columns this index indexes. For example, a value of 1 3 would mean that the first and the third table columns make up the index entries. Key columns come before non-key (included) columns. A zero in this array indicates that the @@ -6335,7 +6335,7 @@ SCRAM-SHA-256$<iteration count>:&l This is an array of values that indicates which table columns are part of the publication. For example a value of 1 3 would mean that the first and the third table columns are published. - A null value indicates that all attributes are published. + A null value indicates that all columns are published. diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index 6a0c7722c7d..5eae5cde499 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -25,13 +25,13 @@ ALTER PUBLICATION name ADD name SET publication_object [, ...] ALTER PUBLICATION name DROP publication_object [, ...] ALTER PUBLICATION name SET ( publication_parameter [= value] [, ... ] ) -ALTER PUBLICATION name ALTER TABLE publication_object SET COLUMNS { ( name [, ...] ) | ALL } +ALTER PUBLICATION name ALTER TABLE table_name SET COLUMNS { ( name [, ...] ) | ALL } ALTER PUBLICATION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER PUBLICATION name RENAME TO new_name where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ ( column_name, [, ... ] ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index a59cd3f532a..da4d929e02a 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -28,7 +28,7 @@ CREATE PUBLICATION name where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ ( column_name, [, ... ] ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] @@ -79,12 +79,10 @@ CREATE PUBLICATION name - When a column list is specified, only the listed columns are replicated; - any other columns are ignored for the purpose of replication through - this publication. If no column list is specified, all columns of the - table are replicated through this publication, including any columns - added later. If a column list is specified, it must include the replica - identity columns. + When a column list is specified, only the named columns are replicated. + If no column list is specified, all columns of the table are replicated + through this publication, including any columns added later. If a column + list is specified, it must include the replica identity columns. @@ -300,6 +298,14 @@ CREATE PUBLICATION production_publication FOR TABLE users, departments, ALL TABL sales: CREATE PUBLICATION sales_publication FOR ALL TABLES IN SCHEMA marketing, sales; + + + + Create a publication that publishes all changes for table users, + but replicates only columns user_id and + firstname: + +CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname); diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index f8e0a728a8c..ab5a345b3de 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -63,8 +63,6 @@ static void check_publication_add_relation(Publication *pub, Relation targetrel, Bitmapset *columns) { - bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); - /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) @@ -97,50 +95,39 @@ check_publication_add_relation(Publication *pub, Relation targetrel, errdetail("This operation is not supported for unlogged tables."))); /* - * If a column filter was specified, check it's sensible with respect to - * replica identity - the list has to include all columns in the replica - * identity. + * Ensure the column filter is compatible with the replica identity and the + * actions the publication is replicating. */ - if (columns != NULL) - { - /* - * - * If the relation uses REPLICA IDENTITY FULL, we can't allow any column - * list even if it lists all columns of the relation - it'd cause issues - * if a column is added later. The column would become part of a replica - * identity, violating the rule that the column list includes the whole - * replica identity. We could add the column to the column list too, of - * course, but it seems rather useles - the column list would always - * include all columns, i.e. as if there's no column filter. - * - * So just reject this case altogether. - */ - if (replidentfull) - ereport(ERROR, - errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("invalid column list for publishing relation \"%s\"", - RelationGetRelationName(targetrel)), - errdetail("Cannot specify a column list on relations with REPLICA IDENTITY FULL.")); - - check_publication_columns(pub, targetrel, columns); - } + check_publication_columns(pub, targetrel, columns); } /* * Enforce that the column list can only leave out columns that aren't * forced to be sent. * - * No column can be excluded if REPLICA IDENTITY is FULL (since all the - * columns need to be sent regardless); and in other cases, the columns in - * the REPLICA IDENTITY cannot be left out. + * If the relation uses REPLICA IDENTITY FULL, we can't allow any column + * list even if it lists all columns of the relation - it'd cause issues + * if a column is added later. The column would become part of a replica + * identity, violating the rule that the column list includes the whole + * replica identity. We could add the column to the column list too, of + * course, but it seems rather useles - the column list would always + * include all columns, i.e. as if there's no column filter. * - * Then we need to cross-check the replica identity and column list. For - * UPDATE/DELETE we need to ensure the replica identity is a subset of the - * column filter. For INSERT, we don't need to check anything. + * In other cases, the columns in the REPLICA IDENTITY cannot be left out, + * except when the publication replicates only inserts. So we check that + * for UPDATE/DELETE the replica identity is a subset of the column filter. */ static void check_publication_columns(Publication *pub, Relation targetrel, Bitmapset *columns) { + /* + * If there is no column list, we treat it as if the list contains all columns. In + * which case there's nothing to check so we're done. + */ + if (!columns) + return; + + /* With REPLICA IDENTITY FULL no column filter is allowed. */ if (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -497,7 +484,7 @@ publication_set_table_columns(Relation pubrel, HeapTuple pubreltup, Datum values[Natts_pg_publication_rel]; memset(values, 0, sizeof(values)); - memset(nulls, 0, sizeof(nulls)); + memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); replaces[Anum_pg_publication_rel_prattrs - 1] = true; diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 28de9329800..2bfb165e45d 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -15872,7 +15872,6 @@ relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid, CatalogTupleUpdate(pg_index, &pg_index_tuple->t_self, pg_index_tuple); InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0, InvalidOid, is_internal); - /* * Invalidate the relcache for the table, so that after we commit * all sessions will refresh the table's replica identity index diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 28c10e4d1e4..ec5a787cc3a 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -933,11 +933,6 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) desc = RelationGetDescr(rel); - /* fetch bitmap of REPLICATION IDENTITY attributes */ - replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); - if (!replidentfull) - idattrs = RelationGetIdentityKeyBitmap(rel); - /* send number of live attributes */ for (i = 0; i < desc->natts; i++) { @@ -954,6 +949,11 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) } pq_sendint16(out, nliveatts); + /* fetch bitmap of REPLICATION IDENTITY attributes */ + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + /* send the attributes */ for (i = 0; i < desc->natts; i++) { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a7befd712a0..d2dbf5f7b83 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -877,12 +877,12 @@ fetch_remote_table_info(char *nspname, char *relname, ExecClearTuple(slot); } - ExecDropSingleTupleTableSlot(slot); - walrcv_clear_result(res); - pfree(cmd.data); lrel->natts = natt; + + walrcv_clear_result(res); + pfree(cmd.data); } /* @@ -920,9 +920,10 @@ copy_table(Relation rel) quote_qualified_identifier(lrel.nspname, lrel.relname)); for (int i = 0; i < lrel.natts; i++) { - appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); - if (i < lrel.natts - 1) + if (i > 0) appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); } appendStringInfo(&cmd, ") TO STDOUT"); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index c7b28d70d23..0068ae25d3e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -138,8 +138,8 @@ typedef struct RelationSyncEntry TupleConversionMap *map; /* - * Set of columns included in the publication, or NULL if all columns are - * included implicitly. Note that the attnums in this list are not + * Columns included in the publication, or NULL if all columns are + * included implicitly. Note that the attnums in this bitmap are not * shifted by FirstLowInvalidHeapAttributeNumber. */ Bitmapset *columns; @@ -1290,9 +1290,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * publish, and list of columns, if appropriate. * * Don't publish changes for partitioned tables, because - * publishing those of its partitions suffices. (However, ignore - * this if partition changes are not to published due to - * pubviaroot being set.) + * publishing those of its partitions suffices, unless partition + * changes won't be published due to pubviaroot being set. */ if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) @@ -1305,7 +1304,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* * Obtain columns published by this publication, and add them * to the list for this rel. Note that if at least one - * publication has a empty column list, that means to publish + * publication has an empty column list, that means to publish * everything; so if we saw a publication that includes all * columns, skip this. */ @@ -1332,14 +1331,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (isnull) { /* - * If we see a publication with no columns, reset the + * If we see a publication with no column filter, it + * means we need to publish all columns, so reset the * list and ignore further ones. */ all_columns = true; bms_free(entry->columns); entry->columns = NULL; } - else if (!isnull) + else { ArrayType *arr; int nelems; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2c347e80ee0..2f29e0d7189 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4160,7 +4160,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) appendPQExpBufferStr(attribs, fmtId(attnames[k])); } - pubrinfo[i].pubrattrs = attribs->data; + pubrinfo[j].pubrattrs = attribs->data; } else pubrinfo[j].pubrattrs = NULL; diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 70f73d01dc0..ff9d9a43984 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -86,6 +86,8 @@ typedef struct Publication typedef struct PublicationRelInfo { Relation relation; + + /* List of columns to replicate (or NIL to replicate all columns) */ List *columns; } PublicationRelInfo; -- 2.34.1