From 2573d3fd6989d1a86f865d05a7b363209e8a8dd4 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 21 Nov 2022 20:16:17 +0800 Subject: [PATCH v51 4/4] Retry to apply streaming xact only in apply worker When the subscription parameter is set streaming=parallel, the logic tries to apply the streaming transaction using a parallel apply worker. If this fails the parallel worker exits with an error. In this case, retry applying the streaming transaction using the normal streaming=on mode. This is done to avoid getting caught in a loop of the same retry errors. A new flag field "subretry" has been introduced to catalog "pg_subscription". If there are any active parallel apply workers and the subscriber exits with an error, this flag will be set true, and whenever the transaction is applied successfully, this flag is reset false. Now, when deciding how to apply a streaming transaction, the logic can know if this transaction has previously failed or not (by checking the "subretry" field). --- doc/src/sgml/catalogs.sgml | 10 ++ doc/src/sgml/logical-replication.sgml | 11 +- doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 1 + .../replication/logical/applyparallelworker.c | 30 ++++ src/backend/replication/logical/worker.c | 182 +++++++++++++++------ src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 7 + src/include/replication/worker_internal.h | 2 + src/test/subscription/t/015_stream.pl | 8 + .../subscription/t/032_streaming_parallel_apply.pl | 18 +- 13 files changed, 223 insertions(+), 59 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 130361f..6912571 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7940,6 +7940,16 @@ SCRAM-SHA-256$<iteration count>:&l + subretry bool + + + True if previous change failed to be applied while there were any + active parallel apply workers, necessitating a retry. + + + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f366710..1311f33 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1500,12 +1500,11 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER When the streaming mode is parallel, the finish LSN of - failed transactions may not be logged. In that case, it may be necessary to - change the streaming mode to on or off and - cause the same conflicts again so the finish LSN of the failed transaction will - be written to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... - SKIP. + failed transactions may not be logged. In that case, the failed transaction + will be retried in streaming = on mode. If it fails + again, the finish LSN of the failed transaction will be written to the + server log. For the usage of finish LSN, please refer to + ALTER SUBSCRIPTION ... SKIP. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index f173a09..64d5802 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -251,6 +251,11 @@ CREATE SUBSCRIPTION subscription_nameparallel mode is disregarded when retrying; + instead the transaction will be applied using on + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a506fc3..723b141 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2d8104b..c2c7d8a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1301,7 +1301,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, - subslotname, subsynccommit, subpublications, suborigin) + subretry, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9e4b676..b03e37c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -691,6 +691,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 7a406b8..3a65997 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -224,6 +224,17 @@ pa_can_start(TransactionId xid) if (!AllTablesyncsReady()) return false; + /* + * Don't use parallel apply workers for retries, because it is possible + * that a deadlock was detected the last time we tried to apply a + * transaction using a parallel apply worker. + */ + if (MySubscription->retry) + { + elog(DEBUG1, "parallel apply workers are not used for retries"); + return false; + } + return true; } @@ -1492,3 +1503,22 @@ pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode) UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, PARALLEL_APPLY_LOCK_XACT, lockmode); } + +/* Check if any active parallel apply workers. */ +bool +pa_have_active_worker(void) +{ + ListCell *lc; + + foreach(lc, ParallelApplyWorkersList) + { + ParallelApplyWorkerInfo *tmp_winfo; + + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + if (tmp_winfo->in_use) + return true; + } + + return false; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 27c76c8..4e216eb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -410,7 +410,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn); -static void DisableSubscriptionAndExit(void); +static void DisableSubscriptionOnError(void); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -447,6 +447,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void set_subscription_retry(bool retry); + /* * Return the name of the logical replication worker. */ @@ -1040,6 +1042,9 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -1149,6 +1154,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1205,6 +1213,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1266,6 +1277,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1391,6 +1405,9 @@ apply_handle_stream_prepare(StringInfo s) break; } + /* Reset the retry flag. */ + set_subscription_retry(false); + pgstat_report_stat(false); /* Process any tables that are being synchronized in parallel. */ @@ -1880,6 +1897,10 @@ apply_handle_stream_abort(StringInfo s) break; } + /* Reset the retry flag. */ + if (toplevel_xact) + set_subscription_retry(false); + reset_apply_error_context_info(); } @@ -2123,6 +2144,9 @@ apply_handle_stream_commit(StringInfo s) break; } + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -4191,20 +4215,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); + + apply_worker_clean_exit(); } PG_END_TRY(); @@ -4229,20 +4261,27 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); } PG_END_TRY(); } @@ -4530,39 +4569,20 @@ ApplyWorkerMain(Datum main_arg) } /* - * After error recovery, disable the subscription in a new transaction - * and exit cleanly. + * Disable the subscription in a new transaction. */ static void -DisableSubscriptionAndExit(void) +DisableSubscriptionOnError(void) { - /* - * Emit the error message, and recover from the error state to an idle - * state - */ - HOLD_INTERRUPTS(); - - EmitErrorReport(); - AbortOutOfAnyTransaction(); - FlushErrorState(); - - RESUME_INTERRUPTS(); - - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - /* Disable the subscription */ StartTransactionCommand(); DisableSubscription(MySubscription->oid); CommitTransactionCommand(); - /* Notify the subscription has been disabled and exit */ + /* Notify the subscription has been disabled */ ereport(LOG, errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); - - apply_worker_clean_exit(); } /* @@ -4858,3 +4878,71 @@ apply_worker_clean_exit(void) proc_exit(0); } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + /* Fast path - if no state change then nothing to do */ + if (MySubscription->retry == retry) + return; + + /* Fast path - skip for parallel apply workers */ + if (am_parallel_apply_worker()) + return; + + /* Fast path - skip set retry if no active parallel apply workers */ + if (retry && !pa_have_active_worker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 5141a66..76e135c 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4508,8 +4508,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 016afbc..eb78a32 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry BKI_DEFAULT(f); /* True if previous change failed + * to be applied while there were + * any active parallel apply + * workers */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +136,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if previous change failed to be + * applied using a parallel apply worker */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7250f7a..6a286c6 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -289,6 +289,8 @@ extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode); extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode); extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode); +extern bool pa_have_active_worker(void); + #define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid) static inline bool diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 65f43f0..64b2edb 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -87,8 +87,16 @@ sub test_streaming 'check extra columns contain local defaults'); # Test the streaming in binary mode + my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + $node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; # Check the subscriber log from now on. $offset = -s $node_subscriber->logfile; diff --git a/src/test/subscription/t/032_streaming_parallel_apply.pl b/src/test/subscription/t/032_streaming_parallel_apply.pl index 10efd2d..d611dc2 100644 --- a/src/test/subscription/t/032_streaming_parallel_apply.pl +++ b/src/test/subscription/t/032_streaming_parallel_apply.pl @@ -21,9 +21,9 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; -# Check if any streaming chunks are applied using the parallel apply worker. We -# have to look for the DEBUG1 log messages about that, so bump up the log -# verbosity. +# Check if any streaming chunks are applied using the parallel apply worker. +# And check if the parallel apply worker failed to start. We have to look for +# the DEBUG1 log messages about that, so bump up the log verbosity. $node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); $node_subscriber->start; @@ -89,6 +89,12 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, $offset); +# Check if the parallel apply worker is not started because the above +# transaction failed to be applied. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? parallel apply workers are not used for retries/, + $offset); + # Drop the unique index on the subscriber, now it works. $node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); @@ -136,6 +142,12 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, $offset); +# Check if the parallel apply worker is not started because the above +# transaction failed to be applied. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? parallel apply workers are not used for retries/, + $offset); + # Drop the unique index on the subscriber, now it works. $node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); -- 2.7.2.windows.1