Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAHut+Pvvjrd_Fd0aP-XsmBy--YbDL=h9FhgcQEWkXqtPgHE+rQ@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
Here are some review comments for patch v51-0002

======

1.

GENERAL - terminology:  spool/serialize and data/changes/message

The terminology seems to be used at random. IMO it might be worthwhile
rechecking at least that terms are used consistently in all the
comments. e.g "serialize message data to disk" ... and later ...
"apply the spooled messages".

Also for places where it says "Write the message to file" maybe
consider using consistent terminology like "serialize the message to a
file".

Also, try to standardize the way things are described by using
consistent (if they really are the same) terminology for "writing
data" VS "writing data" VS "writing messages" etc. It is confusing
trying to know if the different wording has some intended meaning or
is it just random.

======

Commit message

2.
When the leader apply worker times out while sending a message to the parallel
apply worker. Instead of erroring out, switch to partial serialize mode and let
the leader serialize all remaining changes to the file and notify the parallel
apply workers to read and apply them at the end of the transaction.

~

The first sentence seems incomplete

SUGGESTION.
In patch 0001 if the leader apply worker times out while attempting to
send a message to the parallel apply worker it results in an ERROR.

This patch (0002) modifies that behaviour, so instead of erroring it
will switch to "partial serialize" mode -  in this mode the leader
serializes all remaining changes to a file and notifies the parallel
apply workers to read and apply them at the end of the transaction.

~~~

3.

This patch 0002 is called “Serialize partial changes to disk if the
shm_mq buffer is full”, but the commit message is saying nothing about
the buffer filling up. I think the Commit message should be mentioning
something that makes the commit patch name more relevant. Otherwise
change the patch name.

======

.../replication/logical/applyparallelworker.c

4. File header comment

+ * timeout is exceeded, the LA will write to file and indicate PA-2 that it
+ * needs to read file for remaining messages. Then LA will start waiting for
+ * commit which will detect deadlock if any. (See pa_send_data() and typedef
+ * enum TransApplyAction)

"needs to read file for remaining messages" -> "needs to read that
file for the remaining messages"

~~~

5. pa_free_worker

+ /*
+ * Stop the worker if there are enough workers in the pool.
+ *
+ * XXX we also need to stop the worker if the leader apply worker
+ * serialized part of the transaction data to a file due to send timeout.
+ * This is because the message could be partially written to the queue due
+ * to send timeout and there is no way to clean the queue other than
+ * resending the message until it succeeds. To avoid complexity, we
+ * directly stop the worker in this case.
+ */
+ if (winfo->serialize_changes ||
+ napplyworkers > (max_parallel_apply_workers_per_subscription / 2))

5a.

+ * XXX we also need to stop the worker if the leader apply worker
+ * serialized part of the transaction data to a file due to send timeout.

SUGGESTION
XXX The worker is also stopped if the leader apply worker needed to
serialize part of the transaction data due to a send timeout.

~

5b.

+ /* Unlink the files with serialized changes. */
+ if (winfo->serialize_changes)
+ stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);

A better comment might be

SUGGESTION
Unlink any files that were needed to serialize partial changes.

~~~

6. pa_spooled_messages

/*
 * Replay the spooled messages in the parallel apply worker if leader apply
 * worker has finished serializing changes to the file.
 */
static void
pa_spooled_messages(void)

6a.
IMO a better name for this function would be pa_apply_spooled_messages();

~

6b.
"if leader apply" -> "if the leader apply"

~

7.

+ /*
+ * Acquire the stream lock if the leader apply worker is serializing
+ * changes to the file, because the parallel apply worker will no longer
+ * have a chance to receive a STREAM_STOP and acquire the lock until the
+ * leader serialize all changes to the file.
+ */
+ if (fileset_state == LEADER_FILESET_BUSY)
+ {
+ pa_lock_stream(MyParallelShared->xid, AccessShareLock);
+ pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
+ }

SUGGESTION (rearranged comment - please check, I am not sure if I got
this right)

If the leader apply worker is still (busy) serializing partial changes
then the parallel apply worker acquires the stream lock now.
Otherwise, it would not have a chance to receive a STREAM_STOP (and
acquire the stream lock) until the leader had serialized all changes.

~~~

8. pa_send_data

