Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAD21AoDScLvLT8JBfu5WaGCPQs_qhxsybMT+sMXJ=QrDMTyr9w@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Tue, Nov 15, 2022 at 8:57 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Saturday, November 12, 2022 7:06 PM Amit Kapila <amit.kapila16@gmail.com>
> >
> > On Fri, Nov 11, 2022 at 2:12 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> >
> > Few comments on v46-0001:
> > ======================
> >
>
> Thanks for the comments.
>
> > 1.
> > +static void
> > +apply_handle_stream_abort(StringInfo s)
> > {
> > ...
> > + /* Send STREAM ABORT message to the parallel apply worker. */
> > + parallel_apply_send_data(winfo, s->len, s->data);
> > +
> > + if (abort_toplevel_transaction)
> > + {
> > + parallel_apply_unlock_stream(xid, AccessExclusiveLock);
> >
> > Shouldn't we need to release this lock before sending the message as
> > we are doing for streap_prepare and stream_commit? If there is a
> > reason for doing it differently here then let's add some comments for
> > the same.
>
> Changed.
>
> > 2. It seems once the patch makes the file state as busy
> > (LEADER_FILESET_BUSY), it will only be accessible after the leader
> > apply worker receives a transaction end message like stream_commit. Is
> > my understanding correct? If yes, then why can't we make it accessible
> > after the stream_stop message? Are you worried about the concurrency
> > handling for reading and writing the file? If so, we can probably deal
> > with it via some lock for reading and writing to file for each change.
> > I think after this we may not need additional stream level lock/unlock
> > in parallel_apply_spooled_messages. I understand that you probably
> > want to keep the code simple so I am not suggesting changing it
> > immediately but just wanted to know whether you have considered
> > alternatives here.
>
> I thought about this, but it seems the current buffile design doesn't allow two
> processes to open the same buffile at the same time(refer to the comment atop
> of BufFileOpenFileSet()). This means the LA needs to make sure the PA has
> closed the buffile before writing more changes into it. Although we could let
> the LA wait for that, but it could cause another kind of deadlock. Suppose the
> PA opened the file and is blocked when applying the just read change. And the
> LA starts to wait when trying to write the next set of streaming changes into
> file because the file is still opened by PA. Then the lock edge is like:
>
> LA (wait for file to be closed) -> PA1 (wait for unique lock in PA2) -> PA2
> (wait for stream lock held in LA)
>
> We could introduce another lock for this, but that seems not very great as we
> already had two kinds of locks here.
>
> Another solution could be we create different filename for each streaming block
> so that the leader don't need to reopen the same file after writing changes
> into it, but that seems largely increase the number of temp files and looks a
> bit hacky. Or we could let PA open the file, then read and close the file for
> each change, but it seems bring some overhead of opening and closing file.
>
> Another solution which doesn't need a new lock could be that we create
> different filename for each streaming block so that the leader doesn't need to
> reopen the same file after writing changes into it, but that seems largely
> increase the number of temp files and looks a bit hacky. Or we could let PA
> open the file, then read and close the file for each change, but it seems bring
> some overhead of opening and closing file.
>
> Based on above, how about keep the current approach ?(i.e. PA
> will open the file only after the leader apply worker receives a transaction
> end message like stream_commit). Ideally, it will enter partial serialize mode
> only when PA is blocked by a backend or another PA which seems not that common.

+1. We can improve this area later in a separate patch.

Here are review comments on v47-0001 and v47-0002 patches:

When the parallel apply worker exited, I got the following server log.
I think this log is not appropriate since the worker was not
terminated by administrator command but exited by itself. Also,
probably it should exit with exit code 0?

FATAL:  terminating logical replication worker due to administrator command
LOG:  background worker "logical replication parallel worker" (PID
3594918) exited with exit code 1

---
/*
 * Stop the worker if there are enough workers in the pool or the leader
 * apply worker serialized part of the transaction data to a file due to
 * send timeout.
 */
if (winfo->serialize_changes ||
napplyworkers > (max_parallel_apply_workers_per_subscription / 2))

Why do we need to stop the worker if the leader serializes changes?

---
+        /*
+         * Release all session level locks that could be held in parallel apply
+         * mode.
+         */
+        LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+

I think we call LockReleaseAll() at the process exit (in ProcKill()),
but do we really need to do LockReleaseAll() here too?

---

+                elog(ERROR, "could not find replication state slot
for replication"
+                         "origin with OID %u which was acquired by
%d", node, acquired_by);

Let's not break the error log message in the middle so that the user
can search the message by grep easily.

---
+        {
+                {"max_parallel_apply_workers_per_subscription",
+                        PGC_SIGHUP,
+                        REPLICATION_SUBSCRIBERS,
+                        gettext_noop("Maximum number of parallel
apply workers per subscription."),
+                        NULL,
+                },
+                &max_parallel_apply_workers_per_subscription,
+                2, 0, MAX_BACKENDS,
+                NULL, NULL, NULL
+        },
+

I think we should use MAX_PARALLEL_WORKER_LIMIT as the max value
instead. MAX_BACKENDS is too high.

---
+        /*
+         * Indicates whether there are pending messages in the queue.
The parallel
+         * apply worker will check it before starting to wait.
+         */
+        pg_atomic_uint32       pending_message_count;

The "pending messages" sounds like individual logical replication
messages such as LOGICAL_REP_MSG_INSERT. But IIUC what this value
actually shows is how many streamed chunks are pending to process,
right?

---
The streaming parameter has the new value "parallel" for "streaming"
option to enable the parallel apply. It fits so far but I think the
parallel apply feature doesn't necessarily need to be tied up with
streaming replication. For example, we might want to support parallel
apply also for non-streaming transactions in the future. It might be
better to have another option, say "parallel", to control parallel
apply behavior. The "parallel" option can be a boolean option and
setting parallel = on requires streaming = on.

Another variant is to have a new subscription parameter for example
"parallel_workers" parameter that specifies the number of parallel
workers. That way, users can specify the number of parallel workers
per subscription.

---
When the parallel apply worker raises an error, I got the same error
twice from the leader worker and parallel worker as follows. Can we
suppress either one?

2022-11-17 17:30:23.490 JST [3814552] LOG:  logical replication
parallel apply worker for subscription "test_sub1" has started
2022-11-17 17:30:23.490 JST [3814552] ERROR:  duplicate key value
violates unique constraint "test1_c_idx"
2022-11-17 17:30:23.490 JST [3814552] DETAIL:  Key (c)=(1) already exists.
2022-11-17 17:30:23.490 JST [3814552] CONTEXT:  processing remote data
for replication origin "pg_16390" during message type "INSERT" for
replication target relatio
n "public.test1" in transaction 731
2022-11-17 17:30:23.490 JST [3814550] ERROR:  duplicate key value
violates unique constraint "test1_c_idx"
2022-11-17 17:30:23.490 JST [3814550] DETAIL:  Key (c)=(1) already exists.
2022-11-17 17:30:23.490 JST [3814550] CONTEXT:  processing remote data
for replication origin "pg_16390" during message type "INSERT" for
replication target relatio
n "public.test1" in transaction 731
        parallel apply worker

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



pgsql-hackers by date:

Previous
From: Tomas Vondra
Date:
Subject: Re: Optimize join selectivity estimation by not reading MCV stats for unique join attributes
Next
From: Tom Lane
Date:
Subject: Re: Optimize join selectivity estimation by not reading MCV stats for unique join attributes