From 84ec2ad8c0542d9bafc8ca7c6f4c65380cb9bdc7 Mon Sep 17 00:00:00 2001 From: Cloud User Date: Fri, 17 Mar 2023 14:20:10 +0000 Subject: [PATCH v3] Ignore dropped columns when REPLICA IDENTITY FULL Dropped columns are filled with NULL values on slot_store_data() but not on table_scan_getnextslot(). With this commit, we skip such columns while checking tuple equality. --- src/backend/executor/execReplication.c | 9 ++++- src/test/subscription/t/100_bugs.pl | 55 +++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 108162bb35..910608e41e 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -236,6 +236,13 @@ tuple_equals_slot(TupleDesc desc, HeapTuple tup, TupleTableSlot *slot) Form_pg_attribute att; TypeCacheEntry *typentry; + /* + * Ignore dropped columns as the publisher doesn't send those + */ + att = TupleDescAttr(desc, attrnum); + if (att->attisdropped) + continue; + /* * If one value is NULL and other is not, then they are certainly not * equal @@ -249,8 +256,6 @@ tuple_equals_slot(TupleDesc desc, HeapTuple tup, TupleTableSlot *slot) if (isnull[attrnum]) continue; - att = TupleDescAttr(desc, attrnum); - typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO); if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) ereport(ERROR, diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index 08e5f0bcb0..fa42fc67ea 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 5; +use Test::More tests => 6; # Bug #15114 @@ -67,6 +67,59 @@ pass('index predicates do not cause crash'); $node_publisher->stop('fast'); $node_subscriber->stop('fast'); +# The bug was that when the REPLICA IDENTITY FULL is used with dropped columns, +# we fail to apply updates and deletes +my $node_publisher_d_cols = get_new_node('node_publisher_d_cols'); +$node_publisher_d_cols->init(allows_streaming => 'logical'); +$node_publisher_d_cols->start; + +my $node_subscriber_d_cols = get_new_node('node_subscriber_d_cols'); +$node_subscriber_d_cols->init(allows_streaming => 'logical'); +$node_subscriber_d_cols->start; + +$node_publisher_d_cols->safe_psql( + 'postgres', qq( + CREATE TABLE dropped_cols (a int, b_drop int, c int); + ALTER TABLE dropped_cols REPLICA IDENTITY FULL; + CREATE PUBLICATION pub_dropped_cols FOR TABLE dropped_cols; + -- some initial data + INSERT INTO dropped_cols VALUES (1, 1, 1); +)); + +$node_subscriber_d_cols->safe_psql( + 'postgres', qq( + CREATE TABLE dropped_cols (a int, b_drop int, c int); +)); + +my $publisher_connstr_d_cols = $node_publisher_d_cols->connstr . ' dbname=postgres'; +$node_subscriber_d_cols->safe_psql('postgres', + "CREATE SUBSCRIPTION sub_dropped_cols CONNECTION '$publisher_connstr_d_cols application_name=sub_dropped_cols' PUBLICATION pub_dropped_cols" +); + +# Wait for initial table sync to finish +$node_subscriber_d_cols->wait_for_subscription_sync($node_publisher_d_cols, 'sub_dropped_cols'); + +$node_publisher_d_cols->safe_psql( + 'postgres', qq( + ALTER TABLE dropped_cols DROP COLUMN b_drop; +)); +$node_subscriber_d_cols->safe_psql( + 'postgres', qq( + ALTER TABLE dropped_cols DROP COLUMN b_drop; +)); + +$node_publisher_d_cols->safe_psql( + 'postgres', qq( + UPDATE dropped_cols SET a = 100; +)); +$node_publisher_d_cols->wait_for_catchup('sub_dropped_cols'); + +is($node_subscriber_d_cols->safe_psql('postgres', "SELECT count(*) FROM dropped_cols WHERE a = 100"), + qq(1), 'replication with RI FULL and dropped columns'); + +$node_publisher_d_cols->stop('fast'); +$node_subscriber_d_cols->stop('fast'); + # Handling of temporary and unlogged tables with FOR ALL TABLES publications -- 2.31.1