+ *
+ * When sending data times out, data will be serialized to disk. And the
+ * current streaming transaction will enter PARTIAL_SERIALIZE mode, which means
+ * that subsequent data will also be serialized to disk.
  */
 void
 pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)

SUGGESTION (minor comment change)

If the attempt to send data via shared memory times out, then we will
switch to "PARTIAL_SERIALIZE mode" for the current transaction. This
means that the current data and any subsequent data for this
transaction will be serialized to disk.

~

9.

  Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);

How about also asserting that this must be the LA worker?

~

10.

+ /*
+ * The parallel apply worker might be stuck for some reason, so
+ * stop sending data to parallel worker and start to serialize
+ * data to files.
+ */
+ winfo->serialize_changes = true;

SUGGESTION (minor reword)
The parallel apply worker might be stuck for some reason, so stop
sending data directly to it and start to serialize data to files
instead.

~

11.
+ /* Skip first byte and statistics fields. */
+ msg.cursor += SIZE_STATS_MESSAGE + 1;

IMO it would be better for the comment order and the code calculation
order to be the same.

SUGGESTION
/* Skip first byte and statistics fields. */
msg.cursor += 1 + SIZE_STATS_MESSAGE;

~

12. pa_stream_abort

+ /*
+ * If the parallel apply worker is applying the spooled
+ * messages, we save the current file position and close the
+ * file to prevent the file from being accidentally closed on
+ * rollback.
+ */
+ if (stream_fd)
+ {
+ BufFileTell(stream_fd, &fileno, &offset);
+ BufFileClose(stream_fd);
+ reopen_stream_fd = true;
+ }
+
  RollbackToSavepoint(spname);
  CommitTransactionCommand();
  subxactlist = list_truncate(subxactlist, i + 1);
+
+ /*
+ * Reopen the file and set the file position to the saved
+ * position.
+ */
+ if (reopen_stream_fd)

It seems a bit vague to just refer to "close the file" and "reopen the
file" in these comments. IMO it would be better to call this file by a
name like "the message spool file" or similar. Please check all other
similar comments.

~~~

13. pa_set_fileset_state

 /*
+ * Set the fileset_state flag for the given parallel apply worker. The
+ * stream_fileset of the leader apply worker will be written into the shared
+ * memory if the fileset_state is LEADER_FILESET_ACCESSIBLE.
+ */
+void
+pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
+ LeaderFileSetState fileset_state)
+{

13a.

It is an enum -- not a "flag", so:

"fileset_state flag" -> "fileste state"

~~

13b.

It seemed strange to me that the comment/code says this state is only
written to shm when it is "ACCESSIBLE".... IIUC this same filestate
lingers around to be reused for other workers so I expected the state
should *always* be written whenever the LA changes it. (I mean even if
the PA is not needing to look at this member, I still think it should
have the current/correct value in it).

======

src/backend/replication/logical/worker.c

14. TRANS_LEADER_SEND_TO_PARALLEL

+ * TRANS_LEADER_PARTIAL_SERIALIZE:
+ * The action means that we are in the leader apply worker and have sent some
+ * changes to the parallel apply worker, but the remaining changes need to be
+ * serialized to disk due to timeout while sending data, and the parallel apply
+ * worker will apply these changes when the final commit arrives.
+ *
+ * One might think we can use LEADER_SERIALIZE directly. But in partial
+ * serialize mode, in addition to serializing changes to file, the leader
+ * worker needs to write the STREAM_XXX message to disk, and needs to wait for
+ * parallel apply worker to finish the transaction when processing the
+ * transaction finish command. So a new action was introduced to make the logic
+ * clearer.
+ *
  * TRANS_LEADER_SEND_TO_PARALLEL:


SUGGESTION (Minor wording changes)
The action means that we are in the leader apply worker and have sent
some changes directly to the parallel apply worker, due to timeout
while sending data the remaining changes need to be serialized to
disk. The parallel apply worker will apply these serialized changes
when the final commit arrives.

LEADER_SERIALIZE could not be used for this case because, in addition
to serializing changes, the leader worker also needs to write the
STREAM_XXX message to disk, and wait for the parallel apply worker to
finish the transaction when processing the transaction finish command.
So this new action was introduced to make the logic clearer.

~

15.
  /* Actions for streaming transactions. */
  TRANS_LEADER_SERIALIZE,
+ TRANS_LEADER_PARTIAL_SERIALIZE,
  TRANS_LEADER_SEND_TO_PARALLEL,
  TRANS_PARALLEL_APPLY

Although it makes no difference I felt it would be better to put
TRANS_LEADER_PARTIAL_SERIALIZE *after* TRANS_LEADER_SEND_TO_PARALLEL
because that would be the order that these mode changes occur in the
logic...

~~~

16.

@@ -375,7 +388,7 @@ typedef struct ApplySubXactData
 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};

 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
