diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index df67851ac0..75119183da 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1882,7 +1882,6 @@ apply_handle_stream_abort(StringInfo s) pa_unlock_stream(xid, AccessExclusiveLock); (void) pa_free_worker(winfo); } - break; } @@ -1927,8 +1926,8 @@ apply_handle_stream_abort(StringInfo s) * We need to wait after processing rollback to savepoint for the * next set of changes. * - * We have a race condition here when we can start waiting here - * when there are more chunk of streams in the queue. See + * We have a race condition here due to which we can start waiting + * here when there are more chunk of streams in the queue. See * apply_handle_stream_stop. */ if (!toplevel_xact) @@ -4242,8 +4241,8 @@ stream_write_change(char action, StringInfo s) * Serialize a message to a file for the given transaction. * * This function is similar to stream_write_change except that it will open the - * target file if not already before writing the message and close file at the - * end. + * target file if not already before writing the message and close the file at + * the end. */ static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s) @@ -4926,10 +4925,9 @@ set_apply_error_context_origin(char *originname) } /* - * Return the action to take for the given transaction. *winfo is assigned to - * the destination parallel worker info (if the action is - * TRANS_LEADER_SEND_TO_PARALLEL or TRANS_LEADER_PARTIAL_SERIALIZE), otherwise - * *winfo is assigned NULL. + * Return the action to be taken for the given transaction. *winfo is + * assigned to the destination parallel worker info when the given + * transaction is being processed using a parallel apply worker. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a362688fde..1a75686b79 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -58,10 +58,12 @@ typedef struct LogicalRepWorker /* * Used to create the changes and subxact files for the streaming - * transactions. Upon the arrival of the first streaming transaction, the - * fileset will be initialized, and it will be deleted when the worker - * exits. Under this, separate buffiles would be created for each - * transaction which will be deleted after the transaction is finished. + * transactions. Upon the arrival of the first streaming transaction or + * when the first-time leader apply worker times out while sending changes + * to the parallel apply worker, the fileset will be initialized, and it + * will be deleted when the worker exits. Under this, separate buffiles + * would be created for each transaction which will be deleted after the + * transaction is finished. */ FileSet *stream_fileset; @@ -96,12 +98,20 @@ typedef enum ParallelTransState } ParallelTransState; /* - * State of fileset in leader apply worker. + * State of fileset used to communicate changes from leader to parallel + * apply worker. * - * FS_SERIALIZE_IN_PROGRESS means that the leader is serializing changes to the - * file. FS_SERIALIZE_DONE means that the leader has serialized all changes to - * the file. FS_READY means that it is now ok for a parallel apply worker read + * FS_EMPTY indicates an initial state where the leader doesn't need to use + * the file to communicate with the parallel apply worker. + * + * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes + * to the file. + * + * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to * the file. + * + * FS_READY indicates that it is now ok for a parallel apply worker to + * read the file. */ typedef enum PartialFileSetState {