Fwd: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Peter Smith
Subject Fwd: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAHut+Pvi_icVpgbYaMbpcLMMbq9DmyVSK=0qcQKEVwGzTGSnmg@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
(Resending this because somehow my previous post did not appear in the
mail archives)

---------- Forwarded message ---------
From: Peter Smith <smithpb2250@gmail.com>
Date: Fri, Dec 2, 2022 at 7:59 PM
Subject: Re: Perform streaming logical transactions by background
workers and parallel apply
To: houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com>
Cc: Amit Kapila <amit.kapila16@gmail.com>, Masahiko Sawada
<sawada.mshk@gmail.com>, wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com>, Dilip Kumar <dilipbalaut@gmail.com>,
shiy.fnst@fujitsu.com <shiy.fnst@fujitsu.com>, PostgreSQL Hackers
<pgsql-hackers@lists.postgresql.org>


Here are my review comments for patch v54-0001.

======

FILE: .../replication/logical/applyparallelworker.c

1. File header comment

1a.

+ * This file contains the code to launch, set up, and teardown parallel apply
+ * worker which receives the changes from the leader worker and
invokes routines
+ * to apply those on the subscriber database.

"parallel apply worker" -> "a parallel apply worker"

~

1b.

+ *
+ * This file contains routines that are intended to support setting up, using
+ * and tearing down a ParallelApplyWorkerInfo which is required to communicate
+ * among leader and parallel apply workers.

"that are intended to support" -> "for"

"required to communicate among leader and parallel apply workers." ->
"required so the leader worker and parallel apply workers can
communicate with each other."

~

1c.

+ *
+ * The parallel apply workers are assigned (if available) as soon as xact's
+ * first stream is received for subscriptions that have set their 'streaming'
+ * option as parallel. The leader apply worker will send changes to this new
+ * worker via shared memory. We keep this worker assigned till the transaction
+ * commit is received and also wait for the worker to finish at commit. This
+ * preserves commit ordering and avoid file I/O in most cases, although we
+ * still need to spill to a file if there is no worker available. See comments
+ * atop logical/worker to know more about streamed xacts whose changes are
+ * spilled to disk. It is important to maintain commit order to avoid failures
+ * due to (a) transaction dependencies, say if we insert a row in the first
+ * transaction and update it in the second transaction on publisher then
+ * allowing the subscriber to apply both in parallel can lead to failure in the
+ * update. (b) deadlocks, allowing transactions that update the same set of
+ * rows/tables in the opposite order to be applied in parallel can lead to
+ * deadlocks.

"due to (a)" -> "due to: "

"(a) transaction dependencies, " -> "(a) transaction dependencies - "

". (b) deadlocks, " => "; (b) deadlocks - "

~

1d.

+ *
+ * We maintain a worker pool to avoid restarting workers for each streaming
+ * transaction. We maintain each worker's information in the
+ * ParallelApplyWorkersList. After successfully launching a new worker, its
+ * information is added to the ParallelApplyWorkersList. Once the worker
+ * finishes applying the transaction, we mark it available for re-use. Now,
+ * before starting a new worker to apply the streaming transaction, we check
+ * the list for any available worker. Note that we maintain a maximum of half
+ * the max_parallel_apply_workers_per_subscription workers in the pool and
+ * after that, we simply exit the worker after applying the transaction.
+ *

"We maintain a worker pool" -> "A worker pool is used"

"We maintain each worker's information" -> "We maintain each worker's
information (ParallelApplyWorkerInfo)"

"we mark it available for re-use" -> "it is marked as available for re-use"

"Note that we maintain a maximum of half" -> "Note that we retain a
maximum of half"

~

1e.

+ * XXX This worker pool threshold is a bit arbitrary and we can provide a GUC
+ * variable for this in the future if required.

"a bit arbitrary" -> "arbitrary"

~

1f.

+ *
+ * The leader apply worker will create a separate dynamic shared memory segment
+ * when each parallel apply worker starts. The reason for this design is that
+ * we cannot count how many workers will be started. It may be possible to
+ * allocate enough shared memory in one segment based on the maximum number of
+ * parallel apply workers (max_parallel_apply_workers_per_subscription), but
+ * this would waste memory if no process is actually started.
+ *