-static inline void changes_filename(char *path, Oid subid, TransactionId xid);
+inline void changes_filename(char *path, Oid subid, TransactionId xid);

IIUC (see [1]) when this function was made non-static the "inline"
should have been put into the header file.

~

17.
@@ -388,10 +401,9 @@ static inline void cleanup_subxact_info(void);
 /*
  * Serialize and deserialize changes for a toplevel transaction.
  */
-static void stream_cleanup_files(Oid subid, TransactionId xid);
 static void stream_open_file(Oid subid, TransactionId xid,
  bool first_segment);
-static void stream_write_change(char action, StringInfo s);
+static void stream_write_message(TransactionId xid, char action, StringInfo s);
 static void stream_close_file(void);

17a.

I felt just saying "file/files" is too vague. All the references to
the file should be consistent, so IMO everything would be better named
like:

"stream_cleanup_files" -> "stream_msg_spoolfile_cleanup()"
"stream_open_file" ->  "stream_msg_spoolfile_open()"
"stream_close_file" -> "stream_msg_spoolfile_close()"
"stream_write_message" -> "stream_msg_spoolfile_write_msg()"

~

17b.
IMO there is not enough distinction here between function names
stream_write_message and stream_write_change. e.g. You cannot really
tell from their names what might be the difference.

~~~

18.

@@ -586,6 +595,7 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
  TransactionId current_xid;
  ParallelApplyWorkerInfo *winfo;
  TransApplyAction apply_action;
+ StringInfoData original_msg;

  apply_action = get_transaction_apply_action(stream_xid, &winfo);

@@ -595,6 +605,8 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)

  Assert(TransactionIdIsValid(stream_xid));

+ original_msg = *s;
+
  /*
  * We should have received XID of the subxact as the first part of the
  * message, so extract it.
@@ -618,10 +630,14 @@ handle_streamed_transaction(LogicalRepMsgType
action, StringInfo s)
  stream_write_change(action, s);
  return true;

+ case TRANS_LEADER_PARTIAL_SERIALIZE:
  case TRANS_LEADER_SEND_TO_PARALLEL:
  Assert(winfo);

- pa_send_data(winfo, s->len, s->data);
+ if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+ pa_send_data(winfo, s->len, s->data);
+ else
+ stream_write_change(action, &original_msg);

The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE
case so I think it should only be declared/assigned in the scope of
that 'else'

~~

19. apply_handle_stream_prepare

@@ -1316,13 +1335,21 @@ apply_handle_stream_prepare(StringInfo s)
  pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);

  /* Send STREAM PREPARE message to the parallel apply worker. */
- pa_send_data(winfo, s->len, s->data);
+ if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+ pa_send_data(winfo, s->len, s->data);
+ else
+ stream_write_message(prepare_data.xid,
+ LOGICAL_REP_MSG_STREAM_PREPARE,
+ &original_msg);


The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE
case so I think it should only be declared/assigned in the scope of
that 'else'

~

20.

+ /*
+ * Close the file before committing if the parallel apply is
+ * applying spooled changes.
+ */
+ if (stream_fd)
+ BufFileClose(stream_fd);

I found this a bit confusing because there is already a
stream_close_file() wrapper function which does almost the same as
this. So either this code should be calling that function, or the
comment here should be explaining why this code is NOT calling that
function.

~~~

21. serialize_stream_start

+/*
+ * Initialize fileset (if not already done).
+ *
+ * Create a new file when first_segment is true, otherwise open the existing
+ * file.
+ */
+void
+serialize_stream_start(TransactionId xid, bool first_segment)

