From 83171e0e6306ed58136670f120db03a7009db6b4 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 4 Dec 2025 20:55:26 +0900 Subject: [PATCH v9 6/9] Track dependencies for streamed transactions This commit allows tracking dependencies of streamed transactions. Regarding the streaming=on case, dependency tracking is enabled while applying spooled changes from files. In the streaming=parallel case, dependency tracking is performed when the leader sends changes to parallel workers. Apart from non-streamed transactions, the leader waits for parallel workers till the assigned transactions are finished at COMMIT/PREPARE/ABORT; thus, the XID of streamed transactions is not cached as the lastly handled one. Also, streamed transactions are not recorded as parallelized transactions because upcoming workers do not have to wait for them. --- .../replication/logical/applyparallelworker.c | 19 +++++- src/backend/replication/logical/worker.c | 66 +++++++++++++++++-- src/include/replication/worker_internal.h | 2 +- src/test/subscription/t/050_parallel_apply.pl | 47 +++++++++++++ 4 files changed, 126 insertions(+), 8 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 5c8280bb182..a1766bddd93 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -168,7 +168,14 @@ * key) as another ongoing transaction (see handle_dependency_on_change for * details). If so, the leader sends a list of dependent transaction IDs to the * parallel worker, indicating that the parallel apply worker must wait for - * these transactions to commit before proceeding. + * these transactions to commit before proceeding. If transactions are streamed + * but leader deciedes no to assign parallel apply workers, dependencies are + * verified when the transaction is committed. + * + * Non-streaming transactions + * ====================== + * The handling is similar to streaming transactions, but including few + * differences: * * Commit order * ------------ @@ -1636,6 +1643,12 @@ pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo) stream_apply_worker = winfo; } +bool +pa_stream_apply_worker_is_null(void) +{ + return stream_apply_worker == NULL; +} + /* * Form a unique savepoint name for the streaming transaction. * @@ -1721,6 +1734,10 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) TransactionId xid = abort_data->xid; TransactionId subxid = abort_data->subxid; + /* Streamed transactions won't be registered */ + Assert(!dshash_find(parallelized_txns, &xid, false) && + !dshash_find(parallelized_txns, &subxid, false)); + /* * Update origin state so we can restart streaming from correct position * in case of crash. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 6d6e8cf95b2..73d3c64414c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -972,13 +972,26 @@ check_dependency_on_replica_identity(Oid relid, &rientry->remote_xid, new_depended_xid); + /* + * Remove the entry if it is registered for the streamed transactions. We + * do not have to register an entry for them; The leader worker always + * waits until the parallel worker finishes handling streamed + * transactions, thus no need to consider the possiblity that upcoming + * parallel workers would go ahead. + */ + if (TransactionIdIsValid(stream_xid) && !found) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + } + /* * Update the new depended xid into the entry if valid, the new xid could * be invalid if the transaction will be applied by the leader itself * which means all the changes will be committed before processing next * transaction, so no need to be depended on. */ - if (TransactionIdIsValid(new_depended_xid)) + else if (TransactionIdIsValid(new_depended_xid)) rientry->remote_xid = new_depended_xid; /* @@ -1092,8 +1105,11 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, */ StringInfoData change = *s; - /* Compute dependency only for non-streaming transaction */ - if (in_streamed_transaction || (winfo && winfo->stream_txn)) + /* + * Skip if we are handling streaming transactions but changes are not + * applied yet. + */ + if (pa_stream_apply_worker_is_null() && in_streamed_transaction) return; /* Only the leader checks dependencies and schedules the parallel apply */ @@ -1453,7 +1469,18 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); - handle_dependency_on_change(action, s, current_xid, winfo); + /* + * Check dependencies related to the received change. The XID of the top + * transaction is always used to avoid detecting false-positive + * dependencies between top and sub transactions. Sub-transactions can be + * replicated for streamed transactions, and they won't be marked as + * parallelized so that parallel workers won't wait for rolled-back + * sub-transactions. + */ + handle_dependency_on_change(action, s, + in_streamed_transaction + ? stream_xid : remote_xid, + winfo); /* * Re-fetch the latest apply action as it might have been changed during @@ -2594,6 +2621,10 @@ apply_handle_stream_prepare(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, prepare_data.xid, prepare_data.prepare_lsn); + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_remote_xid)) + pa_wait_for_depended_transaction(last_remote_xid); + /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -2617,7 +2648,8 @@ apply_handle_stream_prepare(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - if (pa_send_data(winfo, s->len, s->data)) + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) { /* Finish processing the streaming transaction. */ pa_xact_finish(winfo, prepare_data.end_lsn); @@ -2683,6 +2715,11 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); + /* + * No need to update the last_remote_xid here because leader worker always + * wait until streamed transactions finish. + */ + /* * Process any tables that are being synchronized in parallel, as well as * any newly added tables or sequences. @@ -3467,6 +3504,10 @@ apply_handle_stream_commit(StringInfo s) apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, commit_data.commit_lsn); + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_remote_xid)) + pa_wait_for_depended_transaction(last_remote_xid); + apply_handle_commit_internal(&commit_data); /* Unlink the files with serialized changes and subxact info. */ @@ -3478,7 +3519,20 @@ apply_handle_stream_commit(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - if (pa_send_data(winfo, s->len, s->data)) + /* + * Apart from non-streaming case, no need to mark this transaction + * as parallelized. Because the leader waits until the streamed + * transaction is committed thus commit ordering is always + * preserved. + */ + + /* + * Build a dependency between this transaction and the lastly + * committed transaction to preserve the commit order. Then try to + * send a COMMIT message if succeeded. + */ + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) { /* Finish processing the streaming transaction. */ pa_xact_finish(winfo, commit_data.end_lsn); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 40039b2fc1b..de95c71a8e5 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -355,7 +355,7 @@ extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state); extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo); - +extern bool pa_stream_apply_worker_is_null(void); extern void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid); extern void pa_reset_subtrans(void); diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 15973f7d0e0..9254b85d350 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -187,4 +187,51 @@ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'regress_prepare';"); $node_publisher->wait_for_catchup('regress_sub'); +# Ensure streamed transactions waits the previous transaction + +$node_publisher->append_conf('postgresql.conf', + "logical_decoding_work_mem = 64kB"); +$node_publisher->reload; +# Run a query to make sure that the reload has taken effect. +$node_publisher->safe_psql('postgres', "SELECT 1"); + +# Attach the injection_point again +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(71, 80), 'test');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +# Run a transaction which would be streamed +my $h = $node_publisher->background_psql('postgres', on_error_stop => 0); + +$offset = -s $node_subscriber->logfile; + +$h->query_safe( + q{ +BEGIN; +UPDATE regress_tab SET value = 'streamed-updated' WHERE id BETWEEN 71 AND 80; +INSERT INTO regress_tab VALUES (generate_series(100, 5100), 'streamed'); +}); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +$h->query_safe("COMMIT;"); + done_testing(); -- 2.47.3