From a5c90734a099a4f64cdd5e60f86ea11383904fd9 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Tue, 15 Jun 2021 17:58:09 +1000 Subject: [PATCH v86] Add prepare API support for streaming transactions. * Permits the combination of "streaming" and "two_phase" subscription options. * Adds the prepare API for streaming transactions which will apply the changes accumulated in the spool-file at prepare time. * Adds new subscription TAP tests, and new subscription.sql regression tests. * Updates PG documentation. --- doc/src/sgml/protocol.sgml | 68 +++- doc/src/sgml/ref/create_subscription.sgml | 10 - src/backend/commands/subscriptioncmds.c | 21 - src/backend/replication/logical/proto.c | 60 +++ src/backend/replication/logical/worker.c | 135 +++++- src/backend/replication/pgoutput/pgoutput.c | 33 +- src/include/replication/logicalproto.h | 10 +- src/test/regress/expected/subscription.out | 24 +- src/test/regress/sql/subscription.sql | 12 +- src/test/subscription/t/023_twophase_stream.pl | 453 +++++++++++++++++++++ .../subscription/t/024_twophase_cascade_stream.pl | 271 ++++++++++++ 11 files changed, 1018 insertions(+), 79 deletions(-) create mode 100644 src/test/subscription/t/023_twophase_stream.pl create mode 100644 src/test/subscription/t/024_twophase_cascade_stream.pl diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index f812976..9cc7192 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2881,7 +2881,7 @@ The commands accepted in replication mode are: Begin Prepare and Prepare messages belong to the same transaction. It also sends changes of large in-progress transactions between a pair of Stream Start and Stream Stop messages. The last stream of such a transaction - contains a Stream Commit or Stream Abort message. + contains a Stream Prepare or Stream Commit or Stream Abort message. @@ -7386,7 +7386,7 @@ Stream Abort -The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared) +The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared, Stream Prepare) are available since protocol version 3. @@ -7649,6 +7649,70 @@ are available since protocol version 3. + + +Stream Prepare + + + + + + +Byte1('p') + + Identifies the message as a two-phase prepare for a large in-progress transaction message. + + + + +Int8 + + Flags; currently unused (must be 0). + + + + +Int64 + + The LSN of the prepare. + + + + +Int64 + + The end LSN of the prepare transaction. + + + + +Int64 + + Prepare timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int32 + + Xid of the transaction. + + + + +String + + The user defined GID of the two-phase transaction. + + + + + + + + + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 3bcef78..4238baa 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -238,11 +238,6 @@ CREATE SUBSCRIPTION subscription_name - - The streaming option cannot be used with the - two_phase option. - - @@ -269,11 +264,6 @@ CREATE SUBSCRIPTION subscription_name - - The two_phase option cannot be used with the - streaming option. - - diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f8826fb..894a1b3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -314,21 +314,6 @@ parse_subscription_options(List *options, errmsg("subscription with %s must also set %s", "slot_name = NONE", "create_slot = false"))); } - - /* - * Do additional checking for the disallowed combination of two_phase and - * streaming. While streaming and two_phase can theoretically be - * supported, it needs more analysis to allow them together. - */ - if (twophase && *twophase_given && *twophase) - { - if (streaming && *streaming_given && *streaming) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "two_phase = true", "streaming = true"))); - } - } /* @@ -924,12 +909,6 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (streaming_given) { - if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && streaming) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("cannot set %s for two-phase enabled subscription", - "streaming = true"))); - values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); replaces[Anum_pg_subscription_substream - 1] = true; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 13c8c3b..8e03006 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -318,6 +318,66 @@ logicalrep_read_rollback_prepared(StringInfo in, } /* + * Write STREAM PREPARE to the output stream. + */ +void +logicalrep_write_stream_prepare(StringInfo out, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + Assert(rbtxn_prepared(txn)); + Assert(TransactionIdIsValid(txn->xid)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.prepare_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read STREAM PREPARE from the output stream. + */ +TransactionId +logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + uint8 flags; + + /* read flags */ + flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in stream prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + prepare_data->end_lsn = pq_getmsgint64(in); + prepare_data->prepare_time = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); + + return prepare_data->xid; +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ee95ac8..684fec2 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -331,7 +331,7 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, CmdType operation); /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); - +static int apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* * Should this worker apply changes for given relation. @@ -1057,6 +1057,87 @@ apply_handle_rollback_prepared(StringInfo s) } /* + * Handle STREAM PREPARE. + * + * Logic is in two parts: + * 1. Replay all the spooled operations + * 2. Mark the transaction as prepared + */ +static void +apply_handle_stream_prepare(StringInfo s) +{ + int nchanges = 0; + LogicalRepPreparedTxnData prepare_data; + TransactionId xid; + char gid[GIDSIZE]; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM PREPARE message without STREAM STOP"))); + + /* Tablesync should never receive prepare. */ + Assert(!am_tablesync_worker()); + + xid = logicalrep_read_stream_prepare(s, &prepare_data); + elog(DEBUG1, "received prepare for streamed transaction %u", xid); + + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* + * 1. Replay all the spooled operations - Similar code as for + * apply_handle_stream_commit (i.e. non two-phase stream commit) + */ + + nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn); + + /* + * 2. Mark the transaction as prepared. - Similar code as for + * apply_handle_prepare (i.e. two-phase non-streamed prepare) + */ + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; + + PrepareTransactionBlock(gid); + CommitTransactionCommand(); + + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + elog(DEBUG1, "apply_handle_stream_prepare: replayed %d (all) changes.", nchanges); + + in_remote_transaction = false; + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -1272,30 +1353,20 @@ apply_handle_stream_abort(StringInfo s) } /* - * Handle STREAM COMMIT message. + * Common spoolfile processing. + * Returns how many changes were applied. */ -static void -apply_handle_stream_commit(StringInfo s) +static int +apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) { - TransactionId xid; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; - if (in_streamed_transaction) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("STREAM COMMIT message without STREAM STOP"))); - - xid = logicalrep_read_stream_commit(s, &commit_data); - - elog(DEBUG1, "received commit for streamed transaction %u", xid); - /* Make sure we have an open transaction */ begin_replication_step(); @@ -1306,7 +1377,7 @@ apply_handle_stream_commit(StringInfo s) */ oldcxt = MemoryContextSwitchTo(TopTransactionContext); - /* open the spool file for the committed transaction */ + /* Open the spool file for the committed/prepared transaction */ changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); @@ -1327,7 +1398,7 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; + remote_final_lsn = lsn; /* * Make sure the handle apply_dispatch methods are aware we're in a remote @@ -1406,6 +1477,32 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); + return nchanges; +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + LogicalRepCommitData commit_data; + int nchanges = 0; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM COMMIT message without STREAM STOP"))); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + nchanges = apply_spooled_messages(xid, commit_data.commit_lsn); + + elog(DEBUG1, "apply_handle_stream_commit: replayed %d (all) changes.", nchanges); + apply_handle_commit_internal(s, &commit_data); /* unlink the files with serialized changes and subxact info */ @@ -2351,6 +2448,10 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); return; + + case LOGICAL_REP_MSG_STREAM_PREPARE: + apply_handle_stream_prepare(s); + return; } ereport(ERROR, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7a1d42a..d5a284d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -71,6 +71,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; static bool in_streaming; @@ -175,7 +177,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_message_cb = pgoutput_message; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ - cb->stream_prepare_cb = NULL; + cb->stream_prepare_cb = pgoutput_stream_prepare_txn; } static void @@ -280,17 +282,6 @@ parse_output_parameters(List *options, PGOutputData *data) } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); - - /* - * Do additional checking for the disallowed combination of two_phase and - * streaming. While streaming and two_phase can theoretically be - * supported, it needs more analysis to allow them together. - */ - if (data->two_phase && data->streaming) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "two_phase", "streaming"))); } } @@ -1030,6 +1021,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, } /* + * PREPARE callback (for streaming two-phase commit). + * + * Notify the downstream to prepare the transaction. + */ +static void +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Initialize the relation schema sync cache for a decoding session. * * The hash table is destroyed at the end of a decoding session. While diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index e20f2da..7a4804f 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -67,7 +67,8 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', - LOGICAL_REP_MSG_STREAM_ABORT = 'A' + LOGICAL_REP_MSG_STREAM_ABORT = 'A', + LOGICAL_REP_MSG_STREAM_PREPARE = 'p' } LogicalRepMsgType; /* @@ -124,6 +125,7 @@ typedef struct LogicalRepBeginData TransactionId xid; } LogicalRepBeginData; +/* Commit (and abort) information */ typedef struct LogicalRepCommitData { XLogRecPtr commit_lsn; @@ -243,4 +245,10 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern TransactionId logicalrep_read_stream_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); + + #endif /* LOGICAL_PROTO_H */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index f054ac8..81d27f3 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -279,27 +279,29 @@ WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION .. --fail - alter of two_phase option not supported. ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); ERROR: unrecognized subscription parameter: "two_phase" ---fail - cannot set streaming when two_phase enabled +-- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); -ERROR: cannot set streaming = true for two-phase enabled subscription -ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo -----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist (1 row) +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; --- fail - two_phase and streaming are mutually exclusive. -CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); -ERROR: two_phase = true and streaming = true are mutually exclusive options +-- two_phase and streaming are compatible. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo -------+-------+---------+-------------+--------+-----------+------------------+--------------------+---------- -(0 rows) + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist +(1 row) +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index b732871..e304852 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -209,23 +209,25 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); \dRs+ + --fail - alter of two_phase option not supported. ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); ---fail - cannot set streaming when two_phase enabled +-- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); -ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); - \dRs+ +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; --- fail - two_phase and streaming are mutually exclusive. -CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true); +-- two_phase and streaming are compatible. +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); \dRs+ +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl new file mode 100644 index 0000000..c90e3f6 --- /dev/null +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -0,0 +1,453 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# logical replication of 2PC test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 27; + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_publisher->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_subscriber->start; + +# Create some pre-existing content on publisher (uses same DDL as 015_stream test) +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber (columns a and b are compatible with same table name on publisher) +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication (streaming = on) +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (streaming = on, two_phase = on)"); + +# Wait for subscriber to finish initialization +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Also wait for two-phase to be enabled +my $twophase_query = + "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; +$node_subscriber->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +############################### +# Check initial data was copied to subscriber +############################### +my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +############################### +# Test 2PC PREPARE / COMMIT PREPARED +# 1. Data is streamed as a 2PC transaction. +# 2. Then do commit prepared. +# +# Expect all data is replicated on subscriber side after the commit. +############################### + +# check that 2PC gets replicated to subscriber +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC transaction gets committed +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'); +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Test 2PC PREPARE / ROLLBACK PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC +# 3. Do rollback prepared. +# +# Expect data rolls back leaving only the original 2 rows. +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC transaction gets aborted +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is aborted on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Check that 2PC ROLLBACK PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then server crashes before the 2PC transaction is rolled back. +# 3. After servers are restarted the pending transaction is rolled back. +# +# Expect all inserted data is gone. +# (Note: both publisher and subscriber crash/restart) +############################### + +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# rollback post the restart +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are rolled back +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: both publisher and subscriber crash/restart) +############################### + +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_publisher->stop('immediate'); + +$node_publisher->start; +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then 1 server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: only subscriber crashes) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# insert, update, delete enough data to cause streaming +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_subscriber->stop('immediate'); +$node_subscriber->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. +# 1. insert, update and delete enough rows to exceed the 64kB limit. +# 2. Then 1 server crashes before the 2PC transaction is committed. +# 3. After servers are restarted the pending transaction is committed. +# +# Expect all data is replicated on subscriber side after the commit. +# (Note: only publisher crashes) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# insert, update, delete enough data to cause streaming +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->stop('immediate'); +$node_publisher->start; + +# commit post the restart +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); +$node_publisher->wait_for_catchup($appname); + +# check inserts are visible +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# Do INSERT after the PREPARE but before ROLLBACK PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row INSERT is done which is after the PREPARE +# 4. Then do a ROLLBACK PREPARED. +# +# Expect the 2PC data rolls back leaving only 3 rows on the subscriber. +# (the original 2 + inserted 1) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Insert a different record (now we are outside of the 2PC transaction) +# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')"); + +# 2PC transaction gets aborted +$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is aborted on subscriber, +# but the extra INSERT outside of the 2PC still was replicated +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3|3|3), 'check the outside insert was copied to subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is aborted on subscriber'); + +############################### +# Do INSERT after the PREPARE but before COMMIT PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row INSERT is done which is after the PREPARE. +# 4. Then do a COMMIT PREPARED. +# +# Expect 2PC data + the extra row are on the subscriber. +# (the 3334 + inserted 1 = 3335) +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Insert a different record (now we are outside of the 2PC transaction) +# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')"); + +# 2PC transaction gets committed +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3335|3335|3335), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Do DELETE after PREPARE but before COMMIT PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC. +# 3. A single row DELETE is done for one of the records that was inserted by the 2PC transaction +# 4. Then there is a COMMIT PREPARED. +# +# Expect all the 2PC data rows on the subscriber (since in fact delete at step 3 would do nothing +# because that record was not yet committed at the time of the delete). +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# DELETE one of the prepared 2PC records before they get committed (we are outside of the 2PC transaction) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a = 5"); + +# 2PC transaction gets committed +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber. Nothing was deleted'); + +# confirm the "deleted" row was in fact not deleted +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab WHERE a = 5"); +is($result, qq(1), 'The row we deleted before the commit till exists on subscriber.'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber'); + +############################### +# Try 2PC transaction works using an empty GID literal +############################### + +# First, delete the data except for 2 rows (will be replicated) +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# Then insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION '';}); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# 2PC transaction gets committed +$node_publisher->safe_psql('postgres', "COMMIT PREPARED '';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is committed on subscriber +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber'); + +############################### +# check all the cleanup +############################### + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/024_twophase_cascade_stream.pl b/src/test/subscription/t/024_twophase_cascade_stream.pl new file mode 100644 index 0000000..3a0be82 --- /dev/null +++ b/src/test/subscription/t/024_twophase_cascade_stream.pl @@ -0,0 +1,271 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test cascading logical replication of 2PC. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 31; + +############################### +# Setup a cascade of pub/sub nodes. +# node_A -> node_B -> node_C +############################### + +# Initialize nodes +# node_A +my $node_A = get_new_node('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_A->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB)); +$node_A->start; +# node_B +my $node_B = get_new_node('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_B->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB)); +$node_B->start; +# node_C +my $node_C = get_new_node('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_C->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB)); +$node_C->start; + +# Create some pre-existing content on node_A (uses same DDL as 015_stream.pl) +$node_A->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_A->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Create the same tables on node_B amd node_C +# columns a and b are compatible with same table name on node_A +$node_B->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); +$node_C->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication (streaming = on) + +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE test_tab"); +my $appname_B = 'tap_sub_B'; +$node_B->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_B + CONNECTION '$node_A_connstr application_name=$appname_B' + PUBLICATION tap_pub_A + WITH (streaming = on, two_phase = on)"); + +# node_B (pub) -> node_C (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE test_tab"); +my $appname_C = 'tap_sub_C'; +$node_C->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_C + CONNECTION '$node_B_connstr application_name=$appname_C' + PUBLICATION tap_pub_B + WITH (streaming = on, two_phase = on)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# Also wait for two-phase to be enabled +my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; +$node_B->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; +$node_C->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +is(1,1, "Cascade setup is complete"); + +my $result; + +############################### +# Check initial data was copied to subscriber(s) +############################### +$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber C'); + +############################### +# Test 2PC PREPARE / COMMIT PREPARED +# 1. Data is streamed as a 2PC transaction. +# 2. Then do commit prepared. +# Expect all data is replicated on subscriber(s) after the commit. +############################### + +# Insert, update and delete enough rows to exceed the 64kB limit. +# Then 2PC PREPARE +$node_A->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction was committed on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults'); +$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults'); + +# check the transaction state is ended on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber C'); + +############################### +# Test 2PC PREPARE / ROLLBACK PREPARED. +# 1. Table is deleted back to 2 rows which are replicated on subscriber. +# 2. Data is streamed using 2PC +# 3. Do rollback prepared. +# Expect data rolls back leaving only the original 2 rows. +############################### + +# First, delete the data except for 2 rows (delete will be replicated) +$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# Insert, update and delete enough rows to exceed the 64kB limit. +# The 2PC PREPARE +$node_A->safe_psql('postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC ROLLBACK +$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction is aborted on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'Row inserted by 2PC is not present. Only initial data remains on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'Row inserted by 2PC is not present. Only initial data remains on subscriber C'); + +# check the transaction state is ended on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +############################### +# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT. +# 0. There are 2 rows only in the table (from previous test) +# 1. Insert one more row +# 2. Record a SAVEPOINT +# 3. Data is streamed using 2PC +# 4. Do rollback to SAVEPOINT prior to the streamed inserts +# 5. Then COMMIT PREPARED +# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1) +############################### + +# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT +$node_A->safe_psql('postgres', " + BEGIN; + INSERT INTO test_tab VALUES (9999, 'foobar'); + SAVEPOINT sp_inner; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state prepared on subscriber(s) +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is ended on subscriber +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +# check inserts are visible at subscriber(s). +# All the streamed data (prior to the SAVEPOINT) should be rolled back. +# (3, 'foobar') should be committed. +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';"); +is($result, qq(1), 'Rows committed are present on subscriber B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); +is($result, qq(3), 'Rows rolled back are not present on subscriber B'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';"); +is($result, qq(1), 'Rows committed are present on subscriber C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); +is($result, qq(3), 'Rows rolled back are not present on subscriber C'); + +############################### +# check all the cleanup +############################### + +# cleanup the node_B => node_C pub/sub +$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C"); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber node C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber node C'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node B'); + +# cleanup the node_A => node_B pub/sub +$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B"); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber node B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber node B'); +$result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node A'); + +# shutdown +$node_C->stop('fast'); +$node_B->stop('fast'); +$node_A->stop('fast'); -- 1.8.3.1