"we cannot count how many workers will be started." -> "we cannot
predict how many workers will be needed."

~

1g.

+ * The dynamic shared memory segment will contain (a) a shm_mq that is used to
+ * send changes in the transaction from leader apply worker to parallel apply
+ * worker (b) another shm_mq that is used to send errors (and other messages
+ * reported via elog/ereport) from the parallel apply worker to leader apply
+ * worker (c) necessary information to be shared among parallel apply workers
+ * and leader apply worker (i.e. members of ParallelApplyWorkerShared).

"will contain (a)" => "contains: (a)"

"worker (b)" -> "worker; (b)

"worker (c)" -> "worker; (c)"

"and leader apply worker" -> "and the leader apply worker"

~

1h.

+ *
+ * Locking Considerations
+ * ----------------------
+ * Since the database structure (schema of subscription tables, constraints,
+ * etc.) of the publisher and subscriber could be different, applying
+ * transactions in parallel mode on the subscriber side can cause some
+ * deadlocks that do not occur on the publisher side which is expected and can
+ * happen even without parallel mode. In order to detect the deadlocks among
+ * leader and parallel apply workers, we need to ensure that we wait using lmgr
+ * locks, otherwise, such deadlocks won't be detected. The other approach was
+ * to not allow parallelism when the schema of tables is different between the
+ * publisher and subscriber but that would be too restrictive and would require
+ * the publisher to send much more information than it is currently sending.
+ *

"side which is expected and can happen even without parallel mode." =>
"side. This can happen even without parallel mode."

", otherwise, such deadlocks won't be detected." -> remove this
because the beginning of the sentence says the same thing.

"The other approach was to not allow" -> "An alternative approach
could be to not allow"

~

1i.

+ *
+ * 4) Lock types
+ *
+ * Both the stream lock and the transaction lock mentioned above are
+ * session-level locks because both locks could be acquired outside the
+ * transaction, and the stream lock in the leader need to persist across
+ * transaction boundaries i.e. until the end of the streaming transaction.
+ *-------------------------------------------------------------------------
+ */

"need to persist" -> "needs to persist"

~~~

2. ParallelApplyWorkersList

+/*
+ * A list to maintain the active parallel apply workers. The information for
+ * the new worker is added to the list after successfully launching it. The
+ * list entry is removed if there are already enough workers in the worker
+ * pool either at the end of the transaction or while trying to find a free
+ * worker for applying the transaction. For more information about the worker
+ * pool, see comments atop this file.
+ */
+static List *ParallelApplyWorkersList = NIL;

"A list to maintain the active parallel apply workers." -> "A list
(pool) of active parallel apply workers."

~~~

3. pa_setup_dsm

+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a fixed-size worker info
+ * (ParallelApplyWorkerShared), a message queue, and an error queue.
+ *
+ * Returns true on success, false on failure.
+ */
+static bool
+pa_setup_dsm(ParallelApplyWorkerInfo *winfo)

IMO that's confusing to say "fixed-sized worker info" when it's
referring to the ParallelApplyWorkerShared structure and not the other
ParallelApplyWorkerInfo.

Might be better to say:

"a fixed-size worker info (ParallelApplyWorkerShared)" -> "a
fixed-size struct (ParallelApplyWorkerShared)"

~~~

4. pa_init_and_launch_worker

+ /*
+ * The worker info can be used for the entire duration of the worker so
+ * create it in a permanent context.
+ */
+ oldcontext = MemoryContextSwitchTo(ApplyContext);

SUGGESTION
The worker info can be used for the lifetime of the worker process, so
create it in a permanent context.

~~~

5. pa_allocate_worker

+ /*
+ * First, try to get a parallel apply worker from the pool, if available.
+ * Otherwise, try to start a new parallel apply worker.
+ */
+ winfo = pa_get_available_worker();
+ if (!winfo)
+ {
+ winfo = pa_init_and_launch_worker();
+ if (!winfo)
+ return;
+ }