IMO this function should be called stream_msg_spoolfile_init() or
stream_msg_spoolfile_begin() to match the pattern for function names
of the message spool file that I previously suggested. (see review
comment #17a)

~

22.

+ /*
+ * Initialize the worker's stream_fileset if we haven't yet. This will be
+ * used for the entire duration of the worker so create it in a permanent
+ * context. We create this on the very first streaming message from any
+ * transaction and then use it for this and other streaming transactions.
+ * Now, we could create a fileset at the start of the worker as well but
+ * then we won't be sure that it will ever be used.
+ */
+ if (!MyLogicalRepWorker->stream_fileset)

I assumed this is a typo "Now," --> "Note," ?

~~~

23. apply_handle_stream_start

@@ -1404,6 +1478,7 @@ apply_handle_stream_start(StringInfo s)
  bool first_segment;
  ParallelApplyWorkerInfo *winfo;
  TransApplyAction apply_action;
+ StringInfoData original_msg = *s;

The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE
case so I think it should only be declared/assigned in the scope of
that 'else'

~

24.

  /*
- * Start a transaction on stream start, this transaction will be
- * committed on the stream stop unless it is a tablesync worker in
- * which case it will be committed after processing all the
- * messages. We need the transaction for handling the buffile,
- * used for serializing the streaming data and subxact info.
+ * serialize_stream_start will start a transaction, this
+ * transaction will be committed on the stream stop unless it is a
+ * tablesync worker in which case it will be committed after
+ * processing all the messages. We need the transaction for
+ * handling the buffile, used for serializing the streaming data
+ * and subxact info.
  */
- begin_replication_step();
+ serialize_stream_start(stream_xid, first_segment);
+ break;

Make the comment a bit more natural.

SUGGESTION

Function serialize_stream_start starts a transaction. This transaction
will be committed on the stream stop unless it is a tablesync worker
in which case it will be committed after processing all the messages.
We need this transaction for handling the BufFile, used for
serializing the streaming data and subxact info.

~

25.

+ case TRANS_LEADER_PARTIAL_SERIALIZE:
  /*
- * Initialize the worker's stream_fileset if we haven't yet. This
- * will be used for the entire duration of the worker so create it
- * in a permanent context. We create this on the very first
- * streaming message from any transaction and then use it for this
- * and other streaming transactions. Now, we could create a
- * fileset at the start of the worker as well but then we won't be
- * sure that it will ever be used.
+ * The file should have been created when entering
+ * PARTIAL_SERIALIZE mode so no need to create it again. The
+ * transaction started in serialize_stream_start will be committed
+ * on the stream stop.
  */
- if (!MyLogicalRepWorker->stream_fileset)

BEFORE
The file should have been created when entering PARTIAL_SERIALIZE mode
so no need to create it again.

SUGGESTION
The message spool file was already created when entering PARTIAL_SERIALIZE mode.

~~~

26. serialize_stream_stop

 /*
+ * Update the information about subxacts and close the file.
+ *
+ * This function should be called when the serialize_stream_start function has
+ * been called.
+ */
+void
+serialize_stream_stop(TransactionId xid)

Maybe 2nd part of that comment should be something more like

SUGGESTION
This function ends what was started by the function serialize_stream_start().

~

27.

+ /*
+ * Close the file with serialized changes, and serialize information about
+ * subxacts for the toplevel transaction.
+ */
+ subxact_info_write(MyLogicalRepWorker->subid, xid);
+ stream_close_file();

Should the comment and the code be in the same order?

SUGGESTION
Serialize information about subxacts for the toplevel transaction,
then close the stream messages spool file.

~~~

28. handle_stream_abort

+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ /*
+ * Parallel apply worker might have applied some changes, so write
+ * the STREAM_ABORT message so that the parallel apply worker can
+ * rollback the subtransaction if needed.
+ */
+ stream_write_message(xid, LOGICAL_REP_MSG_STREAM_ABORT,
+ &original_msg);
+

28a.
The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE
case so I think it should only be declared/assigned in the scope of
that case.

~

28b.
"so that the parallel apply worker can" -> "so that it can"


~~~

29. apply_spooled_messages

+void
+apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
+    XLogRecPtr lsn)
 {
  StringInfoData s2;
  int nchanges;
  char path[MAXPGPATH];
  char    *buffer = NULL;
  MemoryContext oldcxt;
- BufFile    *fd;

- maybe_start_skipping_changes(lsn);
+ if (!am_parallel_apply_worker())
+ maybe_start_skipping_changes(lsn);

  /* Make sure we have an open transaction */
  begin_replication_step();
@@ -1810,8 +1913,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
  changes_filename(path, MyLogicalRepWorker->subid, xid);
  elog(DEBUG1, "replaying changes from file \"%s\"", path);

- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
- false);
+ stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+ stream_xid = xid;

