From ecac940b662460037820f28eb935099b82a9a9cb Mon Sep 17 00:00:00 2001 From: nkey Date: Sat, 23 Nov 2024 13:25:11 +0100 Subject: [PATCH v17 1/2] This patch introduces new injection points and TAP tests to reproduce and verify conflict detection issues that arise during SNAPSHOT_DIRTY index scans in logical replication. --- src/backend/access/index/indexam.c | 9 ++ src/backend/access/nbtree/README | 9 ++ src/backend/executor/execIndexing.c | 7 +- src/backend/replication/logical/worker.c | 4 + src/include/utils/snapshot.h | 14 ++ src/test/subscription/meson.build | 4 + .../subscription/t/038_delete_missing_race.pl | 139 +++++++++++++++++ .../subscription/t/039_update_missing_race.pl | 141 +++++++++++++++++ .../t/040_update_missing_with_retain.pl | 143 ++++++++++++++++++ .../t/041_update_missing_simulation.pl | 125 +++++++++++++++ 10 files changed, 594 insertions(+), 1 deletion(-) create mode 100644 src/test/subscription/t/038_delete_missing_race.pl create mode 100644 src/test/subscription/t/039_update_missing_race.pl create mode 100644 src/test/subscription/t/040_update_missing_with_retain.pl create mode 100644 src/test/subscription/t/041_update_missing_simulation.pl diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 43f64a0e721..d934b636a52 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -52,11 +52,13 @@ #include "catalog/pg_type.h" #include "nodes/execnodes.h" #include "pgstat.h" +#include "replication/logicalworker.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "utils/injection_point.h" /* ---------------------------------------------------------------- @@ -751,6 +753,13 @@ index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot * * the index. */ Assert(ItemPointerIsValid(&scan->xs_heaptid)); +#ifdef USE_INJECTION_POINTS + if (!IsCatalogRelation(scan->heapRelation) && IsLogicalWorker()) + { + INJECTION_POINT("index_getnext_slot_before_fetch_apply_dirty", NULL); + } +#endif + if (index_fetch_heap(scan, slot)) return true; } diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README index 53d4a61dc3f..634a3d10bb1 100644 --- a/src/backend/access/nbtree/README +++ b/src/backend/access/nbtree/README @@ -103,6 +103,15 @@ We also remember the left-link, and follow it when the scan moves backwards (though this requires extra handling to account for concurrent splits of the left sibling; see detailed move-left algorithm below). +Despite the described mechanics in place, inconsistent results may still occur +during non-MVCC scans (SnapshotDirty and SnapshotSelf). This issue can occur if a +concurrent transaction deletes a tuple and inserts a new tuple with a new TID in the +same page or to the left/right (depending on scan direction) of current scan position. +If the scan has already visited the page and cached its content in the +backend-local storage, it might skip the old tuple due to deletion and miss the new +tuple because the scan does not re-read the page. Note it affects not only btree +scan but also a heap scan. + In most cases we release our lock and pin on a page before attempting to acquire pin and lock on the page we are moving to. In a few places it is necessary to lock the next page before releasing the current one. diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index 9d071e495c6..dc2d77ad704 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -118,6 +118,7 @@ #include "utils/multirangetypes.h" #include "utils/rangetypes.h" #include "utils/snapmgr.h" +#include "utils/injection_point.h" /* waitMode argument to check_exclusion_or_unique_constraint() */ typedef enum @@ -780,7 +781,9 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index, /* * Search the tuples that are in the index for any violations, including * tuples that aren't visible yet. - */ + * Snapshot dirty may miss some tuples in the case of parallel updates, + * but it is acceptable here. + */ InitDirtySnapshot(DirtySnapshot); for (i = 0; i < indnkeyatts; i++) @@ -948,6 +951,8 @@ retry: INJECTION_POINT("check-exclusion-or-unique-constraint-no-conflict", NULL); #endif + if (!conflict) + INJECTION_POINT("check_exclusion_or_unique_constraint_no_conflict", NULL); return !conflict; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 033858752d9..d774b08e7d0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -287,6 +287,7 @@ #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/guc.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -2989,7 +2990,10 @@ apply_handle_update_internal(ApplyExecutionData *edata, conflicttuple.origin != replorigin_xact_state.origin) type = CT_UPDATE_DELETED; else + { + INJECTION_POINT("apply_handle_update_internal_update_missing", NULL); type = CT_UPDATE_MISSING; + } /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 9766aabcad4..2fc0058796a 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -53,6 +53,13 @@ typedef enum SnapshotType * - previous commands of this transaction * - changes made by the current command * + * Note: such a snapshot may miss an existing logical tuple in case of + * parallel update. + * If a new version of a tuple is inserted into an already processed page + * but the old one marked with committed xmax - snapshot will skip the old + * one and never meet the new one during that scan - resulting in skipping + * that tuple at all. + * * Does _not_ include: * - in-progress transactions (as of the current instant) * ------------------------------------------------------------------------- @@ -82,6 +89,13 @@ typedef enum SnapshotType * transaction and committed/aborted xacts are concerned. However, it * also includes the effects of other xacts still in progress. * + * Note: such a snapshot may miss an existing logical tuple in case of + * parallel update. + * If a new version of a tuple is inserted into an already processed page but the + * old one marked with committed/in-progress xmax - snapshot will skip the old one + * and never meet the new one during that scan - resulting in skipping that tuple + * at all. + * * A special hack is that when a snapshot of this type is used to * determine tuple visibility, the passed-in snapshot struct is used as an * output argument to return the xids of concurrent xacts that affected diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index f4a9cf5057f..c3163f647de 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -47,6 +47,10 @@ tests += { 't/035_conflicts.pl', 't/036_sequences.pl', 't/037_except.pl', + 't/038_delete_missing_race.pl', + 't/039_update_missing_race.pl', + 't/040_update_missing_with_retain.pl', + 't/041_update_missing_simulation.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/038_delete_missing_race.pl b/src/test/subscription/t/038_delete_missing_race.pl new file mode 100644 index 00000000000..51dd351dc10 --- /dev/null +++ b/src/test/subscription/t/038_delete_missing_race.pl @@ -0,0 +1,139 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################## Set it to 0 to make set success; TODO: delete that for commit +my $simulate_race_condition = 1; +############################## + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->start; + + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text);"); + +# Create similar table on subscriber with additional index to disable HOT updates +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text); + CREATE INDEX data_index ON conf_tab(data);"); + +# Set up extension to simulate race condition +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE conf_tab"); + +# Insert row to be updated later +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +############################################ +# Race condition because of DirtySnapshot +############################################ + +my $psql_session_subscriber = $node_subscriber->background_psql('postgres'); +if ($simulate_race_condition) +{ + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')"); +} + +my $log_offset = -s $node_subscriber->logfile; + +# Delete tuple on publisher +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE a=1;"); + +if ($simulate_race_condition) +{ + # Wait apply worker to start the search for the tuple using index + $node_subscriber->wait_for_event('logical replication apply worker', + 'index_getnext_slot_before_fetch_apply_dirty'); +} + +# Updater tuple on subscriber +$psql_session_subscriber->query_until( + qr/start/, qq[ + \\echo start + UPDATE conf_tab SET data = 'fromsubnew' WHERE (a=1); +]); + + +if ($simulate_race_condition) +{ + # Wake up apply worker + $node_subscriber->safe_psql('postgres'," + SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty'); + SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty'); + "); +} + +# Tuple was updated - so, we have conflict +$node_subscriber->wait_for_log( + qr/conflict detected on relation \"public.conf_tab\"/, + $log_offset); + +$node_publisher->wait_for_catchup($appname); + +# But tuple should be deleted on subscriber any way +is($node_subscriber->safe_psql('postgres', 'SELECT count(*) from conf_tab'), 0, 'record deleted on subscriber'); + +ok(!$node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=delete_missing/, + $log_offset), 'invalid conflict detected'); + +ok($node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=delete_origin_differs/, + $log_offset), 'correct conflict detected'); + +done_testing(); diff --git a/src/test/subscription/t/039_update_missing_race.pl b/src/test/subscription/t/039_update_missing_race.pl new file mode 100644 index 00000000000..1e120f74bbd --- /dev/null +++ b/src/test/subscription/t/039_update_missing_race.pl @@ -0,0 +1,141 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################## Set it to 0 to make set success; TODO: delete that for commit +my $simulate_race_condition = 1; +############################## + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->start; + + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text);"); + +# Create similar table on subscriber with additional index to disable HOT updates and additional column +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0); + CREATE INDEX i_index ON conf_tab(i);"); + +# Set up extension to simulate race condition +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE conf_tab"); + +# Insert row to be updated later +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +############################################ +# Race condition because of DirtySnapshot +############################################ + +my $psql_session_subscriber = $node_subscriber->background_psql('postgres'); +if ($simulate_race_condition) +{ + $node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')"); +} + +my $log_offset = -s $node_subscriber->logfile; + +# Update tuple on publisher +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);"); + + +if ($simulate_race_condition) +{ + # Wait apply worker to start the search for the tuple using index + $node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty'); +} + +# Update additional(!) column on the subscriber +$psql_session_subscriber->query_until( + qr/start/, qq[ + \\echo start + UPDATE conf_tab SET i = 1 WHERE (a=1); +]); + + +if ($simulate_race_condition) +{ + # Wake up apply worker + $node_subscriber->safe_psql('postgres'," + SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty'); + SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty'); + "); +} + +# Tuple was updated - so, we have conflict +$node_subscriber->wait_for_log( + qr/conflict detected on relation \"public.conf_tab\"/, + $log_offset); + +$node_publisher->wait_for_catchup($appname); + +# We need new column value be synced with subscriber +is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber'); +# And additional column maintain updated value +is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber'); + +ok(!$node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_missing/, + $log_offset), 'invalid conflict detected'); + +ok($node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/, + $log_offset), 'correct conflict detected'); + +done_testing(); diff --git a/src/test/subscription/t/040_update_missing_with_retain.pl b/src/test/subscription/t/040_update_missing_with_retain.pl new file mode 100644 index 00000000000..7b225d45f7f --- /dev/null +++ b/src/test/subscription/t/040_update_missing_with_retain.pl @@ -0,0 +1,143 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################## Set it to 0 to make set success; TODO: delete that for commit +my $simulate_race_condition = 1; +############################## + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->append_conf('postgresql.conf', + qq(wal_level = 'replica')); +$node_subscriber->start; + + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text);"); + +# Create similar table on subscriber with additional index to disable HOT updates and additional column +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0); + CREATE INDEX i_index ON conf_tab(i);"); + +# Set up extension to simulate race condition +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE conf_tab"); + +# Insert row to be updated later +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub WITH (retain_dead_tuples = true)"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +############################################ +# Race condition because of DirtySnapshot +############################################ + +my $psql_session_subscriber = $node_subscriber->background_psql('postgres'); +if ($simulate_race_condition) +{ + $node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')"); +} + +my $log_offset = -s $node_subscriber->logfile; + +# Update tuple on publisher +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);"); + + +if ($simulate_race_condition) +{ + # Wait apply worker to start the search for the tuple using index + $node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty'); +} + +# Update additional(!) column on the subscriber +$psql_session_subscriber->query_until( + qr/start/, qq[ + \\echo start + UPDATE conf_tab SET i = 1 WHERE (a=1); +]); + + +if ($simulate_race_condition) +{ + # Wake up apply worker + $node_subscriber->safe_psql('postgres'," + SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty'); + SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty'); + "); +} + +# Tuple was updated - so, we have conflict +$node_subscriber->wait_for_log( + qr/conflict detected on relation \"public.conf_tab\"/, + $log_offset); + +$node_publisher->wait_for_catchup($appname); + +# We need new column value be synced with subscriber +is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber'); +# And additional column maintain updated value +is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber'); + +ok(!$node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_deleted/, + $log_offset), 'invalid conflict detected'); + +ok($node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/, + $log_offset), 'correct conflict detected'); + +done_testing(); diff --git a/src/test/subscription/t/041_update_missing_simulation.pl b/src/test/subscription/t/041_update_missing_simulation.pl new file mode 100644 index 00000000000..21fcd1ceb53 --- /dev/null +++ b/src/test/subscription/t/041_update_missing_simulation.pl @@ -0,0 +1,125 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +# Not intended to be committed because quite heavy +# Here to demonstrate reproducibility with pgbench +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use IPC::Run qw(start finish); +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->start; + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE tbl(a int PRIMARY key, data_pub int);"); + +# Create similar table on subscriber with additional index to disable HOT updates +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE tbl(a int PRIMARY key, data_pub int, data_sub int default 0); + CREATE INDEX data_index ON tbl(data_pub);"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE tbl"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +my $num_rows = 10; +my $num_updates = 10000; +my $num_clients = 10; +$node_publisher->safe_psql('postgres', "INSERT INTO tbl SELECT i, i * i FROM generate_series(1,$num_rows) i"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Prepare small pgbench scripts as files +my $sub_sql = $node_subscriber->basedir . '/sub_update.sql'; +my $pub_sql = $node_publisher->basedir . '/pub_delete.sql'; + +open my $fh1, '>', $sub_sql or die $!; +print $fh1 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_sub = data_sub + 1 WHERE a = :num;\n"; +close $fh1; + +open my $fh2, '>', $pub_sql or die $!; +print $fh2 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_pub = data_pub + 1 WHERE a = :num;\n"; +close $fh2; + +my @sub_cmd = ( + 'pgbench', + '--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates", + '-p', $node_subscriber->port, '-h', $node_subscriber->host, '-f', $sub_sql, 'postgres' +); + +my @pub_cmd = ( + 'pgbench', + '--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates", + '-p', $node_publisher->port, '-h', $node_publisher->host, '-f', $pub_sql, 'postgres' +); + +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); +# This should never happen +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('apply_handle_update_internal_update_missing', 'error')"); +my $log_offset = -s $node_subscriber->logfile; + +# Start both concurrently +my ($sub_out, $sub_err, $pub_out, $pub_err) = ('', '', '', ''); +my $sub_h = start \@sub_cmd, '>', \$sub_out, '2>', \$sub_err; +my $pub_h = start \@pub_cmd, '>', \$pub_out, '2>', \$pub_err; + +# Wait for completion +finish $sub_h; +finish $pub_h; + +like($sub_out, qr/actually processed/, 'subscriber pgbench completed'); +like($pub_out, qr/actually processed/, 'publisher pgbench completed'); + +# Let subscription catch up, then check expectations +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +ok(!$node_subscriber->log_contains( + qr/ERROR: error triggered for injection point apply_handle_update_internal_update_missing/, + $log_offset), 'invalid conflict detected'); + +done_testing(); -- 2.43.0