From 64b8d2a86e5001f5e10f2ada88be9ae16d7b16e4 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 24 Aug 2022 21:07:44 +0800 Subject: [PATCH v24 4/5] Retry to apply streaming xact only in apply worker When the subscription parameter is set streaming=parallel, the logic tries to apply the streaming transaction using an apply background worker. If this fails the background worker exits with an error. In this case, retry applying the streaming transaction using the normal streaming=on mode. This is done to avoid getting caught in a loop of the same retry errors. A new flag field "subretry" has been introduced to catalog "pg_subscription". If the subscriber exits with an error, this flag will be set true, and whenever the transaction is applied successfully, this flag is reset false. Now, when deciding how to apply a streaming transaction, the logic can know if this transaction has previously failed or not (by checking the "subretry" field). --- doc/src/sgml/catalogs.sgml | 9 ++ doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 1 + src/backend/replication/logical/applybgworker.c | 16 +- src/backend/replication/logical/worker.c | 188 ++++++++++++++++++------ src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 4 + src/test/subscription/t/032_streaming_apply.pl | 134 ++++++++--------- 10 files changed, 237 insertions(+), 128 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 673c812..35335e6 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7919,6 +7919,15 @@ SCRAM-SHA-256$<iteration count>:&l + subretry bool + + + True if the previous apply change failed, necessitating a retry + + + + + subconninfo text diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 47bcaa2..cb201a8 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -245,6 +245,11 @@ CREATE SUBSCRIPTION subscription_nameparallel mode is disregarded when retrying; + instead the transaction will be applied using on + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a506fc3..723b141 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5a844b6..652d76f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1299,7 +1299,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, - subslotname, subsynccommit, subpublications, suborigin) + subretry, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3dc03dc..ac92cbb 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -689,6 +689,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applybgworker.c b/src/backend/replication/logical/applybgworker.c index d35dd74..9d905c8 100644 --- a/src/backend/replication/logical/applybgworker.c +++ b/src/backend/replication/logical/applybgworker.c @@ -131,6 +131,18 @@ apply_bgworker_can_start(TransactionId xid) return false; /* + * Don't use apply background workers for retries, because it is possible + * that the last time we tried to apply a transaction using an apply + * background worker the checks failed (see function + * apply_bgworker_relation_check). + */ + if (MySubscription->retry) + { + elog(DEBUG1, "apply background workers are not used for retries"); + return false; + } + + /* * For streaming transactions that are being applied using apply background * worker, we cannot decide whether to apply the change for a relation that * is not in the READY state (see should_apply_changes_for_rel) as we won't @@ -1019,7 +1031,5 @@ apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) "streaming = parallel"), errdetail("The unique column on subscriber is not the unique " "column on publisher or there is at least one " - "non-immutable function."), - errhint("Please change to use subscription parameter %s.", - "streaming = on"))); + "non-immutable function."))); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b1d899c..f05715c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -340,7 +340,7 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); -static void DisableSubscriptionAndExit(void); +static void DisableSubscriptionOnError(void); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -377,6 +377,8 @@ static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +static void set_subscription_retry(bool retry); + /* * Should this worker apply changes for given relation. * @@ -930,6 +932,9 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -1039,6 +1044,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1089,6 +1097,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1144,6 +1155,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1209,6 +1223,9 @@ apply_handle_stream_prepare(StringInfo s) apply_bgworker_free(winfo); store_flush_position(prepare_data.end_lsn); + + /* Reset the retry flag. */ + set_subscription_retry(false); } else { @@ -1227,6 +1244,9 @@ apply_handle_stream_prepare(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); + + /* Reset the retry flag. */ + set_subscription_retry(false); } pgstat_report_stat(false); @@ -1671,14 +1691,22 @@ apply_handle_stream_abort(StringInfo s) apply_bgworker_wait_for(winfo, APPLY_BGWORKER_FINISHED); apply_bgworker_free(winfo); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } else + { /* * We are in main apply worker and the transaction has been * serialized to file. */ serialize_stream_abort(xid, subxid); + /* Reset the retry flag. */ + set_subscription_retry(false); + } + reset_apply_error_context_info(); } @@ -1859,6 +1887,9 @@ apply_handle_stream_commit(StringInfo s) * subskiplsn. */ clear_subscription_skip_lsn(commit_data.commit_lsn); + + /* Reset the retry flag. */ + set_subscription_retry(false); } else { @@ -1872,6 +1903,9 @@ apply_handle_stream_commit(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + /* Reset the retry flag. */ + set_subscription_retry(false); } /* Process any tables that are being synchronized in parallel. */ @@ -3919,20 +3953,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); + + proc_exit(0); } PG_END_TRY(); @@ -3957,20 +3999,27 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); } PG_END_TRY(); } @@ -4232,39 +4281,20 @@ ApplyWorkerMain(Datum main_arg) } /* - * After error recovery, disable the subscription in a new transaction - * and exit cleanly. + * Disable the subscription in a new transaction. */ static void -DisableSubscriptionAndExit(void) +DisableSubscriptionOnError(void) { - /* - * Emit the error message, and recover from the error state to an idle - * state - */ - HOLD_INTERRUPTS(); - - EmitErrorReport(); - AbortOutOfAnyTransaction(); - FlushErrorState(); - - RESUME_INTERRUPTS(); - - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - /* Disable the subscription */ StartTransactionCommand(); DisableSubscription(MySubscription->oid); CommitTransactionCommand(); - /* Notify the subscription has been disabled and exit */ + /* Notify the subscription has been disabled */ ereport(LOG, errmsg("logical replication subscription \"%s\" has been disabled due to an error", MySubscription->name)); - - proc_exit(0); } /* @@ -4499,3 +4529,67 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + /* Fast path - if no state change then nothing to do */ + if (MySubscription->retry == retry) + return; + + /* Fast path - skip for apply background workers */ + if (am_apply_bgworker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Reset subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 46a3d01..426cb9f 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4507,8 +4507,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index f4e1e94..0969b9f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry BKI_DEFAULT(f); /* True if the previous apply change failed. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +133,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if the previous apply change + * failed. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/subscription/t/032_streaming_apply.pl b/src/test/subscription/t/032_streaming_apply.pl index 05d3e86..01c231e 100644 --- a/src/test/subscription/t/032_streaming_apply.pl +++ b/src/test/subscription/t/032_streaming_apply.pl @@ -81,7 +81,8 @@ my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, # ============================================================================ # It is not allowed that the unique column in the relation on the # subscriber-side is not the unique column on the publisher-side. Check the -# error reported by background worker in this case. +# error reported by background worker in this case. And after retrying in +# apply worker, we check if the data is replicated successfully. # ============================================================================ # First we check the unique index on normal table. @@ -106,6 +107,10 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? duplicate key value violates unique constraint "idx_tab1"/, + $offset); + # Drop the unique index on the subscriber, now it works. $node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab1"); @@ -114,7 +119,7 @@ $node_publisher->wait_for_catchup($appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); -is($result, qq(5001), 'data replicated to subscriber after dropping index'); +is($result, qq(5001), 'data replicated to subscriber after dropping unique index to retry apply'); # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab1"); @@ -135,19 +140,21 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop the unique index on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(5000), 'data replicated to subscriber after dropping index'); +is($result, qq(5000), 'data replicated to subscriber after retrying because of unique index'); + +# Drop the unique index on the subscriber. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); # ============================================================================ # Triggers which execute non-immutable function are not allowed on the # subscriber side. Check the error reported by background worker in this case. +# And after retrying in apply worker, we check if the data is replicated +# successfully. # ============================================================================ # First we check the trigger function on normal table. @@ -168,17 +175,6 @@ CREATE TRIGGER tri_tab1_unsafe BEFORE INSERT ON public.test_tab1 FOR EACH ROW EXECUTE PROCEDURE trigger_func_tab1_unsafe(); ALTER TABLE test_tab1 ENABLE REPLICA TRIGGER tri_tab1_unsafe; - -CREATE FUNCTION trigger_func_tab1_safe() RETURNS TRIGGER AS \$\$ - BEGIN - RAISE NOTICE 'test for safe trigger function'; - RETURN NEW; - END -\$\$ language plpgsql; -ALTER FUNCTION trigger_func_tab1_safe IMMUTABLE; -CREATE TRIGGER tri_tab1_safe -BEFORE INSERT ON public.test_tab1 -FOR EACH ROW EXECUTE PROCEDURE trigger_func_tab1_safe(); }); # Check the subscriber log from now on. @@ -203,27 +199,25 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Using trigger with immutable function, now it works. -$node_subscriber->safe_psql( - 'postgres', qq{ -ALTER TABLE test_tab1 ENABLE REPLICA TRIGGER tri_tab1_safe; -DROP TRIGGER tri_tab1_unsafe ON public.test_tab1; -}); +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? duplicate key value violates unique constraint "idx_tab2"/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab2"); # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); -is($result, qq(2), 'data replicated to subscriber after using immutable expression'); +is($result, qq(2), 'data replicated to subscriber after retrying because of trigger'); # Clean up test data from the environment. $node_subscriber->safe_psql( 'postgres', qq{ -DROP INDEX idx_tab2; -DROP TRIGGER tri_tab1_safe ON public.test_tab1; +DROP TRIGGER tri_tab1_unsafe ON public.test_tab1; DROP function trigger_func_tab1_unsafe; -DROP function trigger_func_tab1_safe; }); $node_publisher->safe_psql( 'postgres', qq{ @@ -256,21 +250,22 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER insert_trig ON test_tab_partition"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(0), 'data replicated to subscriber after dropping trigger'); +is($result, qq(0), 'data replicated to subscriber after retrying because of trigger'); + +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); # ============================================================================ # It is not allowed that column default value expression contains a # non-immutable function on the subscriber side. Check the error reported by -# background worker in this case. +# background worker in this case. And after retrying in apply worker, we check +# if the data is replicated successfully. # ============================================================================ # First we check the column default value expression on normal table. @@ -308,17 +303,13 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Alter default values to immutable expression, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab1 ALTER COLUMN b SET DEFAULT 1"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after using immutable expression'); + 'data replicated to subscriber after retrying because of column default value'); # Clean up test data from the environment. $node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP COLUMN b"); @@ -344,22 +335,23 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscriber after retrying because of column default value'); + +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); # ============================================================================ # It is not allowed that domain constraint expression contains a non-immutable # function on the subscriber side. Check the error reported by background -# worker in this case. +# worker in this case. And after retrying in apply worker, we check if the +# data is replicated successfully. # ============================================================================ # Because the column type of the partition table must be the same as its parent @@ -400,17 +392,13 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Drop domain constraint expression, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER DOMAIN tmp_domain DROP CONSTRAINT domain_check"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after using immutable expression' + 'data replicated to subscriber after retrying because of domain' ); # Clean up test data from the environment. @@ -424,9 +412,10 @@ TRUNCATE TABLE test_tab2; $node_publisher->wait_for_catchup($appname); # ============================================================================ -# It is not allowed that constraint expression contains a non-immutable function -# on the subscriber side. Check the error reported by background worker in this -# case. +# It is not allowed that constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by background +# worker in this case. And after retrying in apply worker, we check if the +# data is replicated successfully. # ============================================================================ # First we check the constraint expression on normal table. @@ -459,23 +448,16 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Alter constraint expression to immutable expression, now it works. -$node_subscriber->safe_psql( - 'postgres', qq{ -ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_unsafe; -ALTER TABLE test_tab1 ADD CONSTRAINT const_tab1_safe CHECK(a >= 0); -}); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after using immutable expression'); + 'data replicated to subscriber after retrying because of constraint'); # Clean up test data from the environment. -$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_safe"); +$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_unsafe"); $node_publisher->safe_psql( 'postgres', qq{ TRUNCATE TABLE test_tab1; @@ -498,21 +480,22 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(0), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscriber after retrying because of constraint'); + +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); # ============================================================================ # It is not allowed that foreign key on the subscriber side. Check the error -# reported by background worker in this case. +# reported by background worker in this case. And after retrying in apply +# worker, we check if the data is replicated successfully. # ============================================================================ # First we check the foreign key on normal table. @@ -553,8 +536,8 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming = parallel/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT test_tab1fk"); +# Insert dependent data on the publisher, now it works. +$node_subscriber->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); @@ -562,9 +545,10 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(1), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscriber after retrying because of foreign key'); # Clean up test data from the environment. +$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT test_tab1fk"); $node_publisher->safe_psql( 'postgres', qq{ TRUNCATE TABLE test_tab1; @@ -590,17 +574,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming = parallel/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); - # Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscribers after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); $node_subscriber->stop; $node_publisher->stop; -- 2.7.2.windows.1