From 18491666729f9028cdaba611bd90874f75c2e132 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 16 Jan 2023 10:49:38 +0530 Subject: [PATCH v1] Fix the code to decide the apply action. The code missed to handle non-transactional messages and we didn't catch it in our testing as currently such messages are simply ignored by the apply worker. This was introduced by changes in commit 216a784829. While testing this, I noticed that we forgot to reset stream_xid after processing stream stop message which could also result in the wrong apply action after the fix for non-transactional messages. Reported-by: Tomas Vondra Discussion: https://postgr.es/m/984ff689-adde-9977-affe-cd6029e850be@enterprisedb.com --- src/backend/replication/logical/worker.c | 53 ++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 79cda39..7d1b1a8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg * The action to be taken for the changes in the transaction. * * TRANS_LEADER_APPLY: - * This action means that we are in the leader apply worker and changes of the - * transaction are applied directly by the worker. + * This action means that we are in the leader apply worker or table sync + * worker. The changes of the transaction are either directly applied or + * are read from temporary files (for streaming transactions) and then + * applied by the worker. * * TRANS_LEADER_SERIALIZE: * This action means that we are in the leader apply worker or table sync @@ -1301,7 +1303,7 @@ apply_handle_stream_prepare(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * The transaction has been serialized to file, so replay all the @@ -1710,6 +1712,7 @@ apply_handle_stream_stop(StringInfo s) } in_streamed_transaction = false; + stream_xid = InvalidTransactionId; /* * The parallel apply worker could be in a transaction in which case we @@ -1842,7 +1845,7 @@ apply_handle_stream_abort(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * We are in the leader apply worker and the transaction has been @@ -2164,7 +2167,7 @@ apply_handle_stream_commit(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * The transaction has been serialized to file, so replay all the @@ -4996,10 +4999,12 @@ set_apply_error_context_origin(char *originname) } /* - * Return the action to be taken for the given transaction. *winfo is - * assigned to the destination parallel worker info when the leader apply - * worker has to pass all the transaction's changes to the parallel apply - * worker. + * Return the action to be taken for the given transaction. See + * TransApplyAction for information on each of the actions. + * + * *winfo is assigned to the destination parallel worker info when the leader + * apply worker has to pass all the transaction's changes to the parallel + * apply worker. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) @@ -5010,27 +5015,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) { return TRANS_PARALLEL_APPLY; } - else if (in_remote_transaction) - { - return TRANS_LEADER_APPLY; - } /* - * Check if we are processing this transaction using a parallel apply - * worker. + * If we are processing this transaction using a parallel apply worker then + * either we send the changes to the parallel worker or if the worker is busy + * then serialize the changes to the file which will later be processed by + * the parallel worker. */ *winfo = pa_find_worker(xid); - if (!*winfo) + if (*winfo && (*winfo)->serialize_changes) { - return TRANS_LEADER_SERIALIZE; + return TRANS_LEADER_PARTIAL_SERIALIZE; } - else if ((*winfo)->serialize_changes) + else if (*winfo) { - return TRANS_LEADER_PARTIAL_SERIALIZE; + return TRANS_LEADER_SEND_TO_PARALLEL; + } + + /* + * If there is no parallel worker involved to process this transaction then + * we either directly apply the change or serialize it to a file which will + * later be applied when the transaction finish message is processed. + */ + else if (in_streamed_transaction) + { + return TRANS_LEADER_SERIALIZE; } else { - return TRANS_LEADER_SEND_TO_PARALLEL; + return TRANS_LEADER_APPLY; } } -- 1.8.3.1