SUGGESTION
Try to get a parallel apply worker from the pool. If none is available
then start a new one.

~~~

6. pa_free_worker_info

+ /*
+ * Ensure this worker information won't be reused during worker
+ * allocation.
+ */
+ ParallelApplyWorkersList = list_delete_ptr(ParallelApplyWorkersList,
+    winfo);

SUGGESTION 1
Removing from the worker pool ensures this information won't be reused
during worker allocation.

SUGGESTION 2 (more simply)
Remove from the worker pool.

~~~

7. HandleParallelApplyMessage

+ /*
+ * The actual error must have been reported by the parallel
+ * apply worker.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker exited abnormally"),
+ errcontext("%s", edata.context)));

Maybe it's better to remove the comment, but replace it with an
errhint that tells the user "For the cause of this error see the error
logged by the logical replication parallel apply worker."

~

8.

+ case 'X':
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker exited because of
subscription information change")));
+ break; /* Silence compiler warning. */
+ default:

Add a blank line before the default:

~

9.

+ /*
+ * Don't need to do anything about NoticeResponse and
+ * NotifyResponse as the logical replication worker doesn't need
+ * to send messages to the client.
+ */
+ case 'N':
+ case 'A':
+ break;
+
+ /*
+ * Restart replication if a parallel apply worker exited because
+ * of subscription information change.
+ */
+ case 'X':


IMO the comments describing the logic to take for each case should be
*inside* the case. The comment above (if any) should only say what the
messagetype means means.

SUGGESTION

/* Notification, NotifyResponse. */
case 'N':
case 'A':
/*
* Don't need to do anything about these message types as the logical replication
* worker doesn't need to send messages to the client.
*/
break;

/* Parallel apply worker exited because of subscription information change. */
case 'X':
/* Restart replication */

~~~

10. pa_send_data

+ /*
+ * If the attempt to send data via shared memory times out, we restart
+ * the logical replication to prevent possible deadlocks with another
+ * parallel apply worker. Refer to the comments atop
+ * applyparallelworker.c for details.
+ */
+ if (startTime == 0)
+ startTime = GetCurrentTimestamp();

Sometimes (like here) you say "Refer to the comments atop
applyparallelworker.c". In other places, the comments say "Refer to
the comments atop this file.". IMO the wording should be consistent
everywhere.

~~~

11. pa_set_stream_apply_worker

+/*
+ * Set the worker that required for applying the current streaming transaction.
+ */
+void
+pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
+{
+ stream_apply_worker = winfo;
+}

"the worker that required for" ?? English ??

~~~

12. pa_clean_subtrans

+/* Reset the list that maintains subtransactions. */
+void
+pa_clean_subtrans(void)
+{
+ subxactlist = NIL;
+}

Maybe a more informative function name would be pa_reset_subxactlist()?

~~~

13. pa_stream_abort

+ subxactlist = NIL;

Since you  created a new function pa_clean_subtrans which does exactly
this same NIL assignment I was not expecting to see this global being
explicitly set like this in other code -- It's  confusing to have
multiple ways to do the same thing.

Please check the rest of the patch in case the same is done elsewhere.

======

FILE: src/backend/replication/logical/launcher.c

14. logicalrep_worker_detach

+ /*
+ * Detach from the error_mq_handle for all parallel apply workers
+ * before terminating them to prevent the leader apply worker from
+ * receiving the worker termination messages and sending it to logs
+ * when the same is already done by individual parallel worker.
+ */
+ pa_detach_all_error_mq();

"before terminating them to prevent" -> "before terminating them. This prevents"

"termination messages" -> "termination message"

"by individual" -> "by the"

======

FILE: src/backend/replication/logical/worker.c

15. File header comment

+ * 1) Write to temporary files and apply when the final commit arrives
+ *
+ * This approach is used when user has set subscription's streaming option as
+ * on.

"when user has set" -> "when the user has set the"

~

16.

+ * 2) Parallel apply workers.
+ *
+ * This approach is used when user has set subscription's streaming option as
+ * parallel. See logical/applyparallelworker.c for information about this
+ * approach.

