diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 6c18189d9d..79b9622600 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -26,7 +26,7 @@ installcheck:; # installation, allow to do so, but only if requested explicitly. installcheck-force: regresscheck-install-force isolationcheck-install-force -check: regresscheck isolationcheck +check: regresscheck isolationcheck 2pc-check submake-regress: $(MAKE) -C $(top_builddir)/src/test/regress all @@ -66,3 +66,6 @@ isolationcheck-install-force: all | submake-isolation submake-test_decoding temp isolationcheck isolationcheck-install-force temp-install: EXTRA_INSTALL=contrib/test_decoding + +2pc-check: temp-install + $(prove_check) diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out index 46e915d4ff..2df0b6c198 100644 --- a/contrib/test_decoding/expected/prepared.out +++ b/contrib/test_decoding/expected/prepared.out @@ -6,19 +6,123 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d init (1 row) -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc_nofilter', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); +-- Reused queries +\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');' +\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');' +\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc_nofilter'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');' -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +:get_no2pc + data +------ +(0 rows) + +:get_with2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + PREPARE TRANSACTION 'test_prepared#1' +(3 rows) + +:get_with2pc_nofilter + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + PREPARE TRANSACTION 'test_prepared#1' +(3 rows) + COMMIT PREPARED 'test_prepared#1'; +:get_no2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:1 + COMMIT +(3 rows) + +:get_with2pc + data +----------------------------------- + COMMIT PREPARED 'test_prepared#1' +(1 row) + +:get_with2pc_nofilter + data +----------------------------------- + COMMIT PREPARED 'test_prepared#1' +(1 row) + INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +:get_no2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT +(3 rows) + +:get_with2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(6 rows) + +:get_with2pc_nofilter + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:2 + COMMIT + BEGIN + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE TRANSACTION 'test_prepared#2' +(6 rows) + ROLLBACK PREPARED 'test_prepared#2'; +:get_no2pc + data +------ +(0 rows) + +:get_with2pc + data +------------------------------------- + ROLLBACK PREPARED 'test_prepared#2' +(1 row) + +:get_with2pc_nofilter + data +------------------------------------- + ROLLBACK PREPARED 'test_prepared#2' +(1 row) + INSERT INTO test_prepared1 VALUES (4); -- test prepared xact containing ddl BEGIN; @@ -26,45 +130,226 @@ INSERT INTO test_prepared1 VALUES (5); ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. -INSERT INTO test_prepared2 VALUES (7); -COMMIT PREPARED 'test_prepared#3'; --- make sure stuff still works -INSERT INTO test_prepared1 VALUES (8); -INSERT INTO test_prepared2 VALUES (9); --- cleanup -DROP TABLE test_prepared1; -DROP TABLE test_prepared2; --- show results -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + relation | locktype | mode +-----------------+----------+--------------------- + test_prepared_1 | relation | RowExclusiveLock + test_prepared_1 | relation | AccessExclusiveLock +(2 rows) + +:get_no2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:4 + COMMIT +(3 rows) + +:get_with2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:4 + COMMIT +(3 rows) + +:get_with2pc_nofilter data ------------------------------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:1 + table public.test_prepared1: INSERT: id[integer]:4 COMMIT BEGIN - table public.test_prepared1: INSERT: id[integer]:2 + table public.test_prepared1: INSERT: id[integer]:5 + table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' + PREPARE TRANSACTION 'test_prepared#3' +(7 rows) + +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. Our 2pc filter callback will skip decoding of xacts +-- with catalog changes at PREPARE time, so we don't decode it now. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- +INSERT INTO test_prepared2 VALUES (7); +:get_no2pc + data +---------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +:get_with2pc + data +---------------------------------------------------- BEGIN - table public.test_prepared1: INSERT: id[integer]:4 + table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +:get_with2pc_nofilter + data +---------------------------------------------------- BEGIN table public.test_prepared2: INSERT: id[integer]:7 COMMIT +(3 rows) + +COMMIT PREPARED 'test_prepared#3'; +:get_no2pc + data +------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:5 + table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' + COMMIT +(4 rows) + +:get_with2pc + data +------------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:5 table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' COMMIT +(4 rows) + +:get_with2pc_nofilter + data +----------------------------------- + COMMIT PREPARED 'test_prepared#3' +(1 row) + +-- make sure stuff still works +INSERT INTO test_prepared1 VALUES (8); +INSERT INTO test_prepared2 VALUES (9); +:get_no2pc + data +-------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:8 data[text]:null + COMMIT + BEGIN + table public.test_prepared2: INSERT: id[integer]:9 + COMMIT +(6 rows) + +:get_with2pc + data +-------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:8 data[text]:null + COMMIT + BEGIN + table public.test_prepared2: INSERT: id[integer]:9 + COMMIT +(6 rows) + +:get_with2pc_nofilter + data +-------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:8 data[text]:null COMMIT BEGIN table public.test_prepared2: INSERT: id[integer]:9 COMMIT -(22 rows) +(6 rows) + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + relation | locktype | mode +----------+----------+------ +(0 rows) + +-- Shouldn't see anything with 2pc decoding off +:get_no2pc + data +----------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:12 + COMMIT +(3 rows) + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +:get_with2pc + data +----------------------------------------------------- + BEGIN + table public.test_prepared2: INSERT: id[integer]:12 + PREPARE TRANSACTION 'test_prepared_lock2' + COMMIT PREPARED 'test_prepared_lock2' +(4 rows) + +:get_with2pc_nofilter + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + PREPARE TRANSACTION 'test_prepared_lock' + BEGIN + table public.test_prepared2: INSERT: id[integer]:12 + PREPARE TRANSACTION 'test_prepared_lock2' + COMMIT PREPARED 'test_prepared_lock2' +(8 rows) + +RESET statement_timeout; +COMMIT PREPARED 'test_prepared_lock'; +-- Both will work normally after we commit +:get_no2pc + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + COMMIT +(4 rows) + +:get_with2pc + data +---------------------------------------------------------------------------- + BEGIN + table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol' + table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2' + COMMIT +(4 rows) + +:get_with2pc_nofilter + data +-------------------------------------- + COMMIT PREPARED 'test_prepared_lock' +(1 row) + +-- cleanup +DROP TABLE test_prepared1; +DROP TABLE test_prepared2; +-- show results +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot @@ -72,3 +357,15 @@ SELECT pg_drop_replication_slot('regression_slot'); (1 row) +SELECT pg_drop_replication_slot('regression_slot_2pc'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('regression_slot_2pc_nofilter'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql index e72639767e..4197766c50 100644 --- a/contrib/test_decoding/sql/prepared.sql +++ b/contrib/test_decoding/sql/prepared.sql @@ -1,22 +1,41 @@ -- predictability SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc_nofilter', 'test_decoding'); -CREATE TABLE test_prepared1(id int); -CREATE TABLE test_prepared2(id int); +CREATE TABLE test_prepared1(id integer primary key); +CREATE TABLE test_prepared2(id integer primary key); + +-- Reused queries +\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');' +\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');' +\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc_nofilter'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');' -- test simple successful use of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (1); PREPARE TRANSACTION 'test_prepared#1'; +:get_no2pc +:get_with2pc +:get_with2pc_nofilter COMMIT PREPARED 'test_prepared#1'; +:get_no2pc +:get_with2pc +:get_with2pc_nofilter INSERT INTO test_prepared1 VALUES (2); -- test abort of a prepared xact BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; +:get_no2pc +:get_with2pc +:get_with2pc_nofilter ROLLBACK PREPARED 'test_prepared#2'; +:get_no2pc +:get_with2pc +:get_with2pc_nofilter INSERT INTO test_prepared1 VALUES (4); @@ -27,18 +46,74 @@ ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); PREPARE TRANSACTION 'test_prepared#3'; --- test that we decode correctly while an uncommitted prepared xact --- with ddl exists. +SELECT 'test_prepared_1' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'test_prepared1'::regclass; + +:get_no2pc +:get_with2pc +:get_with2pc_nofilter --- separate table because of the lock from the ALTER --- this will come before the '5' row above, as this commits before it. +-- Test that we decode correctly while an uncommitted prepared xact +-- with ddl exists. Our 2pc filter callback will skip decoding of xacts +-- with catalog changes at PREPARE time, so we don't decode it now. +-- +-- Use a separate table for the concurrent transaction because the lock from +-- the ALTER will stop us inserting into the other one. +-- +-- We should see '7' before '5' in our results since it commits first. +-- INSERT INTO test_prepared2 VALUES (7); +:get_no2pc +:get_with2pc +:get_with2pc_nofilter COMMIT PREPARED 'test_prepared#3'; +:get_no2pc +:get_with2pc +:get_with2pc_nofilter -- make sure stuff still works INSERT INTO test_prepared1 VALUES (8); INSERT INTO test_prepared2 VALUES (9); +:get_no2pc +:get_with2pc +:get_with2pc_nofilter + +-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block +-- logical decoding. +BEGIN; +INSERT INTO test_prepared1 VALUES (10, 'othercol'); +CLUSTER test_prepared1 USING test_prepared1_pkey; +INSERT INTO test_prepared1 VALUES (11, 'othercol2'); +PREPARE TRANSACTION 'test_prepared_lock'; + +BEGIN; +insert into test_prepared2 values (12); +PREPARE TRANSACTION 'test_prepared_lock2'; +COMMIT PREPARED 'test_prepared_lock2'; + +SELECT 'pg_class' AS relation, locktype, mode +FROM pg_locks +WHERE locktype = 'relation' + AND relation = 'pg_class'::regclass; + +-- Shouldn't see anything with 2pc decoding off +:get_no2pc + +-- Shouldn't timeout on 2pc decoding. +SET statement_timeout = '1s'; +:get_with2pc +:get_with2pc_nofilter +RESET statement_timeout; + +COMMIT PREPARED 'test_prepared_lock'; + +-- Both will work normally after we commit +:get_no2pc +:get_with2pc +:get_with2pc_nofilter -- cleanup DROP TABLE test_prepared1; @@ -48,3 +123,5 @@ DROP TABLE test_prepared2; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT pg_drop_replication_slot('regression_slot'); +SELECT pg_drop_replication_slot('regression_slot_2pc'); +SELECT pg_drop_replication_slot('regression_slot_2pc_nofilter'); diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl new file mode 100644 index 0000000000..6722317c9f --- /dev/null +++ b/contrib/test_decoding/t/001_twophase.pl @@ -0,0 +1,102 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Initialize node +my $node_logical = get_new_node('logical'); +$node_logical->init(allows_streaming => 'logical'); +$node_logical->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 +)); +$node_logical->start; + +# Create some pre-existing content on logical +$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_logical->safe_psql('postgres', + "INSERT INTO tab SELECT generate_series(1,10)"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');"); +$node_logical->safe_psql('postgres', + "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding');"); + +# This test is specifically for testing concurrent abort while logical decode is +# ongoing. The decode-delay value will allow for each change decode to sleep for +# those many seconds. We also hold the LogicalLockTransaction while we sleep. +# We will fire off a ROLLBACK from another session when this delayed decode is +# ongoing. Since we are holding the lock from the call above, this ROLLBACK +# will wait for the logical backends to do a LogicalUnlockTransaction. We will +# stop decoding immediately post this and the next pg_logical_slot_get_changes call +# should show only a few records decoded from the entire two phase transaction +# +# We use two slots to test multiple decoding backends here + +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + PREPARE TRANSACTION 'test_prepared_tab';"); + +# start decoding the above with decode-delay in the background. +my $logical_connstr = $node_logical->connstr . ' dbname=postgres'; + +# decode now, it should only decode 1 INSERT record and should include +# an ABORT entry because of the ROLLBACK below +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1', 'decode-delay', '3');\" \&"); + +system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1', 'decode-delay', '3');\" \&"); + +# sleep for a little while (shorter than decode-delay) +$node_logical->safe_psql('postgres', "select pg_sleep(1)"); + +# rollback the prepared transaction whose first record is being decoded +# after sleeping for decode-delay time +$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +# wait for decoding to stop +$node_logical->psql('postgres', "select pg_sleep(4)"); + +# consume any remaining changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');"); + +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');"); + +# check for occurrence of log about waiting backends +my $output_file = slurp_file($node_logical->logfile()); +my $waiting_str = "Waiting for backends to abort"; +like($output_file, qr/$waiting_str/, "Waiting log found in server log"); + +# check for occurrence of log about stopping decoding +my $abort_str = "stopping decoding of test_prepared_tab "; +like($output_file, qr/$abort_str/, "ABORT found in server log"); + +# Check that commit prepared is decoded properly on immediate restart +$node_logical->safe_psql('postgres', " + BEGIN; + INSERT INTO tab VALUES (11); + INSERT INTO tab VALUES (12); + ALTER TABLE tab ADD COLUMN b INT; + INSERT INTO tab VALUES (13, 11); + PREPARE TRANSACTION 'test_prepared_tab';"); +# consume changes +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');"); +$node_logical->stop('immediate'); +$node_logical->start; + +# commit post the restart +$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');"); +$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');"); + +# check inserts are visible +my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);"); +is($result, qq(3), 'Rows inserted via 2PC are visible on restart'); + +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');"); +$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2');"); +$node_logical->stop('fast'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 0f18afa852..477c950b8d 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -24,6 +24,8 @@ #include "replication/message.h" #include "replication/origin.h" +#include "storage/procarray.h" + #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -46,6 +48,9 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + bool twophase_decoding; + bool twophase_decode_with_catalog_changes; + int decode_delay; /* seconds to sleep after every change record */ } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -59,6 +64,8 @@ static void pg_output_begin(LogicalDecodingContext *ctx, bool last_write); static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pg_decode_abort_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr abort_lsn); static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); @@ -68,6 +75,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static bool pg_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static bool pg_filter_decode_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); void _PG_init(void) @@ -85,9 +106,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; + cb->abort_cb = pg_decode_abort_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->filter_prepare_cb = pg_filter_prepare; + cb->filter_decode_txn_cb = pg_filter_decode_txn; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->abort_prepared_cb = pg_decode_abort_prepared_txn; } @@ -107,6 +134,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->twophase_decoding = false; + data->twophase_decode_with_catalog_changes = false; + data->decode_delay = 0; ctx->output_plugin_private = data; @@ -156,7 +186,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } else if (strcmp(elem->defname, "skip-empty-xacts") == 0) { - if (elem->arg == NULL) data->skip_empty_xacts = true; else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts)) @@ -167,7 +196,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } else if (strcmp(elem->defname, "only-local") == 0) { - if (elem->arg == NULL) data->only_local = true; else if (!parse_bool(strVal(elem->arg), &data->only_local)) @@ -176,6 +204,41 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "twophase-decoding") == 0) + { + if (elem->arg == NULL) + data->twophase_decoding = true; + else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + else if (strcmp(elem->defname, "twophase-decode-with-catalog-changes") == 0) + { + if (elem->arg == NULL) + data->twophase_decode_with_catalog_changes = true; + else if (!parse_bool(strVal(elem->arg), &data->twophase_decode_with_catalog_changes)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } + else if (strcmp(elem->defname, "decode-delay") == 0) + { + if (elem->arg == NULL) + data->decode_delay = 2; /* default to 2 seconds */ + else + data->decode_delay = pg_atoi(strVal(elem->arg), + sizeof(int), 0); + + if (data->decode_delay <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Specify positive value for parameter \"%s\"," + " you specified \"%s\"", + elem->defname, strVal(elem->arg)))); + } else { ereport(ERROR, @@ -244,6 +307,156 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* ABORT callback */ +static void +pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + if (data->include_xids) + appendStringInfo(ctx->out, "ABORT %u", txn->xid); + else + appendStringInfoString(ctx->out, "ABORT"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* Filter out unnecessary two-phase transactions */ +static bool +pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + TestDecodingData *data = ctx->output_plugin_private; + + /* treat all transactions as one-phase */ + if (!data->twophase_decoding) + return true; + + if (txn && txn_has_catalog_changes(txn) && + !data->twophase_decode_with_catalog_changes) + return true; + + /* + * even if txn is NULL, decode since twophase_decoding is set + */ + return false; +} + +/* + * Check if we should continue to decode this transaction. + * + * If it has aborted in the meanwhile, then there's no sense + * in decoding and sending the rest of the changes, we might + * as well ask the subscribers to abort immediately. + * + * This should be called if we are streaming a transaction + * before it's committed or if we are decoding a 2PC + * transaction. Otherwise we always decode committed + * transactions + * + * Additional checks can be added here, as needed + */ +static bool +pg_filter_decode_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* + * Due to caching, repeated TransactionIdDidAbort calls + * shouldn't be that expensive + */ + if (txn != NULL && + TransactionIdIsValid(txn->xid) && + TransactionIdDidAbort(txn->xid)) + return true; + + /* if txn is NULL, filter it out */ + return (txn != NULL)? false:true; +} + +/* PREPARE callback */ +static void +pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* COMMIT PREPARED callback */ +static void +pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (!data->twophase_decoding) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "COMMIT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* ABORT PREPARED callback */ +static void +pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (!data->twophase_decoding) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "ROLLBACK PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, " %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) @@ -409,8 +622,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } data->xact_wrote_changes = true; + if (!LogicalLockTransaction(txn)) + return; + /* if decode_delay is specified, sleep with above lock held */ + if (data->decode_delay > 0) + { + elog(LOG, "sleeping for %d seconds", data->decode_delay); + pg_usleep(data->decode_delay * 1000000L); + } class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); + LogicalUnlockTransaction(txn); + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 5501eed108..7edda72e5e 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -384,8 +384,14 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeAbortCB abort_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; + LogicalDecodeFilterDecodeTxnCB filter_decode_txn_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; @@ -454,7 +460,12 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); never get decoded. Successful savepoints are folded into the transaction containing them in the order they were - executed within that transaction. + executed within that transaction. A transaction that is prepared for + a two-phase commit using PREPARE TRANSACTION will + also be decoded if the output plugin callbacks needed for decoding + them are provided. It is possible that the current transaction which + is being decoded is aborted concurrently via a ROLLBACK PREPARED + command. In that case, the logical decoding will be aborted midways. @@ -550,6 +561,74 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, + + Transaction Prepare Callback + + + The optional prepare_cb callback is called whenever + a transaction which is prepared for two-phase commit has been + decoded. The change_cb callbacks for all modified + rows will have been called before this, if there have been any modified + rows. + +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + + + + + + Commit Prepared Transaction Callback + + + The optional commit_prepared_cb callback is called whenever + a commit prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Rollback Prepared Transaction Callback + + + The optional abort_prepared_cb callback is called whenever + a rollback prepared transaction has been decoded. The gid field, + which is part of the txn parameter can be used in this + callback. + +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Transaction Abort Callback + + + The required abort_cb callback is called whenever + a transaction abort has to be initiated. This can happen if we are + decoding a transaction that has been prepared for two-phase commit and + a concurrent rollback happens while we are decoding it. It might make + sense, even before we commence decoding, in such cases to check if the + rollback happened even before we start looking at the changes to + completely avoid the decoding of such transactions. + +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + Change Callback @@ -559,12 +638,30 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, an INSERT, UPDATE, or DELETE. Even if the original command modified several rows at once the callback will be called individually for each - row. + row. The change_cb callback may access system or + user catalog tables to aid in the process of outputting the row + modification details. The change_cb call should invoke + LogicalLockTransaction function before such access of + system or user catalog tables. In case of decoding a prepared (but yet + uncommitted) transaction or decoding of an uncommitted transaction, this + function interlocks the decoding activity with simultaneous rollback by + another backend of this very same transaction. The + change_cb should invoke + LogicalUnlockTransaction function immediately after + the catalog tables access. typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); + + Here's an example of the use of LogicalLockTransaction + and LogicalUnlockTransaction in an output plugin: + + if (!LogicalLockTransaction(txn)) + return; + relation = RelationIdGetRelation(reloid); + LogicalUnlockTransaction(txn); The ctx and txn parameters have the same contents as for the begin_cb @@ -614,6 +711,53 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct + + Decode Filter Callback + + + The optional filter_decode_txn_cb callback + is called to determine whether data that is part of the current + transaction should be continued to be decoded. + +typedef bool (*LogicalDecodeFilterDecodeTxnCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + + The ctx parameter has the same contents + as for the other callbacks. The txn parameter + contains meta information about the transaction, like its XID. + Note however that it can be NULL in some cases. To signal that decoding process + should terminate, return true; false otherwise. + + + + + Prepare Filter Callback + + + The optional filter_prepare_cb callback + is called to determine whether data that is part of the current + two-phase commit transaction should be considered for decode + at this prepare stage or as a regular one-phase transaction at + COMMIT PREPARED time later. To signal that + decoding should be skipped, return true; false otherwise. + +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + + The ctx parameter has the same contents + as for the other callbacks. The txn parameter + contains meta information about the transaction. The xid + contains the XID because txn can be NULL in some cases. + The gid is the identifier that later identifies this + transaction for COMMIT PREPARED or ROLLBACK PREPARED. + The callback has to provide the same static answer for a given combination of + xid and gid every time it is + called. To signal that decoding should be skipped, return true; false otherwise. + + + Generic Message Callback diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index e5eef9ea43..b3e2fc3036 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars parsed->twophase_xid = xl_twophase->xid; data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + int gidlen; + strcpy(parsed->twophase_gid, data); + gidlen = strlen(parsed->twophase_gid) + 1; + data += MAXALIGN(gidlen); + } } if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) @@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) data += sizeof(xl_xact_xinfo); } + if (parsed->xinfo & XACT_XINFO_HAS_DBINFO) + { + xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data; + + parsed->dbId = xl_dbinfo->dbId; + parsed->tsId = xl_dbinfo->tsId; + + data += sizeof(xl_xact_dbinfo); + } + if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS) { xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data; @@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) parsed->twophase_xid = xl_twophase->xid; data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + int gidlen; + strcpy(parsed->twophase_gid, data); + gidlen = strlen(parsed->twophase_gid) + 1; + data += MAXALIGN(gidlen); + } + } + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + xl_xact_origin xl_origin; + + /* we're only guaranteed 4 byte alignment, so copy onto stack */ + memcpy(&xl_origin, data, sizeof(xl_origin)); + + parsed->origin_lsn = xl_origin.origin_lsn; + parsed->origin_timestamp = xl_origin.origin_timestamp; + + data += sizeof(xl_xact_origin); } } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c479c4881b..97499707f7 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -144,11 +144,7 @@ int max_prepared_xacts = 0; * * typedef struct GlobalTransactionData *GlobalTransaction appears in * twophase.h - * - * Note that the max value of GIDSIZE must fit in the uint16 gidlen, - * specified in TwoPhaseFileHeader. */ -#define GIDSIZE 200 typedef struct GlobalTransactionData { @@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval); + bool initfileinval, + const char *gid); static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels); + RelFileNode *rels, + const char *gid); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); @@ -556,7 +554,7 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held) * Locate the prepared transaction and mark it busy for COMMIT or PREPARE. */ static GlobalTransaction -LockGXact(const char *gid, Oid user) +LockGXact(const char *gid, Oid user, bool missing_ok) { int i; @@ -616,7 +614,8 @@ LockGXact(const char *gid, Oid user) LWLockRelease(TwoPhaseStateLock); - ereport(ERROR, + if (!missing_ok) + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("prepared transaction with identifier \"%s\" does not exist", gid))); @@ -898,7 +897,7 @@ TwoPhaseGetDummyProc(TransactionId xid) /* * Header for a 2PC state file */ -#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */ +#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */ typedef struct TwoPhaseFileHeader { @@ -914,6 +913,8 @@ typedef struct TwoPhaseFileHeader int32 ninvalmsgs; /* number of cache invalidation messages */ bool initfileinval; /* does relcache init file need invalidation? */ uint16 gidlen; /* length of the GID - GID follows the header */ + XLogRecPtr origin_lsn; /* lsn of this record at origin node */ + TimestampTz origin_timestamp; /* time of prepare at origin node */ } TwoPhaseFileHeader; /* @@ -1065,6 +1066,7 @@ EndPrepare(GlobalTransaction gxact) { TwoPhaseFileHeader *hdr; StateFileChunk *record; + bool replorigin; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -1075,6 +1077,21 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); + replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId); + + if (replorigin) + { + Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr); + hdr->origin_lsn = replorigin_session_origin_lsn; + hdr->origin_timestamp = replorigin_session_origin_timestamp; + } + else + { + hdr->origin_lsn = InvalidXLogRecPtr; + hdr->origin_timestamp = 0; + } + /* * If the data size exceeds MaxAllocSize, we won't be able to read it in * ReadTwoPhaseFile. Check for that now, rather than fail in the case @@ -1107,7 +1124,16 @@ EndPrepare(GlobalTransaction gxact) XLogBeginInsert(); for (record = records.head; record != NULL; record = record->next) XLogRegisterData(record->data, record->len); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); + + if (replorigin) + /* Move LSNs forward for this replication origin */ + replorigin_session_advance(replorigin_session_origin_lsn, + gxact->prepare_end_lsn); + XLogFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -1283,6 +1309,43 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } +/* + * ParsePrepareRecord + */ +void +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) +{ + TwoPhaseFileHeader *hdr; + char *bufptr; + + hdr = (TwoPhaseFileHeader *) xlrec; + bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader)); + + parsed->origin_lsn = hdr->origin_lsn; + parsed->origin_timestamp = hdr->origin_timestamp; + parsed->twophase_xid = hdr->xid; + parsed->dbId = hdr->database; + parsed->nsubxacts = hdr->nsubxacts; + parsed->ncommitrels = hdr->ncommitrels; + parsed->nabortrels = hdr->nabortrels; + parsed->nmsgs = hdr->ninvalmsgs; + + strncpy(parsed->twophase_gid, bufptr, hdr->gidlen); + bufptr += MAXALIGN(hdr->gidlen); + + parsed->subxacts = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + + parsed->commitrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + + parsed->abortrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + parsed->msgs = (SharedInvalidationMessage *) bufptr; + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); +} + /* * Reads 2PC data from xlog. During checkpoint this data will be moved to @@ -1365,7 +1428,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid) * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED */ void -FinishPreparedTransaction(const char *gid, bool isCommit) +FinishPreparedTransaction(const char *gid, bool isCommit, bool missing_ok) { GlobalTransaction gxact; PGPROC *proc; @@ -1386,8 +1449,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* * Validate the GID, and lock the GXACT to ensure that two backends do not * try to commit the same GID at once. + * + * During logical decoding, on the apply side, it's possible that a prepared + * transaction got aborted while decoding. In that case, we stop the + * decoding and abort the transaction immediately. However the ROLLBACK + * prepared processing still reaches the subscriber. In that case it's ok + * to have a missing gid */ - gxact = LockGXact(gid, GetUserId()); + gxact = LockGXact(gid, GetUserId(), missing_ok); + if (gxact == NULL) + { + Assert(missing_ok && !isCommit); + return; + } + proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; xid = pgxact->xid; @@ -1435,13 +1510,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, - hdr->initfileinval); + hdr->initfileinval, gid); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, - hdr->nabortrels, abortrels); + hdr->nabortrels, abortrels, + gid); ProcArrayRemove(proc, latestXid); + /* + * Tell logical decoding backends interested in this XID + * that this is going away + */ + LogicalDecodeRemoveTransaction(proc, isCommit); /* * In case we fail while running the callbacks, mark the gxact invalid so @@ -1752,7 +1833,8 @@ restoreTwoPhaseData(void) if (buf == NULL) continue; - PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); + PrepareRedoAdd(buf, InvalidXLogRecPtr, + InvalidXLogRecPtr, InvalidRepOriginId); } } LWLockRelease(TwoPhaseStateLock); @@ -2165,7 +2247,8 @@ RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval) + bool initfileinval, + const char *gid) { XLogRecPtr recptr; TimestampTz committs = GetCurrentTimestamp(); @@ -2193,7 +2276,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ninvalmsgs, invalmsgs, initfileinval, false, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); if (replorigin) @@ -2255,7 +2338,8 @@ RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels) + RelFileNode *rels, + const char *gid) { XLogRecPtr recptr; @@ -2278,7 +2362,7 @@ RecordTransactionAbortPrepared(TransactionId xid, nchildren, children, nrels, rels, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); @@ -2309,7 +2393,8 @@ RecordTransactionAbortPrepared(TransactionId xid, * data, the entry is marked as located on disk. */ void -PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, + XLogRecPtr end_lsn, RepOriginId origin_id) { TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; char *bufptr; @@ -2358,6 +2443,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + if (origin_id != InvalidRepOriginId) + { + /* recover apply progress */ + replorigin_advance(origin_id, hdr->origin_lsn, end_lsn, + false /* backward */ , false /* WAL */ ); + } + elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index ea81f4b5de..e19fac4f7b 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1227,7 +1227,7 @@ RecordTransactionCommit(void) nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, MyXactFlags, - InvalidTransactionId /* plain commit */ ); + InvalidTransactionId, NULL /* plain commit */ ); if (replorigin) /* Move LSNs forward for this replication origin */ @@ -1579,7 +1579,8 @@ RecordTransactionAbort(bool isSubXact) XactLogAbortRecord(xact_time, nchildren, children, nrels, rels, - MyXactFlags, InvalidTransactionId); + MyXactFlags, InvalidTransactionId, + NULL); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -5247,7 +5248,6 @@ xactGetCommittedChildren(TransactionId **ptr) * XLOG support routines */ - /* * Log the commit record for a plain or twophase transaction commit. * @@ -5260,7 +5260,8 @@ XactLogCommitRecord(TimestampTz commit_time, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_commit xlrec; xl_xact_xinfo xl_xinfo; @@ -5272,6 +5273,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5334,6 +5336,13 @@ XactLogCommitRecord(TimestampTz commit_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } } /* dump transaction origin information */ @@ -5384,8 +5393,19 @@ XactLogCommitRecord(TimestampTz commit_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); @@ -5405,15 +5425,19 @@ XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_abort xlrec; xl_xact_xinfo xl_xinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; xl_xact_twophase xl_twophase; + xl_xact_dbinfo xl_dbinfo; + xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5449,6 +5473,31 @@ XactLogAbortRecord(TimestampTz abort_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } + } + + if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; + } + + /* dump transaction origin information only for abort prepared */ + if ( (replorigin_session_origin != InvalidRepOriginId) && + TransactionIdIsValid(twophase_xid) && + XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; + + xl_origin.origin_lsn = replorigin_session_origin_lsn; + xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } if (xl_xinfo.xinfo != 0) @@ -5463,6 +5512,9 @@ XactLogAbortRecord(TimestampTz abort_time, if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) { XLogRegisterData((char *) (&xl_subxacts), @@ -5480,7 +5532,23 @@ XactLogAbortRecord(TimestampTz abort_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) + XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + + if (TransactionIdIsValid(twophase_xid)) + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + return XLogInsert(RM_XACT_ID, info); } @@ -5803,7 +5871,8 @@ xact_redo(XLogReaderState *record) LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, - record->EndRecPtr); + record->EndRecPtr, + XLogRecGetOrigin(record)); LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6eb0d5527e..b45739d971 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,7 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/twophase.h" #include "catalog/pg_control.h" @@ -72,6 +73,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -280,16 +283,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + /* check that output plugin is capable of twophase decoding */ + if (!ctx->enable_twophase) + { + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); + break; + } + + /* ok, parse it */ + ParsePrepareRecord(XLogRecGetInfo(buf->record), + XLogRecGetData(buf->record), &parsed); + + /* does output plugin want this particular transaction? */ + if (ctx->callbacks.filter_prepare_cb && + ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid, + parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -627,9 +647,71 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + if (TransactionIdIsValid(parsed->twophase_xid) && + ReorderBufferTxnIsPrepared(ctx->reorder, + parsed->twophase_xid, parsed->twophase_gid)) + { + Assert(xid == parsed->twophase_xid); + /* we are processing COMMIT PREPARED */ + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); + } + else + { + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } +} + +/* + * Decode PREPARE record. Similar logic as in COMMIT + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed) +{ + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz commit_time = parsed->origin_timestamp; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + /* + * Process invalidation messages, even if we're not interested in the + * transaction's contents, since the various caches need to always be + * consistent. + */ + if (parsed->nmsgs > 0) + { + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + FilterByOrigin(ctx, origin_id)) + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); + } + ReorderBufferForget(ctx->reorder, xid, buf->origptr); + + return; + } + + /* tell the reorderbuffer about the surviving subtransactions */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, parsed->twophase_gid); } /* @@ -641,6 +723,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + + /* + * If it's ROLLBACK PREPARED then handle it via callbacks. + */ + if (TransactionIdIsValid(xid) && + !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) && + parsed->dbId == ctx->slot->data.database && + !FilterByOrigin(ctx, origin_id) && + ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid)) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, false); + return; + } for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7637efc32e..50c08acc34 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -60,6 +60,18 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static bool filter_decode_txn_cb_wrapper(ReorderBuffer *cache, + ReorderBufferTXN *txn); +static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid); +static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -125,6 +137,7 @@ StartupDecodingContext(List *output_plugin_options, MemoryContext context, old_context; LogicalDecodingContext *ctx; + int twophase_callbacks; /* shorter lines... */ slot = MyReplicationSlot; @@ -184,8 +197,27 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->abort = abort_cb_wrapper; + ctx->reorder->filter_decode_txn = filter_decode_txn_cb_wrapper; + ctx->reorder->filter_prepare = filter_prepare_cb_wrapper; + ctx->reorder->prepare = prepare_cb_wrapper; + ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; + ctx->reorder->abort_prepared = abort_prepared_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + /* check that plugin implements all callbacks necessary to perform 2PC */ + twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) + + (ctx->callbacks.commit_prepared_cb != NULL) + + (ctx->callbacks.abort_prepared_cb != NULL); + + ctx->enable_twophase = (twophase_callbacks == 3); + + if (twophase_callbacks != 3 && twophase_callbacks != 0) + ereport(WARNING, + (errmsg("Output plugin registered only %d twophase callbacks. " + "Twophase transactions will be decoded at commit time.", + twophase_callbacks))); + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -693,6 +725,122 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort"; + state.report_location = txn->final_lsn; /* beginning of abort record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + + static void +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "prepare"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "commit_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "abort_prepared"; + state.report_location = txn->final_lsn; /* beginning of commit record */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; /* points to the end of the record */ + + /* do the actual work: call callback */ + ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) @@ -730,6 +878,62 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static bool +filter_decode_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_decode_txn"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_decode_txn_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + return ret; +} +static bool +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_prepare"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + return ret; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -1013,3 +1217,164 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +bool +LogicalLockTransaction(ReorderBufferTXN *txn) +{ + bool ok = false; + + /* + * Prepared transactions and uncommitted transactions + * that have modified catalogs need to interlock with + * concurrent rollback to ensure that there are no + * issues while decoding + */ + + if (!txn_has_catalog_changes(txn)) + return true; + + /* + * Is it a prepared txn? Similar checks for uncommitted + * transactions when we start supporting them + */ + if (!txn_prepared(txn)) + return true; + + /* check cached status */ + if (txn_commit(txn)) + return true; + if (txn_rollback(txn)) + return false; + + /* + * Find the PROC that is handling this XID and add ourself as a + * decodeGroupMember + */ + if (MyProc->decodeGroupLeader == NULL) + { + PGPROC *proc = BecomeDecodeGroupLeader(txn->xid, txn_prepared(txn)); + + /* + * If decodeGroupLeader is NULL, then the only possibility + * is that the transaction completed and went away + */ + if (proc == NULL) + { + Assert(!TransactionIdIsInProgress(txn->xid)); + if (TransactionIdDidCommit(txn->xid)) + { + txn->txn_flags |= TXN_COMMIT; + return true; + } + else + { + txn->txn_flags |= TXN_ROLLBACK; + return false; + } + } + + /* Add ourself as a decodeGroupMember */ + if (!BecomeDecodeGroupMember(proc, proc->pid, txn_prepared(txn))) + { + Assert(!TransactionIdIsInProgress(txn->xid)); + if (TransactionIdDidCommit(txn->xid)) + { + txn->txn_flags |= TXN_COMMIT; + return true; + } + else + { + txn->txn_flags |= TXN_ROLLBACK; + return false; + } + } + } + + /* + * If we were able to add ourself, then Abort processing will + * interlock with us. Check if the transaction is still around + */ + Assert(MyProc->decodeGroupLeader); + + if (MyProc->decodeGroupLeader) + { + LWLock *leader_lwlock; + + leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader); + LWLockAcquire(leader_lwlock, LW_SHARED); + if (MyProc->decodeAbortPending) + { + /* + * Remove ourself from the decodeGroupMembership and return + * false so that the decoding plugin also initiates abort + * processing + */ + LWLockRelease(leader_lwlock); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + RemoveDecodeGroupMemberLocked(MyProc->decodeGroupLeader); + /* reset the bool to let the leader know that we are going away */ + MyProc->decodeAbortPending = false; + MyProc->decodeLocked = false; + txn->txn_flags |= TXN_ROLLBACK; + ok = false; + } + else + { + MyProc->decodeLocked = true; + ok = true; + } + LWLockRelease(leader_lwlock); + } + else + return false; + + return ok; +} + +void +LogicalUnlockTransaction(ReorderBufferTXN *txn) +{ + LWLock *leader_lwlock; + + /* + * Prepared transactions and uncommitted transactions + * that have modified catalogs need to interlock with + * concurrent rollback to ensure that there are no + * issues while decoding + */ + + if (!txn_has_catalog_changes(txn)) + return; + + /* + * Is it a prepared txn? Similar checks for uncommitted + * transactions when we start supporting them + */ + if (!txn_prepared(txn)) + return; + + /* check cached status */ + if (txn_commit(txn)) + return; + if (txn_rollback(txn)) + return; + + Assert(MyProc->decodeGroupLeader); + leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader); + LWLockAcquire(leader_lwlock, LW_SHARED); + if (MyProc->decodeAbortPending) + { + /* + * Remove ourself from the decodeGroupMembership + */ + LWLockRelease(leader_lwlock); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + RemoveDecodeGroupMemberLocked(MyProc->decodeGroupLeader); + /* reset the bool to let the leader know that we are going away */ + MyProc->decodeAbortPending = false; + txn->txn_flags |= TXN_ROLLBACK; + } + MyProc->decodeLocked = false; + LWLockRelease(leader_lwlock); + return; +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 948343e4ae..5d33931223 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -72,10 +72,11 @@ void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - uint8 flags = 0; + uint8 flags = 0; pq_sendbyte(out, 'C'); /* sending COMMIT */ + flags |= LOGICALREP_IS_COMMIT; /* send the flags field (unused for now) */ pq_sendbyte(out, flags); @@ -86,21 +87,106 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, } /* - * Read transaction COMMIT from the stream. + * Write ABORT to the output stream. + */ +void +logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'C'); /* sending ABORT flag below */ + + flags |= LOGICALREP_IS_ABORT; + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, abort_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); +} + +/* + * Read transaction COMMIT|ABORT from the stream. */ void -logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) +logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data, + uint8 *flags) { - /* read flags (unused for now) */ - uint8 flags = pq_getmsgbyte(in); + /* read flags */ + uint8 commit_flags = pq_getmsgbyte(in); - if (flags != 0) - elog(ERROR, "unrecognized flags %u in commit message", flags); + if (!(commit_flags & LOGICALREP_COMMIT_MASK)) + elog(ERROR, "unrecognized flags %u in commit|abort message", + commit_flags); /* read fields */ commit_data->commit_lsn = pq_getmsgint64(in); commit_data->end_lsn = pq_getmsgint64(in); commit_data->committime = pq_getmsgint64(in); + + /* set gid to empty */ + commit_data->gid[0] = '\0'; + + *flags = commit_flags; +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'P'); /* sending PREPARE protocol */ + + if (txn->txn_flags & TXN_COMMIT_PREPARED) + flags |= LOGICALREP_IS_COMMIT_PREPARED; + else if (txn->txn_flags & TXN_ROLLBACK_PREPARED) + flags |= LOGICALREP_IS_ROLLBACK_PREPARED; + else if (txn->txn_flags & TXN_PREPARE) + flags |= LOGICALREP_IS_PREPARE; + + if (flags == 0) + elog(ERROR, "unrecognized flags %u in [commit|rollback] prepare message", flags); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepCommitData *commit_data, uint8 *flags) +{ + /* read flags */ + uint8 prep_flags = pq_getmsgbyte(in); + + if (!(prep_flags & LOGICALREP_PREPARE_MASK)) + elog(ERROR, "unrecognized flags %u in prepare message", prep_flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); + + /* read gid */ + strcpy(commit_data->gid, pq_getmsgstring(in)); + + /* set flags */ + *flags = prep_flags; } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c72a611a39..0b45f4f20f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -623,7 +623,7 @@ AssertTXNLsnOrder(ReorderBuffer *rb) if (prev_first_lsn != InvalidXLogRecPtr) Assert(prev_first_lsn < cur_txn->first_lsn); - Assert(!cur_txn->is_known_as_subxact); + Assert(!txn_is_subxact(cur_txn)); prev_first_lsn = cur_txn->first_lsn; } #endif @@ -641,7 +641,7 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); - Assert(!txn->is_known_as_subxact); + Assert(!txn_is_subxact(txn)); Assert(txn->first_lsn != InvalidXLogRecPtr); return txn; } @@ -675,9 +675,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, dlist_push_tail(&txn->subtxns, &subtxn->node); txn->nsubtxns++; } - else if (!subtxn->is_known_as_subxact) + else if (!txn_is_subxact(subtxn)) { - subtxn->is_known_as_subxact = true; + subtxn->txn_flags |= TXN_IS_SUBXACT; Assert(subtxn->nsubtxns == 0); /* remove from lsn order list of top-level transactions */ @@ -738,9 +738,9 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, subtxn->final_lsn = commit_lsn; subtxn->end_lsn = end_lsn; - if (!subtxn->is_known_as_subxact) + if (!txn_is_subxact(subtxn)) { - subtxn->is_known_as_subxact = true; + subtxn->txn_flags |= TXN_IS_SUBXACT; Assert(subtxn->nsubtxns == 0); /* remove from lsn order list of top-level transactions */ @@ -849,7 +849,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { ReorderBufferChange *cur_change; - if (txn->serialized) + if (txn_is_serialized(txn)) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); @@ -878,7 +878,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { ReorderBufferChange *cur_change; - if (cur_txn->serialized) + if (txn_is_serialized(cur_txn)) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, cur_txn); @@ -1044,7 +1044,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * they originally were happening inside another subtxn, so we won't * ever recurse more than one level deep here. */ - Assert(subtxn->is_known_as_subxact); + Assert(txn_is_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); ReorderBufferCleanupTXN(rb, subtxn); @@ -1083,7 +1083,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Remove TXN from its containing list. * - * Note: if txn->is_known_as_subxact, we are deleting the TXN from its + * Note: if txn_is_subxact(), we are deleting the TXN from its * parent's list of known subxacts; this leaves the parent's nsubxacts * count too high, but we don't care. Otherwise, we are deleting the TXN * from the LSN-ordered list of toplevel TXNs. @@ -1098,7 +1098,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(found); /* remove entries spilled to disk */ - if (txn->serialized) + if (txn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); /* deallocate */ @@ -1115,7 +1115,7 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_iter iter; HASHCTL hash_ctl; - if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids)) + if (!txn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) return; memset(&hash_ctl, 0, sizeof(hash_ctl)); @@ -1264,25 +1264,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) * the top and subtransactions (using a k-way merge) and replay the changes in * lsn order. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, +static void +ReorderBufferCommitInternal(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; volatile Snapshot snapshot_now; volatile CommandId command_id = FirstCommandId; bool using_subtxn; ReorderBufferIterTXNState *volatile iterstate = NULL; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -1326,20 +1319,62 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change; ReorderBufferChange *specinsert = NULL; + bool change_cleanup = false; + bool check_txn_status, + apply_started = false; + bool is_prepared = txn_prepared(txn); + + /* + * check for the xid once to see if it's already + * committed. Otherwise we need to consult the + * decode_txn filter function to enquire if it's + * still ok for us to continue to decode this xid + * + * This is to handle cases of concurrent abort + * happening parallel to the decode activity + */ + check_txn_status = TransactionIdDidCommit(txn->xid)? + false : true; if (using_subtxn) BeginInternalSubTransaction("replay"); else StartTransactionCommand(); - rb->begin(rb, txn); - iterstate = ReorderBufferIterTXNInit(rb, txn); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; Oid reloid; + /* + * While decoding 2PC or while streaming uncommitted + * transactions, check if this transaction needs to + * be still decoded. If the transaction got aborted + * or if we were instructed to stop decoding, then + * bail out early. + */ + if (check_txn_status && rb->filter_decode_txn(rb, txn)) + { + elog(LOG, "%s decoding of %s (%u)", + apply_started? "stopping":"skipping", + is_prepared? txn->gid:"", + txn->xid); + change_cleanup = true; + goto change_cleanuptxn; + } + + /* + * We have decided to apply changes based on the go + * ahead from the above decode filter, BEGIN the + * transaction on the other side + */ + if (apply_started == false) + { + rb->begin(rb, txn); + apply_started = true; + } + switch (change->action) { case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -1375,7 +1410,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); + /* Lock transaction before catalog access */ + if (!LogicalLockTransaction(txn)) + { + elog(LOG, "stopping decoding of %s (%u)", + is_prepared? txn->gid:"", + txn->xid); + change_cleanup = true; + goto change_cleanuptxn; + } relation = RelationIdGetRelation(reloid); + LogicalUnlockTransaction(txn); if (relation == NULL) elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", @@ -1546,6 +1591,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } } +change_cleanuptxn: /* * There's a speculative insertion remaining, just clean in up, it * can't have been successful, otherwise we'd gotten a confirmation @@ -1561,8 +1607,24 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + if (change_cleanup) + { + /* call abort if we have sent any changes */ + if (apply_started) + rb->abort(rb, txn, commit_lsn); + } + else + { + /* call commit or prepare callback */ + if (txn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } + + /* remove ourself from the decodeGroupLeader */ + if (MyProc->decodeGroupLeader) + RemoveDecodeGroupMember(MyProc->decodeGroupLeader); /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -1589,7 +1651,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (snapshot_now->copied) ReorderBufferFreeSnap(rb, snapshot_now); - /* remove potential on-disk data, and deallocate */ + /* + * remove potential on-disk data, and deallocate. + * + * We remove it even for prepared transactions. + * This is because the COMMIT PREPARED needs + * no data post the successful PREPARE + */ ReorderBufferCleanupTXN(rb, txn); } PG_CATCH(); @@ -1623,6 +1691,136 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_END_TRY(); } +/* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +bool +ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + return rb->filter_prepare(rb, txn, xid, gid); +} + + +/* + * Commit a transaction. + * + * See comments for ReorderBufferCommitInternal() + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Prepare a twophase transaction. It calls ReorderBufferCommitInternal() + * since all prepared transactions need to be decoded at PREPARE time. + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= TXN_PREPARE; + strcpy(txn->gid, gid); + + ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn, + commit_time, origin_id, origin_lsn); +} + +/* + * Check whether this transaction was sent as prepared to subscribers. + * Called while handling commit|abort prepared. + */ +bool +ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* + * Always call the prepare filter. It's the job of the prepare + * filter to give us the *same* response for a given xid + * across multiple calls (including ones on restart) + */ + return !(rb->filter_prepare(rb, txn, xid, gid)); +} + +/* + * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + + /* + * The transaction may or may not exist (during restarts for + * example). Anyways, 2PC transactions do not contain any + * reorderbuffers. So allow it to be created below. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + strcpy(txn->gid, gid); + + if (is_commit) + { + txn->txn_flags |= TXN_COMMIT_PREPARED; + rb->commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= TXN_ROLLBACK_PREPARED; + rb->abort_prepared(rb, txn, commit_lsn); + } + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferCleanupTXN(rb, txn); +} + /* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. @@ -1688,7 +1886,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) * final_lsn to that of their last change; this causes * ReorderBufferRestoreCleanup to do the right thing. */ - if (txn->serialized && txn->final_lsn == 0) + if (txn_is_serialized(txn) && txn->final_lsn == 0) { ReorderBufferChange *last = dlist_tail_element(ReorderBufferChange, node, &txn->changes); @@ -1934,7 +2132,7 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - txn->has_catalog_changes = true; + txn->txn_flags |= TXN_HAS_CATALOG_CHANGES; } /* @@ -1951,7 +2149,7 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) if (txn == NULL) return false; - return txn->has_catalog_changes; + return txn_has_catalog_changes(txn); } /* @@ -2095,7 +2293,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(spilled == txn->nentries_mem); Assert(dlist_is_empty(&txn->changes)); txn->nentries_mem = 0; - txn->serialized = true; + txn->txn_flags |= TXN_SERIALIZED; if (fd != -1) CloseTransientFile(fd); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 83c69092ae..15048378d1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -452,8 +452,9 @@ static void apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + uint8 flags = 0; - logicalrep_read_commit(s, &commit_data); + logicalrep_read_commit(s, &commit_data, &flags); Assert(commit_data.commit_lsn == remote_final_lsn); @@ -467,7 +468,11 @@ apply_handle_commit(StringInfo s) replorigin_session_origin_lsn = commit_data.end_lsn; replorigin_session_origin_timestamp = commit_data.committime; - CommitTransactionCommand(); + if (flags & LOGICALREP_IS_COMMIT) + CommitTransactionCommand(); + else if (flags & LOGICALREP_IS_ABORT) + AbortCurrentTransaction(); + pgstat_report_stat(false); store_flush_position(commit_data.end_lsn); @@ -487,6 +492,120 @@ apply_handle_commit(StringInfo s) pgstat_report_activity(STATE_IDLE, NULL); } +static void +apply_handle_prepare_txn(LogicalRepCommitData *commit_data) +{ + Assert(commit_data->commit_lsn == remote_final_lsn); + /* The synchronization worker runs in single transaction. */ + if (IsTransactionState() && !am_tablesync_worker()) + { + /* End the earlier transaction and start a new one */ + BeginTransactionBlock(); + CommitTransactionCommand(); + StartTransactionCommand(); + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + PrepareTransactionBlock(commit_data->gid); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +static void +apply_handle_commit_prepared_txn(LogicalRepCommitData *commit_data) +{ + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + FinishPreparedTransaction(commit_data->gid, true, false); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +static void +apply_handle_rollback_prepared_txn(LogicalRepCommitData *commit_data) +{ + /* there is no transaction when ABORT/ROLLBACK PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + FinishPreparedTransaction(commit_data->gid, false, true); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data->end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle PREPARE message. + */ +static void +apply_handle_prepare(StringInfo s) +{ + LogicalRepCommitData commit_data; + uint8 flags = 0; + + logicalrep_read_prepare(s, &commit_data, &flags); + + if (flags & LOGICALREP_IS_PREPARE) + apply_handle_prepare_txn(&commit_data); + else if (flags & LOGICALREP_IS_COMMIT_PREPARED) + apply_handle_commit_prepared_txn(&commit_data); + else if (flags & LOGICALREP_IS_ROLLBACK_PREPARED) + apply_handle_rollback_prepared_txn(&commit_data); + else + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("wrong [commit|rollback] prepare message"))); +} + /* * Handle ORIGIN message. * @@ -884,10 +1003,14 @@ apply_dispatch(StringInfo s) case 'B': apply_handle_begin(s); break; - /* COMMIT */ + /* COMMIT|ABORT */ case 'C': apply_handle_commit(s); break; + /* [COMMIT|ROLLBACK] PREPARE */ + case 'P': + apply_handle_prepare(s); + break; /* INSERT */ case 'I': apply_handle_insert(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 40a1ef3c1d..55bdee9abe 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -37,11 +37,23 @@ static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_abort_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr abort_lsn); static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pgoutput_filter_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, TransactionId xid, const char *gid); +static bool pgoutput_decode_txn_filter(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); +static void pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; @@ -79,7 +91,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->commit_cb = pgoutput_commit_txn; + cb->abort_cb = pgoutput_abort_txn; + + cb->filter_prepare_cb = pgoutput_filter_prepare; + cb->prepare_cb = pgoutput_prepare_txn; + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; + cb->abort_prepared_cb = pgoutput_abort_prepared_txn; + cb->filter_by_origin_cb = pgoutput_origin_filter; + cb->filter_decode_txn_cb = pgoutput_decode_txn_filter; cb->shutdown_cb = pgoutput_shutdown; } @@ -251,6 +271,61 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* + * ABORT callback + */ +static void +pgoutput_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_abort(ctx->out, txn, abort_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * PREPARE callback + */ +static void +pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT PREPARED callback + */ +static void +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} +/* + * PREPARE callback + */ +static void +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + /* * Sends the decoded DML over wire. */ @@ -361,6 +436,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +/* + * Filter out unnecessary two-phase transactions. + * + * Currently, we forward all two-phase transactions + */ +static bool +pgoutput_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + TransactionId xid, const char *gid) +{ + return false; +} + /* * Currently we always forward. */ @@ -371,6 +458,37 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, return false; } +/* + * Check if we should continue to decode this transaction. + * + * If it has aborted in the meanwhile, then there's no sense + * in decoding and sending the rest of the changes, we might + * as well ask the subscribers to abort immediately. + * + * This should be called if we are streaming a transaction + * before it's committed or if we are decoding a 2PC + * transaction. Otherwise we always decode committed + * transactions + * + * Additional checks can be added here, as needed + */ +static bool +pgoutput_decode_txn_filter(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* + * Due to caching, repeated TransactionIdDidAbort calls + * shouldn't be that expensive + */ + if (txn != NULL && + TransactionIdIsValid(txn->xid) && + TransactionIdDidAbort(txn->xid)) + return true; + + /* if txn is NULL, filter it out */ + return (txn != NULL)? false:true; +} + /* * Shutdown the output plugin. * diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 1a00011adc..f6bb4e509f 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2432,6 +2432,45 @@ BackendXidGetPid(TransactionId xid) return result; } +/* + * BackendXidGetProc -- get a backend's PGPROC given its XID + * + * Note that it is up to the caller to be sure that the question + * remains meaningful for long enough for the answer to be used ... + * + * Only main transaction Ids are considered. + * + */ +PGPROC * +BackendXidGetProc(TransactionId xid) +{ + PGPROC *result = NULL; + ProcArrayStruct *arrayP = procArray; + int index; + + if (xid == InvalidTransactionId) /* never match invalid xid */ + return 0; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + PGPROC *proc = &allProcs[pgprocno]; + volatile PGXACT *pgxact = &allPgXact[pgprocno]; + + if (pgxact->xid == xid) + { + result = proc; + break; + } + } + + LWLockRelease(ProcArrayLock); + + return result; +} + /* * IsBackendPid -- is a given pid a running backend * diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6f30e082b2..26d35c7807 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -267,6 +267,11 @@ InitProcGlobal(void) /* Initialize lockGroupMembers list. */ dlist_init(&procs[i].lockGroupMembers); + + /* Initialize decodeGroupMembers list. */ + dlist_init(&procs[i].decodeGroupMembers); + procs[i].decodeAbortPending = false; + procs[i].decodeLocked = false; } /* @@ -406,6 +411,12 @@ InitProcess(void) Assert(MyProc->lockGroupLeader == NULL); Assert(dlist_is_empty(&MyProc->lockGroupMembers)); + /* Check that group decode fields are in a proper initial state. */ + Assert(MyProc->decodeGroupLeader == NULL); + Assert(dlist_is_empty(&MyProc->decodeGroupMembers)); + MyProc->decodeAbortPending = false; + MyProc->decodeLocked = false; + /* Initialize wait event information. */ MyProc->wait_event_info = 0; @@ -581,6 +592,12 @@ InitAuxiliaryProcess(void) Assert(MyProc->lockGroupLeader == NULL); Assert(dlist_is_empty(&MyProc->lockGroupMembers)); + /* Check that group decode fields are in a proper initial state. */ + Assert(MyProc->decodeGroupLeader == NULL); + Assert(dlist_is_empty(&MyProc->decodeGroupMembers)); + MyProc->decodeAbortPending = false; + MyProc->decodeLocked = false; + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly @@ -1887,3 +1904,268 @@ BecomeLockGroupMember(PGPROC *leader, int pid) return ok; } + +/* + * BecomeDecodeGroupLeader - designate process as decode group leader + * + * Once this function has returned, other processes can join the decode group + * by calling BecomeDecodeGroupMember. + */ +PGPROC * +BecomeDecodeGroupLeader(TransactionId xid, bool is_prepared) +{ + PGPROC *proc = NULL; + int pid; + LWLock *leader_lwlock; + + Assert(xid != InvalidTransactionId); + + + proc = BackendXidGetProc(xid); + if (proc) + pid = proc->pid; + + /* + * This proc will become decodeGroupLeader if it's + * not already + */ + if (proc && proc->decodeGroupLeader != proc) + { + volatile PGXACT *pgxact; + /* Create single-member group, containing this proc. */ + leader_lwlock = LockHashPartitionLockByProc(proc); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + /* recheck we are still the same */ + pgxact = &ProcGlobal->allPgXact[proc->pgprocno]; + if (proc->pid == pid && pgxact->xid == xid) + { + if (is_prepared) + Assert(pid == 0); + /* recheck if someone else did not already assign us */ + if (proc->decodeGroupLeader != proc) + { + /* We had better not be a follower. */ + Assert(proc->decodeGroupLeader == NULL); + proc->decodeGroupLeader = proc; + dlist_push_head(&proc->decodeGroupMembers, + &proc->decodeGroupLink); + } + } + else + { + /* proc entry is gone */ + proc = NULL; + } + LWLockRelease(leader_lwlock); + } + + elog(DEBUG1, "became group leader (%p)", proc); + return proc; +} + +/* + * BecomeDecodeGroupMember - designate process as decode group member + * + * This is pretty straightforward except for the possibility that the leader + * whose group we're trying to join might exit before we manage to do so; + * and the PGPROC might get recycled for an unrelated process. To avoid + * that, we require the caller to pass the PID of the intended PGPROC as + * an interlock. Returns true if we successfully join the intended lock + * group, and false if not. + */ +bool +BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared) +{ + LWLock *leader_lwlock; + bool ok = false; + + /* Group leader can't become member of group */ + Assert(MyProc != leader); + + /* Can't already be a member of a group */ + Assert(MyProc->decodeGroupLeader == NULL); + + /* PID must be valid OR this is a prepared transaction. */ + Assert(pid != 0 || is_prepared); + + /* + * Get lock protecting the group fields. Note LockHashPartitionLockByProc + * accesses leader->pgprocno in a PGPROC that might be free. This is safe + * because all PGPROCs' pgprocno fields are set during shared memory + * initialization and never change thereafter; so we will acquire the + * correct lock even if the leader PGPROC is in process of being recycled. + */ + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + + /* Is this the leader we're looking for? */ + if (leader->pid == pid && leader->decodeGroupLeader == leader) + { + if (is_prepared) + Assert(pid == 0); + /* is the leader going away? */ + if (leader->decodeAbortPending) + ok = false; + else + { + /* OK, join the group */ + ok = true; + MyProc->decodeGroupLeader = leader; + dlist_push_tail(&leader->decodeGroupMembers, &MyProc->decodeGroupLink); + } + } + else + MyProc->decodeGroupLeader = NULL; + LWLockRelease(leader_lwlock); + + elog(DEBUG1, "became group member (%p) to (%p)", MyProc, leader); + return ok; +} + +/* + * Remove a decodeGroupMember from the decodeGroupMembership of + * decodeGroupLeader + * Acquire lock + */ +void +RemoveDecodeGroupMember(PGPROC *leader) +{ + LWLock *leader_lwlock; + + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + RemoveDecodeGroupMemberLocked(leader); + LWLockRelease(leader_lwlock); + + return; +} + +/* + * Remove a decodeGroupMember from the decodeGroupMembership of + * decodeGroupLeader + * Assumes that the caller is holding appropriate lock + */ +void +RemoveDecodeGroupMemberLocked(PGPROC *leader) +{ + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + dlist_delete(&MyProc->decodeGroupLink); + /* leader links to itself, so never empty */ + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + MyProc->decodeGroupLeader = NULL; + elog(DEBUG1, "removed group member (%p) from (%p)", MyProc, leader); + + return; +} + +/* + * Indicate to all decodeGroupMembers that this transaction is + * going away. + * + * Wait for all decodeGroupMembers to ack back before returning + * from here but only in case of aborts. + * + * This function should be called *after* the proc has been + * removed from the procArray. + * + * If the transaction is committing, it's ok for the + * decoders to continue merrily. When it tries to lock this + * proc, it won't find it and check for transaction status + * and cache the commit status for future calls in + * LogicalLockTransaction + */ +void +LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit) +{ + LWLock *leader_lwlock; + dlist_mutable_iter change_i; + dlist_iter iter; + PGPROC *proc; + bool do_wait; + + leader_lwlock = LockHashPartitionLockByProc(leader); + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + /* mark ourself as aborting */ + if (!isCommit) + leader->decodeAbortPending = true; + + if (leader->decodeGroupLeader == NULL) + { + Assert(dlist_is_empty(&leader->decodeGroupMembers)); + LWLockRelease(leader_lwlock); + return; + } + +recheck: + do_wait = false; + Assert(leader->decodeGroupLeader == leader); + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + if (!isCommit) + { + dlist_foreach(iter, &leader->decodeGroupMembers) + { + proc = dlist_container(PGPROC, decodeGroupLink, iter.cur); + /* mark the proc to indicate abort is pending */ + if (proc == leader) + continue; + if (!proc->decodeAbortPending) + { + proc->decodeAbortPending = true; + elog(DEBUG1, "marking group member (%p) from (%p) for abort", + proc, leader); + } + /* if the proc is currently locked, wait */ + if (proc->decodeLocked) + do_wait = true; + } + + if (do_wait) + { + int rc; + LWLockRelease(leader_lwlock); + + elog(LOG, "Waiting for backends to abort decoding"); + /* + * Wait on our latch to allow decodeGroupMembers to + * go away soon + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 100L, + WAIT_EVENT_PG_SLEEP); + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + CHECK_FOR_INTERRUPTS(); + + /* Recheck decodeGroupMembers */ + LWLockAcquire(leader_lwlock, LW_EXCLUSIVE); + goto recheck; + } + } + + /* + * All backends exited cleanly in case of aborts above, + * remove decodeGroupMembers now for both commit/abort cases + */ + Assert(leader->decodeGroupLeader == leader); + Assert(!dlist_is_empty(&leader->decodeGroupMembers)); + dlist_foreach_modify(change_i, &leader->decodeGroupMembers) + { + proc = dlist_container(PGPROC, decodeGroupLink, change_i.cur); + Assert(!proc->decodeLocked); + dlist_delete(&proc->decodeGroupLink); + elog(DEBUG1, "deleting group member (%p) from (%p)", + proc, leader); + proc->decodeGroupLeader = NULL; + } + Assert(dlist_is_empty(&leader->decodeGroupMembers)); + leader->decodeGroupLeader = NULL; + leader->decodeAbortPending = false; + LWLockRelease(leader_lwlock); + + return; +} diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 3abe7d6155..8a6e0a1c2d 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -455,13 +455,13 @@ standard_ProcessUtility(PlannedStmt *pstmt, case TRANS_STMT_COMMIT_PREPARED: PreventTransactionChain(isTopLevel, "COMMIT PREPARED"); PreventCommandDuringRecovery("COMMIT PREPARED"); - FinishPreparedTransaction(stmt->gid, true); + FinishPreparedTransaction(stmt->gid, true, false); break; case TRANS_STMT_ROLLBACK_PREPARED: PreventTransactionChain(isTopLevel, "ROLLBACK PREPARED"); PreventCommandDuringRecovery("ROLLBACK PREPARED"); - FinishPreparedTransaction(stmt->gid, false); + FinishPreparedTransaction(stmt->gid, false, false); break; case TRANS_STMT_ROLLBACK: diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 34d9470811..cbc63a18ad 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xact.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -46,15 +47,18 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); +extern void ParsePrepareRecord(uint8 info, char *xlrec, + xl_xact_parsed_prepare *parsed); extern void StandbyRecoverPreparedTransactions(void); extern void RecoverPreparedTransactions(void); extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); -extern void FinishPreparedTransaction(const char *gid, bool isCommit); +extern void FinishPreparedTransaction(const char *gid, bool isCommit, + bool missing_ok); extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, - XLogRecPtr end_lsn); + XLogRecPtr end_lsn, RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); #endif /* TWOPHASE_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 6445bbc46f..d2e104423d 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -21,6 +21,13 @@ #include "storage/sinval.h" #include "utils/datetime.h" +/* + * Maximum size of Global Transaction ID (including '\0'). + * + * Note that the max value of GIDSIZE must fit in the uint16 gidlen, + * specified in TwoPhaseFileHeader. + */ +#define GIDSIZE 200 /* * Xact isolation levels @@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_TWOPHASE (1U << 4) #define XACT_XINFO_HAS_ORIGIN (1U << 5) #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) +#define XACT_XINFO_HAS_GID (1U << 7) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -302,13 +310,40 @@ typedef struct xl_xact_parsed_commit SharedInvalidationMessage *msgs; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; XLogRecPtr origin_lsn; TimestampTz origin_timestamp; } xl_xact_parsed_commit; +typedef struct xl_xact_parsed_prepare +{ + Oid dbId; /* MyDatabaseId */ + + int nsubxacts; + TransactionId *subxacts; + + int ncommitrels; + RelFileNode *commitrels; + + int nabortrels; + RelFileNode *abortrels; + + int nmsgs; + SharedInvalidationMessage *msgs; + + TransactionId twophase_xid; + char twophase_gid[GIDSIZE]; + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; +} xl_xact_parsed_prepare; + typedef struct xl_xact_parsed_abort { + Oid dbId; + Oid tsId; + TimestampTz xact_time; uint32 xinfo; @@ -319,6 +354,10 @@ typedef struct xl_xact_parsed_abort RelFileNode *xnodes; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; } xl_xact_parsed_abort; @@ -386,12 +425,13 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, int xactflags, - TransactionId twophase_xid); + TransactionId twophase_xid, const char *twophase_gid); extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid); + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 619c5f4d73..9dad4c997f 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -89,6 +89,11 @@ typedef struct LogicalDecodingContext bool prepared_write; XLogRecPtr write_location; TransactionId write_xid; + + /* + * Capabilities of the output plugin. + */ + bool enable_twophase; } LogicalDecodingContext; @@ -117,6 +122,8 @@ extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); +extern bool LogicalLockTransaction(ReorderBufferTXN *txn); +extern void LogicalUnlockTransaction(ReorderBufferTXN *txn); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 0eb21057c5..886025f3aa 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -69,11 +69,20 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +#define LOGICALREP_IS_COMMIT 0x01 +#define LOGICALREP_IS_ABORT 0x02 +#define LOGICALREP_IS_PREPARE 0x04 +#define LOGICALREP_IS_COMMIT_PREPARED 0x08 +#define LOGICALREP_IS_ROLLBACK_PREPARED 0x10 +#define LOGICALREP_COMMIT_MASK (LOGICALREP_IS_COMMIT | LOGICALREP_IS_ABORT) +#define LOGICALREP_PREPARE_MASK (LOGICALREP_IS_PREPARE | LOGICALREP_IS_COMMIT_PREPARED | LOGICALREP_IS_ROLLBACK_PREPARED) typedef struct LogicalRepCommitData { + uint8 flag; XLogRecPtr commit_lsn; XLogRecPtr end_lsn; TimestampTz committime; + char gid[GIDSIZE]; } LogicalRepCommitData; extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); @@ -81,8 +90,14 @@ extern void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data); extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +extern void logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); extern void logicalrep_read_commit(StringInfo in, - LogicalRepCommitData *commit_data); + LogicalRepCommitData *commit_data, uint8 *flags); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepCommitData *commit_data, uint8 *flags); extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 78fd38bb16..61c5019adf 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -67,6 +67,46 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* + * Called for an implicit ABORT of a transaction. + */ +typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare + * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED + * and sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* * Called for the generic logical decoding messages. */ @@ -84,6 +124,12 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id); +/* + * Filter to check if we should continue to decode this transaction + */ +typedef bool (*LogicalDecodeFilterDecodeTxnCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + /* * Called to shutdown an output plugin. */ @@ -98,8 +144,14 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeAbortCB abort_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeAbortPreparedCB abort_prepared_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; + LogicalDecodeFilterDecodeTxnCB filter_decode_txn_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0970abca52..a43e941b25 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -137,20 +138,50 @@ typedef struct ReorderBufferChange dlist_node node; } ReorderBufferChange; +/* ReorderBufferTXN flags */ +#define TXN_HAS_CATALOG_CHANGES 0x0001 +#define TXN_IS_SUBXACT 0x0002 +#define TXN_SERIALIZED 0x0004 +#define TXN_PREPARE 0x0008 +#define TXN_COMMIT_PREPARED 0x0010 +#define TXN_ROLLBACK_PREPARED 0x0020 +#define TXN_COMMIT 0x0040 +#define TXN_ROLLBACK 0x0080 + +/* does the txn have catalog changes */ +#define txn_has_catalog_changes(txn) (txn->txn_flags & TXN_HAS_CATALOG_CHANGES) +/* is the txn known as a subxact? */ +#define txn_is_subxact(txn) (txn->txn_flags & TXN_IS_SUBXACT) +/* + * Has this transaction been spilled to disk? It's not always possible to + * deduce that fact by comparing nentries with nentries_mem, because e.g. + * subtransactions of a large transaction might get serialized together + * with the parent - if they're restored to memory they'd have + * nentries_mem == nentries. + */ +#define txn_is_serialized(txn) (txn->txn_flags & TXN_SERIALIZED) +/* is this txn prepared? */ +#define txn_prepared(txn) (txn->txn_flags & TXN_PREPARE) +/* was this prepared txn committed in the meanwhile? */ +#define txn_commit_prepared(txn) (txn->txn_flags & TXN_COMMIT_PREPARED) +/* was this prepared txn aborted in the meanwhile? */ +#define txn_rollback_prepared(txn) (txn->txn_flags & TXN_ROLLBACK_PREPARED) +/* was this txn committed in the meanwhile? */ +#define txn_commit(txn) (txn->txn_flags & TXN_COMMIT) +/* was this prepared txn aborted in the meanwhile? */ +#define txn_rollback(txn) (txn->txn_flags & TXN_ROLLBACK) + typedef struct ReorderBufferTXN { + int txn_flags; + /* * The transactions transaction id, can be a toplevel or sub xid. */ TransactionId xid; - /* did the TX have catalog changes */ - bool has_catalog_changes; - - /* - * Do we know this is a subxact? - */ - bool is_known_as_subxact; + /* In case of 2PC we need to pass GID to output plugin */ + char gid[GIDSIZE]; /* * LSN of the first data carrying, WAL record with knowledge about this @@ -214,15 +245,6 @@ typedef struct ReorderBufferTXN */ uint64 nentries_mem; - /* - * Has this transaction been spilled to disk? It's not always possible to - * deduce that fact by comparing nentries with nentries_mem, because e.g. - * subtransactions of a large transaction might get serialized together - * with the parent - if they're restored to memory they'd have - * nentries_mem == nentries. - */ - bool serialized; - /* * List of ReorderBufferChange structs, including new Snapshots and new * CommandIds @@ -294,6 +316,40 @@ typedef void (*ReorderBufferCommitCB) ( ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* abort callback signature */ +typedef void (*ReorderBufferAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +typedef bool (*ReorderBufferFilterDecodeTxnCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn); + +typedef bool (*ReorderBufferFilterPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + TransactionId xid, + const char *gid); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* abort prepared callback signature */ +typedef void (*ReorderBufferAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + /* message callback signature */ typedef void (*ReorderBufferMessageCB) ( ReorderBuffer *rb, @@ -329,6 +385,12 @@ struct ReorderBuffer ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; ReorderBufferCommitCB commit; + ReorderBufferAbortCB abort; + ReorderBufferFilterDecodeTxnCB filter_decode_txn; + ReorderBufferFilterPrepareCB filter_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferAbortPreparedCB abort_prepared; ReorderBufferMessageCB message; /* @@ -371,6 +433,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); @@ -394,6 +461,15 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, + const char *gid); +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, + const char *gid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 5c19a61dcf..fdfc582874 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -200,6 +200,26 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * Support for decoding groups. Use LockHashPartitionLockByProc on the group + * leader to get the LWLock protecting these fields. + * + * For prepared and uncommitted transactions, decoding backends working on + * the same XID will link themselves up to the corresponding PGPROC + * entry (decodeGroupLeader). + * + * They will remove themselves when they are done decoding. + * + * If the prepared or uncommitted transaction decides to abort, then + * the decodeGroupLeader will set the decodeAbortPending flag allowing + * the decodeGroupMembers to abort their decoding appropriately + */ + PGPROC *decodeGroupLeader; /* decode group leader, if I'm a member */ + dlist_head decodeGroupMembers; /* list of members, if I'm a leader */ + dlist_node decodeGroupLink; /* my member link, if I'm a member */ + bool decodeLocked; /* is it currently locked by this proc? */ + bool decodeAbortPending; /* is the decode group leader aborting? */ }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ @@ -326,5 +346,10 @@ extern PGPROC *AuxiliaryPidGetProc(int pid); extern void BecomeLockGroupLeader(void); extern bool BecomeLockGroupMember(PGPROC *leader, int pid); +extern PGPROC *BecomeDecodeGroupLeader(TransactionId xid, bool is_prepared); +extern bool BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared); +extern void RemoveDecodeGroupMember(PGPROC *leader); +extern void RemoveDecodeGroupMemberLocked(PGPROC *leader); +extern void LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit); #endif /* PROC_H */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 75bab2985f..68173743ae 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -98,6 +98,7 @@ extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids extern PGPROC *BackendPidGetProc(int pid); extern PGPROC *BackendPidGetProcWithLock(int pid); extern int BackendXidGetPid(TransactionId xid); +extern PGPROC *BackendXidGetProc(TransactionId xid); extern bool IsBackendPid(int pid); extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin, diff --git a/src/test/subscription/t/009_twophase.pl b/src/test/subscription/t/009_twophase.pl new file mode 100644 index 0000000000..c7f373df93 --- /dev/null +++ b/src/test/subscription/t/009_twophase.pl @@ -0,0 +1,163 @@ +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 12; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf( + 'postgresql.conf', qq( + max_prepared_transactions = 10 + )); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf( + 'postgresql.conf', qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', +"ALTER PUBLICATION tap_pub ADD TABLE tab_full, tab_full2" +); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', + "BEGIN;INSERT INTO tab_full VALUES (11);PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); + is($result, qq(1), 'Row inserted via 2PC has committed on subscriber'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(0), 'transaction is committed on subscriber'); + +# check that 2PC gets replicated to subscriber +$node_publisher->safe_psql('postgres', + "BEGIN;INSERT INTO tab_full VALUES (12);PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is in prepared state on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets aborted on subscriber +$node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); + is($result, qq(0), 'Row inserted via 2PC is not present on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';"); + is($result, qq(0), 'transaction is aborted on subscriber'); + +# Check that commit prepared is decoded properly on crash restart +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + INSERT INTO tab_full VALUES (13); + PREPARE TRANSACTION 'test_prepared_tab';"); +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (11,12);"); +is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); + +# TODO add test cases involving DDL. This can be added after we add functionality +# to replicate DDL changes to subscriber. + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');