IMO it seems strange to me that the fileset is passed as a parameter
but then the resulting fd is always assigned to a single global
variable (regardless of what the fileset was passed).

~

30.

- BufFileClose(fd);
-
+ BufFileClose(stream_fd);
  pfree(buffer);
  pfree(s2.data);

+done:
+ stream_fd = NULL;
+ stream_xid = InvalidTransactionId;
+

This code fragment seems to be doing almost the same as what function
stream_close_file() is doing. Should you just call that instead?

~~~

31. apply_handle_stream_commit

+ if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+ pa_send_data(winfo, s->len, s->data);
+ else
+ stream_write_message(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
+ &original_msg);

The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE
case so I think it should only be declared/assigned in the scope of
that 'else'

~

32.

  case TRANS_PARALLEL_APPLY:
+
+ /*
+ * Close the file before committing if the parallel apply is
+ * applying spooled changes.
+ */
+ if (stream_fd)
+ BufFileClose(stream_fd);

(Same as earlier review comment #20)

IMO this is confusing because there is already a stream_close_file()
wrapper function that does almost the same. So either this code should
be calling that function, or the comment here should explain why this
code is NOT calling that function.


======

src/include/replication/worker_internal.h

33. LeaderFileSetState

+/* State of fileset in leader apply worker. */
+typedef enum LeaderFileSetState
+{
+ LEADER_FILESET_UNKNOWN,
+ LEADER_FILESET_BUSY,
+ LEADER_FILESET_ACCESSIBLE
+} LeaderFileSetState;

33a.

Missing from typedefs.list?

~

33b.

I thought some more explanatory comments for the meaning of
BUSY/ACCESSIBLE should be here.

~

33c.

READY might be a better value than ACCESSIBLE

~

33d.
I'm not sure what usefulness does the "LEADER_" and "Leader" prefixes
give here. Maybe a name like PartialFileSetStat is more meaningful?

e.g. like this?

typedef enum PartialFileSetState
{
FS_UNKNOWN,
FS_BUSY,
FS_READY
} PartialFileSetState;

~


~~~

34. ParallelApplyWorkerShared

+ /*
+ * The leader apply worker will serialize changes to the file after
+ * entering PARTIAL_SERIALIZE mode and share the fileset with the parallel
+ * apply worker when processing the transaction finish command. And then
+ * the parallel apply worker will apply all the spooled messages.
+ *
+ * Don't use SharedFileSet here as we need the fileset to survive after
+ * releasing the shared memory so that the leader apply worker can re-use
+ * the fileset for next streaming transaction.
+ */
+ LeaderFileSetState fileset_state;
+ FileSet fileset;

Minor rewording of that comment

SUGGESTION
After entering PARTIAL_SERIALIZE mode, the leader apply worker will
serialize changes to the file, and share the fileset with the parallel
apply worker when processing the transaction finish command. Then the
parallel apply worker will apply all the spooled messages.

FileSet is used here instead of SharedFileSet because we need it to
survive after releasing the shared memory so that the leader apply
worker can re-use the same fileset for the next streaming transaction.

~~~

35. globals

  /*
+ * Indicates whether the leader apply worker needs to serialize the
+ * remaining changes to disk due to timeout when sending data to the
+ * parallel apply worker.
+ */
+ bool serialize_changes;

35a.
I wonder if the comment would be better to also mention "via shared memory".

SUGGESTION

Indicates whether the leader apply worker needs to serialize the
remaining changes to disk due to timeout when attempting to send data
to the parallel apply worker via shared memory.

~

35b.
I wonder if a more informative variable name might be
serialize_remaining_changes?

------
[1] https://stackoverflow.com/questions/17504316/what-happens-with-an-extern-inline-function

Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: Allow file inclusion in pg_hba and pg_ident files
Next
From: Michail Nikolaev
Date:
Subject: Re: Slow standby snapshot