RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB5716D38323B07EB0A0D097F194E09@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("shiy.fnst@fujitsu.com" <shiy.fnst@fujitsu.com>)
Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
List pgsql-hackers
On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> 
> On Sun, Dec 11, 2022 at 8:45 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > On Friday, December 9, 2022 3:14 PM Amit Kapila
> <amit.kapila16@gmail.com> wrote:
> > >
> > > On Thu, Dec 8, 2022 at 12:37 PM houzj.fnst@fujitsu.com
> > > <houzj.fnst@fujitsu.com> wrote:
> > > >
> > >
> > > Review comments
> >
> > Thanks for the comments!
> >
> > > ==============
> > > 1. Currently, we don't release the stream lock in LA (leade apply
> > > worker) for "rollback to savepoint" and the reason is mentioned in
> > > comments of
> > > apply_handle_stream_abort() in the patch. But, today, while testing,
> > > I found that can lead to deadlock which otherwise, won't happen on
> > > the publisher. The key point is rollback to savepoint releases the
> > > locks acquired by the particular subtransaction, so parallel apply
> > > worker should also do the same. Consider the following example where
> > > the transaction in session-1 is being performed by the parallel
> > > apply worker and the transaction in session-2 is being performed by the
> leader apply worker. I have simulated it by using GUC force_stream_mode.
> > > Publisher
> > > ==========
> > > Session-1
> > > postgres=# begin;
> > > BEGIN
> > > postgres=*# savepoint s1;
> > > SAVEPOINT
> > > postgres=*# truncate t1;
> > > TRUNCATE TABLE
> > >
> > > Session-2
> > > postgres=# begin;
> > > BEGIN
> > > postgres=*# insert into t1 values(4);
> > >
> > > Session-1
> > > postgres=*# rollback to savepoint s1; ROLLBACK
> > >
> > > Session-2
> > > Commit;
> > >
> > > With or without commit of Session-2, this scenario will lead to
> > > deadlock on the subscriber because PA (parallel apply worker) is
> > > waiting for LA to send the next command, and LA is blocked by
> > > Exclusive of PA. There is no deadlock on the publisher because
> > > rollback to savepoint will release the lock acquired by truncate.
> > >
> > > To solve this, How about if we do three things before sending abort
> > > of sub-transaction (a) unlock the stream lock, (b) increment
> > > pending_stream_count,
> > > (c) take the stream lock again?
> > >
> > > Now, if the PA is not already waiting on the stop, it will not wait
> > > at stream_stop but will wait after applying abort of sub-transaction
> > > and if it is already waiting at stream_stop, the wait will be
> > > released. If this works then probably we should try to do (b) before (a) to
> match the steps with stream_start.
> >
> > The solution works for me, I have changed the code as suggested.
> >
> >
> > > 2. There seems to be another general problem in the way the patch
> > > waits for stream_stop in PA (parallel apply worker). Currently, PA
> > > checks, if there are no more pending streams then it tries to wait
> > > for the next stream by waiting on a stream lock. However, it is
> > > possible after PA checks there is no pending stream and before it
> > > actually starts waiting on a lock, the LA sends another stream for
> > > which even stream_stop is sent, in this case, PA will start waiting
> > > for the next stream whereas there is actually a pending stream
> > > available. In this case, it won't lead to any problem apart from
> > > delay in applying the changes in such cases but for the case mentioned in
> the previous point (Pont 1), it can lead to deadlock even after we implement the
> solution proposed to solve it.
> >
> > Thanks for reporting, I have introduced another flag in shared memory
> > and use it to prevent the leader from incrementing the
> > pending_stream_count if the parallel apply worker is trying to lock the stream
> lock.
> >
> >
> > > 3. The other point to consider is that for
> > > stream_commit/prepare/abort, in LA, we release the stream lock after
> > > sending the message whereas for stream_start we release it before
> > > sending the message. I think for the earlier cases
> > > (stream_commit/prepare/abort), the patch has done like this because
> > > pa_send_data() may need to require the lock again when it times out
> > > and start serializing, so there will be no sense in first releasing
> > > it, then re-acquiring it, and then again releasing it. Can't we also
> > > release the lock for stream_start after
> > > pa_send_data() only if it is not switched to serialize mode?
> >
> > Changed.
> >
> > Attach the new version patch set which addressed above comments.
> 
> Here are comments on v59 0001, 0002 patches:

Thanks for the comments!

> +void
> +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) {
> +        while (1)
> +        {
> +                SpinLockAcquire(&wshared->mutex);
> +
> +                /*
> +                 * Don't try to increment the count if the parallel
> apply worker is
> +                 * taking the stream lock. Otherwise, there would be
> a race condition
> +                 * that the parallel apply worker checks there is no
> pending streaming
> +                 * block and before it actually starts waiting on a
> lock, the leader
> +                 * sends another streaming block and take the stream
> lock again. In
> +                 * this case, the parallel apply worker will start
> waiting for the next
> +                 * streaming block whereas there is actually a
> pending streaming block
> +                 * available.
> +                 */
> +                if (!wshared->pa_wait_for_stream)
> +                {
> +                        wshared->pending_stream_count++;
> +                        SpinLockRelease(&wshared->mutex);
> +                        break;
> +                }
> +
> +                SpinLockRelease(&wshared->mutex);
> +        }
> +}
> 
> I think we should add an assertion to check if we don't hold the stream lock.
> 
> I think that waiting for pa_wait_for_stream to be false in a busy loop is not a
> good idea. It's not interruptible and there is not guarantee that we can break
> from this loop in a short time. For instance, if PA executes
> pa_decr_and_wait_stream_block() a bit earlier than LA executes
> pa_increment_stream_block(), LA has to wait for PA to acquire and release the
> stream lock in a busy loop. It should not be long in normal cases but the
> duration LA needs to wait for PA depends on PA, which could be long. Also
> what if PA raises an error in
> pa_lock_stream() due to some reasons? I think LA won't be able to detect the
> failure.
> 
> I think we should at least make it interruptible and maybe need to add some
> sleep. Or perhaps we can use the condition variable for this case.

