From 2f6508618ca69b909e447a1ef3b4998b63817a8d Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 24 Aug 2022 21:07:44 +0800 Subject: [PATCH v36 4/5] Retry to apply streaming xact only in apply worker When the subscription parameter is set streaming=parallel, the logic tries to apply the streaming transaction using a parallel apply worker. If this fails the parallel worker exits with an error. In this case, retry applying the streaming transaction using the normal streaming=on mode. This is done to avoid getting caught in a loop of the same retry errors. A new flag field "subretry" has been introduced to catalog "pg_subscription". If the subscriber exits with an error, this flag will be set true, and whenever the transaction is applied successfully, this flag is reset false. Now, when deciding how to apply a streaming transaction, the logic can know if this transaction has previously failed or not (by checking the "subretry" field). In addition, when introduced parallel apply worker, we added some log checks to ensure that streamed transactions are applied in parallel apply worker. But since we introduced "retry" now, we will not be able to confirm whether streamed transactions will be applied in parallel apply worker. So removed these log checks. --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/logical-replication.sgml | 11 +- doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 1 + .../replication/logical/applyparallelworker.c | 16 +- src/backend/replication/logical/worker.c | 187 +++++++++++++++------ src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 5 + src/test/subscription/t/015_stream.pl | 56 +----- src/test/subscription/t/016_stream_subxact.pl | 45 +---- src/test/subscription/t/017_stream_ddl.pl | 51 +----- .../subscription/t/018_stream_subxact_abort.pl | 56 +----- .../subscription/t/019_stream_subxact_ddl_abort.pl | 46 +---- src/test/subscription/t/022_twophase_cascade.pl | 54 ------ src/test/subscription/t/023_twophase_stream.pl | 66 +------- .../t/032_streaming_parallel_safety.pl | 150 ++++++++--------- 18 files changed, 274 insertions(+), 492 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index c5769da..792749c 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7929,6 +7929,15 @@ SCRAM-SHA-256$<iteration count>:&l + subretry bool + + + True if the previous apply change failed, necessitating a retry + + + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 9e753fe..6229ceb 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1343,12 +1343,11 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER When the streaming mode is parallel, the finish LSN of - failed transactions may not be logged. In that case, it may be necessary to - change the streaming mode to on or off and - cause the same conflicts again so the finish LSN of the failed transaction will - be written to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... - SKIP. + failed transactions may not be logged. In that case, the failed transaction + will be retried in streaming = on mode. If it fails + again, the finish LSN of the failed transaction will be written to the + server log. For the usage of finish LSN, please refer to + ALTER SUBSCRIPTION ... SKIP. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 79250ac..2b33e85 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -250,6 +250,11 @@ CREATE SUBSCRIPTION subscription_nameparallel mode is disregarded when retrying; + instead the transaction will be applied using on + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a506fc3..723b141 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 55f7ec7..f4a0049 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1298,7 +1298,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, - subslotname, subsynccommit, subpublications, suborigin) + subretry, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1f82206..c7fd730 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -635,6 +635,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 46ab3b0..44818ac 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -147,6 +147,18 @@ parallel_apply_can_start(TransactionId xid) return false; /* + * Don't use parallel apply workers for retries, because it is possible + * that the last time we tried to apply a transaction using a parallel + * apply worker the checks failed (see function + * parallel_apply_relation_check). + */ + if (MySubscription->retry) + { + elog(DEBUG1, "parallel apply workers are not used for retries"); + return false; + } + + /* * For streaming transactions that are being applied using a parallel * apply worker, we cannot decide whether to apply the change for a * relation that is not in the READY state (see @@ -1144,7 +1156,5 @@ parallel_apply_relation_check(LogicalRepRelMapEntry *rel) "streaming = parallel"), errdetail("The unique column on subscriber is not the unique " "column on publisher or there is at least one " - "non-immutable function."), - errhint("Please change to use subscription parameter %s.", - "streaming = on"))); + "non-immutable function."))); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 66a79f2..a81e0d4 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -395,7 +395,7 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); -static void DisableSubscriptionAndExit(void); +static void DisableSubscriptionOnError(void); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -435,6 +435,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void set_subscription_retry(bool retry); + /* * Should this worker apply changes for given relation. * @@ -980,6 +982,9 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -1089,6 +1094,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1145,6 +1153,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1206,6 +1217,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1260,6 +1274,9 @@ apply_handle_stream_prepare(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); + + /* Reset the retry flag. */ + set_subscription_retry(false); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1287,6 +1304,9 @@ apply_handle_stream_prepare(StringInfo s) in_remote_transaction = false; store_flush_position(prepare_data.end_lsn); + + /* Reset the retry flag. */ + set_subscription_retry(false); break; case TRANS_PARALLEL_APPLY: @@ -1652,6 +1672,10 @@ apply_handle_stream_abort(StringInfo s) * serialized to file. */ serialize_stream_abort(xid, subxid); + + /* Reset the retry flag. */ + if (subxid == xid) + set_subscription_retry(false); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1681,6 +1705,9 @@ apply_handle_stream_abort(StringInfo s) { parallel_apply_replorigin_setup(); parallel_apply_free_worker(winfo, xid); + + /* Reset the retry flag. */ + set_subscription_retry(false); } else { @@ -1861,6 +1888,9 @@ apply_handle_stream_commit(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + /* Reset the retry flag. */ + set_subscription_retry(false); break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -1897,6 +1927,9 @@ apply_handle_stream_commit(StringInfo s) * subskiplsn. */ clear_subscription_skip_lsn(commit_data.commit_lsn); + + /* Reset the retry flag. */ + set_subscription_retry(false); break; case TRANS_PARALLEL_APPLY: @@ -3964,20 +3997,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); + + proc_exit(0); } PG_END_TRY(); @@ -4002,20 +4043,27 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); } PG_END_TRY(); } @@ -4297,39 +4345,20 @@ ApplyWorkerMain(Datum main_arg) } /* - * After error recovery, disable the subscription in a new transaction - * and exit cleanly. + * Disable the subscription in a new transaction. */ static void -DisableSubscriptionAndExit(void) +DisableSubscriptionOnError(void) { - /* - * Emit the error message, and recover from the error state to an idle - * state - */ - HOLD_INTERRUPTS(); - - EmitErrorReport(); - AbortOutOfAnyTransaction(); - FlushErrorState(); - - RESUME_INTERRUPTS(); - - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - /* Disable the subscription */ StartTransactionCommand(); DisableSubscription(MySubscription->oid); CommitTransactionCommand(); - /* Notify the subscription has been disabled and exit */ + /* Notify the subscription has been disabled */ ereport(LOG, errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); - - proc_exit(0); } /* @@ -4605,3 +4634,67 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_SERIALIZE; } } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + /* Fast path - if no state change then nothing to do */ + if (MySubscription->retry == retry) + return; + + /* Fast path - skip for parallel apply workers */ + if (am_parallel_apply_worker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 01b2b60..1a4b4a7 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4508,8 +4508,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 016afbc..36c1801 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry BKI_DEFAULT(f); /* True if the previous apply + * change failed. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +134,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if the previous apply change + * failed. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 6d1ff8d..742bce0 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -8,52 +8,22 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -# Check the log that the streamed transaction was completed successfully -# reported by parallel apply worker. -sub check_parallel_log -{ - my ($node_subscriber, $offset, $is_parallel) = @_; - my $parallel_message = - 'finished processing the transaction finish command'; - - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); - } -} - # Encapsulate all the common test steps which are related to "streaming" # parameter so the same code can be run both for the streaming=on and # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + my ($node_publisher, $node_subscriber, $appname) = @_; # Interleave a pair of transactions, each exceeding the 64kB limit. my $in = ''; my $out = ''; - my $offset = 0; - my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, on_error_stop => 0); - # If "streaming" parameter is specified as "parallel", we need to check - # that streamed transaction was applied using a parallel apply worker. - # We have to look for the DEBUG1 log messages about that, so bump up the - # log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - $in .= q{ BEGIN; INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); @@ -78,8 +48,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); @@ -90,9 +58,6 @@ sub test_streaming $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # Insert, update and delete enough rows to exceed the 64kB limit. $node_publisher->safe_psql( 'postgres', q{ @@ -105,8 +70,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); @@ -121,16 +84,11 @@ sub test_streaming "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" ); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - $node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" ); @@ -141,14 +99,6 @@ sub test_streaming $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE (a > 2)"); $node_publisher->wait_for_catchup($appname); - - # Reset the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } } # Create publisher node @@ -196,7 +146,7 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -219,7 +169,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl index 0efa41d..2beb680 100644 --- a/src/test/subscription/t/016_stream_subxact.pl +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -8,42 +8,12 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -# Check the log that the streamed transaction was completed successfully -# reported by parallel apply worker. -sub check_parallel_log -{ - my ($node_subscriber, $offset, $is_parallel) = @_; - my $parallel_message = - 'finished processing the transaction finish command'; - - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); - } -} - # Encapsulate all the common test steps which are related to "streaming" # parameter so the same code can be run both for the streaming=on and # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; - - my $offset = 0; - - # If "streaming" parameter is specified as "parallel", we need to check - # that streamed transaction was applied using a parallel apply worker. - # We have to look for the DEBUG1 log messages about that, so bump up the - # log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; + my ($node_publisher, $node_subscriber, $appname) = @_; # Insert, update and delete enough rows to exceed 64kB limit. $node_publisher->safe_psql( @@ -73,8 +43,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); @@ -87,13 +55,6 @@ sub test_streaming "DELETE FROM test_tab WHERE (a > 2)"); $node_publisher->wait_for_catchup($appname); - # Reset the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } } # Create publisher node @@ -141,7 +102,7 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -164,7 +125,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl index d2cc46d..43f9ca3 100644 --- a/src/test/subscription/t/017_stream_ddl.pl +++ b/src/test/subscription/t/017_stream_ddl.pl @@ -8,28 +8,12 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -# Check the log that the streamed transaction was completed successfully -# reported by parallel apply worker. -sub check_parallel_log -{ - my ($node_subscriber, $offset, $is_parallel) = @_; - my $parallel_message = - 'finished processing the transaction finish command'; - - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); - } -} - # Encapsulate all the common test steps which are related to "streaming" # parameter so the same code can be run both for the streaming=on and # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; - - my $offset = 0; + my ($node_publisher, $node_subscriber, $appname) = @_; # a small (non-streamed) transaction with DDL and DML $node_publisher->safe_psql( @@ -42,20 +26,6 @@ sub test_streaming COMMIT; }); - # If "streaming" parameter is specified as "parallel", we need to check - # that streamed transaction was applied using a parallel apply worker. - # We have to look for the DEBUG1 log messages about that, so bump up the - # log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # large (streamed) transaction with DDL and DML $node_publisher->safe_psql( 'postgres', q{ @@ -80,8 +50,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); @@ -89,9 +57,6 @@ sub test_streaming 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' ); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # A large (streamed) transaction with DDL and DML. One of the DDL is performed # after DML to ensure that we invalidate the schema sent for test_tab so that # the next transaction has to send the schema again. @@ -114,8 +79,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab" ); @@ -130,14 +93,6 @@ sub test_streaming ALTER TABLE test_tab DROP COLUMN c, DROP COLUMN d, DROP COLUMN e, DROP COLUMN f; }); $node_publisher->wait_for_catchup($appname); - - # Reset the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } } # Create publisher node @@ -185,7 +140,7 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|0|0), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -203,7 +158,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 752e79a..67c778d 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -8,42 +8,12 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -# Check the log that the streamed transaction was completed successfully -# reported by parallel apply worker. -sub check_parallel_log -{ - my ($node_subscriber, $offset, $is_parallel) = @_; - my $parallel_message = - 'finished processing the transaction finish command'; - - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); - } -} - # Encapsulate all the common test steps which are related to "streaming" # parameter so the same code can be run both for the streaming=on and # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; - - my $offset = 0; - - # If "streaming" parameter is specified as "parallel", we need to check - # that streamed transaction was applied using a parallel apply worker. - # We have to look for the DEBUG1 log messages about that, so bump up the - # log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; + my ($node_publisher, $node_subscriber, $appname) = @_; # large (streamed) transaction with DDL, DML and ROLLBACKs $node_publisher->safe_psql( @@ -69,8 +39,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); @@ -78,9 +46,6 @@ sub test_streaming 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' ); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # large (streamed) transaction with subscriber receiving out of order # subtransaction ROLLBACKs $node_publisher->safe_psql( @@ -101,17 +66,12 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2500|0), 'check rollback to savepoint was reflected on subscriber'); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # large (streamed) transaction with subscriber receiving rollback $node_publisher->safe_psql( 'postgres', q{ @@ -126,8 +86,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); @@ -137,14 +95,6 @@ sub test_streaming $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE (a > 2)"); $node_publisher->wait_for_catchup($appname); - - # Reset the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } } # Create publisher node @@ -191,7 +141,7 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -209,7 +159,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl index 9d678b3..41ec1e8 100644 --- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -9,42 +9,12 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -# Check the log that the streamed transaction was completed successfully -# reported by parallel apply worker. -sub check_parallel_log -{ - my ($node_subscriber, $offset, $is_parallel) = @_; - my $parallel_message = - 'finished processing the transaction finish command'; - - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); - } -} - # Encapsulate all the common test steps which are related to "streaming" # parameter so the same code can be run both for the streaming=on and # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; - - my $offset = 0; - - # If "streaming" parameter is specified as "parallel", we need to check - # that streamed transaction was applied using a parallel apply worker. - # We have to look for the DEBUG1 log messages about that, so bump up the - # log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; + my ($node_publisher, $node_subscriber, $appname) = @_; # large (streamed) transaction with DDL, DML and ROLLBACKs $node_publisher->safe_psql( @@ -68,8 +38,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); @@ -84,14 +52,6 @@ sub test_streaming ALTER TABLE test_tab DROP COLUMN c; }); $node_publisher->wait_for_catchup($appname); - - # Reset the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } } # Create publisher node @@ -138,7 +98,7 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -156,7 +116,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index 893faaa..6ea7fc1 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -11,20 +11,6 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -# Check the log that the streamed transaction was completed successfully -# reported by parallel apply worker. -sub check_parallel_log -{ - my ($node_subscriber, $offset, $streaming_mode) = @_; - my $parallel_message = - 'finished processing the transaction finish command'; - - if ($streaming_mode eq 'parallel') - { - $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); - } -} - # Encapsulate all the common test steps which are related to "streaming" parameter # so the same code can be run both for the streaming=on and streaming=parallel # cases. @@ -33,9 +19,6 @@ sub test_streaming my ($node_A, $node_B, $node_C, $appname_B, $appname_C, $streaming_mode) = @_; - my $offset_B = 0; - my $offset_C = 0; - my $oldpid_B = $node_A->safe_psql( 'postgres', " SELECT pid FROM pg_stat_replication @@ -77,23 +60,6 @@ sub test_streaming # Expect all data is replicated on subscriber(s) after the commit. ############################### - # If "streaming" parameter is specified as "parallel", we need to check - # that streamed transaction was prepared using a parallel apply worker. - # We have to look for the DEBUG1 log messages about that, so bump up the - # log verbosity. - if ($streaming_mode eq 'parallel') - { - $node_B->append_conf('postgresql.conf', "log_min_messages = debug1"); - $node_B->reload; - - $node_C->append_conf('postgresql.conf', "log_min_messages = debug1"); - $node_C->reload; - } - - # Check the subscriber log from now on. - $offset_B = -s $node_B->logfile; - $offset_C = -s $node_C->logfile; - # Insert, update and delete enough rows to exceed the 64kB limit. # Then 2PC PREPARE $node_A->safe_psql( @@ -107,9 +73,6 @@ sub test_streaming $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); - check_parallel_log($node_B, $offset_B, $streaming_mode); - check_parallel_log($node_C, $offset_C, $streaming_mode); - # check the transaction state is prepared on subscriber(s) my $result = $node_B->safe_psql('postgres', @@ -163,10 +126,6 @@ sub test_streaming # First, delete the data except for 2 rows (delete will be replicated) $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - # Check the subscriber log from now on. - $offset_B = -s $node_B->logfile; - $offset_C = -s $node_C->logfile; - # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT $node_A->safe_psql( 'postgres', " @@ -183,9 +142,6 @@ sub test_streaming $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); - check_parallel_log($node_B, $offset_B, $streaming_mode); - check_parallel_log($node_C, $offset_C, $streaming_mode); - # check the transaction state prepared on subscriber(s) $result = $node_B->safe_psql('postgres', @@ -232,16 +188,6 @@ sub test_streaming $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); - - # Reset the log verbosity. - if ($streaming_mode eq 'parallel') - { - $node_B->append_conf('postgresql.conf', "log_min_messages = warning"); - $node_B->reload; - - $node_C->append_conf('postgresql.conf', "log_min_messages = warning"); - $node_C->reload; - } } ############################### diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 2cd453c..c49cb10 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -8,28 +8,12 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -# Check the log that the streamed transaction was completed successfully -# reported by parallel apply worker. -sub check_parallel_log -{ - my ($node_subscriber, $offset, $is_parallel) = @_; - my $parallel_message = - 'finished processing the transaction finish command'; - - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/$parallel_message/, $offset); - } -} - # Encapsulate all the common test steps which are related to "streaming" # parameter so the same code can be run both for the streaming=on and # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; - - my $offset = 0; + my ($node_publisher, $node_subscriber, $appname) = @_; ############################### # Test 2PC PREPARE / COMMIT PREPARED. @@ -39,20 +23,6 @@ sub test_streaming # Expect all data is replicated on subscriber side after the commit. ############################### - # If "streaming" parameter is specified as "parallel", we need to check - # that streamed transaction was prepared using a parallel apply worker. - # We have to look for the DEBUG1 log messages about that, so bump up the - # log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # check that 2PC gets replicated to subscriber # Insert, update and delete enough rows to exceed the 64kB limit. $node_publisher->safe_psql( @@ -65,8 +35,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - # check that transaction is in prepared state on subscriber my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); @@ -101,9 +69,6 @@ sub test_streaming $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # Then insert, update and delete enough rows to exceed the 64kB limit. $node_publisher->safe_psql( 'postgres', q{ @@ -115,8 +80,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - # check that transaction is in prepared state on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); @@ -149,9 +112,6 @@ sub test_streaming # Note: both publisher and subscriber do crash/restart. ############################### - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - $node_publisher->safe_psql( 'postgres', q{ BEGIN; @@ -166,8 +126,6 @@ sub test_streaming $node_publisher->start; $node_subscriber->start; - check_parallel_log($node_subscriber, $offset, $is_parallel); - # commit post the restart $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); @@ -195,9 +153,6 @@ sub test_streaming $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # Then insert, update and delete enough rows to exceed the 64kB limit. $node_publisher->safe_psql( 'postgres', q{ @@ -209,8 +164,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - # check that transaction is in prepared state on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); @@ -253,9 +206,6 @@ sub test_streaming $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - # Check the subscriber log from now on. - $offset = -s $node_subscriber->logfile; - # Then insert, update and delete enough rows to exceed the 64kB limit. $node_publisher->safe_psql( 'postgres', q{ @@ -267,8 +217,6 @@ sub test_streaming $node_publisher->wait_for_catchup($appname); - check_parallel_log($node_subscriber, $offset, $is_parallel); - # check that transaction is in prepared state on subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); @@ -300,14 +248,6 @@ sub test_streaming $node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); $node_publisher->wait_for_catchup($appname); - - # Reset the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } } ############################### @@ -375,7 +315,7 @@ my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -398,7 +338,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); ############################### # check all the cleanup diff --git a/src/test/subscription/t/032_streaming_parallel_safety.pl b/src/test/subscription/t/032_streaming_parallel_safety.pl index 3aaa699..b87dc53 100644 --- a/src/test/subscription/t/032_streaming_parallel_safety.pl +++ b/src/test/subscription/t/032_streaming_parallel_safety.pl @@ -78,7 +78,8 @@ my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, # ============================================================================ # It is not allowed that the unique column in the relation on the # subscriber-side is not the unique column on the publisher-side. Check the -# error reported by parallel worker in this case. +# error reported by parallel worker in this case. And after retrying in +# apply worker, we check if the data is replicated successfully. # ============================================================================ # First we check the unique index on normal table. @@ -103,6 +104,10 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? duplicate key value violates unique constraint "idx_tab1"/, + $offset); + # Drop the unique index on the subscriber, now it works. $node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab1"); @@ -111,7 +116,9 @@ $node_publisher->wait_for_catchup($appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); -is($result, qq(5001), 'data replicated to subscriber after dropping index'); +is($result, qq(5001), + 'data replicated to subscriber after dropping unique index to retry apply' +); # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab1"); @@ -132,21 +139,24 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop the unique index on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP INDEX test_tab_b_partition_idx"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(5000), 'data replicated to subscriber after dropping index'); +is($result, qq(5000), + 'data replicated to subscriber after retrying because of unique index'); + +# Drop the unique index on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP INDEX test_tab_b_partition_idx"); # ============================================================================ # Triggers which execute non-immutable function are not allowed on the # subscriber side. Check the error reported by parallel worker in this case. +# And after retrying in apply worker, we check if the data is replicated +# successfully. # ============================================================================ # First we check the trigger function on normal table. @@ -167,17 +177,6 @@ CREATE TRIGGER tri_tab1_unsafe BEFORE INSERT ON public.test_tab1 FOR EACH ROW EXECUTE PROCEDURE trigger_func_tab1_unsafe(); ALTER TABLE test_tab1 ENABLE REPLICA TRIGGER tri_tab1_unsafe; - -CREATE FUNCTION trigger_func_tab1_safe() RETURNS TRIGGER AS \$\$ - BEGIN - RAISE NOTICE 'test for safe trigger function'; - RETURN NEW; - END -\$\$ language plpgsql; -ALTER FUNCTION trigger_func_tab1_safe IMMUTABLE; -CREATE TRIGGER tri_tab1_safe -BEFORE INSERT ON public.test_tab1 -FOR EACH ROW EXECUTE PROCEDURE trigger_func_tab1_safe(); }); # Check the subscriber log from now on. @@ -202,12 +201,12 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Using trigger with immutable function, now it works. -$node_subscriber->safe_psql( - 'postgres', qq{ -ALTER TABLE test_tab1 ENABLE REPLICA TRIGGER tri_tab1_safe; -DROP TRIGGER tri_tab1_unsafe ON public.test_tab1; -}); +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? duplicate key value violates unique constraint "idx_tab2"/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab2"); # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); @@ -215,15 +214,13 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(2), - 'data replicated to subscriber after using immutable expression'); + 'data replicated to subscriber after retrying because of trigger'); # Clean up test data from the environment. $node_subscriber->safe_psql( 'postgres', qq{ -DROP INDEX idx_tab2; -DROP TRIGGER tri_tab1_safe ON public.test_tab1; +DROP TRIGGER tri_tab1_unsafe ON public.test_tab1; DROP function trigger_func_tab1_unsafe; -DROP function trigger_func_tab1_safe; }); $node_publisher->safe_psql( 'postgres', qq{ @@ -256,22 +253,24 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER insert_trig ON test_tab_partition"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(0), 'data replicated to subscriber after dropping trigger'); +is($result, qq(0), + 'data replicated to subscriber after retrying because of trigger'); + +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); # ============================================================================ # It is not allowed that column default value expression contains a # non-immutable function on the subscriber side. Check the error reported by -# parallel worker in this case. +# parallel worker in this case. And after retrying in apply worker, we check +# if the data is replicated successfully. # ============================================================================ # First we check the column default value expression on normal table. @@ -309,17 +308,14 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Alter default values to immutable expression, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab1 ALTER COLUMN b SET DEFAULT 1"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after using immutable expression'); + 'data replicated to subscriber after retrying because of column default value' +); # Clean up test data from the environment. $node_subscriber->safe_psql('postgres', @@ -346,10 +342,6 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); @@ -357,12 +349,18 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscriber after retrying because of column default value' +); + +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); # ============================================================================ # It is not allowed that domain constraint expression contains a non-immutable # function on the subscriber side. Check the error reported by parallel -# worker in this case. +# worker in this case. And after retrying in apply worker, we check if the +# data is replicated successfully. # ============================================================================ # Because the column type of the partition table must be the same as its parent @@ -403,17 +401,13 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Drop domain constraint expression, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER DOMAIN tmp_domain DROP CONSTRAINT domain_check"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after using immutable expression'); + 'data replicated to subscriber after retrying because of domain'); # Clean up test data from the environment. $node_subscriber->safe_psql('postgres', @@ -426,9 +420,10 @@ TRUNCATE TABLE test_tab2; $node_publisher->wait_for_catchup($appname); # ============================================================================ -# It is not allowed that constraint expression contains a non-immutable function -# on the subscriber side. Check the error reported by parallel worker in this -# case. +# It is not allowed that constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by parallel +# worker in this case. And after retrying in apply worker, we check if the +# data is replicated successfully. # ============================================================================ # First we check the constraint expression on normal table. @@ -461,24 +456,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Alter constraint expression to immutable expression, now it works. -$node_subscriber->safe_psql( - 'postgres', qq{ -ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_unsafe; -ALTER TABLE test_tab1 ADD CONSTRAINT const_tab1_safe CHECK(a >= 0); -}); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after using immutable expression'); + 'data replicated to subscriber after retrying because of constraint'); # Clean up test data from the environment. $node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_safe"); + "ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_unsafe"); $node_publisher->safe_psql( 'postgres', qq{ TRUNCATE TABLE test_tab1; @@ -501,10 +489,6 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); @@ -512,11 +496,16 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(0), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscriber after retrying because of constraint'); + +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); # ============================================================================ # It is not allowed that foreign key on the subscriber side. Check the error -# reported by parallel worker in this case. +# reported by parallel worker in this case. And after retrying in apply +# worker, we check if the data is replicated successfully. # ============================================================================ # First we check the foreign key on normal table. @@ -558,9 +547,13 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab1 DROP CONSTRAINT test_tab1fk"); +# Wait for error log to make sure the dependent data has been deleted. +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? insert or update on table "test_tab1" violates foreign key constraint "test_tab1fk"/, + $offset); + +# Insert dependent data on the publisher, now it works. +$node_subscriber->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); @@ -568,9 +561,11 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(1), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscriber after retrying because of foreign key'); # Clean up test data from the environment. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab1 DROP CONSTRAINT test_tab1fk"); $node_publisher->safe_psql( 'postgres', qq{ TRUNCATE TABLE test_tab1; @@ -582,6 +577,7 @@ $node_publisher->wait_for_catchup($appname); $node_subscriber->safe_psql( 'postgres', qq{ CREATE TABLE test_tab_partition_f (a int primary key); +INSERT INTO test_tab_partition_f SELECT i FROM generate_series(1, 5000) s(i); ALTER TABLE test_tab_partition ADD CONSTRAINT test_tab_patition_fk FOREIGN KEY(a) REFERENCES test_tab_partition_f(a); SELECT 'ALTER TABLE test_tab_partition ENABLE REPLICA TRIGGER "' || tgname || '"' FROM pg_trigger WHERE tgrelid = 'test_tab_partition'::regclass::oid \\gexec }); @@ -597,10 +593,6 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); @@ -608,7 +600,11 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscribers after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); $node_subscriber->stop; $node_publisher->stop; -- 2.7.2.windows.1