From baf4ff2650b967113990f8d14a374419d82b4317 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 30 Nov 2022 20:52:29 +0800 Subject: [PATCH v58 6/7] Retry to apply streaming xact only in apply worker When the subscription parameter is set streaming=parallel, the logic tries to apply the streaming transaction using a parallel apply worker. If this fails the parallel worker exits with an error. In this case, retry applying the streaming transaction using the normal streaming=on mode. This is done to avoid getting caught in a loop of the same retry errors. A new flag field "subretry" has been introduced to catalog "pg_subscription". If there are any active parallel apply workers and the subscriber exits with an error, this flag will be set true, and whenever the transaction is applied successfully, this flag is reset false. Now, when deciding how to apply a streaming transaction, the logic can know if this transaction has previously failed or not (by checking the "subretry" field). --- doc/src/sgml/catalogs.sgml | 10 + doc/src/sgml/logical-replication.sgml | 11 +- doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 1 + .../replication/logical/applyparallelworker.c | 30 +++ src/backend/replication/logical/worker.c | 182 +++++++++++++----- src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 7 + src/include/replication/worker_internal.h | 2 + src/test/subscription/t/015_stream.pl | 8 + .../t/032_stream_parallel_conflict.pl | 69 ++++++- 13 files changed, 274 insertions(+), 59 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 130361f31a..6912571e2d 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7938,6 +7938,16 @@ SCRAM-SHA-256$<iteration count>:&l + + + subretry bool + + + True if previous change failed to be applied while there were any + active parallel apply workers, necessitating a retry. + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f3667101e2..1311f3354f 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1500,12 +1500,11 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER When the streaming mode is parallel, the finish LSN of - failed transactions may not be logged. In that case, it may be necessary to - change the streaming mode to on or off and - cause the same conflicts again so the finish LSN of the failed transaction will - be written to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... - SKIP. + failed transactions may not be logged. In that case, the failed transaction + will be retried in streaming = on mode. If it fails + again, the finish LSN of the failed transaction will be written to the + server log. For the usage of finish LSN, please refer to + ALTER SUBSCRIPTION ... SKIP. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index f173a09037..64d5802add 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -251,6 +251,11 @@ CREATE SUBSCRIPTION subscription_nameparallel mode is disregarded when retrying; + instead the transaction will be applied using on + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a506fc3ec8..723b141c74 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2d8104b090..c2c7d8a8b7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1301,7 +1301,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, - subslotname, subsynccommit, subpublications, suborigin) + subretry, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0cecbc7390..f7227aad3e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -634,6 +634,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index a49d08c70a..12f0243030 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -298,6 +298,17 @@ pa_can_start(TransactionId xid) if (!AllTablesyncsReady()) return false; + /* + * Don't use parallel apply workers for retries, because it is possible + * that a deadlock was detected the last time we tried to apply a + * transaction using a parallel apply worker. + */ + if (MySubscription->retry) + { + elog(DEBUG1, "parallel apply workers are not used for retries"); + return false; + } + return true; } @@ -1582,3 +1593,22 @@ pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode) UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, PARALLEL_APPLY_LOCK_XACT, lockmode); } + +/* Check if any active parallel apply workers. */ +bool +pa_have_active_worker(void) +{ + ListCell *lc; + + foreach(lc, ParallelApplyWorkerPool) + { + ParallelApplyWorkerInfo *tmp_winfo; + + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + if (tmp_winfo->in_use) + return true; + } + + return false; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 81f21fbd58..485de65333 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -393,7 +393,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); -static void DisableSubscriptionAndExit(void); +static void DisableSubscriptionOnError(void); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -430,6 +430,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void set_subscription_retry(bool retry); + /* * Return the name of the logical replication worker. */ @@ -1036,6 +1038,9 @@ apply_handle_commit(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); + /* Reset the retry flag. */ + set_subscription_retry(false); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -1142,6 +1147,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1198,6 +1206,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1259,6 +1270,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1418,6 +1432,9 @@ apply_handle_stream_prepare(StringInfo s) break; } + /* Reset the retry flag. */ + set_subscription_retry(false); + pgstat_report_stat(false); /* Process any tables that are being synchronized in parallel. */ @@ -1941,6 +1958,10 @@ apply_handle_stream_abort(StringInfo s) break; } + /* Reset the retry flag. */ + if (toplevel_xact) + set_subscription_retry(false); + reset_apply_error_context_info(); } @@ -2250,6 +2271,9 @@ apply_handle_stream_commit(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); + /* Reset the retry flag. */ + set_subscription_retry(false); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -4324,20 +4348,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); + + apply_worker_exit(false); } PG_END_TRY(); @@ -4362,20 +4394,27 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); } PG_END_TRY(); } @@ -4656,39 +4695,20 @@ ApplyWorkerMain(Datum main_arg) } /* - * After error recovery, disable the subscription in a new transaction - * and exit cleanly. + * Disable the subscription in a new transaction. */ static void -DisableSubscriptionAndExit(void) +DisableSubscriptionOnError(void) { - /* - * Emit the error message, and recover from the error state to an idle - * state - */ - HOLD_INTERRUPTS(); - - EmitErrorReport(); - AbortOutOfAnyTransaction(); - FlushErrorState(); - - RESUME_INTERRUPTS(); - - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - /* Disable the subscription */ StartTransactionCommand(); DisableSubscription(MySubscription->oid); CommitTransactionCommand(); - /* Notify the subscription has been disabled and exit */ + /* Notify the subscription has been disabled */ ereport(LOG, errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); - - apply_worker_exit(false); } /* @@ -5008,3 +5028,71 @@ apply_worker_exit(bool on_subinfo_change) proc_exit(0); } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + /* Fast path - if no state change then nothing to do */ + if (MySubscription->retry == retry) + return; + + /* Fast path - skip for parallel apply workers */ + if (am_parallel_apply_worker()) + return; + + /* Fast path - skip set retry if no active parallel apply workers */ + if (retry && !pa_have_active_worker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fec6d21c29..8c2cc78a2d 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4543,8 +4543,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 5b9e5a399a..d3b158f0c0 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry BKI_DEFAULT(f); /* True if previous change failed + * to be applied while there were + * any active parallel apply + * workers */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +136,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if previous change failed to be + * applied using a parallel apply worker */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9bdc664546..a90b819b63 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -290,6 +290,8 @@ extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode); extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode); extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode); +extern bool pa_have_active_worker(void); + #define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid) static inline bool diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 38941ffc01..890fa88d29 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -87,8 +87,16 @@ sub test_streaming 'check extra columns contain local defaults'); # Test the streaming in binary mode + my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + $node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; # Check the subscriber log from now on. $offset = -s $node_subscriber->logfile; diff --git a/src/test/subscription/t/032_stream_parallel_conflict.pl b/src/test/subscription/t/032_stream_parallel_conflict.pl index 62d58cec3c..d2bb86d8b1 100644 --- a/src/test/subscription/t/032_stream_parallel_conflict.pl +++ b/src/test/subscription/t/032_stream_parallel_conflict.pl @@ -10,6 +10,20 @@ use Test::More; my $offset = 0; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -21,9 +35,9 @@ $node_publisher->start; my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init; -# Check if any streaming chunks are applied using the parallel apply worker. We -# have to look for the DEBUG1 log messages about that, so bump up the log -# verbosity. +# Check if any streaming chunks are applied using the parallel apply worker. +# And check if the parallel apply worker failed to start. We have to look for +# the DEBUG1 log messages about that, so bump up the log verbosity. $node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); $node_subscriber->start; @@ -175,6 +189,55 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# ============================================================================ +# Test re-apply a failed streaming transaction using the leader apply +# worker and apply subsequent streaming transaction using the parallel apply +# worker after this retry succeeds. +# ============================================================================ + +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab on test_tab(a)"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', qq{ +BEGIN; +INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); +INSERT INTO test_tab values(1); +COMMIT;}); + +# Check if the parallel apply worker is not started because the above +# transaction failed to be applied. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? parallel apply workers are not used for retries/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5001), 'data replicated to subscriber after dropping index'); + +# After successfully retrying to apply a failed streaming transaction, apply +# the following streaming transaction using the parallel apply worker. +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab SELECT i FROM generate_series(5001, 10000) s(i)"); + +check_parallel_log($node_subscriber, $offset, 1, 'COMMIT'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(10001), 'data replicated to subscriber using the parallel apply worker'); + $node_subscriber->stop; $node_publisher->stop; -- 2.23.0.windows.1