"when user has set" -> "when the user has set the "


~~~

17. apply_handle_stream_stop

+ case TRANS_PARALLEL_APPLY:
+ elog(DEBUG1, "applied %u changes in the streaming chunk",
+ parallel_stream_nchanges);
+
+ /*
+ * By the time parallel apply worker is processing the changes in
+ * the current streaming block, the leader apply worker may have
+ * sent multiple streaming blocks. This can lead to parallel apply
+ * worker start waiting even when there are more chunk of streams
+ * in the queue. So, try to lock only if there is no message left
+ * in the queue. See Locking Considerations atop
+ * applyparallelworker.c.
+ */

SUGGESTION (minor rewording)

By the time the parallel apply worker is processing the changes in the
current streaming block, the leader apply worker may have sent
multiple streaming blocks. To the parallel apply from waiting
unnecessarily, try to lock only if there is no message left in the
queue. See Locking Considerations atop applyparallelworker.c.

~~~

18. apply_handle_stream_abort

+ case TRANS_PARALLEL_APPLY:
+ pa_stream_abort(&abort_data);
+
+ /*
+ * We need to wait after processing rollback to savepoint for the
+ * next set of changes.
+ *
+ * By the time parallel apply worker is processing the changes in
+ * the current streaming block, the leader apply worker may have
+ * sent multiple streaming blocks. This can lead to parallel apply
+ * worker start waiting even when there are more chunk of streams
+ * in the queue. So, try to lock only if there is no message left
+ * in the queue. See Locking Considerations atop
+ * applyparallelworker.c.
+ */

Second paragraph ("By the time...") same review comment as the
previous one (#17)

~~~

19. store_flush_position

+ /*
+ * Skip for parallel apply workers. The leader apply worker will ensure to
+ * update it as the lsn_mapping is maintained by it.
+ */
+ if (am_parallel_apply_worker())
+ return;

SUGGESTION (comment multiple "it" was confusing)
Skip for parallel apply workers, because the lsn_mapping is maintained
by the leader apply worker.

~~~

20. set_apply_error_context_origin

+
+/* Set the origin name of apply error callback. */
+void
+set_apply_error_context_origin(char *originname)
+{
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+    originname);
+}

IMO that "Allocate ..." comment should just replace the function header comment.

~~~

21. apply_worker_clean_exit

I wasn't sure if calling this a 'clean' exit meant anything much.

How about:
- apply_worker_proc_exit, or
- apply_worker_exit

~

22.

+apply_worker_clean_exit(bool on_subinfo_change)
+{
+ if (am_parallel_apply_worker() && on_subinfo_change)
+ {
+ /*
+ * If a parallel apply worker exits due to the subscription
+ * information change, we notify the leader apply worker so that the
+ * leader can report more meaningful message in time and restart the
+ * logical replication.
+ */
+ pq_putmessage('X', NULL, 0);
+ }
+
+ proc_exit(0);
+}

SUGGESTION (for comment)
If this is a parallel apply worker exiting due to a subscription
information change, we notify the leader apply worker so that it can
report a more meaningful message before restarting the logical
replication.

======

FILE: src/include/commands/subscriptioncmds.h

23. externs

@@ -26,4 +26,6 @@ extern void DropSubscription(DropSubscriptionStmt
*stmt, bool isTopLevel);
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);

+extern char defGetStreamingMode(DefElem *def);

The extern is not in the same order as the functions of subscriptioncmds.c

======

FILE: src/include/replication/worker_internal.h

24. externs

24a.

+extern void apply_dispatch(StringInfo s);
+
+extern void InitializeApplyWorker(void);
+
+extern void maybe_reread_subscription(void);

The above externs are not in the same order as the functions of worker.c

~

24b.

+extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
+extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
+
+extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
+extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);

The above externs are not in the same order as the functions of
applyparallelworker.c

------
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Bogus rte->relkind for EXCLUDED pseudo-relation
Next
From: Paul Jungwirth
Date:
Subject: Think-o in foreign key comments