From bcf3cb77bd1ee923f128bc4607b66a6287762743 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 21 Apr 2026 20:24:02 +0000 Subject: [PATCH v1] Add tests for concurrent DML retry paths in logical replication apply. The retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq for concurrent updates and deletes had no test coverage. When a tuple is concurrently modified on the subscriber while the apply worker is trying to lock it, table_tuple_lock returns TM_Updated or TM_Deleted, and the worker retries the scan. This commit adds an injection point and a TAP test that exercises these retry paths for both index scan and sequential scan. While here, fix minor inefficiency in the retry handling. In RelationFindReplTupleByIndex, avoid calling ExecMaterializeSlot before the retry check. In RelationFindReplTupleSeq, remove the unnecessary separate TupleTableSlot allocation and ExecCopySlot call by using outslot directly for scanning, keeping it consistent with RelationFindReplTupleByIndex. --- src/backend/executor/execReplication.c | 25 +-- src/test/subscription/meson.build | 1 + .../t/039_concurrent_dml_retry.pl | 152 ++++++++++++++++++ 3 files changed, 168 insertions(+), 10 deletions(-) create mode 100644 src/test/subscription/t/039_concurrent_dml_retry.pl diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b2ca5cbf117..0d219e35daa 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -37,6 +37,8 @@ #include "utils/syscache.h" #include "utils/typcache.h" +#include "utils/injection_point.h" + static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns); @@ -229,8 +231,6 @@ retry: continue; } - ExecMaterializeSlot(outslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -255,6 +255,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -269,6 +271,9 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } index_endscan(scan); @@ -370,7 +375,6 @@ bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot) { - TupleTableSlot *scanslot; TableScanDesc scan; SnapshotData snap; TypeCacheEntry **eq; @@ -386,7 +390,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, InitDirtySnapshot(snap); scan = table_beginscan(rel, &snap, 0, NULL, SO_NONE); - scanslot = table_slot_create(rel, NULL); retry: found = false; @@ -394,14 +397,11 @@ retry: table_rescan(scan, NULL); /* Try to find the tuple */ - while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + while (table_scan_getnextslot(scan, ForwardScanDirection, outslot)) { - if (!tuples_equal(scanslot, searchslot, eq, NULL)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; - found = true; - ExecCopySlot(outslot, scanslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -416,6 +416,7 @@ retry: } /* Found our tuple and it's not locked */ + found = true; break; } @@ -425,6 +426,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -439,10 +442,12 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } table_endscan(scan); - ExecDropSingleTupleTableSlot(scanslot); return found; } diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..5df7f143721 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/039_concurrent_dml_retry.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/039_concurrent_dml_retry.pl b/src/test/subscription/t/039_concurrent_dml_retry.pl new file mode 100644 index 00000000000..074aeafbcc4 --- /dev/null +++ b/src/test/subscription/t/039_concurrent_dml_retry.pl @@ -0,0 +1,152 @@ +# Copyright (c) 2025-2026, PostgreSQL Global Development Group + +# Test concurrent update/delete retry paths in logical replication apply. +# +# Uses injection points to pause the apply worker after finding a tuple but +# before locking it, allowing a concurrent session to modify or delete the +# same row. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Check if injection points are available. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +if ($node_publisher->check_extension('injection_points') == 0) +{ + $node_publisher->stop; + plan skip_all => 'injection_points not supported'; +} + +# Subscriber needs injection_points loaded. +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); +$node_subscriber->start; + +# Create tables on both publisher and subscriber, and set up replication. +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int); + CREATE TABLE test_tab_full (a int, b int); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int); + CREATE TABLE test_tab_full (a int, b int); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; + CREATE EXTENSION injection_points; +)); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION test_pub FOR TABLE test_tab, test_tab_full;"); + +my $appname = 'test_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION test_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION test_pub;"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Pre-insert all test data in a single batch to avoid multiple +# wait_for_catchup round trips. +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_tab VALUES (1, 10), (2, 30); + INSERT INTO test_tab_full VALUES (1, 100), (2, 300); +)); +$node_publisher->wait_for_catchup($appname); + +# Helper to run a single concurrent DML retry test using injection points. +sub test_concurrent_retry +{ + my (%args) = @_; + my $test_name = $args{name}; + my $pub_dml = $args{pub_dml}; + my $sub_dml = $args{sub_dml}; + my $expected_log = $args{expected_log}; + my $verify_query = $args{verify_query}; + my $verify_result = $args{verify_result}; + + # Attach injection point. + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('find-repl-tuple-before-lock', 'wait');"); + + # Publish the DML that will trigger the apply worker to find + lock. + $node_publisher->safe_psql('postgres', $pub_dml); + + # Wait for the apply worker to hit the injection point. + $node_subscriber->wait_for_event('logical replication apply worker', + 'find-repl-tuple-before-lock'); + + # Execute concurrent DML on subscriber while apply worker is paused. + $node_subscriber->safe_psql('postgres', $sub_dml); + + my $log_offset = -s $node_subscriber->logfile; + + # Detach first so the retry loop doesn't hit the injection point again, + # then wake up the apply worker. + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_detach('find-repl-tuple-before-lock'); + SELECT injection_points_wakeup('find-repl-tuple-before-lock');"); + + # Confirm the expected log message. + $node_subscriber->wait_for_log($expected_log, $log_offset); + pass("$test_name: concurrent modification detected and retried"); + + # Wait for apply to finish and verify result. + $node_publisher->wait_for_catchup($appname); + + my $result = $node_subscriber->safe_psql('postgres', $verify_query); + is($result, $verify_result, "$test_name: data correct after retry"); +} + +# TM_Updated via index scan (PK): concurrent update on subscriber. +test_concurrent_retry( + name => 'index scan TM_Updated', + pub_dml => "UPDATE test_tab SET b = 20 WHERE a = 1;", + sub_dml => "UPDATE test_tab SET b = 99 WHERE a = 1;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT b FROM test_tab WHERE a = 1;", + verify_result => '20', +); + +# TM_Deleted via index scan (PK): concurrent delete on subscriber. +test_concurrent_retry( + name => 'index scan TM_Deleted', + pub_dml => "UPDATE test_tab SET b = 40 WHERE a = 2;", + sub_dml => "DELETE FROM test_tab WHERE a = 2;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab WHERE a = 2;", + verify_result => '0', +); + +# TM_Updated via seq scan (REPLICA IDENTITY FULL): concurrent update on +# subscriber. +test_concurrent_retry( + name => 'seq scan TM_Updated', + pub_dml => "UPDATE test_tab_full SET b = 200 WHERE a = 1;", + sub_dml => "UPDATE test_tab_full SET b = 999 WHERE a = 1;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT b FROM test_tab_full WHERE a = 1;", + verify_result => '999', +); + +# TM_Deleted via seq scan (REPLICA IDENTITY FULL): concurrent delete on +# subscriber. +test_concurrent_retry( + name => 'seq scan TM_Deleted', + pub_dml => "UPDATE test_tab_full SET b = 400 WHERE a = 2;", + sub_dml => "DELETE FROM test_tab_full WHERE a = 2;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab_full WHERE a = 2;", + verify_result => '0', +); + +done_testing(); -- 2.47.3