From 25d936d1190c4ba6ec4c6cacb72e96b73f85c598 Mon Sep 17 00:00:00 2001 From: wangw Date: Wed, 15 Jun 2022 11:02:08 +0800 Subject: [PATCH v15 4/4] Retry to apply streaming xact only in apply worker If the user sets the subscription_parameter "streaming" to "parallel", when applying a streaming transaction, we will try to apply this transaction in apply background worker. However, when the changes in this transaction cannot be applied in apply background worker, the background worker will exit with an error. In this case, we can retry applying this streaming transaction in "on" mode. In this way, we may avoid blocking logical replication here. So we introduce field "subretry" in catalog "pg_subscription". When the subscriber exit with an error, we will try to set this flag to true, and when the transaction is applied successfully, we will try to set this flag to false. Then when we try to apply a streaming transaction in apply background worker, we can see if this transaction has failed before based on the "subretry" field. --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 4 +- src/backend/commands/subscriptioncmds.c | 1 + .../replication/logical/applybgworker.c | 15 +- src/backend/replication/logical/worker.c | 89 ++++++++++ src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 4 + .../subscription/t/032_streaming_apply.pl | 154 +++++++++++------- 10 files changed, 221 insertions(+), 66 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 815cae6082..28f3d121d9 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7907,6 +7907,15 @@ SCRAM-SHA-256$<iteration count>:&l + + + subretry bool + + + True if the previous apply change failed and a retry was required. + + + subconninfo text diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 270e3d382e..bd5361991e 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -244,6 +244,11 @@ CREATE SUBSCRIPTION subscription_nameparallel mode is disregarded when retrying; + instead the transaction will be applied using on + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8856ce3b50..9b7f09653d 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fedaed533b..10f4dd6785 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, subslotname, - subsynccommit, subpublications) + subbinary, substream, subtwophasestate, subdisableonerr, + subretry, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5f349067cc..33aef31b30 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -662,6 +662,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applybgworker.c b/src/backend/replication/logical/applybgworker.c index 92b4b4e4ed..9bcc8669fb 100644 --- a/src/backend/replication/logical/applybgworker.c +++ b/src/backend/replication/logical/applybgworker.c @@ -106,6 +106,18 @@ apply_bgworker_can_start(TransactionId xid) if (!XLogRecPtrIsInvalid(MySubscription->skiplsn)) return false; + /* + * Don't use apply background workers for retries, because it is possible + * that the last time we tried to apply a transaction using an apply + * background worker the checks failed (see function + * apply_bgworker_relation_check). + */ + if (MySubscription->retry) + { + elog(DEBUG1, "apply background workers are not used for retries"); + return false; + } + /* * For streaming transactions that are being applied in apply background * worker, we cannot decide whether to apply the change for a relation @@ -812,6 +824,5 @@ apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) "mode", rel->remoterel.nspname, rel->remoterel.relname), errdetail("The unique column on subscriber is not the unique " "column on publisher or there is at least one " - "non-immutable function."), - errhint("Please change the streaming option to 'on' instead of 'parallel'."))); + "non-immutable function."))); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 81e0005d12..a4aa6de533 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -378,6 +378,8 @@ static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +static void set_subscription_retry(bool retry); + /* * Should this worker apply changes for given relation. * @@ -904,6 +906,9 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Check the status of apply background worker if any. */ apply_bgworker_check_status(); @@ -1015,6 +1020,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Check the status of apply background worker if any. */ apply_bgworker_check_status(); @@ -1068,6 +1076,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1123,6 +1134,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1215,6 +1229,9 @@ apply_handle_stream_prepare(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } in_remote_transaction = false; @@ -1642,6 +1659,9 @@ apply_handle_stream_abort(StringInfo s) */ serialize_stream_abort(xid, subxid); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } reset_apply_error_context_info(); @@ -1854,6 +1874,9 @@ apply_handle_stream_commit(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } /* Check the status of apply background worker if any. */ @@ -3897,6 +3920,9 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* Set the retry flag. */ + set_subscription_retry(true); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -3935,6 +3961,9 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* Set the retry flag. */ + set_subscription_retry(true); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -4461,3 +4490,63 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + if (MySubscription->retry == retry || + am_apply_bgworker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* reset subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 24927641b9..9ce774fc39 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4471,8 +4471,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index d54540f5f5..5f4e058ec1 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -76,6 +76,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry BKI_DEFAULT(f); /* True if the previous apply change failed. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -116,6 +118,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if the previous apply change + * failed. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/subscription/t/032_streaming_apply.pl b/src/test/subscription/t/032_streaming_apply.pl index eca4328676..1bcb4c65b7 100644 --- a/src/test/subscription/t/032_streaming_apply.pl +++ b/src/test/subscription/t/032_streaming_apply.pl @@ -57,8 +57,13 @@ $node_subscriber->safe_psql( $node_publisher->wait_for_catchup($appname); +# ============================================================================ # It is not allowed that the unique index on the publisher and the subscriber -# is different. Check the error reported by background worker in this case. +# is different. Check the error reported by background worker in this case. And +# after retrying in apply worker, we check if the data is replicated +# successfully. +# ============================================================================ + # First we check the unique index on normal table. $node_subscriber->safe_psql('postgres', "CREATE UNIQUE INDEX test_tab_b_idx ON test_tab (b)"); @@ -82,14 +87,15 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" in parallel mode/, $offset); -# Drop the unique index on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_idx"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); -is($result, qq(5000), 'data replicated to subscriber after dropping index'); +is($result, qq(5000), 'data replicated to subscribers after retrying because of unique index'); + +# Drop the unique index on the subscriber. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_idx"); # Then we check the unique index on partition table. $node_subscriber->safe_psql('postgres', @@ -106,17 +112,20 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" in parallel mode/, $offset); -# Drop the unique index on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(5000), 'data replicated to subscriber after dropping index'); +is($result, qq(5000), 'data replicated to subscribers after retrying because of unique index'); + +# Drop the unique index on the subscriber. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); # Triggers which execute non-immutable function are not allowed on the # subscriber side. Check the error reported by background worker in this case. +# And after retrying in apply worker, we check if the data is replicated +# successfully. # First we check the trigger function on normal table. $node_subscriber->safe_psql( 'postgres', qq{ @@ -140,15 +149,16 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" in parallel mode/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER insert_trig ON test_tab"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); -is($result, qq(0), 'data replicated to subscriber after dropping trigger'); +is($result, qq(0), 'data replicated to subscribers after retrying because of trigger'); + +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab"); # Then we check the trigger function on partition table. $node_subscriber->safe_psql( @@ -168,19 +178,24 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" in parallel mode/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER insert_trig ON test_tab_partition"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(0), 'data replicated to subscriber after dropping trigger'); +is($result, qq(0), 'data replicated to subscribers after retrying because of trigger'); +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); + +# ============================================================================ # It is not allowed that column default value expression contains a # non-immutable function on the subscriber side. Check the error reported by -# background worker in this case. +# background worker in this case. And after retrying in apply worker, we check +# if the data is replicated successfully. +# ============================================================================ + # First we check the column default value expression on normal table. $node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab ALTER COLUMN b SET DEFAULT random()"); @@ -196,16 +211,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" in parallel mode/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab ALTER COLUMN b DROP DEFAULT"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(5000), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscribers after retrying because of column default value'); + +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN b DROP DEFAULT"); # Then we check the column default value expression on partition table. $node_subscriber->safe_psql('postgres', @@ -222,20 +238,25 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" in parallel mode/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscribers after retrying because of column default value'); + +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); +# ============================================================================ # It is not allowed that domain constraint expression contains a non-immutable # function on the subscriber side. Check the error reported by background -# worker in this case. +# worker in this case. And after retrying in apply worker, we check if the data +# is replicated successfully. +# ============================================================================ + # Because the column type of the partition table must be the same as its parent # table, only test normal table here. $node_subscriber->safe_psql( @@ -253,21 +274,26 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" in parallel mode/, $offset); -# Drop domain constraint expression on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab ALTER COLUMN a TYPE int"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(0), - 'data replicated to subscriber after dropping domain constraint expression' + 'data replicated to subscribers after retrying because of domain' ); -# It is not allowed that constraint expression contains a non-immutable function -# on the subscriber side. Check the error reported by background worker in this -# case. +# Drop domain constraint expression on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN a TYPE int"); + +# ============================================================================ +# It is not allowed that constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by background +# worker in this case. And after retrying in apply worker, we check if the data +# is replicated successfully. +# ============================================================================ + # First we check the constraint expression on normal table. $node_subscriber->safe_psql( 'postgres', qq{ @@ -285,16 +311,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" in parallel mode/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab DROP CONSTRAINT test_tab_con"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(5000), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscribers after retrying because of constraint'); + +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tab_con"); # Then we check the constraint expression on partition table. $node_subscriber->safe_psql( @@ -311,19 +338,24 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" in parallel mode/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(0), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscribers after retrying because of constraint'); +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); + +# ============================================================================ # It is not allowed that foreign key on the subscriber side. Check the error -# reported by background worker in this case. +# reported by background worker in this case. And after retrying in apply +# worker, we check if the data is replicated successfully. +# ============================================================================ + # First we check the foreign key on normal table. $node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); $node_publisher->wait_for_catchup($appname); @@ -344,16 +376,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab" in parallel mode/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab DROP CONSTRAINT test_tabfk"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(5000), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscribers after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tabfk"); # Then we check the foreign key on partition table. $node_publisher->wait_for_catchup($appname); @@ -374,16 +407,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" in parallel mode/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscribers after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); $node_subscriber->stop; $node_publisher->stop; -- 2.23.0.windows.1