From 37c258e8dfb151fe776ef365d9b523cfc56c3f50 Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Tue, 25 Jun 2024 16:32:35 +0530 Subject: [PATCH v12 2/3] Support replication of generated column during initial sync When 'copy_data' is true, during the initial sync, the data is replicated from the publisher to the subscriber using the COPY command. The normal COPY command does not copy generated columns, so when 'include_generated_columns' is true, we need to copy using the syntax: 'COPY (SELECT column_name FROM table_name) TO STDOUT'. Note that we don't copy columns when the subscriber-side column is also generated. Those will be filled as normal with the subscriber-side computed or default data. --- doc/src/sgml/ref/create_subscription.sgml | 4 - src/backend/commands/subscriptioncmds.c | 14 --- src/backend/replication/logical/relation.c | 15 ++- src/backend/replication/logical/tablesync.c | 114 +++++++++++++++----- src/include/replication/logicalrelation.h | 3 +- src/test/regress/expected/subscription.out | 3 - src/test/regress/sql/subscription.sql | 3 - src/test/subscription/t/011_generated.pl | 88 +++++++++++++++ 8 files changed, 193 insertions(+), 51 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index ee27a5873a..8fb4491b65 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -442,10 +442,6 @@ CREATE SUBSCRIPTION subscription_name - - This parameter can only be set true if copy_data is - set to false. - diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f942b58565..408a9157ec 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -459,20 +459,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } - - /* - * Do additional checking for disallowed combination when copy_data and - * include_generated_columns are true. COPY of generated columns is not - * supported yet. - */ - if (opts->copy_data && opts->include_generated_columns) - { - ereport(ERROR, - errcode(ERRCODE_SYNTAX_ERROR), - /*- translator: both %s are strings of the form "option = value" */ - errmsg("%s and %s are mutually exclusive options", - "copy_data = true", "include_generated_columns = true")); - } } /* diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 5de1531567..27c34059af 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -205,7 +205,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) * * Returns -1 if not found. */ -static int +int logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) { int i; @@ -427,6 +427,19 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) continue; } + /* + * In case 'include_generated_columns' is 'false', we should skip the + * check of missing attrs for generated columns. + * In case 'include_generated_columns' is 'true', we should check if + * corresponding column for the generated column in publication column + * list is present in the subscription table. + */ + if (!MySubscription->includegencols && attr->attgenerated) + { + entry->attrmap->attnums[i] = -1; + continue; + } + attnum = logicalrep_rel_att_by_name(remoterel, NameStr(attr->attname)); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b00267f042..b3fde6afb3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -118,6 +118,7 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -692,20 +693,63 @@ process_syncing_tables(XLogRecPtr current_lsn) } /* - * Create list of columns for COPY based on logical relation mapping. + * Create list of columns for COPY based on logical relation mapping. Do not + * include generated columns of the subscription table in the column list. */ static List * -make_copy_attnamelist(LogicalRepRelMapEntry *rel) +make_copy_attnamelist(LogicalRepRelMapEntry *rel, bool *remotegenlist) { List *attnamelist = NIL; - int i; + bool *gencollist; + TupleDesc desc; - for (i = 0; i < rel->remoterel.natts; i++) + desc = RelationGetDescr(rel->localrel); + gencollist = palloc0(MaxTupleAttributeNumber * sizeof(bool)); + + /* Loop to handle subscription table generated columns. */ + for (int i = 0; i < desc->natts; i++) { - attnamelist = lappend(attnamelist, - makeString(rel->remoterel.attnames[i])); + int attnum; + Form_pg_attribute attr = TupleDescAttr(desc, i); + + if (!attr->attgenerated) + continue; + + attnum = logicalrep_rel_att_by_name(&rel->remoterel, + NameStr(attr->attname)); + + if (attnum >= 0) + { + /* + * Check if the subscription table generated column has same + * name as a non-generated column in the corresponding + * publication table. + */ + if(!remotegenlist[attnum]) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication target relation \"%s.%s\" has a generated column \"%s\" " + "but corresponding column on source relation is not a generated column", + rel->remoterel.nspname, rel->remoterel.relname, NameStr(attr->attname)))); + + /* + * 'gencollist' records that this is a generated column in + * the subscription table. Later, we use this information to + * skip adding this column to the column list for COPY. + */ + gencollist[attnum] = true; + } } + /* + * Construct column list for COPY. + */ + for (int i = 0; i < rel->remoterel.natts; i++) + { + if(!gencollist[i]) + attnamelist = lappend(attnamelist, + makeString(rel->remoterel.attnames[i])); + } return attnamelist; } @@ -791,16 +835,17 @@ copy_read_data(void *outbuf, int minread, int maxread) * qualifications to be used in the COPY command. */ static void -fetch_remote_table_info(char *nspname, char *relname, +fetch_remote_table_info(char *nspname, char *relname, bool **remotegenlist, LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; + bool *remotegenlist_res; int natt; ListCell *lc; Bitmapset *included_cols = NULL; @@ -948,18 +993,24 @@ fetch_remote_table_info(char *nspname, char *relname, "SELECT a.attnum," " a.attname," " a.atttypid," - " a.attnum = ANY(i.indkey)" + " a.attnum = ANY(i.indkey)," + " a.attgenerated != ''" " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" " ON (i.indexrelid = pg_get_replica_identity_index(%u))" " WHERE a.attnum > 0::pg_catalog.int2" - " AND NOT a.attisdropped %s" + " AND NOT a.attisdropped", lrel->remoteid); + + if ((walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 && + walrcv_server_version(LogRepWorkerWalRcvConn) <= 160000) || + !MySubscription->includegencols) + appendStringInfo(&cmd, " AND a.attgenerated = ''"); + + appendStringInfo(&cmd, " AND a.attrelid = %u" " ORDER BY a.attnum", - lrel->remoteid, - (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? - "AND a.attgenerated = ''" : ""), lrel->remoteid); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(attrRow), attrRow); @@ -973,6 +1024,7 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + remotegenlist_res = palloc0(MaxTupleAttributeNumber * sizeof(bool)); /* * Store the columns as a list of names. Ignore those that are not @@ -1005,6 +1057,8 @@ fetch_remote_table_info(char *nspname, char *relname, if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); + remotegenlist_res[natt] = DatumGetBool(slot_getattr(slot, 5, &isnull)); + /* Should never happen. */ if (++natt >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", @@ -1015,7 +1069,7 @@ fetch_remote_table_info(char *nspname, char *relname, ExecDropSingleTupleTableSlot(slot); lrel->natts = natt; - + *remotegenlist = remotegenlist_res; walrcv_clear_result(res); /* @@ -1123,10 +1177,12 @@ copy_table(Relation rel) List *attnamelist; ParseState *pstate; List *options = NIL; + bool *remotegenlist; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel, &qual); + RelationGetRelationName(rel), &remotegenlist, + &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -1135,11 +1191,17 @@ copy_table(Relation rel) relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + attnamelist = make_copy_attnamelist(relmapentry, remotegenlist); + /* Start copy on the publisher. */ initStringInfo(&cmd); - /* Regular table with no row filter */ - if (lrel.relkind == RELKIND_RELATION && qual == NIL) + /* + * Regular table with no row filter and 'include_generated_columns' + * specified as 'false' during creation of subscription. + */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL && + !MySubscription->includegencols) { appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); @@ -1169,17 +1231,20 @@ copy_table(Relation rel) else { /* - * For non-tables and tables with row filters, we need to do COPY - * (SELECT ...), but we can't just do SELECT * because we need to not - * copy generated columns. For tables with any row filters, build a - * SELECT query with OR'ed row filters for COPY. + * For non-tables and tables with row filters and when + * 'include_generated_columns' is specified as 'true', we need to do + * COPY (SELECT ...), as normal COPY of generated column is not + * supported. For tables with any row filters, build a SELECT query + * with OR'ed row filters for COPY. */ + int i = 0; + appendStringInfoString(&cmd, "COPY (SELECT "); - for (int i = 0; i < lrel.natts; i++) + foreach_ptr(String, att_name, attnamelist) { - appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); - if (i < lrel.natts - 1) + if (i++) appendStringInfoString(&cmd, ", "); + appendStringInfoString(&cmd, quote_identifier(strVal(att_name))); } appendStringInfoString(&cmd, " FROM "); @@ -1237,7 +1302,6 @@ copy_table(Relation rel) (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock, NULL, false, false); - attnamelist = make_copy_attnamelist(relmapentry); cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options); /* Do the copy */ diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index e687b40a56..797e66dfdb 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -41,7 +41,8 @@ typedef struct LogicalRepRelMapEntry extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel); - +extern int logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, + const char *attname); extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode); extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b78e3c6d6a..d7c4298377 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -99,9 +99,6 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU ERROR: subscription with slot_name = NONE must also set create_slot = false CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, create_slot = false); ERROR: subscription with slot_name = NONE must also set enabled = false --- fail - copy_data and include_generated_columns are mutually exclusive options -CREATE SUBSCRIPTION sub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (include_generated_columns = true); -ERROR: copy_data = true and include_generated_columns = true are mutually exclusive options -- fail - include_generated_columns must be boolean CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, include_generated_columns = foo); ERROR: include_generated_columns requires a Boolean value diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index dbf064474c..838881be50 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -60,9 +60,6 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = false); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, create_slot = false); --- fail - copy_data and include_generated_columns are mutually exclusive options -CREATE SUBSCRIPTION sub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (include_generated_columns = true); - -- fail - include_generated_columns must be boolean CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, include_generated_columns = foo); diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl index bc6033adb0..3ab004429f 100644 --- a/src/test/subscription/t/011_generated.pl +++ b/src/test/subscription/t/011_generated.pl @@ -16,6 +16,8 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + "max_logical_replication_workers = 10"); $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; @@ -41,6 +43,28 @@ $node_publisher->safe_psql('postgres', $node_subscriber->safe_psql('postgres', "CREATE TABLE tab3 (a int, b int GENERATED ALWAYS AS (a + 20) STORED)"); +# tab4: publisher-side generated col 'b' and 'c' --> subscriber-side non-generated col 'b', and generated-col 'c' +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (a int , b int GENERATED ALWAYS AS (a * 2) STORED, c int GENERATED ALWAYS AS (a * 2) STORED)" +); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab4 (a int, b int, c int GENERATED ALWAYS AS (a * 22) STORED)" +); + +# tab5: publisher-side non-generated col 'b' --> subscriber-side generated col 'b' +$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int, b int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab5 (a int, b int GENERATED ALWAYS AS (a * 22) STORED)"); + +# tab6: publisher-side generated col 'b' and 'c' --> subscriber-side non-generated col 'b', and generated-col 'c' +# columns on subscriber in different order +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab6 (a int, b int GENERATED ALWAYS AS (a * 2) STORED, c int GENERATED ALWAYS AS (a * 2) STORED)"); + +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab6 (c int GENERATED ALWAYS AS (a * 22) STORED, b int, a int)"); + # data for initial sync $node_publisher->safe_psql('postgres', @@ -49,6 +73,12 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab2 (a) VALUES (1), (2), (3)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab3 (a) VALUES (1), (2), (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab4 (a) VALUES (1), (2), (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab5 (a, b) VALUES (1, 1), (2, 2), (3, 3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab6 (a) VALUES (1), (2), (3)"); $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE tab1"); @@ -56,6 +86,12 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab2"); $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab3"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub4 FOR TABLE tab4"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub5 FOR TABLE tab5"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub6 FOR TABLE tab6"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" @@ -69,6 +105,14 @@ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (include_generated_columns = true, copy_data = false)" ); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub4 CONNECTION '$publisher_connstr' PUBLICATION pub4 WITH (include_generated_columns = true)" +); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub6 CONNECTION '$publisher_connstr' PUBLICATION pub6 WITH (include_generated_columns = true)" +); + # Wait for initial sync of all subscriptions $node_subscriber->wait_for_subscription_sync; @@ -121,6 +165,50 @@ is( $result, qq(4|24 'confirm generated columns are NOT replicated when the subscriber-side column is also generated' ); +$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (4), (5)"); + +$node_publisher->wait_for_catchup('sub4'); + +# gen-col 'b' in publisher replicating to NOT gen-col 'b' on subscriber +# gen-col 'c' in publisher not replicating to gen-col 'c' on subscriber +$result = + $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM tab4 ORDER BY a"); +is( $result, qq(1|2|22 +2|4|44 +3|6|66 +4|8|88 +5|10|110), 'replicate generated column with initial sync'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab6 VALUES (4), (5)"); + +$node_publisher->wait_for_catchup('sub6'); + +# gen-col 'b' and 'c' in publisher replicating to NOT gen-col 'b' and gen-col 'c' on subscriber +# order of column is different on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT a, b, c FROM tab6 ORDER BY a"); +is( $result, qq(1|2|22 +2|4|44 +3|6|66 +4|8|88 +5|10|110), 'replicate generated column with initial sync different column order'); + +# NOT gen-col 'b' in publisher not replicating to gen-col 'b' on subscriber +my $offset = -s $node_subscriber->logfile; + +# sub5 will cause table sync worker to restart repetitively +# So SUBSCRIPTION sub5 is created separately +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub5 CONNECTION '$publisher_connstr' PUBLICATION pub5 WITH (include_generated_columns = true)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? logical replication target relation "public.tab5" has a generated column "b" but corresponding column on source relation is not a generated column/, + $offset); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub5"); + # try it with a subscriber-side trigger $node_subscriber->safe_psql( -- 2.34.1