From c91bb783235eac8d0648418553d7e1d3507717ac Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 11 Dec 2022 19:16:34 +0800 Subject: [PATCH v76 4/5] Retry to apply streaming xact only in apply worker When the subscription parameter is set streaming=parallel, the logic tries to apply the streaming transaction using a parallel apply worker. If this fails the parallel worker exits with an error. In this case, retry applying the streaming transaction using the normal streaming=on mode. This is done to avoid getting caught in a loop of the same retry errors. A new flag field "subretry" has been introduced to catalog "pg_subscription". If there are any active parallel apply workers and the subscriber exits with an error, this flag will be set true, and whenever the transaction is applied successfully, this flag is reset false. Now, when deciding how to apply a streaming transaction, the logic can know if this transaction has previously failed or not (by checking the "subretry" field). Note: Since we add a new field 'subretry' to catalog 'pg_subscription' has been expanded, we need bump catalog version. --- doc/src/sgml/catalogs.sgml | 10 ++ doc/src/sgml/logical-replication.sgml | 11 +- doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 1 + .../replication/logical/applyparallelworker.c | 30 ++++ src/backend/replication/logical/worker.c | 182 +++++++++++++++------ src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 7 + src/include/replication/worker_internal.h | 2 + src/test/subscription/t/015_stream.pl | 63 +++++++ 12 files changed, 263 insertions(+), 56 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index c1e4048..d918327 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7950,6 +7950,16 @@ SCRAM-SHA-256$<iteration count>:&l + subretry bool + + + True if previous change failed to be applied while there were any + active parallel apply workers, necessitating a retry. + + + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 54f48be..cb4f5c6 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1504,12 +1504,11 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER When the streaming mode is parallel, the finish LSN of - failed transactions may not be logged. In that case, it may be necessary to - change the streaming mode to on or off and - cause the same conflicts again so the finish LSN of the failed transaction will - be written to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... - SKIP. + failed transactions may not be logged. In that case, the failed transaction + will be retried in streaming = on mode. If it fails + again, the finish LSN of the failed transaction will be written to the + server log. For the usage of finish LSN, please refer to + ALTER SUBSCRIPTION ... SKIP. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index eba72c6..decd554 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -251,6 +251,11 @@ CREATE SUBSCRIPTION subscription_nameparallel mode is disregarded when retrying; + instead the transaction will be applied using on + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a56ae31..0eb5ffb 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 447c9b9..af07684 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1301,7 +1301,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, - subslotname, subsynccommit, subpublications, suborigin) + subretry, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index baff00d..b2053f7 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -636,6 +636,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1df18a7..a669842 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -315,6 +315,17 @@ pa_can_start(void) if (!AllTablesyncsReady()) return false; + /* + * Don't use parallel apply workers for retries, because it is possible + * that a deadlock was detected the last time we tried to apply a + * transaction using a parallel apply worker. + */ + if (MySubscription->retry) + { + elog(DEBUG1, "parallel apply workers are not used for retries"); + return false; + } + return true; } @@ -1678,3 +1689,22 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) pa_free_worker(winfo); } + +/* Check if any active parallel apply workers. */ +bool +pa_have_active_worker(void) +{ + ListCell *lc; + + foreach(lc, ParallelApplyWorkerPool) + { + ParallelApplyWorkerInfo *tmp_winfo; + + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + if (tmp_winfo->in_use) + return true; + } + + return false; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 1db6fbd..6a47485 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -394,7 +394,7 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); -static void DisableSubscriptionAndExit(void); +static void DisableSubscriptionOnError(void); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -431,6 +431,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void set_subscription_retry(bool retry); + /* * Return the name of the logical replication worker. */ @@ -1046,6 +1048,9 @@ apply_handle_commit(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); + /* Reset the retry flag. */ + set_subscription_retry(false); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -1152,6 +1157,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1208,6 +1216,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1269,6 +1280,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1394,6 +1408,9 @@ apply_handle_stream_prepare(StringInfo s) break; } + /* Reset the retry flag. */ + set_subscription_retry(false); + pgstat_report_stat(false); /* Process any tables that are being synchronized in parallel. */ @@ -1964,6 +1981,10 @@ apply_handle_stream_abort(StringInfo s) break; } + /* Reset the retry flag. */ + if (toplevel_xact) + set_subscription_retry(false); + reset_apply_error_context_info(); } @@ -2246,6 +2267,9 @@ apply_handle_stream_commit(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); + /* Reset the retry flag. */ + set_subscription_retry(false); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -4342,20 +4366,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); + + proc_exit(0); } PG_END_TRY(); @@ -4380,20 +4412,27 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); } PG_END_TRY(); } @@ -4669,39 +4708,20 @@ ApplyWorkerMain(Datum main_arg) } /* - * After error recovery, disable the subscription in a new transaction - * and exit cleanly. + * Disable the subscription in a new transaction. */ static void -DisableSubscriptionAndExit(void) +DisableSubscriptionOnError(void) { - /* - * Emit the error message, and recover from the error state to an idle - * state - */ - HOLD_INTERRUPTS(); - - EmitErrorReport(); - AbortOutOfAnyTransaction(); - FlushErrorState(); - - RESUME_INTERRUPTS(); - - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - /* Disable the subscription */ StartTransactionCommand(); DisableSubscription(MySubscription->oid); CommitTransactionCommand(); - /* Notify the subscription has been disabled and exit */ + /* Notify the subscription has been disabled */ ereport(LOG, errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); - - proc_exit(0); } /* @@ -5044,3 +5064,71 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_SEND_TO_PARALLEL; } } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + /* Fast path - if no state change then nothing to do */ + if (MySubscription->retry == retry) + return; + + /* Fast path - skip for parallel apply workers */ + if (am_parallel_apply_worker()) + return; + + /* Fast path - skip set retry if no active parallel apply workers */ + if (retry && !pa_have_active_worker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 5e800dc..931233f 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4560,8 +4560,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index b0f2a17..8edbbca 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry BKI_DEFAULT(f); /* True if previous change failed + * to be applied while there were + * any active parallel apply + * workers */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +136,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if previous change failed to be + * applied using a parallel apply worker */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f3b2f2d..42c33a4 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -308,6 +308,8 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern bool pa_have_active_worker(void); + #define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid) static inline bool diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 83d6956..4054a5f 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -74,8 +74,16 @@ sub test_streaming 'check extra columns contain local defaults'); # Test the streaming in binary mode + my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + $node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; # Check the subscriber log from now on. $offset = -s $node_subscriber->logfile; @@ -450,6 +458,61 @@ $result = is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); +$node_publisher->wait_for_catchup($appname); + +# ============================================================================ +# Test re-apply a failed streaming transaction using the leader apply +# worker and apply subsequent streaming transaction using the parallel apply +# worker after this retry succeeds. +# ============================================================================ + +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab on test_tab_2(a)"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', qq{ +BEGIN; +INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); +INSERT INTO test_tab_2 values(1); +COMMIT;}); + +# Check if the parallel apply worker is not started because the above +# transaction failed to be applied. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? parallel apply workers are not used for retries/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(5001), 'data replicated to subscriber after dropping index'); + +# After successfully retrying to apply a failed streaming transaction, apply +# the following streaming transaction using the parallel apply worker. + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_2 SELECT i FROM generate_series(5001, 10000) s(i)"); + +check_parallel_log($node_subscriber, $offset, 1, 'COMMIT'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(10001), + 'data replicated to subscriber using the parallel apply worker'); + $node_subscriber->stop; $node_publisher->stop; -- 2.7.2.windows.1