From e8ad78037540bb0f904197bead7871043cd02dd6 Mon Sep 17 00:00:00 2001 From: "shiy.fnst" Date: Fri, 13 May 2022 14:50:30 +0800 Subject: [PATCH v68 2/6] Test streaming parallel option in tap test Change some TAP tests using the SUBSCRIPTION "streaming" parameter, so they now test both 'on' and 'parallel' values. --- src/test/subscription/t/015_stream.pl | 273 +++++++-- src/test/subscription/t/016_stream_subxact.pl | 130 +++-- src/test/subscription/t/017_stream_ddl.pl | 4 + .../t/018_stream_subxact_abort.pl | 220 +++++--- .../t/019_stream_subxact_ddl_abort.pl | 4 + .../subscription/t/022_twophase_cascade.pl | 381 +++++++------ .../subscription/t/023_twophase_stream.pl | 523 ++++++++++-------- 7 files changed, 1013 insertions(+), 522 deletions(-) diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index cbaa327e44..6faa308333 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -8,6 +8,128 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + # Interleave a pair of transactions, each exceeding the 64kB limit. + my $in = ''; + my $out = ''; + + my $offset = 0; + + my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); + + my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, + on_error_stop => 0); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $in .= q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + }; + $h->pump_nb; + + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); + DELETE FROM test_tab WHERE a > 5000; + COMMIT; + }); + + $in .= q{ + COMMIT; + \q + }; + $h->finish; # errors make the next test fail, so ignore them here + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'check extra columns contain local defaults'); + + # Test the streaming in binary mode + $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Insert, update and delete enough rows to exceed the 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(6667|6667|6667), + 'check extra columns contain local defaults'); + + # Change the local values of the extra columns on the subscriber, + # update publisher, and check that subscriber retains the expected + # values. This is to ensure that non-streaming transactions behave + # properly after a streaming transaction. + $node_subscriber->safe_psql('postgres', + "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $node_publisher->safe_psql('postgres', + "UPDATE test_tab SET b = md5(a::text)"); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" + ); + is($result, qq(6667|6667|6667), + 'check extra columns contain locally changed data'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -26,17 +148,27 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); + # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" ); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab on test_tab_2(a)"); + # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,6 +181,42 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" +); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + +# Test that the deadlock is detected among leader and parallel apply workers. + +$node_subscriber->append_conf('postgresql.conf', "deadlock_timeout = 1ms"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + # Interleave a pair of transactions, each exceeding the 64kB limit. my $in = ''; my $out = ''; @@ -58,73 +226,90 @@ my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, on_error_stop => 0); +# Confirm if a deadlock between the leader apply worker and the parallel apply +# worker can be detected. + +my $offset = -s $node_subscriber->logfile; + $in .= q{ BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; +INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); }; $h->pump_nb; -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); -DELETE FROM test_tab WHERE a > 5000; -COMMIT; -}); +# Ensure that the parallel apply worker executes the insert command before the +# leader worker. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/, + $offset); + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); $in .= q{ COMMIT; \q }; -$h->finish; # errors make the next test fail, so ignore them here +$h->finish; +$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, + $offset); + +# In order for the two transactions to be completed normally without causing +# conflicts due to the unique index, we temporarily drop it. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(5001), 'data replicated to subscriber after dropping index'); -# Test the streaming in binary mode +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); +$node_publisher->wait_for_catchup($appname); $node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + "CREATE UNIQUE INDEX idx_tab on test_tab_2(a)"); -# Insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -COMMIT; -}); +# Confirm if a deadlock between two parallel apply workers can be detected. -$node_publisher->wait_for_catchup($appname); +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(6667|6667|6667), 'check extra columns contain local defaults'); +$in .= q{ +BEGIN; +INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +# Ensure that the first parallel apply worker executes the insert command +# before the second one. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/, + $offset); -# Change the local values of the extra columns on the subscriber, -# update publisher, and check that subscriber retains the expected -# values. This is to ensure that non-streaming transactions behave -# properly after a streaming transaction. -$node_subscriber->safe_psql('postgres', - "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" -); $node_publisher->safe_psql('postgres', - "UPDATE test_tab SET b = md5(a::text)"); + "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, + $offset); + +# In order for the two transactions to be completed normally without causing +# conflicts due to the unique index, we temporarily drop it. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" -); -is($result, qq(6667|6667|6667), - 'check extra columns contain locally changed data'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(10000), 'data replicated to subscriber after dropping index'); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl index db29f089a0..7e374f687b 100644 --- a/src/test/subscription/t/016_stream_subxact.pl +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -8,6 +8,73 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s3; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s4; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(12|12|12), + 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' + ); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -37,6 +104,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,41 +120,34 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -# Insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s4; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(12|12|12), - 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl index 866f1512e4..b4101e979d 100644 --- a/src/test/subscription/t/017_stream_ddl.pl +++ b/src/test/subscription/t/017_stream_ddl.pl @@ -2,6 +2,10 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group # Test streaming of large transaction with DDL and subtransactions +# +# This file is mainly to test the DDL/DML interaction of publisher side and +# doesn't cover some extra code, so we don't add a parallel version for this +# file. use strict; use warnings; use PostgreSQL::Test::Cluster; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 1458c3a0fc..24428131f7 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -8,6 +8,124 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with DDL, DML and ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (3, md5(3::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (4, md5(4::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (5, md5(5::text)); + SAVEPOINT s3; + INSERT INTO test_tab VALUES (6, md5(6::text)); + ROLLBACK TO s2; + INSERT INTO test_tab VALUES (7, md5(7::text)); + ROLLBACK TO s1; + INSERT INTO test_tab VALUES (8, md5(8::text)); + SAVEPOINT s4; + INSERT INTO test_tab VALUES (9, md5(9::text)); + SAVEPOINT s5; + INSERT INTO test_tab VALUES (10, md5(10::text)); + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(6|0), + 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with subscriber receiving out of order + # subtransaction ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (11, md5(11::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (12, md5(12::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (13, md5(13::text)); + SAVEPOINT s3; + INSERT INTO test_tab VALUES (14, md5(14::text)); + RELEASE s2; + INSERT INTO test_tab VALUES (15, md5(15::text)); + ROLLBACK TO s1; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(7|0), + 'check rollback to savepoint was reflected on subscriber'); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with subscriber receiving rollback + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (16, md5(16::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (17, md5(17::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (18, md5(18::text)); + ROLLBACK; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(7|0), 'check rollback was reflected on subscriber'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -36,6 +154,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -48,81 +170,33 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -# streamed transaction with DDL, DML and ROLLBACKs -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (3, md5(3::text)); -SAVEPOINT s1; -INSERT INTO test_tab VALUES (4, md5(4::text)); -SAVEPOINT s2; -INSERT INTO test_tab VALUES (5, md5(5::text)); -SAVEPOINT s3; -INSERT INTO test_tab VALUES (6, md5(6::text)); -ROLLBACK TO s2; -INSERT INTO test_tab VALUES (7, md5(7::text)); -ROLLBACK TO s1; -INSERT INTO test_tab VALUES (8, md5(8::text)); -SAVEPOINT s4; -INSERT INTO test_tab VALUES (9, md5(9::text)); -SAVEPOINT s5; -INSERT INTO test_tab VALUES (10, md5(10::text)); -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(6|0), - 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -# streamed transaction with subscriber receiving out of order subtransaction -# ROLLBACKs -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (11, md5(11::text)); -SAVEPOINT s1; -INSERT INTO test_tab VALUES (12, md5(12::text)); -SAVEPOINT s2; -INSERT INTO test_tab VALUES (13, md5(13::text)); -SAVEPOINT s3; -INSERT INTO test_tab VALUES (14, md5(14::text)); -RELEASE s2; -INSERT INTO test_tab VALUES (15, md5(15::text)); -ROLLBACK TO s1; -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(7|0), - 'check rollback to savepoint was reflected on subscriber'); - -# streamed transaction with subscriber receiving rollback -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (16, md5(16::text)); -SAVEPOINT s1; -INSERT INTO test_tab VALUES (17, md5(17::text)); -SAVEPOINT s2; -INSERT INTO test_tab VALUES (18, md5(18::text)); -ROLLBACK; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(7|0), 'check rollback was reflected on subscriber'); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_publisher, $node_subscriber, $appname, 1); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl index c6719c1af8..905d2516ba 100644 --- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -3,6 +3,10 @@ # Test streaming of transaction with subtransactions, DDLs, DMLs, and # rollbacks +# +# This file is mainly to test the DDL/DML interaction of publisher side and +# doesn't cover some extra code, so we don't add a parallel version for this +# file. use strict; use warnings; use PostgreSQL::Test::Cluster; diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index 7a797f37ba..d6678d69dc 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -11,6 +11,213 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $streaming_mode, $type) = @_; + + if ($streaming_mode eq 'parallel') + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_A, $node_B, $node_C, $appname_B, $appname_C, $streaming_mode) = + @_; + + my $offset_B = 0; + my $offset_C = 0; + + my $oldpid_B = $node_A->safe_psql( + 'postgres', " + SELECT pid FROM pg_stat_replication + WHERE application_name = '$appname_B' AND state = 'streaming';"); + my $oldpid_C = $node_B->safe_psql( + 'postgres', " + SELECT pid FROM pg_stat_replication + WHERE application_name = '$appname_C' AND state = 'streaming';"); + + # Setup logical replication streaming mode + + $node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_B + SET (streaming = $streaming_mode);"); + $node_C->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_C + SET (streaming = $streaming_mode)"); + + # Wait for subscribers to finish initialization + + $node_A->poll_query_until( + 'postgres', " + SELECT pid != $oldpid_B FROM pg_stat_replication + WHERE application_name = '$appname_B' AND state = 'streaming';" + ) or die "Timed out while waiting for apply to restart"; + $node_B->poll_query_until( + 'postgres', " + SELECT pid != $oldpid_C FROM pg_stat_replication + WHERE application_name = '$appname_C' AND state = 'streaming';" + ) or die "Timed out while waiting for apply to restart"; + + ############################### + # Test 2PC PREPARE / COMMIT PREPARED. + # 1. Data is streamed as a 2PC transaction. + # 2. Then do commit prepared. + # + # Expect all data is replicated on subscriber(s) after the commit. + ############################### + + # Check the subscriber log from now on. + $offset_B = -s $node_B->logfile; + $offset_C = -s $node_C->logfile; + + # Insert, update and delete enough rows to exceed the 64kB limit. + # Then 2PC PREPARE + $node_A->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + check_parallel_log($node_B, $offset_B, $streaming_mode, 'PREPARE'); + check_parallel_log($node_C, $offset_C, $streaming_mode, 'PREPARE'); + + # check the transaction state is prepared on subscriber(s) + my $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber C'); + + # 2PC COMMIT + $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + # check that transaction was committed on subscriber(s) + $result = $node_B->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults' + ); + $result = $node_C->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults' + ); + + # check the transaction state is ended on subscriber(s) + $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber C'); + + ############################### + # Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT. + # 0. Cleanup from previous test leaving only 2 rows. + # 1. Insert one more row. + # 2. Record a SAVEPOINT. + # 3. Data is streamed using 2PC. + # 4. Do rollback to SAVEPOINT prior to the streamed inserts. + # 5. Then COMMIT PREPARED. + # + # Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1). + ############################### + + # First, delete the data except for 2 rows (delete will be replicated) + $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset_B = -s $node_B->logfile; + $offset_C = -s $node_C->logfile; + + # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT + $node_A->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO test_tab VALUES (9999, 'foobar'); + SAVEPOINT sp_inner; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + check_parallel_log($node_B, $offset_B, $streaming_mode, 'PREPARE'); + + # check the transaction state prepared on subscriber(s) + $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber C'); + + # 2PC COMMIT + $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + # check the transaction state is ended on subscriber + $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is ended on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is ended on subscriber C'); + + # check inserts are visible at subscriber(s). + # All the streamed data (prior to the SAVEPOINT) should be rolled back. + # (9999, 'foobar') should be committed. + $result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM test_tab where b = 'foobar';"); + is($result, qq(1), 'Rows committed are present on subscriber B'); + $result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); + is($result, qq(3), 'Rows committed are present on subscriber B'); + $result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM test_tab where b = 'foobar';"); + is($result, qq(1), 'Rows committed are present on subscriber C'); + $result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); + is($result, qq(3), 'Rows committed are present on subscriber C'); + + # Cleanup the test data + $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); +} + ############################### # Setup a cascade of pub/sub nodes. # node_A -> node_B -> node_C @@ -260,160 +467,26 @@ is($result, qq(21), 'Rows committed are present on subscriber C'); # 2PC + STREAMING TESTS # --------------------- -my $oldpid_B = $node_A->safe_psql( - 'postgres', " - SELECT pid FROM pg_stat_replication - WHERE application_name = '$appname_B' AND state = 'streaming';"); -my $oldpid_C = $node_B->safe_psql( - 'postgres', " - SELECT pid FROM pg_stat_replication - WHERE application_name = '$appname_C' AND state = 'streaming';"); - -# Setup logical replication (streaming = on) - -$node_B->safe_psql( - 'postgres', " - ALTER SUBSCRIPTION tap_sub_B - SET (streaming = on);"); -$node_C->safe_psql( - 'postgres', " - ALTER SUBSCRIPTION tap_sub_C - SET (streaming = on)"); - -# Wait for subscribers to finish initialization - -$node_A->poll_query_until( - 'postgres', " - SELECT pid != $oldpid_B FROM pg_stat_replication - WHERE application_name = '$appname_B' AND state = 'streaming';" -) or die "Timed out while waiting for apply to restart"; -$node_B->poll_query_until( - 'postgres', " - SELECT pid != $oldpid_C FROM pg_stat_replication - WHERE application_name = '$appname_C' AND state = 'streaming';" -) or die "Timed out while waiting for apply to restart"; - -############################### -# Test 2PC PREPARE / COMMIT PREPARED. -# 1. Data is streamed as a 2PC transaction. -# 2. Then do commit prepared. -# -# Expect all data is replicated on subscriber(s) after the commit. -############################### - -# Insert, update and delete enough rows to exceed the 64kB limit. -# Then 2PC PREPARE -$node_A->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); - -# check the transaction state is prepared on subscriber(s) -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber C'); - -# 2PC COMMIT -$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); - -# check that transaction was committed on subscriber(s) -$result = $node_B->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), - 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults' -); -$result = $node_C->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), - 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults' -); - -# check the transaction state is ended on subscriber(s) -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber C'); - -############################### -# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT. -# 0. Cleanup from previous test leaving only 2 rows. -# 1. Insert one more row. -# 2. Record a SAVEPOINT. -# 3. Data is streamed using 2PC. -# 4. Do rollback to SAVEPOINT prior to the streamed inserts. -# 5. Then COMMIT PREPARED. -# -# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1). -############################### - -# First, delete the data except for 2 rows (delete will be replicated) -$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT -$node_A->safe_psql( - 'postgres', " - BEGIN; - INSERT INTO test_tab VALUES (9999, 'foobar'); - SAVEPOINT sp_inner; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - ROLLBACK TO SAVEPOINT sp_inner; - PREPARE TRANSACTION 'outer'; - "); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); - -# check the transaction state prepared on subscriber(s) -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber C'); - -# 2PC COMMIT -$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); - -# check the transaction state is ended on subscriber -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is ended on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is ended on subscriber C'); - -# check inserts are visible at subscriber(s). -# All the streamed data (prior to the SAVEPOINT) should be rolled back. -# (9999, 'foobar') should be committed. -$result = $node_B->safe_psql('postgres', - "SELECT count(*) FROM test_tab where b = 'foobar';"); -is($result, qq(1), 'Rows committed are present on subscriber B'); -$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); -is($result, qq(3), 'Rows committed are present on subscriber B'); -$result = $node_C->safe_psql('postgres', - "SELECT count(*) FROM test_tab where b = 'foobar';"); -is($result, qq(1), 'Rows committed are present on subscriber C'); -$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); -is($result, qq(3), 'Rows committed are present on subscriber C'); +################################ +# Test using streaming mode 'on' +################################ +test_streaming($node_A, $node_B, $node_C, $appname_B, $appname_C, 'on'); + +###################################### +# Test using streaming mode 'parallel' +###################################### +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_B->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_B->reload; +$node_C->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_C->reload; + +# Run a query to make sure that the reload has taken effect. +$node_B->safe_psql('postgres', q{SELECT 1}); +$node_C->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_A, $node_B, $node_C, $appname_B, $appname_C, 'parallel'); ############################### # check all the cleanup diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index a191129b9d..d56f2b7c3e 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -8,6 +8,287 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + ############################### + # Test 2PC PREPARE / COMMIT PREPARED. + # 1. Data is streamed as a 2PC transaction. + # 2. Then do commit prepared. + # + # Expect all data is replicated on subscriber side after the commit. + ############################### + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # check that 2PC gets replicated to subscriber + # Insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # 2PC transaction gets committed + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is committed on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(4|4|4), + 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' + ); + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber'); + + ############################### + # Test 2PC PREPARE / ROLLBACK PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. Do rollback prepared. + # + # Expect data rolls back leaving only the original 2 rows. + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # 2PC transaction gets aborted + $node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is aborted on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(2|2|2), + 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows' + ); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is aborted on subscriber'); + + ############################### + # Check that 2PC COMMIT PREPARED is decoded properly on crash restart. + # 1. insert, update and delete some rows. + # 2. Then server crashes before the 2PC transaction is committed. + # 3. After servers are restarted the pending transaction is committed. + # + # Expect all data is replicated on subscriber side after the commit. + # Note: both publisher and subscriber do crash/restart. + ############################### + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_subscriber->stop('immediate'); + $node_publisher->stop('immediate'); + + $node_publisher->start; + $node_subscriber->start; + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # commit post the restart + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + $node_publisher->wait_for_catchup($appname); + + # check inserts are visible + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(4|4|4), + 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' + ); + + ############################### + # Do INSERT after the PREPARE but before ROLLBACK PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. A single row INSERT is done which is after the PREPARE. + # 4. Then do a ROLLBACK PREPARED. + # + # Expect the 2PC data rolls back leaving only 3 rows on the subscriber + # (the original 2 + inserted 1). + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # Insert a different record (now we are outside of the 2PC transaction) + # Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key + $node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + + # 2PC transaction gets aborted + $node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is aborted on subscriber, + # but the extra INSERT outside of the 2PC still was replicated + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3|3|3), + 'check the outside insert was copied to subscriber'); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is aborted on subscriber'); + + ############################### + # Do INSERT after the PREPARE but before COMMIT PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. A single row INSERT is done which is after the PREPARE. + # 4. Then do a COMMIT PREPARED. + # + # Expect 2PC data + the extra row are on the subscriber + # (the 3334 + inserted 1 = 3335). + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # Insert a different record (now we are outside of the 2PC transaction) + # Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key + $node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + + # 2PC transaction gets committed + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is committed on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(5|5|5), + 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults' + ); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + $node_publisher->wait_for_catchup($appname); +} + ############################### # Setup ############################### @@ -48,6 +329,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql( 'postgres', " CREATE SUBSCRIPTION tap_sub @@ -64,236 +349,38 @@ my $twophase_query = $node_subscriber->poll_query_until('postgres', $twophase_query) or die "Timed out while waiting for subscriber to enable twophase"; -############################### # Check initial data was copied to subscriber -############################### my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -############################### -# Test 2PC PREPARE / COMMIT PREPARED. -# 1. Data is streamed as a 2PC transaction. -# 2. Then do commit prepared. -# -# Expect all data is replicated on subscriber side after the commit. -############################### - -# check that 2PC gets replicated to subscriber -# Insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# 2PC transaction gets committed -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); +test_streaming($node_publisher, $node_subscriber, $appname, 0); -# check that transaction is committed on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(4|4|4), - 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber'); - -############################### -# Test 2PC PREPARE / ROLLBACK PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. Do rollback prepared. -# -# Expect data rolls back leaving only the original 2 rows. -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# 2PC transaction gets aborted -$node_publisher->safe_psql('postgres', - "ROLLBACK PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is aborted on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(2|2|2), - 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is aborted on subscriber'); - -############################### -# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. -# 1. insert, update and delete some rows. -# 2. Then server crashes before the 2PC transaction is committed. -# 3. After servers are restarted the pending transaction is committed. -# -# Expect all data is replicated on subscriber side after the commit. -# Note: both publisher and subscriber do crash/restart. -############################### - -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_subscriber->stop('immediate'); -$node_publisher->stop('immediate'); - -$node_publisher->start; -$node_subscriber->start; -# commit post the restart -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); -$node_publisher->wait_for_catchup($appname); - -# check inserts are visible -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(4|4|4), - 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' -); - -############################### -# Do INSERT after the PREPARE but before ROLLBACK PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. A single row INSERT is done which is after the PREPARE. -# 4. Then do a ROLLBACK PREPARED. -# -# Expect the 2PC data rolls back leaving only 3 rows on the subscriber -# (the original 2 + inserted 1). -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# Insert a different record (now we are outside of the 2PC transaction) -# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key -$node_publisher->safe_psql('postgres', - "INSERT INTO test_tab VALUES (99999, 'foobar')"); - -# 2PC transaction gets aborted -$node_publisher->safe_psql('postgres', - "ROLLBACK PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is aborted on subscriber, -# but the extra INSERT outside of the 2PC still was replicated -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3|3|3), 'check the outside insert was copied to subscriber'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is aborted on subscriber'); - -############################### -# Do INSERT after the PREPARE but before COMMIT PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. A single row INSERT is done which is after the PREPARE. -# 4. Then do a COMMIT PREPARED. -# -# Expect 2PC data + the extra row are on the subscriber -# (the 3334 + inserted 1 = 3335). -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); -# Insert a different record (now we are outside of the 2PC transaction) -# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key -$node_publisher->safe_psql('postgres', - "INSERT INTO test_tab VALUES (99999, 'foobar')"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -# 2PC transaction gets committed -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; -$node_publisher->wait_for_catchup($appname); +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); -# check that transaction is committed on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(5|5|5), - 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults' -); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber'); +test_streaming($node_publisher, $node_subscriber, $appname, 1); ############################### # check all the cleanup -- 2.23.0.windows.1