From 5e650a9522444a785d5e3d5b2b2998c93501c5b6 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 20 Nov 2020 06:15:57 -0500 Subject: [PATCH v24] 2pc test cases for testing concurrent aborts. Add tap tests to test_decoding for testing concurrent aborts during 2pc. --- contrib/test_decoding/Makefile | 2 + contrib/test_decoding/t/001_twophase.pl | 121 ++++++++++++++++++++ contrib/test_decoding/t/002_twophase_streaming.pl | 133 ++++++++++++++++++++++ contrib/test_decoding/test_decoding.c | 36 ++++++ 4 files changed, 292 insertions(+) create mode 100644 contrib/test_decoding/t/001_twophase.pl create mode 100644 contrib/test_decoding/t/002_twophase_streaming.pl diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 2c4acdc..49523fe 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -9,6 +9,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream +TAP_TESTS = 1 + REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl new file mode 100644 index 0000000..1555582 --- /dev/null +++ b/contrib/test_decoding/t/001_twophase.pl @@ -0,0 +1,121 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; +use Time::HiRes qw(usleep); +use Scalar::Util qw(looks_like_number); + +# Initialize node +my $node_logical = get_new_node('logical'); +$node_logical->init(allows_streaming => 'logical'); +$node_logical->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 +)); +$node_logical->start; + +# Create some pre-existing content on logical +$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_logical->safe_psql('postgres', + "INSERT INTO tab SELECT generate_series(1,10)"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');"); + +#Test 1: +# This test is specifically for testing concurrent abort while logical decode +# is ongoing. We will pass in the xid of the 2PC to the plugin as an option. +# On the receipt of a valid "check-xid-aborted", the change API in the test decoding +# plugin will wait for it to be aborted. +# +# We will fire off a ROLLBACK from another session when this decode +# is waiting. +# +# The status of "check-xid-aborted" will change from in-progress to not-committed +# (hence aborted) and we will stop decoding because the subsequent +# system catalog scan will error out. + +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13,14); + PREPARE TRANSACTION 'test_prepared_tab';"); +# get XID of the above two-phase transaction +my $xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test_prepared_tab'"); +is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID'); + +# start decoding the above by passing the "check-xid-aborted" +my $logical_connstr = $node_logical->connstr . ' dbname=postgres'; + +# decode now, it should include an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc');\" \&"); + +# check that decode starts waiting for this $xid2pc +poll_output_until("waiting for $xid2pc to abort") + or die "no wait happened for the abort"; + +# rollback the prepared transaction +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +# check for occurrence of the log about stopping this decoding +poll_output_until("stopping decoding of test_prepared_tab") + or die "no decoding stop for the rollback"; + +# consume any remaining changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); + +# Test 2: +# Check that commit prepared is decoded properly on immediate restart +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13, 11); + PREPARE TRANSACTION 'test_prepared_tab';"); +# consume changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); +$node_logical->stop('immediate'); +$node_logical->start; + +# commit post the restart +$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); + +# check inserts are visible +my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);"); +is($result, qq(3), 'Rows inserted via 2PC are visible on restart'); + +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');"); +$node_logical->stop('fast'); + +sub poll_output_until +{ + my ($expected) = @_; + + $expected = 'xxxxxx' unless defined($expected); # default junk value + + my $max_attempts = 180 * 10; + my $attempts = 0; + + my $output_file = ''; + while ($attempts < $max_attempts) + { + $output_file = slurp_file($node_logical->logfile()); + + if ($output_file =~ $expected) + { + return 1; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + $attempts++; + } + + # The output result didn't change in 180 seconds. Give up + return 0; +} diff --git a/contrib/test_decoding/t/002_twophase_streaming.pl b/contrib/test_decoding/t/002_twophase_streaming.pl new file mode 100644 index 0000000..8c0410e --- /dev/null +++ b/contrib/test_decoding/t/002_twophase_streaming.pl @@ -0,0 +1,133 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; +use Time::HiRes qw(usleep); +use Scalar::Util qw(looks_like_number); + +# Initialize node +my $node_logical = get_new_node('logical'); +$node_logical->init(allows_streaming => 'logical'); +$node_logical->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 + logical_decoding_work_mem = 64kB +)); +$node_logical->start; + +# Create some pre-existing content on logical +$node_logical->safe_psql('postgres', "CREATE TABLE stream_test (data text)"); +$node_logical->safe_psql('postgres', + "INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1,3) g(i)"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');"); + +#Test 1: +# This test is specifically for testing concurrent abort while logical decode +# is ongoing. We will pass in the xid of the 2PC to the plugin as an option. +# On the receipt of a valid "check-xid-aborted", the change API in the test decoding +# plugin will wait for it to be aborted. +# +# We will fire off a ROLLBACK from another session when this decode +# is waiting. +# +# The status of "check-xid-aborted" will change from in-progress to not-committed +# (hence aborted) and we will stop decoding because the subsequent +# system catalog scan will error out. + +$node_logical->safe_psql('postgres', " + BEGIN; + savepoint s1; + SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); + TRUNCATE table stream_test; + rollback to s1; + INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); + PREPARE TRANSACTION 'test1';"); +# get XID of the above two-phase transaction +my $xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test1'"); +is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID'); + +# start decoding the above by passing the "check-xid-aborted" +my $logical_connstr = $node_logical->connstr . ' dbname=postgres'; + +# decode now, it should include an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc','stream-changes', '1');\" \&"); + +# check that decode starts waiting for this $xid2pc +poll_output_until("waiting for $xid2pc to abort") + or die "no wait happened for the abort"; + +# rollback the prepared transaction +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test1';"); + +# check for occurrence of the log about stopping this decoding +poll_output_until("stopping decoding of $xid2pc") + or die "no decoding stop for the rollback"; + +# consume any remaining changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1','stream-changes', '1');"); + +# Test 2: +# Check concurrent aborts while decoding a TRUNCATE. + +$node_logical->safe_psql('postgres', " + BEGIN; + savepoint s1; + SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); + TRUNCATE table stream_test; + rollback to s1; + TRUNCATE table stream_test; + PREPARE TRANSACTION 'test2';"); +# get XID of the above two-phase transaction +$xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test2'"); +is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID'); + +# decode now, it should include an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc','stream-changes', '1');\" \&"); + +# check that decode starts waiting for this $xid2pc +poll_output_until("waiting for $xid2pc to abort") + or die "no wait happened for the abort"; + +# rollback the prepared transaction +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test2';"); + +# check for occurrence of the log about stopping this decoding +poll_output_until("stopping decoding of $xid2pc") + or die "no decoding stop for the rollback"; + + +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');"); +$node_logical->stop('fast'); + +sub poll_output_until +{ + my ($expected) = @_; + + $expected = 'xxxxxx' unless defined($expected); # default junk value + + my $max_attempts = 180 * 10; + my $attempts = 0; + + my $output_file = ''; + while ($attempts < $max_attempts) + { + $output_file = slurp_file($node_logical->logfile()); + + if ($output_file =~ $expected) + { + return 1; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + $attempts++; + } + + # The output result didn't change in 180 seconds. Give up + return 0; +} diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index c42de64..ef9abdc 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -475,6 +475,30 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static void +test_concurrent_aborts(TestDecodingData *data) +{ + /* + * If check_xid_aborted is a valid xid, then it was passed in as an option + * to check if the transaction having this xid would be aborted. This is + * to test concurrent aborts. + */ + if (TransactionIdIsValid(data->check_xid_aborted)) + { + elog(LOG, "waiting for %u to abort", data->check_xid_aborted); + while (TransactionIdIsInProgress(data->check_xid_aborted)) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000L); + } + if (!TransactionIdIsInProgress(data->check_xid_aborted) && + !TransactionIdDidCommit(data->check_xid_aborted)) + elog(LOG, "%u aborted", data->check_xid_aborted); + + Assert(TransactionIdDidAbort(data->check_xid_aborted)); + } +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. @@ -624,6 +648,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -710,6 +737,9 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -923,6 +953,9 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, } txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; + /* Test for concurrent aborts */ + test_concurrent_aborts(data); + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); @@ -976,6 +1009,9 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); -- 1.8.3.1