Thanks for the analysis, I will research this part.

> ---
> In worker.c, we have the following common pattern:
> 
> case TRANS_LEADER_PARTIAL_SERIALIZE:
>     write change to the file;
>     do some work;
>     break;
> 
> case TRANS_LEADER_SEND_TO_PARALLEL:
>     pa_send_data();
> 
>     if (winfo->serialize_changes)
>     {
>         do some worker required after writing changes to the file.
>     }
>     :
>     break;
> 
> IIUC there are two different paths for partial serialization: (a) where
> apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where
> apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and
> winfo->serialize_changes became true. And we need to match what we do
> in (a) and (b). Rather than having two different paths for the same case, how
> about falling through TRANS_LEADER_PARTIAL_SERIALIZE when we could not
> send the changes? That is, pa_send_data() just returns false when the timeout
> exceeds and we need to switch to serialize changes, otherwise returns true. If it
> returns false, we prepare for switching to serialize changes such as initializing
> fileset, and fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code
> would be like:
> 
> case TRANS_LEADER_SEND_TO_PARALLEL:
>     ret = pa_send_data();
> 
>     if (ret)
>     {
>         do work for sending changes to PA.
>         break;
>     }
> 
>     /* prepare for switching to serialize changes */
>     winfo->serialize_changes = true;
>     initialize fileset;
>     acquire stream lock if necessary;
> 
>     /* FALLTHROUGH */
> case TRANS_LEADER_PARTIAL_SERIALIZE:
>     do work for serializing changes;
>     break;

I think that the suggestion is to extract the code that switch to serialize
mode out of the pa_send_data(), and then we need to add that logic in all the
functions which call pa_send_data(), I am not sure if it looks better as it
might introduce some more codes in each handling function.

> ---
> /*
> -                        * Unlock the shared object lock so that
> parallel apply worker can
> -                        * continue to receive and apply changes.
> +                        * Parallel apply worker might have applied
> some changes, so write
> +                        * the STREAM_ABORT message so that it can rollback
> the
> +                        * subtransaction if needed.
>  */
> -                       pa_unlock_stream(xid, AccessExclusiveLock);
> +                       stream_open_and_write_change(xid,
> LOGICAL_REP_MSG_STREAM_ABORT,
> +
>           &original_msg);
> +
> +                       if (toplevel_xact)
> +                       {
> +                               pa_unlock_stream(xid, AccessExclusiveLock);
> +                               pa_set_fileset_state(winfo->shared,
> FS_SERIALIZE_DONE);
> +                               (void) pa_free_worker(winfo, xid);
> +                       }
> 
> At every place except for the above code, we set the fileset state
> FS_SERIALIZE_DONE first then unlock the stream lock. Is there any reason for
> that?

No, I think we should make them consistent, will change this.

> ---
> +               case TRANS_LEADER_SEND_TO_PARALLEL:
> +                       Assert(winfo);
> +
> +                       /*
> +                        * Unlock the shared object lock so that
> parallel apply worker can
> +                        * continue to receive and apply changes.
> +                        */
> +                       pa_unlock_stream(xid, AccessExclusiveLock);
> +
> +                       /*
> +                        * For the case of aborting the
> subtransaction, we increment the
> +                        * number of streaming blocks and take the
> lock again before
> +                        * sending the STREAM_ABORT to ensure that the
> parallel apply
> +                        * worker will wait on the lock for the next
> set of changes after
> +                        * processing the STREAM_ABORT message if it
> is not already waiting
> +                        * for STREAM_STOP message.
> +                        */
> +                       if (!toplevel_xact)
> +                       {
> +                               pa_increment_stream_block(winfo->shared);
> +                               pa_lock_stream(xid, AccessExclusiveLock);
> +                       }
> +
> +                       /* Send STREAM ABORT message to the parallel
> apply worker. */
> +                       pa_send_data(winfo, s->len, s->data);
> +
> +                       if (toplevel_xact)
> +                               (void) pa_free_worker(winfo, xid);
> +
> +                       break;
> 
> In apply_handle_stream_abort(), it's better to add the comment why we don't
> need to wait for PA to finish.

Will add.

> 
> Also, given that we don't wait for PA to finish in this case, does it really make
> sense to call pa_free_worker() immediately after sending STREAM_ABORT?

I think it's possible that the PA finish the ROLLBACK quickly and the LA can
free the worker here in time.

> ---
> PA acquires the transaction lock in AccessShare mode whereas LA acquires it in
> AccessExclusiveMode. Is it better to do the opposite?
> Like a backend process acquires a lock on its XID in Exclusive mode, we can
> have PA acquire the lock on its XID in Exclusive mode whereas other attempts
> to acquire it in Share mode to wait.

Agreed, will improve.

> ---
>  void
> pa_lock_stream(TransactionId xid, LOCKMODE lockmode) {
>     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
>                                    PARALLEL_APPLY_LOCK_STREAM,
> lockmode); }
> 
> I think since we don't need to let the caller to specify the lock mode but need
> only shared and exclusive modes, we can make it simple by having a boolean
> argument say shared instead of lockmode.

I personally think passing the lockmode would make the code more clear
than passing a Boolean value.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: Nathan Bossart
Date:
Subject: Re: Avoid extra "skipping" messages from VACUUM/ANALYZE
Next
From: Thomas Munro
Date:
Subject: Re: Direct I/O