diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 6c18189d9d..79b9622600 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -26,7 +26,7 @@ installcheck:;
# installation, allow to do so, but only if requested explicitly.
installcheck-force: regresscheck-install-force isolationcheck-install-force
-check: regresscheck isolationcheck
+check: regresscheck isolationcheck 2pc-check
submake-regress:
$(MAKE) -C $(top_builddir)/src/test/regress all
@@ -66,3 +66,6 @@ isolationcheck-install-force: all | submake-isolation submake-test_decoding temp
isolationcheck isolationcheck-install-force
temp-install: EXTRA_INSTALL=contrib/test_decoding
+
+2pc-check: temp-install
+ $(prove_check)
diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d4ff..2df0b6c198 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,123 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
init
(1 row)
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc_nofilter', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc_nofilter'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
-- test simple successful use of a prepared xact
BEGIN;
INSERT INTO test_prepared1 VALUES (1);
PREPARE TRANSACTION 'test_prepared#1';
+:get_no2pc
+ data
+------
+(0 rows)
+
+:get_with2pc
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
+:get_with2pc_nofilter
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
COMMIT PREPARED 'test_prepared#1';
+:get_no2pc
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
+:get_with2pc
+ data
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
+:get_with2pc_nofilter
+ data
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
INSERT INTO test_prepared1 VALUES (2);
-- test abort of a prepared xact
BEGIN;
INSERT INTO test_prepared1 VALUES (3);
PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+(3 rows)
+
+:get_with2pc
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
+:get_with2pc_nofilter
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+ data
+------
+(0 rows)
+
+:get_with2pc
+ data
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
+:get_with2pc_nofilter
+ data
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
INSERT INTO test_prepared1 VALUES (4);
-- test prepared xact containing ddl
BEGIN;
@@ -26,45 +130,226 @@ INSERT INTO test_prepared1 VALUES (5);
ALTER TABLE test_prepared1 ADD COLUMN data text;
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
-INSERT INTO test_prepared2 VALUES (7);
-COMMIT PREPARED 'test_prepared#3';
--- make sure stuff still works
-INSERT INTO test_prepared1 VALUES (8);
-INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+ relation | locktype | mode
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+:get_no2pc
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+(3 rows)
+
+:get_with2pc
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+(3 rows)
+
+:get_with2pc_nofilter
data
-------------------------------------------------------------------------
BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:4
COMMIT
BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(7 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
+INSERT INTO test_prepared2 VALUES (7);
+:get_no2pc
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
COMMIT
+(3 rows)
+
+:get_with2pc
+ data
+----------------------------------------------------
BEGIN
- table public.test_prepared1: INSERT: id[integer]:4
+ table public.test_prepared2: INSERT: id[integer]:7
COMMIT
+(3 rows)
+
+:get_with2pc_nofilter
+ data
+----------------------------------------------------
BEGIN
table public.test_prepared2: INSERT: id[integer]:7
COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+ data
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ COMMIT
+(4 rows)
+
+:get_with2pc
+ data
+-------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:5
table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
COMMIT
+(4 rows)
+
+:get_with2pc_nofilter
+ data
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#3'
+(1 row)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+:get_no2pc
+ data
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+:get_with2pc
+ data
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+:get_with2pc_nofilter
+ data
+--------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
COMMIT
BEGIN
table public.test_prepared2: INSERT: id[integer]:9
COMMIT
-(22 rows)
+(6 rows)
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'pg_class'::regclass;
+ relation | locktype | mode
+----------+----------+------
+(0 rows)
+
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+ data
+-----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ COMMIT
+(3 rows)
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+:get_with2pc
+ data
+-----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(4 rows)
+
+:get_with2pc_nofilter
+ data
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(8 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- Both will work normally after we commit
+:get_no2pc
+ data
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ COMMIT
+(4 rows)
+
+:get_with2pc
+ data
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ COMMIT
+(4 rows)
+
+:get_with2pc_nofilter
+ data
+--------------------------------------
+ COMMIT PREPARED 'test_prepared_lock'
+(1 row)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
@@ -72,3 +357,15 @@ SELECT pg_drop_replication_slot('regression_slot');
(1 row)
+SELECT pg_drop_replication_slot('regression_slot_2pc');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot_2pc_nofilter');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e72639767e..4197766c50 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -1,22 +1,41 @@
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc_nofilter', 'test_decoding');
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc_nofilter'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
-- test simple successful use of a prepared xact
BEGIN;
INSERT INTO test_prepared1 VALUES (1);
PREPARE TRANSACTION 'test_prepared#1';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
COMMIT PREPARED 'test_prepared#1';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
INSERT INTO test_prepared1 VALUES (2);
-- test abort of a prepared xact
BEGIN;
INSERT INTO test_prepared1 VALUES (3);
PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
INSERT INTO test_prepared1 VALUES (4);
@@ -27,18 +46,74 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
INSERT INTO test_prepared2 VALUES (7);
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (8);
INSERT INTO test_prepared2 VALUES (9);
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'pg_class'::regclass;
+
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+:get_with2pc
+:get_with2pc_nofilter
+RESET statement_timeout;
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- Both will work normally after we commit
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
-- cleanup
DROP TABLE test_prepared1;
@@ -48,3 +123,5 @@ DROP TABLE test_prepared2;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_drop_replication_slot('regression_slot_2pc');
+SELECT pg_drop_replication_slot('regression_slot_2pc_nofilter');
diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl
new file mode 100644
index 0000000000..6722317c9f
--- /dev/null
+++ b/contrib/test_decoding/t/001_twophase.pl
@@ -0,0 +1,102 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Initialize node
+my $node_logical = get_new_node('logical');
+$node_logical->init(allows_streaming => 'logical');
+$node_logical->append_conf(
+ 'postgresql.conf', qq(
+ max_prepared_transactions = 10
+));
+$node_logical->start;
+
+# Create some pre-existing content on logical
+$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+$node_logical->safe_psql('postgres',
+ "INSERT INTO tab SELECT generate_series(1,10)");
+$node_logical->safe_psql('postgres',
+ "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');");
+$node_logical->safe_psql('postgres',
+ "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding');");
+
+# This test is specifically for testing concurrent abort while logical decode is
+# ongoing. The decode-delay value will allow for each change decode to sleep for
+# those many seconds. We also hold the LogicalLockTransaction while we sleep.
+# We will fire off a ROLLBACK from another session when this delayed decode is
+# ongoing. Since we are holding the lock from the call above, this ROLLBACK
+# will wait for the logical backends to do a LogicalUnlockTransaction. We will
+# stop decoding immediately post this and the next pg_logical_slot_get_changes call
+# should show only a few records decoded from the entire two phase transaction
+#
+# We use two slots to test multiple decoding backends here
+
+$node_logical->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab VALUES (11);
+ INSERT INTO tab VALUES (12);
+ ALTER TABLE tab ADD COLUMN b INT;
+ PREPARE TRANSACTION 'test_prepared_tab';");
+
+# start decoding the above with decode-delay in the background.
+my $logical_connstr = $node_logical->connstr . ' dbname=postgres';
+
+# decode now, it should only decode 1 INSERT record and should include
+# an ABORT entry because of the ROLLBACK below
+system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1', 'decode-delay', '3');\" \&");
+
+system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1', 'decode-delay', '3');\" \&");
+
+# sleep for a little while (shorter than decode-delay)
+$node_logical->safe_psql('postgres', "select pg_sleep(1)");
+
+# rollback the prepared transaction whose first record is being decoded
+# after sleeping for decode-delay time
+$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+# wait for decoding to stop
+$node_logical->psql('postgres', "select pg_sleep(4)");
+
+# consume any remaining changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');");
+
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');");
+
+# check for occurrence of log about waiting backends
+my $output_file = slurp_file($node_logical->logfile());
+my $waiting_str = "Waiting for backends to abort";
+like($output_file, qr/$waiting_str/, "Waiting log found in server log");
+
+# check for occurrence of log about stopping decoding
+my $abort_str = "stopping decoding of test_prepared_tab ";
+like($output_file, qr/$abort_str/, "ABORT found in server log");
+
+# Check that commit prepared is decoded properly on immediate restart
+$node_logical->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab VALUES (11);
+ INSERT INTO tab VALUES (12);
+ ALTER TABLE tab ADD COLUMN b INT;
+ INSERT INTO tab VALUES (13, 11);
+ PREPARE TRANSACTION 'test_prepared_tab';");
+# consume changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');");
+$node_logical->stop('immediate');
+$node_logical->start;
+
+# commit post the restart
+$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1', 'twophase-decode-with-catalog-changes', '1');");
+
+# check inserts are visible
+my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);");
+is($result, qq(3), 'Rows inserted via 2PC are visible on restart');
+
+$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');");
+$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2');");
+$node_logical->stop('fast');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 0f18afa852..477c950b8d 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -24,6 +24,8 @@
#include "replication/message.h"
#include "replication/origin.h"
+#include "storage/procarray.h"
+
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -46,6 +48,9 @@ typedef struct
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
+ bool twophase_decoding;
+ bool twophase_decode_with_catalog_changes;
+ int decode_delay; /* seconds to sleep after every change record */
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +64,8 @@ static void pg_output_begin(LogicalDecodingContext *ctx,
bool last_write);
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_abort_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
@@ -68,6 +75,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static bool pg_filter_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ TransactionId xid, const char *gid);
+static bool pg_filter_decode_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
void
_PG_init(void)
@@ -85,9 +106,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
cb->commit_cb = pg_decode_commit_txn;
+ cb->abort_cb = pg_decode_abort_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->filter_prepare_cb = pg_filter_prepare;
+ cb->filter_decode_txn_cb = pg_filter_decode_txn;
+ cb->prepare_cb = pg_decode_prepare_txn;
+ cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+ cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
}
@@ -107,6 +134,9 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_timestamp = false;
data->skip_empty_xacts = false;
data->only_local = false;
+ data->twophase_decoding = false;
+ data->twophase_decode_with_catalog_changes = false;
+ data->decode_delay = 0;
ctx->output_plugin_private = data;
@@ -156,7 +186,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
{
-
if (elem->arg == NULL)
data->skip_empty_xacts = true;
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
@@ -167,7 +196,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
else if (strcmp(elem->defname, "only-local") == 0)
{
-
if (elem->arg == NULL)
data->only_local = true;
else if (!parse_bool(strVal(elem->arg), &data->only_local))
@@ -176,6 +204,41 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "twophase-decoding") == 0)
+ {
+ if (elem->arg == NULL)
+ data->twophase_decoding = true;
+ else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
+ else if (strcmp(elem->defname, "twophase-decode-with-catalog-changes") == 0)
+ {
+ if (elem->arg == NULL)
+ data->twophase_decode_with_catalog_changes = true;
+ else if (!parse_bool(strVal(elem->arg), &data->twophase_decode_with_catalog_changes))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
+ else if (strcmp(elem->defname, "decode-delay") == 0)
+ {
+ if (elem->arg == NULL)
+ data->decode_delay = 2; /* default to 2 seconds */
+ else
+ data->decode_delay = pg_atoi(strVal(elem->arg),
+ sizeof(int), 0);
+
+ if (data->decode_delay <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("Specify positive value for parameter \"%s\","
+ " you specified \"%s\"",
+ elem->defname, strVal(elem->arg))));
+ }
else
{
ereport(ERROR,
@@ -244,6 +307,156 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+/* ABORT callback */
+static void
+pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "ABORT %u", txn->xid);
+ else
+ appendStringInfoString(ctx->out, "ABORT");
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/* Filter out unnecessary two-phase transactions */
+static bool
+pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ TransactionId xid, const char *gid)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ /* treat all transactions as one-phase */
+ if (!data->twophase_decoding)
+ return true;
+
+ if (txn && txn_has_catalog_changes(txn) &&
+ !data->twophase_decode_with_catalog_changes)
+ return true;
+
+ /*
+ * even if txn is NULL, decode since twophase_decoding is set
+ */
+ return false;
+}
+
+/*
+ * Check if we should continue to decode this transaction.
+ *
+ * If it has aborted in the meanwhile, then there's no sense
+ * in decoding and sending the rest of the changes, we might
+ * as well ask the subscribers to abort immediately.
+ *
+ * This should be called if we are streaming a transaction
+ * before it's committed or if we are decoding a 2PC
+ * transaction. Otherwise we always decode committed
+ * transactions
+ *
+ * Additional checks can be added here, as needed
+ */
+static bool
+pg_filter_decode_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ /*
+ * Due to caching, repeated TransactionIdDidAbort calls
+ * shouldn't be that expensive
+ */
+ if (txn != NULL &&
+ TransactionIdIsValid(txn->xid) &&
+ TransactionIdDidAbort(txn->xid))
+ return true;
+
+ /* if txn is NULL, filter it out */
+ return (txn != NULL)? false:true;
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, " %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (!data->twophase_decoding)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, " %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (!data->twophase_decoding)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+ quote_literal_cstr(txn->gid));
+
+ if (data->include_xids)
+ appendStringInfo(ctx->out, " %u", txn->xid);
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
static bool
pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
@@ -409,8 +622,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
data->xact_wrote_changes = true;
+ if (!LogicalLockTransaction(txn))
+ return;
+ /* if decode_delay is specified, sleep with above lock held */
+ if (data->decode_delay > 0)
+ {
+ elog(LOG, "sleeping for %d seconds", data->decode_delay);
+ pg_usleep(data->decode_delay * 1000000L);
+ }
class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);
+ LogicalUnlockTransaction(txn);
+
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 5501eed108..7edda72e5e 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -384,8 +384,14 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeAbortCB abort_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeAbortPreparedCB abort_prepared_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeFilterDecodeTxnCB filter_decode_txn_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
@@ -454,7 +460,12 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
never get
decoded. Successful savepoints are
folded into the transaction containing them in the order they were
- executed within that transaction.
+ executed within that transaction. A transaction that is prepared for
+ a two-phase commit using PREPARE TRANSACTION will
+ also be decoded if the output plugin callbacks needed for decoding
+ them are provided. It is possible that the current transaction which
+ is being decoded is aborted concurrently via a ROLLBACK PREPARED
+ command. In that case, the logical decoding will be aborted midways.
@@ -550,6 +561,74 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+
+ Transaction Prepare Callback
+
+
+ The optional prepare_cb callback is called whenever
+ a transaction which is prepared for two-phase commit has been
+ decoded. The change_cb callbacks for all modified
+ rows will have been called before this, if there have been any modified
+ rows.
+
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+
+
+
+
+ Commit Prepared Transaction Callback
+
+
+ The optional commit_prepared_cb callback is called whenever
+ a commit prepared transaction has been decoded. The gid field,
+ which is part of the txn parameter can be used in this
+ callback.
+
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+
+
+
+
+ Rollback Prepared Transaction Callback
+
+
+ The optional abort_prepared_cb callback is called whenever
+ a rollback prepared transaction has been decoded. The gid field,
+ which is part of the txn parameter can be used in this
+ callback.
+
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+
+
+
+
+ Transaction Abort Callback
+
+
+ The required abort_cb callback is called whenever
+ a transaction abort has to be initiated. This can happen if we are
+ decoding a transaction that has been prepared for two-phase commit and
+ a concurrent rollback happens while we are decoding it. It might make
+ sense, even before we commence decoding, in such cases to check if the
+ rollback happened even before we start looking at the changes to
+ completely avoid the decoding of such transactions.
+
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+
+
+
Change Callback
@@ -559,12 +638,30 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
an INSERT, UPDATE,
or DELETE. Even if the original command modified
several rows at once the callback will be called individually for each
- row.
+ row. The change_cb callback may access system or
+ user catalog tables to aid in the process of outputting the row
+ modification details. The change_cb call should invoke
+ LogicalLockTransaction function before such access of
+ system or user catalog tables. In case of decoding a prepared (but yet
+ uncommitted) transaction or decoding of an uncommitted transaction, this
+ function interlocks the decoding activity with simultaneous rollback by
+ another backend of this very same transaction. The
+ change_cb should invoke
+ LogicalUnlockTransaction function immediately after
+ the catalog tables access.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
+
+ Here's an example of the use of LogicalLockTransaction
+ and LogicalUnlockTransaction in an output plugin:
+
+ if (!LogicalLockTransaction(txn))
+ return;
+ relation = RelationIdGetRelation(reloid);
+ LogicalUnlockTransaction(txn);
The ctx and txn parameters
have the same contents as for the begin_cb
@@ -614,6 +711,53 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
+
+ Decode Filter Callback
+
+
+ The optional filter_decode_txn_cb callback
+ is called to determine whether data that is part of the current
+ transaction should be continued to be decoded.
+
+typedef bool (*LogicalDecodeFilterDecodeTxnCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+ The ctx parameter has the same contents
+ as for the other callbacks. The txn parameter
+ contains meta information about the transaction, like its XID.
+ Note however that it can be NULL in some cases. To signal that decoding process
+ should terminate, return true; false otherwise.
+
+
+
+
+ Prepare Filter Callback
+
+
+ The optional filter_prepare_cb callback
+ is called to determine whether data that is part of the current
+ two-phase commit transaction should be considered for decode
+ at this prepare stage or as a regular one-phase transaction at
+ COMMIT PREPARED time later. To signal that
+ decoding should be skipped, return true; false otherwise.
+
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ TransactionId xid,
+ const char *gid);
+
+ The ctx parameter has the same contents
+ as for the other callbacks. The txn parameter
+ contains meta information about the transaction. The xid
+ contains the XID because txn can be NULL in some cases.
+ The gid is the identifier that later identifies this
+ transaction for COMMIT PREPARED or ROLLBACK PREPARED.
+ The callback has to provide the same static answer for a given combination of
+ xid and gid every time it is
+ called. To signal that decoding should be skipped, return true; false otherwise.
+
+
+
Generic Message Callback
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index e5eef9ea43..b3e2fc3036 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
+
+ if (parsed->xinfo & XACT_XINFO_HAS_GID)
+ {
+ int gidlen;
+ strcpy(parsed->twophase_gid, data);
+ gidlen = strlen(parsed->twophase_gid) + 1;
+ data += MAXALIGN(gidlen);
+ }
}
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
data += sizeof(xl_xact_xinfo);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+ {
+ xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+ parsed->dbId = xl_dbinfo->dbId;
+ parsed->tsId = xl_dbinfo->tsId;
+
+ data += sizeof(xl_xact_dbinfo);
+ }
+
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
{
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
parsed->twophase_xid = xl_twophase->xid;
data += sizeof(xl_xact_twophase);
+
+ if (parsed->xinfo & XACT_XINFO_HAS_GID)
+ {
+ int gidlen;
+ strcpy(parsed->twophase_gid, data);
+ gidlen = strlen(parsed->twophase_gid) + 1;
+ data += MAXALIGN(gidlen);
+ }
+ }
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ xl_xact_origin xl_origin;
+
+ /* we're only guaranteed 4 byte alignment, so copy onto stack */
+ memcpy(&xl_origin, data, sizeof(xl_origin));
+
+ parsed->origin_lsn = xl_origin.origin_lsn;
+ parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+ data += sizeof(xl_xact_origin);
}
}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c479c4881b..97499707f7 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -144,11 +144,7 @@ int max_prepared_xacts = 0;
*
* typedef struct GlobalTransactionData *GlobalTransaction appears in
* twophase.h
- *
- * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
- * specified in TwoPhaseFileHeader.
*/
-#define GIDSIZE 200
typedef struct GlobalTransactionData
{
@@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval);
+ bool initfileinval,
+ const char *gid);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels);
+ RelFileNode *rels,
+ const char *gid);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
@@ -556,7 +554,7 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
* Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
*/
static GlobalTransaction
-LockGXact(const char *gid, Oid user)
+LockGXact(const char *gid, Oid user, bool missing_ok)
{
int i;
@@ -616,7 +614,8 @@ LockGXact(const char *gid, Oid user)
LWLockRelease(TwoPhaseStateLock);
- ereport(ERROR,
+ if (!missing_ok)
+ ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("prepared transaction with identifier \"%s\" does not exist",
gid)));
@@ -898,7 +897,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
/*
* Header for a 2PC state file
*/
-#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
typedef struct TwoPhaseFileHeader
{
@@ -914,6 +913,8 @@ typedef struct TwoPhaseFileHeader
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
+ XLogRecPtr origin_lsn; /* lsn of this record at origin node */
+ TimestampTz origin_timestamp; /* time of prepare at origin node */
} TwoPhaseFileHeader;
/*
@@ -1065,6 +1066,7 @@ EndPrepare(GlobalTransaction gxact)
{
TwoPhaseFileHeader *hdr;
StateFileChunk *record;
+ bool replorigin;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1075,6 +1077,21 @@ EndPrepare(GlobalTransaction gxact)
Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);
+
+ if (replorigin)
+ {
+ Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+ hdr->origin_lsn = replorigin_session_origin_lsn;
+ hdr->origin_timestamp = replorigin_session_origin_timestamp;
+ }
+ else
+ {
+ hdr->origin_lsn = InvalidXLogRecPtr;
+ hdr->origin_timestamp = 0;
+ }
+
/*
* If the data size exceeds MaxAllocSize, we won't be able to read it in
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1107,7 +1124,16 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
+
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+ gxact->prepare_end_lsn);
+
XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1309,43 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+ TwoPhaseFileHeader *hdr;
+ char *bufptr;
+
+ hdr = (TwoPhaseFileHeader *) xlrec;
+ bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+ parsed->origin_lsn = hdr->origin_lsn;
+ parsed->origin_timestamp = hdr->origin_timestamp;
+ parsed->twophase_xid = hdr->xid;
+ parsed->dbId = hdr->database;
+ parsed->nsubxacts = hdr->nsubxacts;
+ parsed->ncommitrels = hdr->ncommitrels;
+ parsed->nabortrels = hdr->nabortrels;
+ parsed->nmsgs = hdr->ninvalmsgs;
+
+ strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+ bufptr += MAXALIGN(hdr->gidlen);
+
+ parsed->subxacts = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+ parsed->commitrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+ parsed->abortrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ parsed->msgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1365,7 +1428,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
*/
void
-FinishPreparedTransaction(const char *gid, bool isCommit)
+FinishPreparedTransaction(const char *gid, bool isCommit, bool missing_ok)
{
GlobalTransaction gxact;
PGPROC *proc;
@@ -1386,8 +1449,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* Validate the GID, and lock the GXACT to ensure that two backends do not
* try to commit the same GID at once.
+ *
+ * During logical decoding, on the apply side, it's possible that a prepared
+ * transaction got aborted while decoding. In that case, we stop the
+ * decoding and abort the transaction immediately. However the ROLLBACK
+ * prepared processing still reaches the subscriber. In that case it's ok
+ * to have a missing gid
*/
- gxact = LockGXact(gid, GetUserId());
+ gxact = LockGXact(gid, GetUserId(), missing_ok);
+ if (gxact == NULL)
+ {
+ Assert(missing_ok && !isCommit);
+ return;
+ }
+
proc = &ProcGlobal->allProcs[gxact->pgprocno];
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
xid = pgxact->xid;
@@ -1435,13 +1510,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
- hdr->initfileinval);
+ hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
- hdr->nabortrels, abortrels);
+ hdr->nabortrels, abortrels,
+ gid);
ProcArrayRemove(proc, latestXid);
+ /*
+ * Tell logical decoding backends interested in this XID
+ * that this is going away
+ */
+ LogicalDecodeRemoveTransaction(proc, isCommit);
/*
* In case we fail while running the callbacks, mark the gxact invalid so
@@ -1752,7 +1833,8 @@ restoreTwoPhaseData(void)
if (buf == NULL)
continue;
- PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
+ PrepareRedoAdd(buf, InvalidXLogRecPtr,
+ InvalidXLogRecPtr, InvalidRepOriginId);
}
}
LWLockRelease(TwoPhaseStateLock);
@@ -2165,7 +2247,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval)
+ bool initfileinval,
+ const char *gid)
{
XLogRecPtr recptr;
TimestampTz committs = GetCurrentTimestamp();
@@ -2193,7 +2276,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
ninvalmsgs, invalmsgs,
initfileinval, false,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
if (replorigin)
@@ -2255,7 +2338,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels)
+ RelFileNode *rels,
+ const char *gid)
{
XLogRecPtr recptr;
@@ -2278,7 +2362,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
nchildren, children,
nrels, rels,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
@@ -2309,7 +2393,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
* data, the entry is marked as located on disk.
*/
void
-PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn, RepOriginId origin_id)
{
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
char *bufptr;
@@ -2358,6 +2443,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+ if (origin_id != InvalidRepOriginId)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
+ false /* backward */ , false /* WAL */ );
+ }
+
elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index ea81f4b5de..e19fac4f7b 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1227,7 +1227,7 @@ RecordTransactionCommit(void)
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
- InvalidTransactionId /* plain commit */ );
+ InvalidTransactionId, NULL /* plain commit */ );
if (replorigin)
/* Move LSNs forward for this replication origin */
@@ -1579,7 +1579,8 @@ RecordTransactionAbort(bool isSubXact)
XactLogAbortRecord(xact_time,
nchildren, children,
nrels, rels,
- MyXactFlags, InvalidTransactionId);
+ MyXactFlags, InvalidTransactionId,
+ NULL);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -5247,7 +5248,6 @@ xactGetCommittedChildren(TransactionId **ptr)
* XLOG support routines
*/
-
/*
* Log the commit record for a plain or twophase transaction commit.
*
@@ -5260,7 +5260,8 @@ XactLogCommitRecord(TimestampTz commit_time,
int nrels, RelFileNode *rels,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
- int xactflags, TransactionId twophase_xid)
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid)
{
xl_xact_commit xlrec;
xl_xact_xinfo xl_xinfo;
@@ -5272,6 +5273,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_origin xl_origin;
uint8 info;
+ int gidlen = 0;
Assert(CritSectionCount > 0);
@@ -5334,6 +5336,13 @@ XactLogCommitRecord(TimestampTz commit_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ Assert(twophase_gid != NULL);
+
+ if (XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+ gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+ }
}
/* dump transaction origin information */
@@ -5384,8 +5393,19 @@ XactLogCommitRecord(TimestampTz commit_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+ {
+ static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+ XLogRegisterData((char*) twophase_gid, gidlen);
+ if (MAXALIGN(gidlen) != gidlen)
+ XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+ }
+ }
+
+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@@ -5405,15 +5425,19 @@ XLogRecPtr
XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- int xactflags, TransactionId twophase_xid)
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid)
{
xl_xact_abort xlrec;
xl_xact_xinfo xl_xinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_twophase xl_twophase;
+ xl_xact_dbinfo xl_dbinfo;
+ xl_xact_origin xl_origin;
uint8 info;
+ int gidlen = 0;
Assert(CritSectionCount > 0);
@@ -5449,6 +5473,31 @@ XactLogAbortRecord(TimestampTz abort_time,
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
xl_twophase.xid = twophase_xid;
+ Assert(twophase_gid != NULL);
+
+ if (XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+ gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+ }
+ }
+
+ if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+ xl_dbinfo.dbId = MyDatabaseId;
+ xl_dbinfo.tsId = MyDatabaseTableSpace;
+ }
+
+ /* dump transaction origin information only for abort prepared */
+ if ( (replorigin_session_origin != InvalidRepOriginId) &&
+ TransactionIdIsValid(twophase_xid) &&
+ XLogLogicalInfoActive())
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+ xl_origin.origin_lsn = replorigin_session_origin_lsn;
+ xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
}
if (xl_xinfo.xinfo != 0)
@@ -5463,6 +5512,9 @@ XactLogAbortRecord(TimestampTz abort_time,
if (xl_xinfo.xinfo != 0)
XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+ XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
{
XLogRegisterData((char *) (&xl_subxacts),
@@ -5480,7 +5532,23 @@ XactLogAbortRecord(TimestampTz abort_time,
}
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+ {
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+ {
+ static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+ XLogRegisterData((char*) twophase_gid, gidlen);
+ if (MAXALIGN(gidlen) != gidlen)
+ XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+ }
+ }
+
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+ XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+ if (TransactionIdIsValid(twophase_xid))
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
return XLogInsert(RM_XACT_ID, info);
}
@@ -5803,7 +5871,8 @@ xact_redo(XLogReaderState *record)
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
PrepareRedoAdd(XLogRecGetData(record),
record->ReadRecPtr,
- record->EndRecPtr);
+ record->EndRecPtr,
+ XLogRecGetOrigin(record));
LWLockRelease(TwoPhaseStateLock);
}
else if (info == XLOG_XACT_ASSIGNMENT)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 6eb0d5527e..b45739d971 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
#include "access/xlogutils.h"
#include "access/xlogreader.h"
#include "access/xlogrecord.h"
+#include "access/twophase.h"
#include "catalog/pg_control.h"
@@ -72,6 +73,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -280,16 +283,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
}
case XLOG_XACT_PREPARE:
+ {
+ xl_xact_parsed_prepare parsed;
- /*
- * Currently decoding ignores PREPARE TRANSACTION and will just
- * decode the transaction when the COMMIT PREPARED is sent or
- * throw away the transaction's contents when a ROLLBACK PREPARED
- * is received. In the future we could add code to expose prepared
- * transactions in the changestream allowing for a kind of
- * distributed 2PC.
- */
- ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+ /* check that output plugin is capable of twophase decoding */
+ if (!ctx->enable_twophase)
+ {
+ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+ break;
+ }
+
+ /* ok, parse it */
+ ParsePrepareRecord(XLogRecGetInfo(buf->record),
+ XLogRecGetData(buf->record), &parsed);
+
+ /* does output plugin want this particular transaction? */
+ if (ctx->callbacks.filter_prepare_cb &&
+ ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+ parsed.twophase_gid))
+ {
+ ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+ buf->origptr);
+ break;
+ }
+
+ DecodePrepare(ctx, buf, &parsed);
+ break;
+ }
break;
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -627,9 +647,71 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
buf->origptr, buf->endptr);
}
+ if (TransactionIdIsValid(parsed->twophase_xid) &&
+ ReorderBufferTxnIsPrepared(ctx->reorder,
+ parsed->twophase_xid, parsed->twophase_gid))
+ {
+ Assert(xid == parsed->twophase_xid);
+ /* we are processing COMMIT PREPARED */
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
+ }
+ else
+ {
+ /* replay actions of all transaction + subtransactions in order */
+ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+ }
+}
+
+/*
+ * Decode PREPARE record. Similar logic as in COMMIT
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed)
+{
+ XLogRecPtr origin_lsn = parsed->origin_lsn;
+ TimestampTz commit_time = parsed->origin_timestamp;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ int i;
+ TransactionId xid = parsed->twophase_xid;
+
+ /*
+ * Process invalidation messages, even if we're not interested in the
+ * transaction's contents, since the various caches need to always be
+ * consistent.
+ */
+ if (parsed->nmsgs > 0)
+ {
+ ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+ parsed->nmsgs, parsed->msgs);
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+ }
+
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+ FilterByOrigin(ctx, origin_id))
+ {
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+ }
+ ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+ return;
+ }
+
+ /* tell the reorderbuffer about the surviving subtransactions */
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+ buf->origptr, buf->endptr);
+ }
+
/* replay actions of all transaction + subtransactions in order */
- ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time, origin_id, origin_lsn);
+ ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn, parsed->twophase_gid);
}
/*
@@ -641,6 +723,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid)
{
int i;
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ TimestampTz commit_time = 0;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ origin_lsn = parsed->origin_lsn;
+ commit_time = parsed->origin_timestamp;
+ }
+
+ /*
+ * If it's ROLLBACK PREPARED then handle it via callbacks.
+ */
+ if (TransactionIdIsValid(xid) &&
+ !SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
+ parsed->dbId == ctx->slot->data.database &&
+ !FilterByOrigin(ctx, origin_id) &&
+ ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid))
+ {
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn,
+ parsed->twophase_gid, false);
+ return;
+ }
for (i = 0; i < parsed->nsubxacts; i++)
{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 7637efc32e..50c08acc34 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -60,6 +60,18 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static bool filter_decode_txn_cb_wrapper(ReorderBuffer *cache,
+ ReorderBufferTXN *txn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ TransactionId xid, const char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -125,6 +137,7 @@ StartupDecodingContext(List *output_plugin_options,
MemoryContext context,
old_context;
LogicalDecodingContext *ctx;
+ int twophase_callbacks;
/* shorter lines... */
slot = MyReplicationSlot;
@@ -184,8 +197,27 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->begin = begin_cb_wrapper;
ctx->reorder->apply_change = change_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
+ ctx->reorder->abort = abort_cb_wrapper;
+ ctx->reorder->filter_decode_txn = filter_decode_txn_cb_wrapper;
+ ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+ ctx->reorder->prepare = prepare_cb_wrapper;
+ ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+ ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ /* check that plugin implements all callbacks necessary to perform 2PC */
+ twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) +
+ (ctx->callbacks.commit_prepared_cb != NULL) +
+ (ctx->callbacks.abort_prepared_cb != NULL);
+
+ ctx->enable_twophase = (twophase_callbacks == 3);
+
+ if (twophase_callbacks != 3 && twophase_callbacks != 0)
+ ereport(WARNING,
+ (errmsg("Output plugin registered only %d twophase callbacks. "
+ "Twophase transactions will be decoded at commit time.",
+ twophase_callbacks)));
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -693,6 +725,122 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "abort";
+ state.report_location = txn->final_lsn; /* beginning of abort record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /* do the actual work: call callback */
+ ctx->callbacks.abort_cb(ctx, txn, abort_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+ static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "prepare";
+ state.report_location = txn->final_lsn; /* beginning of commit record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /* do the actual work: call callback */
+ ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "commit_prepared";
+ state.report_location = txn->final_lsn; /* beginning of commit record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /* do the actual work: call callback */
+ ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "abort_prepared";
+ state.report_location = txn->final_lsn; /* beginning of commit record */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /* do the actual work: call callback */
+ ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
@@ -730,6 +878,62 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static bool
+filter_decode_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_decode_txn";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_decode_txn_cb(ctx, txn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+ return ret;
+}
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ TransactionId xid, const char *gid)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_prepare";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+ return ret;
+}
+
bool
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
@@ -1013,3 +1217,164 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
SpinLockRelease(&MyReplicationSlot->mutex);
}
}
+
+bool
+LogicalLockTransaction(ReorderBufferTXN *txn)
+{
+ bool ok = false;
+
+ /*
+ * Prepared transactions and uncommitted transactions
+ * that have modified catalogs need to interlock with
+ * concurrent rollback to ensure that there are no
+ * issues while decoding
+ */
+
+ if (!txn_has_catalog_changes(txn))
+ return true;
+
+ /*
+ * Is it a prepared txn? Similar checks for uncommitted
+ * transactions when we start supporting them
+ */
+ if (!txn_prepared(txn))
+ return true;
+
+ /* check cached status */
+ if (txn_commit(txn))
+ return true;
+ if (txn_rollback(txn))
+ return false;
+
+ /*
+ * Find the PROC that is handling this XID and add ourself as a
+ * decodeGroupMember
+ */
+ if (MyProc->decodeGroupLeader == NULL)
+ {
+ PGPROC *proc = BecomeDecodeGroupLeader(txn->xid, txn_prepared(txn));
+
+ /*
+ * If decodeGroupLeader is NULL, then the only possibility
+ * is that the transaction completed and went away
+ */
+ if (proc == NULL)
+ {
+ Assert(!TransactionIdIsInProgress(txn->xid));
+ if (TransactionIdDidCommit(txn->xid))
+ {
+ txn->txn_flags |= TXN_COMMIT;
+ return true;
+ }
+ else
+ {
+ txn->txn_flags |= TXN_ROLLBACK;
+ return false;
+ }
+ }
+
+ /* Add ourself as a decodeGroupMember */
+ if (!BecomeDecodeGroupMember(proc, proc->pid, txn_prepared(txn)))
+ {
+ Assert(!TransactionIdIsInProgress(txn->xid));
+ if (TransactionIdDidCommit(txn->xid))
+ {
+ txn->txn_flags |= TXN_COMMIT;
+ return true;
+ }
+ else
+ {
+ txn->txn_flags |= TXN_ROLLBACK;
+ return false;
+ }
+ }
+ }
+
+ /*
+ * If we were able to add ourself, then Abort processing will
+ * interlock with us. Check if the transaction is still around
+ */
+ Assert(MyProc->decodeGroupLeader);
+
+ if (MyProc->decodeGroupLeader)
+ {
+ LWLock *leader_lwlock;
+
+ leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader);
+ LWLockAcquire(leader_lwlock, LW_SHARED);
+ if (MyProc->decodeAbortPending)
+ {
+ /*
+ * Remove ourself from the decodeGroupMembership and return
+ * false so that the decoding plugin also initiates abort
+ * processing
+ */
+ LWLockRelease(leader_lwlock);
+ LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+ RemoveDecodeGroupMemberLocked(MyProc->decodeGroupLeader);
+ /* reset the bool to let the leader know that we are going away */
+ MyProc->decodeAbortPending = false;
+ MyProc->decodeLocked = false;
+ txn->txn_flags |= TXN_ROLLBACK;
+ ok = false;
+ }
+ else
+ {
+ MyProc->decodeLocked = true;
+ ok = true;
+ }
+ LWLockRelease(leader_lwlock);
+ }
+ else
+ return false;
+
+ return ok;
+}
+
+void
+LogicalUnlockTransaction(ReorderBufferTXN *txn)
+{
+ LWLock *leader_lwlock;
+
+ /*
+ * Prepared transactions and uncommitted transactions
+ * that have modified catalogs need to interlock with
+ * concurrent rollback to ensure that there are no
+ * issues while decoding
+ */
+
+ if (!txn_has_catalog_changes(txn))
+ return;
+
+ /*
+ * Is it a prepared txn? Similar checks for uncommitted
+ * transactions when we start supporting them
+ */
+ if (!txn_prepared(txn))
+ return;
+
+ /* check cached status */
+ if (txn_commit(txn))
+ return;
+ if (txn_rollback(txn))
+ return;
+
+ Assert(MyProc->decodeGroupLeader);
+ leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader);
+ LWLockAcquire(leader_lwlock, LW_SHARED);
+ if (MyProc->decodeAbortPending)
+ {
+ /*
+ * Remove ourself from the decodeGroupMembership
+ */
+ LWLockRelease(leader_lwlock);
+ LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+ RemoveDecodeGroupMemberLocked(MyProc->decodeGroupLeader);
+ /* reset the bool to let the leader know that we are going away */
+ MyProc->decodeAbortPending = false;
+ txn->txn_flags |= TXN_ROLLBACK;
+ }
+ MyProc->decodeLocked = false;
+ LWLockRelease(leader_lwlock);
+ return;
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 948343e4ae..5d33931223 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -72,10 +72,11 @@ void
logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- uint8 flags = 0;
+ uint8 flags = 0;
pq_sendbyte(out, 'C'); /* sending COMMIT */
+ flags |= LOGICALREP_IS_COMMIT;
/* send the flags field (unused for now) */
pq_sendbyte(out, flags);
@@ -86,21 +87,106 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
}
/*
- * Read transaction COMMIT from the stream.
+ * Write ABORT to the output stream.
+ */
+void
+logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'C'); /* sending ABORT flag below */
+
+ flags |= LOGICALREP_IS_ABORT;
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, abort_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Read transaction COMMIT|ABORT from the stream.
*/
void
-logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
+logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data,
+ uint8 *flags)
{
- /* read flags (unused for now) */
- uint8 flags = pq_getmsgbyte(in);
+ /* read flags */
+ uint8 commit_flags = pq_getmsgbyte(in);
- if (flags != 0)
- elog(ERROR, "unrecognized flags %u in commit message", flags);
+ if (!(commit_flags & LOGICALREP_COMMIT_MASK))
+ elog(ERROR, "unrecognized flags %u in commit|abort message",
+ commit_flags);
/* read fields */
commit_data->commit_lsn = pq_getmsgint64(in);
commit_data->end_lsn = pq_getmsgint64(in);
commit_data->committime = pq_getmsgint64(in);
+
+ /* set gid to empty */
+ commit_data->gid[0] = '\0';
+
+ *flags = commit_flags;
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'P'); /* sending PREPARE protocol */
+
+ if (txn->txn_flags & TXN_COMMIT_PREPARED)
+ flags |= LOGICALREP_IS_COMMIT_PREPARED;
+ else if (txn->txn_flags & TXN_ROLLBACK_PREPARED)
+ flags |= LOGICALREP_IS_ROLLBACK_PREPARED;
+ else if (txn->txn_flags & TXN_PREPARE)
+ flags |= LOGICALREP_IS_PREPARE;
+
+ if (flags == 0)
+ elog(ERROR, "unrecognized flags %u in [commit|rollback] prepare message", flags);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->commit_time);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepCommitData *commit_data, uint8 *flags)
+{
+ /* read flags */
+ uint8 prep_flags = pq_getmsgbyte(in);
+
+ if (!(prep_flags & LOGICALREP_PREPARE_MASK))
+ elog(ERROR, "unrecognized flags %u in prepare message", prep_flags);
+
+ /* read fields */
+ commit_data->commit_lsn = pq_getmsgint64(in);
+ commit_data->end_lsn = pq_getmsgint64(in);
+ commit_data->committime = pq_getmsgint64(in);
+
+ /* read gid */
+ strcpy(commit_data->gid, pq_getmsgstring(in));
+
+ /* set flags */
+ *flags = prep_flags;
}
/*
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c72a611a39..0b45f4f20f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -623,7 +623,7 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
if (prev_first_lsn != InvalidXLogRecPtr)
Assert(prev_first_lsn < cur_txn->first_lsn);
- Assert(!cur_txn->is_known_as_subxact);
+ Assert(!txn_is_subxact(cur_txn));
prev_first_lsn = cur_txn->first_lsn;
}
#endif
@@ -641,7 +641,7 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb)
txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
- Assert(!txn->is_known_as_subxact);
+ Assert(!txn_is_subxact(txn));
Assert(txn->first_lsn != InvalidXLogRecPtr);
return txn;
}
@@ -675,9 +675,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
dlist_push_tail(&txn->subtxns, &subtxn->node);
txn->nsubtxns++;
}
- else if (!subtxn->is_known_as_subxact)
+ else if (!txn_is_subxact(subtxn))
{
- subtxn->is_known_as_subxact = true;
+ subtxn->txn_flags |= TXN_IS_SUBXACT;
Assert(subtxn->nsubtxns == 0);
/* remove from lsn order list of top-level transactions */
@@ -738,9 +738,9 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
subtxn->final_lsn = commit_lsn;
subtxn->end_lsn = end_lsn;
- if (!subtxn->is_known_as_subxact)
+ if (!txn_is_subxact(subtxn))
{
- subtxn->is_known_as_subxact = true;
+ subtxn->txn_flags |= TXN_IS_SUBXACT;
Assert(subtxn->nsubtxns == 0);
/* remove from lsn order list of top-level transactions */
@@ -849,7 +849,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
ReorderBufferChange *cur_change;
- if (txn->serialized)
+ if (txn_is_serialized(txn))
{
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn);
@@ -878,7 +878,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
ReorderBufferChange *cur_change;
- if (cur_txn->serialized)
+ if (txn_is_serialized(cur_txn))
{
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, cur_txn);
@@ -1044,7 +1044,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
* they originally were happening inside another subtxn, so we won't
* ever recurse more than one level deep here.
*/
- Assert(subtxn->is_known_as_subxact);
+ Assert(txn_is_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
ReorderBufferCleanupTXN(rb, subtxn);
@@ -1083,7 +1083,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/*
* Remove TXN from its containing list.
*
- * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
+ * Note: if txn_is_subxact(), we are deleting the TXN from its
* parent's list of known subxacts; this leaves the parent's nsubxacts
* count too high, but we don't care. Otherwise, we are deleting the TXN
* from the LSN-ordered list of toplevel TXNs.
@@ -1098,7 +1098,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(found);
/* remove entries spilled to disk */
- if (txn->serialized)
+ if (txn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
/* deallocate */
@@ -1115,7 +1115,7 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_iter iter;
HASHCTL hash_ctl;
- if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
+ if (!txn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
return;
memset(&hash_ctl, 0, sizeof(hash_ctl));
@@ -1264,25 +1264,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
* the top and subtransactions (using a k-way merge) and replay the changes in
* lsn order.
*/
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+ ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
- ReorderBufferTXN *txn;
volatile Snapshot snapshot_now;
volatile CommandId command_id = FirstCommandId;
bool using_subtxn;
ReorderBufferIterTXNState *volatile iterstate = NULL;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
-
- /* unknown transaction, nothing to replay */
- if (txn == NULL)
- return;
-
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
@@ -1326,20 +1319,62 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferChange *change;
ReorderBufferChange *specinsert = NULL;
+ bool change_cleanup = false;
+ bool check_txn_status,
+ apply_started = false;
+ bool is_prepared = txn_prepared(txn);
+
+ /*
+ * check for the xid once to see if it's already
+ * committed. Otherwise we need to consult the
+ * decode_txn filter function to enquire if it's
+ * still ok for us to continue to decode this xid
+ *
+ * This is to handle cases of concurrent abort
+ * happening parallel to the decode activity
+ */
+ check_txn_status = TransactionIdDidCommit(txn->xid)?
+ false : true;
if (using_subtxn)
BeginInternalSubTransaction("replay");
else
StartTransactionCommand();
- rb->begin(rb, txn);
-
iterstate = ReorderBufferIterTXNInit(rb, txn);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
{
Relation relation = NULL;
Oid reloid;
+ /*
+ * While decoding 2PC or while streaming uncommitted
+ * transactions, check if this transaction needs to
+ * be still decoded. If the transaction got aborted
+ * or if we were instructed to stop decoding, then
+ * bail out early.
+ */
+ if (check_txn_status && rb->filter_decode_txn(rb, txn))
+ {
+ elog(LOG, "%s decoding of %s (%u)",
+ apply_started? "stopping":"skipping",
+ is_prepared? txn->gid:"",
+ txn->xid);
+ change_cleanup = true;
+ goto change_cleanuptxn;
+ }
+
+ /*
+ * We have decided to apply changes based on the go
+ * ahead from the above decode filter, BEGIN the
+ * transaction on the other side
+ */
+ if (apply_started == false)
+ {
+ rb->begin(rb, txn);
+ apply_started = true;
+ }
+
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -1375,7 +1410,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
relpathperm(change->data.tp.relnode,
MAIN_FORKNUM));
+ /* Lock transaction before catalog access */
+ if (!LogicalLockTransaction(txn))
+ {
+ elog(LOG, "stopping decoding of %s (%u)",
+ is_prepared? txn->gid:"",
+ txn->xid);
+ change_cleanup = true;
+ goto change_cleanuptxn;
+ }
relation = RelationIdGetRelation(reloid);
+ LogicalUnlockTransaction(txn);
if (relation == NULL)
elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
@@ -1546,6 +1591,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
}
+change_cleanuptxn:
/*
* There's a speculative insertion remaining, just clean in up, it
* can't have been successful, otherwise we'd gotten a confirmation
@@ -1561,8 +1607,24 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;
- /* call commit callback */
- rb->commit(rb, txn, commit_lsn);
+ if (change_cleanup)
+ {
+ /* call abort if we have sent any changes */
+ if (apply_started)
+ rb->abort(rb, txn, commit_lsn);
+ }
+ else
+ {
+ /* call commit or prepare callback */
+ if (txn_prepared(txn))
+ rb->prepare(rb, txn, commit_lsn);
+ else
+ rb->commit(rb, txn, commit_lsn);
+ }
+
+ /* remove ourself from the decodeGroupLeader */
+ if (MyProc->decodeGroupLeader)
+ RemoveDecodeGroupMember(MyProc->decodeGroupLeader);
/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -1589,7 +1651,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (snapshot_now->copied)
ReorderBufferFreeSnap(rb, snapshot_now);
- /* remove potential on-disk data, and deallocate */
+ /*
+ * remove potential on-disk data, and deallocate.
+ *
+ * We remove it even for prepared transactions.
+ * This is because the COMMIT PREPARED needs
+ * no data post the successful PREPARE
+ */
ReorderBufferCleanupTXN(rb, txn);
}
PG_CATCH();
@@ -1623,6 +1691,136 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_END_TRY();
}
+/*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ return rb->filter_prepare(rb, txn, xid, gid);
+}
+
+
+/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+ commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare a twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all prepared transactions need to be decoded at PREPARE time.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= TXN_PREPARE;
+ strcpy(txn->gid, gid);
+
+ ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+ commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to subscribers.
+ * Called while handling commit|abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+ const char *gid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /*
+ * Always call the prepare filter. It's the job of the prepare
+ * filter to give us the *same* response for a given xid
+ * across multiple calls (including ones on restart)
+ */
+ return !(rb->filter_prepare(rb, txn, xid, gid));
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid, bool is_commit)
+{
+ ReorderBufferTXN *txn;
+
+ /*
+ * The transaction may or may not exist (during restarts for
+ * example). Anyways, 2PC transactions do not contain any
+ * reorderbuffers. So allow it to be created below.
+ */
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+ true);
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+ strcpy(txn->gid, gid);
+
+ if (is_commit)
+ {
+ txn->txn_flags |= TXN_COMMIT_PREPARED;
+ rb->commit_prepared(rb, txn, commit_lsn);
+ }
+ else
+ {
+ txn->txn_flags |= TXN_ROLLBACK_PREPARED;
+ rb->abort_prepared(rb, txn, commit_lsn);
+ }
+
+ /* cleanup: make sure there's no cache pollution */
+ ReorderBufferExecuteInvalidations(rb, txn);
+ ReorderBufferCleanupTXN(rb, txn);
+}
+
/*
* Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid.
@@ -1688,7 +1886,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
* final_lsn to that of their last change; this causes
* ReorderBufferRestoreCleanup to do the right thing.
*/
- if (txn->serialized && txn->final_lsn == 0)
+ if (txn_is_serialized(txn) && txn->final_lsn == 0)
{
ReorderBufferChange *last =
dlist_tail_element(ReorderBufferChange, node, &txn->changes);
@@ -1934,7 +2132,7 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- txn->has_catalog_changes = true;
+ txn->txn_flags |= TXN_HAS_CATALOG_CHANGES;
}
/*
@@ -1951,7 +2149,7 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
if (txn == NULL)
return false;
- return txn->has_catalog_changes;
+ return txn_has_catalog_changes(txn);
}
/*
@@ -2095,7 +2293,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(spilled == txn->nentries_mem);
Assert(dlist_is_empty(&txn->changes));
txn->nentries_mem = 0;
- txn->serialized = true;
+ txn->txn_flags |= TXN_SERIALIZED;
if (fd != -1)
CloseTransientFile(fd);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 83c69092ae..15048378d1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -452,8 +452,9 @@ static void
apply_handle_commit(StringInfo s)
{
LogicalRepCommitData commit_data;
+ uint8 flags = 0;
- logicalrep_read_commit(s, &commit_data);
+ logicalrep_read_commit(s, &commit_data, &flags);
Assert(commit_data.commit_lsn == remote_final_lsn);
@@ -467,7 +468,11 @@ apply_handle_commit(StringInfo s)
replorigin_session_origin_lsn = commit_data.end_lsn;
replorigin_session_origin_timestamp = commit_data.committime;
- CommitTransactionCommand();
+ if (flags & LOGICALREP_IS_COMMIT)
+ CommitTransactionCommand();
+ else if (flags & LOGICALREP_IS_ABORT)
+ AbortCurrentTransaction();
+
pgstat_report_stat(false);
store_flush_position(commit_data.end_lsn);
@@ -487,6 +492,120 @@ apply_handle_commit(StringInfo s)
pgstat_report_activity(STATE_IDLE, NULL);
}
+static void
+apply_handle_prepare_txn(LogicalRepCommitData *commit_data)
+{
+ Assert(commit_data->commit_lsn == remote_final_lsn);
+ /* The synchronization worker runs in single transaction. */
+ if (IsTransactionState() && !am_tablesync_worker())
+ {
+ /* End the earlier transaction and start a new one */
+ BeginTransactionBlock();
+ CommitTransactionCommand();
+ StartTransactionCommand();
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data->end_lsn;
+ replorigin_session_origin_timestamp = commit_data->committime;
+
+ PrepareTransactionBlock(commit_data->gid);
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(commit_data->end_lsn);
+ }
+ else
+ {
+ /* Process any invalidation messages that might have accumulated. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
+
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data->end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_commit_prepared_txn(LogicalRepCommitData *commit_data)
+{
+ /* there is no transaction when COMMIT PREPARED is called */
+ ensure_transaction();
+
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data->end_lsn;
+ replorigin_session_origin_timestamp = commit_data->committime;
+
+ FinishPreparedTransaction(commit_data->gid, true, false);
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(commit_data->end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data->end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_rollback_prepared_txn(LogicalRepCommitData *commit_data)
+{
+ /* there is no transaction when ABORT/ROLLBACK PREPARED is called */
+ ensure_transaction();
+
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data->end_lsn;
+ replorigin_session_origin_timestamp = commit_data->committime;
+
+ FinishPreparedTransaction(commit_data->gid, false, true);
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(commit_data->end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data->end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+ LogicalRepCommitData commit_data;
+ uint8 flags = 0;
+
+ logicalrep_read_prepare(s, &commit_data, &flags);
+
+ if (flags & LOGICALREP_IS_PREPARE)
+ apply_handle_prepare_txn(&commit_data);
+ else if (flags & LOGICALREP_IS_COMMIT_PREPARED)
+ apply_handle_commit_prepared_txn(&commit_data);
+ else if (flags & LOGICALREP_IS_ROLLBACK_PREPARED)
+ apply_handle_rollback_prepared_txn(&commit_data);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("wrong [commit|rollback] prepare message")));
+}
+
/*
* Handle ORIGIN message.
*
@@ -884,10 +1003,14 @@ apply_dispatch(StringInfo s)
case 'B':
apply_handle_begin(s);
break;
- /* COMMIT */
+ /* COMMIT|ABORT */
case 'C':
apply_handle_commit(s);
break;
+ /* [COMMIT|ROLLBACK] PREPARE */
+ case 'P':
+ apply_handle_prepare(s);
+ break;
/* INSERT */
case 'I':
apply_handle_insert(s);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 40a1ef3c1d..55bdee9abe 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -37,11 +37,23 @@ static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pgoutput_abort_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
static void pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static bool pgoutput_filter_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, TransactionId xid, const char *gid);
+static bool pgoutput_decode_txn_filter(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
@@ -79,7 +91,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->commit_cb = pgoutput_commit_txn;
+ cb->abort_cb = pgoutput_abort_txn;
+
+ cb->filter_prepare_cb = pgoutput_filter_prepare;
+ cb->prepare_cb = pgoutput_prepare_txn;
+ cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+ cb->abort_prepared_cb = pgoutput_abort_prepared_txn;
+
cb->filter_by_origin_cb = pgoutput_origin_filter;
+ cb->filter_decode_txn_cb = pgoutput_decode_txn_filter;
cb->shutdown_cb = pgoutput_shutdown;
}
@@ -251,6 +271,61 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+/*
+ * ABORT callback
+ */
+static void
+pgoutput_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_abort(ctx->out, txn, abort_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Sends the decoded DML over wire.
*/
@@ -361,6 +436,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
}
+/*
+ * Filter out unnecessary two-phase transactions.
+ *
+ * Currently, we forward all two-phase transactions
+ */
+static bool
+pgoutput_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ TransactionId xid, const char *gid)
+{
+ return false;
+}
+
/*
* Currently we always forward.
*/
@@ -371,6 +458,37 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
return false;
}
+/*
+ * Check if we should continue to decode this transaction.
+ *
+ * If it has aborted in the meanwhile, then there's no sense
+ * in decoding and sending the rest of the changes, we might
+ * as well ask the subscribers to abort immediately.
+ *
+ * This should be called if we are streaming a transaction
+ * before it's committed or if we are decoding a 2PC
+ * transaction. Otherwise we always decode committed
+ * transactions
+ *
+ * Additional checks can be added here, as needed
+ */
+static bool
+pgoutput_decode_txn_filter(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ /*
+ * Due to caching, repeated TransactionIdDidAbort calls
+ * shouldn't be that expensive
+ */
+ if (txn != NULL &&
+ TransactionIdIsValid(txn->xid) &&
+ TransactionIdDidAbort(txn->xid))
+ return true;
+
+ /* if txn is NULL, filter it out */
+ return (txn != NULL)? false:true;
+}
+
/*
* Shutdown the output plugin.
*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 1a00011adc..f6bb4e509f 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2432,6 +2432,45 @@ BackendXidGetPid(TransactionId xid)
return result;
}
+/*
+ * BackendXidGetProc -- get a backend's PGPROC given its XID
+ *
+ * Note that it is up to the caller to be sure that the question
+ * remains meaningful for long enough for the answer to be used ...
+ *
+ * Only main transaction Ids are considered.
+ *
+ */
+PGPROC *
+BackendXidGetProc(TransactionId xid)
+{
+ PGPROC *result = NULL;
+ ProcArrayStruct *arrayP = procArray;
+ int index;
+
+ if (xid == InvalidTransactionId) /* never match invalid xid */
+ return 0;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ int pgprocno = arrayP->pgprocnos[index];
+ PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+
+ if (pgxact->xid == xid)
+ {
+ result = proc;
+ break;
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ return result;
+}
+
/*
* IsBackendPid -- is a given pid a running backend
*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f30e082b2..26d35c7807 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -267,6 +267,11 @@ InitProcGlobal(void)
/* Initialize lockGroupMembers list. */
dlist_init(&procs[i].lockGroupMembers);
+
+ /* Initialize decodeGroupMembers list. */
+ dlist_init(&procs[i].decodeGroupMembers);
+ procs[i].decodeAbortPending = false;
+ procs[i].decodeLocked = false;
}
/*
@@ -406,6 +411,12 @@ InitProcess(void)
Assert(MyProc->lockGroupLeader == NULL);
Assert(dlist_is_empty(&MyProc->lockGroupMembers));
+ /* Check that group decode fields are in a proper initial state. */
+ Assert(MyProc->decodeGroupLeader == NULL);
+ Assert(dlist_is_empty(&MyProc->decodeGroupMembers));
+ MyProc->decodeAbortPending = false;
+ MyProc->decodeLocked = false;
+
/* Initialize wait event information. */
MyProc->wait_event_info = 0;
@@ -581,6 +592,12 @@ InitAuxiliaryProcess(void)
Assert(MyProc->lockGroupLeader == NULL);
Assert(dlist_is_empty(&MyProc->lockGroupMembers));
+ /* Check that group decode fields are in a proper initial state. */
+ Assert(MyProc->decodeGroupLeader == NULL);
+ Assert(dlist_is_empty(&MyProc->decodeGroupMembers));
+ MyProc->decodeAbortPending = false;
+ MyProc->decodeLocked = false;
+
/*
* We might be reusing a semaphore that belonged to a failed process. So
* be careful and reinitialize its value here. (This is not strictly
@@ -1887,3 +1904,268 @@ BecomeLockGroupMember(PGPROC *leader, int pid)
return ok;
}
+
+/*
+ * BecomeDecodeGroupLeader - designate process as decode group leader
+ *
+ * Once this function has returned, other processes can join the decode group
+ * by calling BecomeDecodeGroupMember.
+ */
+PGPROC *
+BecomeDecodeGroupLeader(TransactionId xid, bool is_prepared)
+{
+ PGPROC *proc = NULL;
+ int pid;
+ LWLock *leader_lwlock;
+
+ Assert(xid != InvalidTransactionId);
+
+
+ proc = BackendXidGetProc(xid);
+ if (proc)
+ pid = proc->pid;
+
+ /*
+ * This proc will become decodeGroupLeader if it's
+ * not already
+ */
+ if (proc && proc->decodeGroupLeader != proc)
+ {
+ volatile PGXACT *pgxact;
+ /* Create single-member group, containing this proc. */
+ leader_lwlock = LockHashPartitionLockByProc(proc);
+ LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+ /* recheck we are still the same */
+ pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
+ if (proc->pid == pid && pgxact->xid == xid)
+ {
+ if (is_prepared)
+ Assert(pid == 0);
+ /* recheck if someone else did not already assign us */
+ if (proc->decodeGroupLeader != proc)
+ {
+ /* We had better not be a follower. */
+ Assert(proc->decodeGroupLeader == NULL);
+ proc->decodeGroupLeader = proc;
+ dlist_push_head(&proc->decodeGroupMembers,
+ &proc->decodeGroupLink);
+ }
+ }
+ else
+ {
+ /* proc entry is gone */
+ proc = NULL;
+ }
+ LWLockRelease(leader_lwlock);
+ }
+
+ elog(DEBUG1, "became group leader (%p)", proc);
+ return proc;
+}
+
+/*
+ * BecomeDecodeGroupMember - designate process as decode group member
+ *
+ * This is pretty straightforward except for the possibility that the leader
+ * whose group we're trying to join might exit before we manage to do so;
+ * and the PGPROC might get recycled for an unrelated process. To avoid
+ * that, we require the caller to pass the PID of the intended PGPROC as
+ * an interlock. Returns true if we successfully join the intended lock
+ * group, and false if not.
+ */
+bool
+BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared)
+{
+ LWLock *leader_lwlock;
+ bool ok = false;
+
+ /* Group leader can't become member of group */
+ Assert(MyProc != leader);
+
+ /* Can't already be a member of a group */
+ Assert(MyProc->decodeGroupLeader == NULL);
+
+ /* PID must be valid OR this is a prepared transaction. */
+ Assert(pid != 0 || is_prepared);
+
+ /*
+ * Get lock protecting the group fields. Note LockHashPartitionLockByProc
+ * accesses leader->pgprocno in a PGPROC that might be free. This is safe
+ * because all PGPROCs' pgprocno fields are set during shared memory
+ * initialization and never change thereafter; so we will acquire the
+ * correct lock even if the leader PGPROC is in process of being recycled.
+ */
+ leader_lwlock = LockHashPartitionLockByProc(leader);
+ LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+
+ /* Is this the leader we're looking for? */
+ if (leader->pid == pid && leader->decodeGroupLeader == leader)
+ {
+ if (is_prepared)
+ Assert(pid == 0);
+ /* is the leader going away? */
+ if (leader->decodeAbortPending)
+ ok = false;
+ else
+ {
+ /* OK, join the group */
+ ok = true;
+ MyProc->decodeGroupLeader = leader;
+ dlist_push_tail(&leader->decodeGroupMembers, &MyProc->decodeGroupLink);
+ }
+ }
+ else
+ MyProc->decodeGroupLeader = NULL;
+ LWLockRelease(leader_lwlock);
+
+ elog(DEBUG1, "became group member (%p) to (%p)", MyProc, leader);
+ return ok;
+}
+
+/*
+ * Remove a decodeGroupMember from the decodeGroupMembership of
+ * decodeGroupLeader
+ * Acquire lock
+ */
+void
+RemoveDecodeGroupMember(PGPROC *leader)
+{
+ LWLock *leader_lwlock;
+
+ leader_lwlock = LockHashPartitionLockByProc(leader);
+ LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+ RemoveDecodeGroupMemberLocked(leader);
+ LWLockRelease(leader_lwlock);
+
+ return;
+}
+
+/*
+ * Remove a decodeGroupMember from the decodeGroupMembership of
+ * decodeGroupLeader
+ * Assumes that the caller is holding appropriate lock
+ */
+void
+RemoveDecodeGroupMemberLocked(PGPROC *leader)
+{
+ Assert(!dlist_is_empty(&leader->decodeGroupMembers));
+ dlist_delete(&MyProc->decodeGroupLink);
+ /* leader links to itself, so never empty */
+ Assert(!dlist_is_empty(&leader->decodeGroupMembers));
+ MyProc->decodeGroupLeader = NULL;
+ elog(DEBUG1, "removed group member (%p) from (%p)", MyProc, leader);
+
+ return;
+}
+
+/*
+ * Indicate to all decodeGroupMembers that this transaction is
+ * going away.
+ *
+ * Wait for all decodeGroupMembers to ack back before returning
+ * from here but only in case of aborts.
+ *
+ * This function should be called *after* the proc has been
+ * removed from the procArray.
+ *
+ * If the transaction is committing, it's ok for the
+ * decoders to continue merrily. When it tries to lock this
+ * proc, it won't find it and check for transaction status
+ * and cache the commit status for future calls in
+ * LogicalLockTransaction
+ */
+void
+LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit)
+{
+ LWLock *leader_lwlock;
+ dlist_mutable_iter change_i;
+ dlist_iter iter;
+ PGPROC *proc;
+ bool do_wait;
+
+ leader_lwlock = LockHashPartitionLockByProc(leader);
+ LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+ /* mark ourself as aborting */
+ if (!isCommit)
+ leader->decodeAbortPending = true;
+
+ if (leader->decodeGroupLeader == NULL)
+ {
+ Assert(dlist_is_empty(&leader->decodeGroupMembers));
+ LWLockRelease(leader_lwlock);
+ return;
+ }
+
+recheck:
+ do_wait = false;
+ Assert(leader->decodeGroupLeader == leader);
+ Assert(!dlist_is_empty(&leader->decodeGroupMembers));
+ if (!isCommit)
+ {
+ dlist_foreach(iter, &leader->decodeGroupMembers)
+ {
+ proc = dlist_container(PGPROC, decodeGroupLink, iter.cur);
+ /* mark the proc to indicate abort is pending */
+ if (proc == leader)
+ continue;
+ if (!proc->decodeAbortPending)
+ {
+ proc->decodeAbortPending = true;
+ elog(DEBUG1, "marking group member (%p) from (%p) for abort",
+ proc, leader);
+ }
+ /* if the proc is currently locked, wait */
+ if (proc->decodeLocked)
+ do_wait = true;
+ }
+
+ if (do_wait)
+ {
+ int rc;
+ LWLockRelease(leader_lwlock);
+
+ elog(LOG, "Waiting for backends to abort decoding");
+ /*
+ * Wait on our latch to allow decodeGroupMembers to
+ * go away soon
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ 100L,
+ WAIT_EVENT_PG_SLEEP);
+ ResetLatch(MyLatch);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Recheck decodeGroupMembers */
+ LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+ goto recheck;
+ }
+ }
+
+ /*
+ * All backends exited cleanly in case of aborts above,
+ * remove decodeGroupMembers now for both commit/abort cases
+ */
+ Assert(leader->decodeGroupLeader == leader);
+ Assert(!dlist_is_empty(&leader->decodeGroupMembers));
+ dlist_foreach_modify(change_i, &leader->decodeGroupMembers)
+ {
+ proc = dlist_container(PGPROC, decodeGroupLink, change_i.cur);
+ Assert(!proc->decodeLocked);
+ dlist_delete(&proc->decodeGroupLink);
+ elog(DEBUG1, "deleting group member (%p) from (%p)",
+ proc, leader);
+ proc->decodeGroupLeader = NULL;
+ }
+ Assert(dlist_is_empty(&leader->decodeGroupMembers));
+ leader->decodeGroupLeader = NULL;
+ leader->decodeAbortPending = false;
+ LWLockRelease(leader_lwlock);
+
+ return;
+}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 3abe7d6155..8a6e0a1c2d 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -455,13 +455,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_COMMIT_PREPARED:
PreventTransactionChain(isTopLevel, "COMMIT PREPARED");
PreventCommandDuringRecovery("COMMIT PREPARED");
- FinishPreparedTransaction(stmt->gid, true);
+ FinishPreparedTransaction(stmt->gid, true, false);
break;
case TRANS_STMT_ROLLBACK_PREPARED:
PreventTransactionChain(isTopLevel, "ROLLBACK PREPARED");
PreventCommandDuringRecovery("ROLLBACK PREPARED");
- FinishPreparedTransaction(stmt->gid, false);
+ FinishPreparedTransaction(stmt->gid, false, false);
break;
case TRANS_STMT_ROLLBACK:
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 34d9470811..cbc63a18ad 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
#define TWOPHASE_H
#include "access/xlogdefs.h"
+#include "access/xact.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
@@ -46,15 +47,18 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+ xl_xact_parsed_prepare *parsed);
extern void StandbyRecoverPreparedTransactions(void);
extern void RecoverPreparedTransactions(void);
extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
-extern void FinishPreparedTransaction(const char *gid, bool isCommit);
+extern void FinishPreparedTransaction(const char *gid, bool isCommit,
+ bool missing_ok);
extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
- XLogRecPtr end_lsn);
+ XLogRecPtr end_lsn, RepOriginId origin_id);
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
#endif /* TWOPHASE_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 6445bbc46f..d2e104423d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,13 @@
#include "storage/sinval.h"
#include "utils/datetime.h"
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ *
+ * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
+ * specified in TwoPhaseFileHeader.
+ */
+#define GIDSIZE 200
/*
* Xact isolation levels
@@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
#define XACT_XINFO_HAS_ORIGIN (1U << 5)
#define XACT_XINFO_HAS_AE_LOCKS (1U << 6)
+#define XACT_XINFO_HAS_GID (1U << 7)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
@@ -302,13 +310,40 @@ typedef struct xl_xact_parsed_commit
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE];
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
} xl_xact_parsed_commit;
+typedef struct xl_xact_parsed_prepare
+{
+ Oid dbId; /* MyDatabaseId */
+
+ int nsubxacts;
+ TransactionId *subxacts;
+
+ int ncommitrels;
+ RelFileNode *commitrels;
+
+ int nabortrels;
+ RelFileNode *abortrels;
+
+ int nmsgs;
+ SharedInvalidationMessage *msgs;
+
+ TransactionId twophase_xid;
+ char twophase_gid[GIDSIZE];
+
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
+} xl_xact_parsed_prepare;
+
typedef struct xl_xact_parsed_abort
{
+ Oid dbId;
+ Oid tsId;
+
TimestampTz xact_time;
uint32 xinfo;
@@ -319,6 +354,10 @@ typedef struct xl_xact_parsed_abort
RelFileNode *xnodes;
TransactionId twophase_xid; /* only for 2PC */
+ char twophase_gid[GIDSIZE];
+
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
} xl_xact_parsed_abort;
@@ -386,12 +425,13 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval, bool forceSync,
int xactflags,
- TransactionId twophase_xid);
+ TransactionId twophase_xid, const char *twophase_gid);
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
- int xactflags, TransactionId twophase_xid);
+ int xactflags, TransactionId twophase_xid,
+ const char *twophase_gid);
extern void xact_redo(XLogReaderState *record);
/* xactdesc.c */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 619c5f4d73..9dad4c997f 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -89,6 +89,11 @@ typedef struct LogicalDecodingContext
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
+
+ /*
+ * Capabilities of the output plugin.
+ */
+ bool enable_twophase;
} LogicalDecodingContext;
@@ -117,6 +122,8 @@ extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern bool LogicalLockTransaction(ReorderBufferTXN *txn);
+extern void LogicalUnlockTransaction(ReorderBufferTXN *txn);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0eb21057c5..886025f3aa 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -69,11 +69,20 @@ typedef struct LogicalRepBeginData
TransactionId xid;
} LogicalRepBeginData;
+#define LOGICALREP_IS_COMMIT 0x01
+#define LOGICALREP_IS_ABORT 0x02
+#define LOGICALREP_IS_PREPARE 0x04
+#define LOGICALREP_IS_COMMIT_PREPARED 0x08
+#define LOGICALREP_IS_ROLLBACK_PREPARED 0x10
+#define LOGICALREP_COMMIT_MASK (LOGICALREP_IS_COMMIT | LOGICALREP_IS_ABORT)
+#define LOGICALREP_PREPARE_MASK (LOGICALREP_IS_PREPARE | LOGICALREP_IS_COMMIT_PREPARED | LOGICALREP_IS_ROLLBACK_PREPARED)
typedef struct LogicalRepCommitData
{
+ uint8 flag;
XLogRecPtr commit_lsn;
XLogRecPtr end_lsn;
TimestampTz committime;
+ char gid[GIDSIZE];
} LogicalRepCommitData;
extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
@@ -81,8 +90,14 @@ extern void logicalrep_read_begin(StringInfo in,
LogicalRepBeginData *begin_data);
extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+extern void logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
extern void logicalrep_read_commit(StringInfo in,
- LogicalRepCommitData *commit_data);
+ LogicalRepCommitData *commit_data, uint8 *flags);
+extern void logicalrep_read_prepare(StringInfo in,
+ LogicalRepCommitData *commit_data, uint8 *flags);
extern void logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn);
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 78fd38bb16..61c5019adf 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -67,6 +67,46 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/*
+ * Called for an implicit ABORT of a transaction.
+ */
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+ /*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare
+ * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ TransactionId xid,
+ const char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
/*
* Called for the generic logical decoding messages.
*/
@@ -84,6 +124,12 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
+/*
+ * Filter to check if we should continue to decode this transaction
+ */
+typedef bool (*LogicalDecodeFilterDecodeTxnCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
/*
* Called to shutdown an output plugin.
*/
@@ -98,8 +144,14 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeAbortCB abort_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeAbortPreparedCB abort_prepared_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeFilterDecodeTxnCB filter_decode_txn_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0970abca52..a43e941b25 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
#define REORDERBUFFER_H
#include "access/htup_details.h"
+#include "access/twophase.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
@@ -137,20 +138,50 @@ typedef struct ReorderBufferChange
dlist_node node;
} ReorderBufferChange;
+/* ReorderBufferTXN flags */
+#define TXN_HAS_CATALOG_CHANGES 0x0001
+#define TXN_IS_SUBXACT 0x0002
+#define TXN_SERIALIZED 0x0004
+#define TXN_PREPARE 0x0008
+#define TXN_COMMIT_PREPARED 0x0010
+#define TXN_ROLLBACK_PREPARED 0x0020
+#define TXN_COMMIT 0x0040
+#define TXN_ROLLBACK 0x0080
+
+/* does the txn have catalog changes */
+#define txn_has_catalog_changes(txn) (txn->txn_flags & TXN_HAS_CATALOG_CHANGES)
+/* is the txn known as a subxact? */
+#define txn_is_subxact(txn) (txn->txn_flags & TXN_IS_SUBXACT)
+/*
+ * Has this transaction been spilled to disk? It's not always possible to
+ * deduce that fact by comparing nentries with nentries_mem, because e.g.
+ * subtransactions of a large transaction might get serialized together
+ * with the parent - if they're restored to memory they'd have
+ * nentries_mem == nentries.
+ */
+#define txn_is_serialized(txn) (txn->txn_flags & TXN_SERIALIZED)
+/* is this txn prepared? */
+#define txn_prepared(txn) (txn->txn_flags & TXN_PREPARE)
+/* was this prepared txn committed in the meanwhile? */
+#define txn_commit_prepared(txn) (txn->txn_flags & TXN_COMMIT_PREPARED)
+/* was this prepared txn aborted in the meanwhile? */
+#define txn_rollback_prepared(txn) (txn->txn_flags & TXN_ROLLBACK_PREPARED)
+/* was this txn committed in the meanwhile? */
+#define txn_commit(txn) (txn->txn_flags & TXN_COMMIT)
+/* was this prepared txn aborted in the meanwhile? */
+#define txn_rollback(txn) (txn->txn_flags & TXN_ROLLBACK)
+
typedef struct ReorderBufferTXN
{
+ int txn_flags;
+
/*
* The transactions transaction id, can be a toplevel or sub xid.
*/
TransactionId xid;
- /* did the TX have catalog changes */
- bool has_catalog_changes;
-
- /*
- * Do we know this is a subxact?
- */
- bool is_known_as_subxact;
+ /* In case of 2PC we need to pass GID to output plugin */
+ char gid[GIDSIZE];
/*
* LSN of the first data carrying, WAL record with knowledge about this
@@ -214,15 +245,6 @@ typedef struct ReorderBufferTXN
*/
uint64 nentries_mem;
- /*
- * Has this transaction been spilled to disk? It's not always possible to
- * deduce that fact by comparing nentries with nentries_mem, because e.g.
- * subtransactions of a large transaction might get serialized together
- * with the parent - if they're restored to memory they'd have
- * nentries_mem == nentries.
- */
- bool serialized;
-
/*
* List of ReorderBufferChange structs, including new Snapshots and new
* CommandIds
@@ -294,6 +316,40 @@ typedef void (*ReorderBufferCommitCB) (
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/* abort callback signature */
+typedef void (*ReorderBufferAbortCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+typedef bool (*ReorderBufferFilterDecodeTxnCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn);
+
+typedef bool (*ReorderBufferFilterPrepareCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ TransactionId xid,
+ const char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
/* message callback signature */
typedef void (*ReorderBufferMessageCB) (
ReorderBuffer *rb,
@@ -329,6 +385,12 @@ struct ReorderBuffer
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferCommitCB commit;
+ ReorderBufferAbortCB abort;
+ ReorderBufferFilterDecodeTxnCB filter_decode_txn;
+ ReorderBufferFilterPrepareCB filter_prepare;
+ ReorderBufferPrepareCB prepare;
+ ReorderBufferCommitPreparedCB commit_prepared;
+ ReorderBufferAbortPreparedCB abort_prepared;
ReorderBufferMessageCB message;
/*
@@ -371,6 +433,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid, bool is_commit);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -394,6 +461,15 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
+bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
+ const char *gid);
+bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+ const char *gid);
+void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid);
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 5c19a61dcf..fdfc582874 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -200,6 +200,26 @@ struct PGPROC
PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */
dlist_head lockGroupMembers; /* list of members, if I'm a leader */
dlist_node lockGroupLink; /* my member link, if I'm a member */
+
+ /*
+ * Support for decoding groups. Use LockHashPartitionLockByProc on the group
+ * leader to get the LWLock protecting these fields.
+ *
+ * For prepared and uncommitted transactions, decoding backends working on
+ * the same XID will link themselves up to the corresponding PGPROC
+ * entry (decodeGroupLeader).
+ *
+ * They will remove themselves when they are done decoding.
+ *
+ * If the prepared or uncommitted transaction decides to abort, then
+ * the decodeGroupLeader will set the decodeAbortPending flag allowing
+ * the decodeGroupMembers to abort their decoding appropriately
+ */
+ PGPROC *decodeGroupLeader; /* decode group leader, if I'm a member */
+ dlist_head decodeGroupMembers; /* list of members, if I'm a leader */
+ dlist_node decodeGroupLink; /* my member link, if I'm a member */
+ bool decodeLocked; /* is it currently locked by this proc? */
+ bool decodeAbortPending; /* is the decode group leader aborting? */
};
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
@@ -326,5 +346,10 @@ extern PGPROC *AuxiliaryPidGetProc(int pid);
extern void BecomeLockGroupLeader(void);
extern bool BecomeLockGroupMember(PGPROC *leader, int pid);
+extern PGPROC *BecomeDecodeGroupLeader(TransactionId xid, bool is_prepared);
+extern bool BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared);
+extern void RemoveDecodeGroupMember(PGPROC *leader);
+extern void RemoveDecodeGroupMemberLocked(PGPROC *leader);
+extern void LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit);
#endif /* PROC_H */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 75bab2985f..68173743ae 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -98,6 +98,7 @@ extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids
extern PGPROC *BackendPidGetProc(int pid);
extern PGPROC *BackendPidGetProcWithLock(int pid);
extern int BackendXidGetPid(TransactionId xid);
+extern PGPROC *BackendXidGetProc(TransactionId xid);
extern bool IsBackendPid(int pid);
extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
diff --git a/src/test/subscription/t/009_twophase.pl b/src/test/subscription/t/009_twophase.pl
new file mode 100644
index 0000000000..c7f373df93
--- /dev/null
+++ b/src/test/subscription/t/009_twophase.pl
@@ -0,0 +1,163 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 12;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+ 'postgresql.conf', qq(
+ max_prepared_transactions = 10
+ ));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf(
+ 'postgresql.conf', qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full SELECT generate_series(1,10)");
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+"ALTER PUBLICATION tap_pub ADD TABLE tab_full, tab_full2"
+);
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
+);
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres',
+ "BEGIN;INSERT INTO tab_full VALUES (11);PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+ is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres',
+ "COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+ is($result, qq(1), 'Row inserted via 2PC has committed on subscriber');
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+ is($result, qq(0), 'transaction is committed on subscriber');
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres',
+ "BEGIN;INSERT INTO tab_full VALUES (12);PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+ is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->safe_psql('postgres',
+ "ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is aborted on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+ is($result, qq(0), 'Row inserted via 2PC is not present on subscriber');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+ is($result, qq(0), 'transaction is aborted on subscriber');
+
+# Check that commit prepared is decoded properly on crash restart
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (12);
+ INSERT INTO tab_full VALUES (13);
+ PREPARE TRANSACTION 'test_prepared_tab';");
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+ or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (11,12);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+# TODO add test cases involving DDL. This can be added after we add functionality
+# to replicate DDL changes to subscriber.
+
+# check all the cleanup
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+ 'check subscription relation status was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');