From 03f5b4852963c34dded67e4dfa7dc77baf993f0b Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Tue, 16 Feb 2021 08:05:18 +1100 Subject: [PATCH v40] Support 2PC txn tests for concurrent aborts. Add tap tests to test_decoding for testing concurrent aborts during 2PC. --- contrib/test_decoding/Makefile | 2 + contrib/test_decoding/t/001_twophase.pl | 121 ++++++++++++++++++++ contrib/test_decoding/t/002_twophase_streaming.pl | 133 ++++++++++++++++++++++ contrib/test_decoding/test_decoding.c | 58 ++++++++++ src/backend/replication/logical/reorderbuffer.c | 5 + 5 files changed, 319 insertions(+) create mode 100644 contrib/test_decoding/t/001_twophase.pl create mode 100644 contrib/test_decoding/t/002_twophase_streaming.pl diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c5e28ce..e0cd841 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -10,6 +10,8 @@ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot +TAP_TESTS = 1 + REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl new file mode 100644 index 0000000..3b3e7b8 --- /dev/null +++ b/contrib/test_decoding/t/001_twophase.pl @@ -0,0 +1,121 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; +use Time::HiRes qw(usleep); +use Scalar::Util qw(looks_like_number); + +# Initialize node +my $node_logical = get_new_node('logical'); +$node_logical->init(allows_streaming => 'logical'); +$node_logical->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 +)); +$node_logical->start; + +# Create some pre-existing content on logical +$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_logical->safe_psql('postgres', + "INSERT INTO tab SELECT generate_series(1,10)"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');"); + +#Test 1: +# This test is specifically for testing concurrent abort while logical decode +# is ongoing. We will pass in the xid of the 2PC to the plugin as an option. +# On the receipt of a valid "check-xid-aborted", the change API in the test decoding +# plugin will wait for it to be aborted. +# +# We will fire off a ROLLBACK from another session when this decode +# is waiting. +# +# The status of "check-xid-aborted" will change from in-progress to not-committed +# (hence aborted) and we will stop decoding because the subsequent +# system catalog scan will error out. + +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13,14); + PREPARE TRANSACTION 'test_prepared_tab';"); +# get XID of the above two-phase transaction +my $xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test_prepared_tab'"); +is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID'); + +# start decoding the above by passing the "check-xid-aborted" +my $logical_connstr = $node_logical->connstr . ' dbname=postgres'; + +# decode now, it should include an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc');\" \&"); + +# check that decode starts waiting for this $xid2pc +poll_output_until("waiting for $xid2pc to abort") + or die "no wait happened for the abort"; + +# rollback the prepared transaction +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +# check for occurrence of the log about stopping this decoding +poll_output_until("stop decoding of prepared txn test_prepared_tab") + or die "no decoding stop for the rollback"; + +# consume any remaining changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); + +# Test 2: +# Check that commit prepared is decoded properly on immediate restart +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13, 11); + PREPARE TRANSACTION 'test_prepared_tab';"); +# consume changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); +$node_logical->stop('immediate'); +$node_logical->start; + +# commit post the restart +$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');"); + +# check inserts are visible +my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);"); +is($result, qq(3), 'Rows inserted via 2PC are visible on restart'); + +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');"); +$node_logical->stop('fast'); + +sub poll_output_until +{ + my ($expected) = @_; + + $expected = 'xxxxxx' unless defined($expected); # default junk value + + my $max_attempts = 180 * 10; + my $attempts = 0; + + my $output_file = ''; + while ($attempts < $max_attempts) + { + $output_file = slurp_file($node_logical->logfile()); + + if ($output_file =~ $expected) + { + return 1; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + $attempts++; + } + + # The output result didn't change in 180 seconds. Give up + return 0; +} diff --git a/contrib/test_decoding/t/002_twophase_streaming.pl b/contrib/test_decoding/t/002_twophase_streaming.pl new file mode 100644 index 0000000..15001c6 --- /dev/null +++ b/contrib/test_decoding/t/002_twophase_streaming.pl @@ -0,0 +1,133 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; +use Time::HiRes qw(usleep); +use Scalar::Util qw(looks_like_number); + +# Initialize node +my $node_logical = get_new_node('logical'); +$node_logical->init(allows_streaming => 'logical'); +$node_logical->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 + logical_decoding_work_mem = 64kB +)); +$node_logical->start; + +# Create some pre-existing content on logical +$node_logical->safe_psql('postgres', "CREATE TABLE stream_test (data text)"); +$node_logical->safe_psql('postgres', + "INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1,3) g(i)"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');"); + +#Test 1: +# This test is specifically for testing concurrent abort while logical decode +# is ongoing. We will pass in the xid of the 2PC to the plugin as an option. +# On the receipt of a valid "check-xid-aborted", the change API in the test decoding +# plugin will wait for it to be aborted. +# +# We will fire off a ROLLBACK from another session when this decode +# is waiting. +# +# The status of "check-xid-aborted" will change from in-progress to not-committed +# (hence aborted) and we will stop decoding because the subsequent +# system catalog scan will error out. + +$node_logical->safe_psql('postgres', " + BEGIN; + savepoint s1; + SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); + TRUNCATE table stream_test; + rollback to s1; + INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); + PREPARE TRANSACTION 'test1';"); +# get XID of the above two-phase transaction +my $xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test1'"); +is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID'); + +# start decoding the above by passing the "check-xid-aborted" +my $logical_connstr = $node_logical->connstr . ' dbname=postgres'; + +# decode now, it should include an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc','stream-changes', '1');\" \&"); + +# check that decode starts waiting for this $xid2pc +poll_output_until("waiting for $xid2pc to abort") + or die "no wait happened for the abort"; + +# rollback the prepared transaction +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test1';"); + +# check for occurrence of the log about stopping this decoding +poll_output_until("stop decoding of txn $xid2pc") + or die "no decoding stop for the rollback"; + +# consume any remaining changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1','stream-changes', '1');"); + +# Test 2: +# Check concurrent aborts while decoding a TRUNCATE. + +$node_logical->safe_psql('postgres', " + BEGIN; + savepoint s1; + SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); + TRUNCATE table stream_test; + rollback to s1; + TRUNCATE table stream_test; + PREPARE TRANSACTION 'test2';"); +# get XID of the above two-phase transaction +$xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test2'"); +is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID'); + +# decode now, it should include an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc','stream-changes', '1');\" \&"); + +# check that decode starts waiting for this $xid2pc +poll_output_until("waiting for $xid2pc to abort") + or die "no wait happened for the abort"; + +# rollback the prepared transaction +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test2';"); + +# check for occurrence of the log about stopping this decoding +poll_output_until("stop decoding of txn $xid2pc") + or die "no decoding stop for the rollback"; + + +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');"); +$node_logical->stop('fast'); + +sub poll_output_until +{ + my ($expected) = @_; + + $expected = 'xxxxxx' unless defined($expected); # default junk value + + my $max_attempts = 180 * 10; + my $attempts = 0; + + my $output_file = ''; + while ($attempts < $max_attempts) + { + $output_file = slurp_file($node_logical->logfile()); + + if ($output_file =~ $expected) + { + return 1; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + $attempts++; + } + + # The output result didn't change in 180 seconds. Give up + return 0; +} diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 929255e..3fa172a 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -11,11 +11,13 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "miscadmin.h" #include "catalog/pg_type.h" #include "replication/logical.h" #include "replication/origin.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -35,6 +37,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool only_local; + TransactionId check_xid_aborted; /* track abort of this txid */ } TestDecodingData; /* @@ -174,6 +177,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->check_xid_aborted = InvalidTransactionId; ctx->output_plugin_private = data; @@ -275,6 +279,24 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "check-xid-aborted") == 0) + { + if (elem->arg == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid-aborted needs an input value"))); + else + { + errno = 0; + data->check_xid_aborted = (TransactionId)strtoul(strVal(elem->arg), NULL, 0); + + if (errno || !TransactionIdIsValid(data->check_xid_aborted)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("check-xid-aborted is not a valid xid: \"%s\"", + strVal(elem->arg)))); + } + } else { ereport(ERROR, @@ -471,6 +493,30 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static void +test_concurrent_aborts(TestDecodingData *data) +{ + /* + * If check_xid_aborted is a valid xid, then it was passed in as an option + * to check if the transaction having this xid would be aborted. This is + * to test concurrent aborts. + */ + if (TransactionIdIsValid(data->check_xid_aborted)) + { + elog(LOG, "waiting for %u to abort", data->check_xid_aborted); + while (TransactionIdIsInProgress(data->check_xid_aborted)) + { + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000L); + } + if (!TransactionIdIsInProgress(data->check_xid_aborted) && + !TransactionIdDidCommit(data->check_xid_aborted)) + elog(LOG, "%u aborted", data->check_xid_aborted); + + Assert(TransactionIdDidAbort(data->check_xid_aborted)); + } +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. @@ -620,6 +666,9 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -706,6 +755,9 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -918,6 +970,9 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, } txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; + /* Test for concurrent aborts */ + test_concurrent_aborts(data); + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); @@ -971,6 +1026,9 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; + /* For testing concurrent aborts */ + test_concurrent_aborts(data); + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5a62ab8..4a4a9ed 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2489,6 +2489,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, curtxn->concurrent_abort = true; /* Reset the TXN so that it is allowed to stream remaining data. */ + if (rbtxn_prepared(txn)) + elog(LOG, "stop decoding of prepared txn %s (%u)", + txn->gid != NULL ? txn->gid : "", txn->xid); + else + elog(LOG, "stop decoding of txn %u", txn->xid); ReorderBufferResetTXN(rb, txn, snapshot_now, command_id, prev_lsn, specinsert); -- 1.8.3.1