Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | CAHut+Pvvjrd_Fd0aP-XsmBy--YbDL=h9FhgcQEWkXqtPgHE+rQ@mail.gmail.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
Here are some review comments for patch v51-0002 ====== 1. GENERAL - terminology: spool/serialize and data/changes/message The terminology seems to be used at random. IMO it might be worthwhile rechecking at least that terms are used consistently in all the comments. e.g "serialize message data to disk" ... and later ... "apply the spooled messages". Also for places where it says "Write the message to file" maybe consider using consistent terminology like "serialize the message to a file". Also, try to standardize the way things are described by using consistent (if they really are the same) terminology for "writing data" VS "writing data" VS "writing messages" etc. It is confusing trying to know if the different wording has some intended meaning or is it just random. ====== Commit message 2. When the leader apply worker times out while sending a message to the parallel apply worker. Instead of erroring out, switch to partial serialize mode and let the leader serialize all remaining changes to the file and notify the parallel apply workers to read and apply them at the end of the transaction. ~ The first sentence seems incomplete SUGGESTION. In patch 0001 if the leader apply worker times out while attempting to send a message to the parallel apply worker it results in an ERROR. This patch (0002) modifies that behaviour, so instead of erroring it will switch to "partial serialize" mode - in this mode the leader serializes all remaining changes to a file and notifies the parallel apply workers to read and apply them at the end of the transaction. ~~~ 3. This patch 0002 is called “Serialize partial changes to disk if the shm_mq buffer is full”, but the commit message is saying nothing about the buffer filling up. I think the Commit message should be mentioning something that makes the commit patch name more relevant. Otherwise change the patch name. ====== .../replication/logical/applyparallelworker.c 4. File header comment + * timeout is exceeded, the LA will write to file and indicate PA-2 that it + * needs to read file for remaining messages. Then LA will start waiting for + * commit which will detect deadlock if any. (See pa_send_data() and typedef + * enum TransApplyAction) "needs to read file for remaining messages" -> "needs to read that file for the remaining messages" ~~~ 5. pa_free_worker + /* + * Stop the worker if there are enough workers in the pool. + * + * XXX we also need to stop the worker if the leader apply worker + * serialized part of the transaction data to a file due to send timeout. + * This is because the message could be partially written to the queue due + * to send timeout and there is no way to clean the queue other than + * resending the message until it succeeds. To avoid complexity, we + * directly stop the worker in this case. + */ + if (winfo->serialize_changes || + napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) 5a. + * XXX we also need to stop the worker if the leader apply worker + * serialized part of the transaction data to a file due to send timeout. SUGGESTION XXX The worker is also stopped if the leader apply worker needed to serialize part of the transaction data due to a send timeout. ~ 5b. + /* Unlink the files with serialized changes. */ + if (winfo->serialize_changes) + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); A better comment might be SUGGESTION Unlink any files that were needed to serialize partial changes. ~~~ 6. pa_spooled_messages /* * Replay the spooled messages in the parallel apply worker if leader apply * worker has finished serializing changes to the file. */ static void pa_spooled_messages(void) 6a. IMO a better name for this function would be pa_apply_spooled_messages(); ~ 6b. "if leader apply" -> "if the leader apply" ~ 7. + /* + * Acquire the stream lock if the leader apply worker is serializing + * changes to the file, because the parallel apply worker will no longer + * have a chance to receive a STREAM_STOP and acquire the lock until the + * leader serialize all changes to the file. + */ + if (fileset_state == LEADER_FILESET_BUSY) + { + pa_lock_stream(MyParallelShared->xid, AccessShareLock); + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); + } SUGGESTION (rearranged comment - please check, I am not sure if I got this right) If the leader apply worker is still (busy) serializing partial changes then the parallel apply worker acquires the stream lock now. Otherwise, it would not have a chance to receive a STREAM_STOP (and acquire the stream lock) until the leader had serialized all changes. ~~~ 8. pa_send_data + * + * When sending data times out, data will be serialized to disk. And the + * current streaming transaction will enter PARTIAL_SERIALIZE mode, which means + * that subsequent data will also be serialized to disk. */ void pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) SUGGESTION (minor comment change) If the attempt to send data via shared memory times out, then we will switch to "PARTIAL_SERIALIZE mode" for the current transaction. This means that the current data and any subsequent data for this transaction will be serialized to disk. ~ 9. Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); How about also asserting that this must be the LA worker? ~ 10. + /* + * The parallel apply worker might be stuck for some reason, so + * stop sending data to parallel worker and start to serialize + * data to files. + */ + winfo->serialize_changes = true; SUGGESTION (minor reword) The parallel apply worker might be stuck for some reason, so stop sending data directly to it and start to serialize data to files instead. ~ 11. + /* Skip first byte and statistics fields. */ + msg.cursor += SIZE_STATS_MESSAGE + 1; IMO it would be better for the comment order and the code calculation order to be the same. SUGGESTION /* Skip first byte and statistics fields. */ msg.cursor += 1 + SIZE_STATS_MESSAGE; ~ 12. pa_stream_abort + /* + * If the parallel apply worker is applying the spooled + * messages, we save the current file position and close the + * file to prevent the file from being accidentally closed on + * rollback. + */ + if (stream_fd) + { + BufFileTell(stream_fd, &fileno, &offset); + BufFileClose(stream_fd); + reopen_stream_fd = true; + } + RollbackToSavepoint(spname); CommitTransactionCommand(); subxactlist = list_truncate(subxactlist, i + 1); + + /* + * Reopen the file and set the file position to the saved + * position. + */ + if (reopen_stream_fd) It seems a bit vague to just refer to "close the file" and "reopen the file" in these comments. IMO it would be better to call this file by a name like "the message spool file" or similar. Please check all other similar comments. ~~~ 13. pa_set_fileset_state /* + * Set the fileset_state flag for the given parallel apply worker. The + * stream_fileset of the leader apply worker will be written into the shared + * memory if the fileset_state is LEADER_FILESET_ACCESSIBLE. + */ +void +pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + LeaderFileSetState fileset_state) +{ 13a. It is an enum -- not a "flag", so: "fileset_state flag" -> "fileste state" ~~ 13b. It seemed strange to me that the comment/code says this state is only written to shm when it is "ACCESSIBLE".... IIUC this same filestate lingers around to be reused for other workers so I expected the state should *always* be written whenever the LA changes it. (I mean even if the PA is not needing to look at this member, I still think it should have the current/correct value in it). ====== src/backend/replication/logical/worker.c 14. TRANS_LEADER_SEND_TO_PARALLEL + * TRANS_LEADER_PARTIAL_SERIALIZE: + * The action means that we are in the leader apply worker and have sent some + * changes to the parallel apply worker, but the remaining changes need to be + * serialized to disk due to timeout while sending data, and the parallel apply + * worker will apply these changes when the final commit arrives. + * + * One might think we can use LEADER_SERIALIZE directly. But in partial + * serialize mode, in addition to serializing changes to file, the leader + * worker needs to write the STREAM_XXX message to disk, and needs to wait for + * parallel apply worker to finish the transaction when processing the + * transaction finish command. So a new action was introduced to make the logic + * clearer. + * * TRANS_LEADER_SEND_TO_PARALLEL: SUGGESTION (Minor wording changes) The action means that we are in the leader apply worker and have sent some changes directly to the parallel apply worker, due to timeout while sending data the remaining changes need to be serialized to disk. The parallel apply worker will apply these serialized changes when the final commit arrives. LEADER_SERIALIZE could not be used for this case because, in addition to serializing changes, the leader worker also needs to write the STREAM_XXX message to disk, and wait for the parallel apply worker to finish the transaction when processing the transaction finish command. So this new action was introduced to make the logic clearer. ~ 15. /* Actions for streaming transactions. */ TRANS_LEADER_SERIALIZE, + TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY Although it makes no difference I felt it would be better to put TRANS_LEADER_PARTIAL_SERIALIZE *after* TRANS_LEADER_SEND_TO_PARALLEL because that would be the order that these mode changes occur in the logic... ~~~ 16. @@ -375,7 +388,7 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; static inline void subxact_filename(char *path, Oid subid, TransactionId xid); -static inline void changes_filename(char *path, Oid subid, TransactionId xid); +inline void changes_filename(char *path, Oid subid, TransactionId xid); IIUC (see [1]) when this function was made non-static the "inline" should have been put into the header file. ~ 17. @@ -388,10 +401,9 @@ static inline void cleanup_subxact_info(void); /* * Serialize and deserialize changes for a toplevel transaction. */ -static void stream_cleanup_files(Oid subid, TransactionId xid); static void stream_open_file(Oid subid, TransactionId xid, bool first_segment); -static void stream_write_change(char action, StringInfo s); +static void stream_write_message(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); 17a. I felt just saying "file/files" is too vague. All the references to the file should be consistent, so IMO everything would be better named like: "stream_cleanup_files" -> "stream_msg_spoolfile_cleanup()" "stream_open_file" -> "stream_msg_spoolfile_open()" "stream_close_file" -> "stream_msg_spoolfile_close()" "stream_write_message" -> "stream_msg_spoolfile_write_msg()" ~ 17b. IMO there is not enough distinction here between function names stream_write_message and stream_write_change. e.g. You cannot really tell from their names what might be the difference. ~~~ 18. @@ -586,6 +595,7 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransactionId current_xid; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg; apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -595,6 +605,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) Assert(TransactionIdIsValid(stream_xid)); + original_msg = *s; + /* * We should have received XID of the subxact as the first part of the * message, so extract it. @@ -618,10 +630,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) stream_write_change(action, s); return true; + case TRANS_LEADER_PARTIAL_SERIALIZE: case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_change(action, &original_msg); The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE case so I think it should only be declared/assigned in the scope of that 'else' ~~ 19. apply_handle_stream_prepare @@ -1316,13 +1335,21 @@ apply_handle_stream_prepare(StringInfo s) pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); /* Send STREAM PREPARE message to the parallel apply worker. */ - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_message(prepare_data.xid, + LOGICAL_REP_MSG_STREAM_PREPARE, + &original_msg); The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE case so I think it should only be declared/assigned in the scope of that 'else' ~ 20. + /* + * Close the file before committing if the parallel apply is + * applying spooled changes. + */ + if (stream_fd) + BufFileClose(stream_fd); I found this a bit confusing because there is already a stream_close_file() wrapper function which does almost the same as this. So either this code should be calling that function, or the comment here should be explaining why this code is NOT calling that function. ~~~ 21. serialize_stream_start +/* + * Initialize fileset (if not already done). + * + * Create a new file when first_segment is true, otherwise open the existing + * file. + */ +void +serialize_stream_start(TransactionId xid, bool first_segment) IMO this function should be called stream_msg_spoolfile_init() or stream_msg_spoolfile_begin() to match the pattern for function names of the message spool file that I previously suggested. (see review comment #17a) ~ 22. + /* + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. + */ + if (!MyLogicalRepWorker->stream_fileset) I assumed this is a typo "Now," --> "Note," ? ~~~ 23. apply_handle_stream_start @@ -1404,6 +1478,7 @@ apply_handle_stream_start(StringInfo s) bool first_segment; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE case so I think it should only be declared/assigned in the scope of that 'else' ~ 24. /* - * Start a transaction on stream start, this transaction will be - * committed on the stream stop unless it is a tablesync worker in - * which case it will be committed after processing all the - * messages. We need the transaction for handling the buffile, - * used for serializing the streaming data and subxact info. + * serialize_stream_start will start a transaction, this + * transaction will be committed on the stream stop unless it is a + * tablesync worker in which case it will be committed after + * processing all the messages. We need the transaction for + * handling the buffile, used for serializing the streaming data + * and subxact info. */ - begin_replication_step(); + serialize_stream_start(stream_xid, first_segment); + break; Make the comment a bit more natural. SUGGESTION Function serialize_stream_start starts a transaction. This transaction will be committed on the stream stop unless it is a tablesync worker in which case it will be committed after processing all the messages. We need this transaction for handling the BufFile, used for serializing the streaming data and subxact info. ~ 25. + case TRANS_LEADER_PARTIAL_SERIALIZE: /* - * Initialize the worker's stream_fileset if we haven't yet. This - * will be used for the entire duration of the worker so create it - * in a permanent context. We create this on the very first - * streaming message from any transaction and then use it for this - * and other streaming transactions. Now, we could create a - * fileset at the start of the worker as well but then we won't be - * sure that it will ever be used. + * The file should have been created when entering + * PARTIAL_SERIALIZE mode so no need to create it again. The + * transaction started in serialize_stream_start will be committed + * on the stream stop. */ - if (!MyLogicalRepWorker->stream_fileset) BEFORE The file should have been created when entering PARTIAL_SERIALIZE mode so no need to create it again. SUGGESTION The message spool file was already created when entering PARTIAL_SERIALIZE mode. ~~~ 26. serialize_stream_stop /* + * Update the information about subxacts and close the file. + * + * This function should be called when the serialize_stream_start function has + * been called. + */ +void +serialize_stream_stop(TransactionId xid) Maybe 2nd part of that comment should be something more like SUGGESTION This function ends what was started by the function serialize_stream_start(). ~ 27. + /* + * Close the file with serialized changes, and serialize information about + * subxacts for the toplevel transaction. + */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + stream_close_file(); Should the comment and the code be in the same order? SUGGESTION Serialize information about subxacts for the toplevel transaction, then close the stream messages spool file. ~~~ 28. handle_stream_abort + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Parallel apply worker might have applied some changes, so write + * the STREAM_ABORT message so that the parallel apply worker can + * rollback the subtransaction if needed. + */ + stream_write_message(xid, LOGICAL_REP_MSG_STREAM_ABORT, + &original_msg); + 28a. The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE case so I think it should only be declared/assigned in the scope of that case. ~ 28b. "so that the parallel apply worker can" -> "so that it can" ~~~ 29. apply_spooled_messages +void +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn) { StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; MemoryContext oldcxt; - BufFile *fd; - maybe_start_skipping_changes(lsn); + if (!am_parallel_apply_worker()) + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -1810,8 +1913,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + stream_xid = xid; IMO it seems strange to me that the fileset is passed as a parameter but then the resulting fd is always assigned to a single global variable (regardless of what the fileset was passed). ~ 30. - BufFileClose(fd); - + BufFileClose(stream_fd); pfree(buffer); pfree(s2.data); +done: + stream_fd = NULL; + stream_xid = InvalidTransactionId; + This code fragment seems to be doing almost the same as what function stream_close_file() is doing. Should you just call that instead? ~~~ 31. apply_handle_stream_commit + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_message(xid, LOGICAL_REP_MSG_STREAM_COMMIT, + &original_msg); The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE case so I think it should only be declared/assigned in the scope of that 'else' ~ 32. case TRANS_PARALLEL_APPLY: + + /* + * Close the file before committing if the parallel apply is + * applying spooled changes. + */ + if (stream_fd) + BufFileClose(stream_fd); (Same as earlier review comment #20) IMO this is confusing because there is already a stream_close_file() wrapper function that does almost the same. So either this code should be calling that function, or the comment here should explain why this code is NOT calling that function. ====== src/include/replication/worker_internal.h 33. LeaderFileSetState +/* State of fileset in leader apply worker. */ +typedef enum LeaderFileSetState +{ + LEADER_FILESET_UNKNOWN, + LEADER_FILESET_BUSY, + LEADER_FILESET_ACCESSIBLE +} LeaderFileSetState; 33a. Missing from typedefs.list? ~ 33b. I thought some more explanatory comments for the meaning of BUSY/ACCESSIBLE should be here. ~ 33c. READY might be a better value than ACCESSIBLE ~ 33d. I'm not sure what usefulness does the "LEADER_" and "Leader" prefixes give here. Maybe a name like PartialFileSetStat is more meaningful? e.g. like this? typedef enum PartialFileSetState { FS_UNKNOWN, FS_BUSY, FS_READY } PartialFileSetState; ~ ~~~ 34. ParallelApplyWorkerShared + /* + * The leader apply worker will serialize changes to the file after + * entering PARTIAL_SERIALIZE mode and share the fileset with the parallel + * apply worker when processing the transaction finish command. And then + * the parallel apply worker will apply all the spooled messages. + * + * Don't use SharedFileSet here as we need the fileset to survive after + * releasing the shared memory so that the leader apply worker can re-use + * the fileset for next streaming transaction. + */ + LeaderFileSetState fileset_state; + FileSet fileset; Minor rewording of that comment SUGGESTION After entering PARTIAL_SERIALIZE mode, the leader apply worker will serialize changes to the file, and share the fileset with the parallel apply worker when processing the transaction finish command. Then the parallel apply worker will apply all the spooled messages. FileSet is used here instead of SharedFileSet because we need it to survive after releasing the shared memory so that the leader apply worker can re-use the same fileset for the next streaming transaction. ~~~ 35. globals /* + * Indicates whether the leader apply worker needs to serialize the + * remaining changes to disk due to timeout when sending data to the + * parallel apply worker. + */ + bool serialize_changes; 35a. I wonder if the comment would be better to also mention "via shared memory". SUGGESTION Indicates whether the leader apply worker needs to serialize the remaining changes to disk due to timeout when attempting to send data to the parallel apply worker via shared memory. ~ 35b. I wonder if a more informative variable name might be serialize_remaining_changes? ------ [1] https://stackoverflow.com/questions/17504316/what-happens-with-an-extern-inline-function Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: