Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Masahiko Sawada
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAD21AoD_3ta_29sopuf_KJCExverP5808HjwDeqaHDrbqbxYww@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
List pgsql-hackers
On Sun, Dec 11, 2022 at 8:45 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Friday, December 9, 2022 3:14 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Thu, Dec 8, 2022 at 12:37 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> >
> > Review comments
>
> Thanks for the comments!
>
> > ==============
> > 1. Currently, we don't release the stream lock in LA (leade apply
> > worker) for "rollback to savepoint" and the reason is mentioned in comments of
> > apply_handle_stream_abort() in the patch. But, today, while testing, I found that
> > can lead to deadlock which otherwise, won't happen on the publisher. The key
> > point is rollback to savepoint releases the locks acquired by the particular
> > subtransaction, so parallel apply worker should also do the same. Consider the
> > following example where the transaction in session-1 is being performed by the
> > parallel apply worker and the transaction in session-2 is being performed by the
> > leader apply worker. I have simulated it by using GUC force_stream_mode.
> > Publisher
> > ==========
> > Session-1
> > postgres=# begin;
> > BEGIN
> > postgres=*# savepoint s1;
> > SAVEPOINT
> > postgres=*# truncate t1;
> > TRUNCATE TABLE
> >
> > Session-2
> > postgres=# begin;
> > BEGIN
> > postgres=*# insert into t1 values(4);
> >
> > Session-1
> > postgres=*# rollback to savepoint s1;
> > ROLLBACK
> >
> > Session-2
> > Commit;
> >
> > With or without commit of Session-2, this scenario will lead to deadlock on the
> > subscriber because PA (parallel apply worker) is waiting for LA to send the next
> > command, and LA is blocked by Exclusive of PA. There is no deadlock on the
> > publisher because rollback to savepoint will release the lock acquired by
> > truncate.
> >
> > To solve this, How about if we do three things before sending abort of
> > sub-transaction (a) unlock the stream lock, (b) increment pending_stream_count,
> > (c) take the stream lock again?
> >
> > Now, if the PA is not already waiting on the stop, it will not wait at stream_stop
> > but will wait after applying abort of sub-transaction and if it is already waiting at
> > stream_stop, the wait will be released. If this works then probably we should try
> > to do (b) before (a) to match the steps with stream_start.
>
> The solution works for me, I have changed the code as suggested.
>
>
> > 2. There seems to be another general problem in the way the patch waits for
> > stream_stop in PA (parallel apply worker). Currently, PA checks, if there are no
> > more pending streams then it tries to wait for the next stream by waiting on a
> > stream lock. However, it is possible after PA checks there is no pending stream
> > and before it actually starts waiting on a lock, the LA sends another stream for
> > which even stream_stop is sent, in this case, PA will start waiting for the next
> > stream whereas there is actually a pending stream available. In this case, it won't
> > lead to any problem apart from delay in applying the changes in such cases but
> > for the case mentioned in the previous point (Pont 1), it can lead to deadlock
> > even after we implement the solution proposed to solve it.
>
> Thanks for reporting, I have introduced another flag in shared memory and use it to
> prevent the leader from incrementing the pending_stream_count if the parallel
> apply worker is trying to lock the stream lock.
>
>
> > 3. The other point to consider is that for stream_commit/prepare/abort, in LA, we
> > release the stream lock after sending the message whereas for stream_start we
> > release it before sending the message. I think for the earlier cases
> > (stream_commit/prepare/abort), the patch has done like this because
> > pa_send_data() may need to require the lock again when it times out and start
> > serializing, so there will be no sense in first releasing it, then re-acquiring it, and
> > then again releasing it. Can't we also release the lock for stream_start after
> > pa_send_data() only if it is not switched to serialize mode?
>
> Changed.
>
> Attach the new version patch set which addressed above comments.

Here are comments on v59 0001, 0002 patches:

+void
+pa_increment_stream_block(ParallelApplyWorkerShared *wshared)
+{
+        while (1)
+        {
+                SpinLockAcquire(&wshared->mutex);
+
+                /*
+                 * Don't try to increment the count if the parallel
apply worker is
+                 * taking the stream lock. Otherwise, there would be
a race condition
+                 * that the parallel apply worker checks there is no
pending streaming
+                 * block and before it actually starts waiting on a
lock, the leader
+                 * sends another streaming block and take the stream
lock again. In
+                 * this case, the parallel apply worker will start
waiting for the next
+                 * streaming block whereas there is actually a
pending streaming block
+                 * available.
+                 */
+                if (!wshared->pa_wait_for_stream)
+                {
+                        wshared->pending_stream_count++;
+                        SpinLockRelease(&wshared->mutex);
+                        break;
+                }
+
+                SpinLockRelease(&wshared->mutex);
+        }
+}

I think we should add an assertion to check if we don't hold the stream lock.

I think that waiting for pa_wait_for_stream to be false in a busy loop
is not a good idea. It's not interruptible and there is not guarantee
that we can break from this loop in a short time. For instance, if PA
executes pa_decr_and_wait_stream_block() a bit earlier than LA
executes pa_increment_stream_block(), LA has to wait for PA to acquire
and release the stream lock in a busy loop. It should not be long in
normal cases but the duration LA needs to wait for PA depends on PA,
which could be long. Also what if PA raises an error in
pa_lock_stream() due to some reasons? I think LA won't be able to
detect the failure.

I think we should at least make it interruptible and maybe need to add
some sleep. Or perhaps we can use the condition variable for this
case.

---
In worker.c, we have the following common pattern:

case TRANS_LEADER_PARTIAL_SERIALIZE:
    write change to the file;
    do some work;
    break;

case TRANS_LEADER_SEND_TO_PARALLEL:
    pa_send_data();

    if (winfo->serialize_changes)
    {
        do some worker required after writing changes to the file.
    }
    :
    break;

IIUC there are two different paths for partial serialization: (a)
where apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where
apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and
winfo->serialize_changes became true. And we need to match what we do
in (a) and (b). Rather than having two different paths for the same
case, how about falling through TRANS_LEADER_PARTIAL_SERIALIZE when we
could not send the changes? That is, pa_send_data() just returns false
when the timeout exceeds and we need to switch to serialize changes,
otherwise returns true. If it returns false, we prepare for switching
to serialize changes such as initializing fileset, and fall through
TRANS_LEADER_PARTIAL_SERIALIZE case. The code would be like:

case TRANS_LEADER_SEND_TO_PARALLEL:
    ret = pa_send_data();

    if (ret)
    {
        do work for sending changes to PA.
        break;
    }

    /* prepare for switching to serialize changes */
    winfo->serialize_changes = true;
    initialize fileset;
    acquire stream lock if necessary;

    /* FALLTHROUGH */
case TRANS_LEADER_PARTIAL_SERIALIZE:
    do work for serializing changes;
    break;

---
/*
-                        * Unlock the shared object lock so that
parallel apply worker can
-                        * continue to receive and apply changes.
+                        * Parallel apply worker might have applied
some changes, so write
+                        * the STREAM_ABORT message so that it can rollback the
+                        * subtransaction if needed.
 */
-                       pa_unlock_stream(xid, AccessExclusiveLock);
+                       stream_open_and_write_change(xid,
LOGICAL_REP_MSG_STREAM_ABORT,
+
          &original_msg);
+
+                       if (toplevel_xact)
+                       {
+                               pa_unlock_stream(xid, AccessExclusiveLock);
+                               pa_set_fileset_state(winfo->shared,
FS_SERIALIZE_DONE);
+                               (void) pa_free_worker(winfo, xid);
+                       }

At every place except for the above code, we set the fileset state
FS_SERIALIZE_DONE first then unlock the stream lock. Is there any
reason for that?

---
+               case TRANS_LEADER_SEND_TO_PARALLEL:
+                       Assert(winfo);
+
+                       /*
+                        * Unlock the shared object lock so that
parallel apply worker can
+                        * continue to receive and apply changes.
+                        */
+                       pa_unlock_stream(xid, AccessExclusiveLock);
+
+                       /*
+                        * For the case of aborting the
subtransaction, we increment the
+                        * number of streaming blocks and take the
lock again before
+                        * sending the STREAM_ABORT to ensure that the
parallel apply
+                        * worker will wait on the lock for the next
set of changes after
+                        * processing the STREAM_ABORT message if it
is not already waiting
+                        * for STREAM_STOP message.
+                        */
+                       if (!toplevel_xact)
+                       {
+                               pa_increment_stream_block(winfo->shared);
+                               pa_lock_stream(xid, AccessExclusiveLock);
+                       }
+
+                       /* Send STREAM ABORT message to the parallel
apply worker. */
+                       pa_send_data(winfo, s->len, s->data);
+
+                       if (toplevel_xact)
+                               (void) pa_free_worker(winfo, xid);
+
+                       break;

In apply_handle_stream_abort(), it's better to add the comment why we
don't need to wait for PA to finish.


Also, given that we don't wait for PA to finish in this case, does it
really make sense to call pa_free_worker() immediately after sending
STREAM_ABORT?

---
PA acquires the transaction lock in AccessShare mode whereas LA
acquires it in AccessExclusiveMode. Is it better to do the opposite?
Like a backend process acquires a lock on its XID in Exclusive mode,
we can have PA acquire the lock on its XID in Exclusive mode whereas
other attempts to acquire it in Share mode to wait.

---
 void
pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
{
    LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
                                   PARALLEL_APPLY_LOCK_STREAM, lockmode);
}

I think since we don't need to let the caller to specify the lock mode
but need only shared and exclusive modes, we can make it simple by
having a boolean argument say shared instead of lockmode.

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Ordering behavior for aggregates
Next
From: Tom Lane
Date:
Subject: Re: Use get_call_result_type() more widely