RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57169DAA9A2A6E68EE5E05F094E19@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Re: Perform streaming logical transactions by background workers and parallel apply  (Masahiko Sawada <sawada.mshk@gmail.com>)
List pgsql-hackers
On Wednesday, December 14, 2022 2:49 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

> 
> On Wed, Dec 14, 2022 at 9:50 AM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada
> <sawada.mshk@gmail.com> wrote:
> > >
> > > Here are comments on v59 0001, 0002 patches:
> >
> > Thanks for the comments!
> >
> > > +void
> > > +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) {
> > > +        while (1)
> > > +        {
> > > +                SpinLockAcquire(&wshared->mutex);
> > > +
> > > +                /*
> > > +                 * Don't try to increment the count if the parallel
> > > apply worker is
> > > +                 * taking the stream lock. Otherwise, there would
> > > + be
> > > a race condition
> > > +                 * that the parallel apply worker checks there is
> > > + no
> > > pending streaming
> > > +                 * block and before it actually starts waiting on a
> > > lock, the leader
> > > +                 * sends another streaming block and take the
> > > + stream
> > > lock again. In
> > > +                 * this case, the parallel apply worker will start
> > > waiting for the next
> > > +                 * streaming block whereas there is actually a
> > > pending streaming block
> > > +                 * available.
> > > +                 */
> > > +                if (!wshared->pa_wait_for_stream)
> > > +                {
> > > +                        wshared->pending_stream_count++;
> > > +                        SpinLockRelease(&wshared->mutex);
> > > +                        break;
> > > +                }
> > > +
> > > +                SpinLockRelease(&wshared->mutex);
> > > +        }
> > > +}
> > >
> > > I think we should add an assertion to check if we don't hold the stream lock.
> > >
> > > I think that waiting for pa_wait_for_stream to be false in a busy
> > > loop is not a good idea. It's not interruptible and there is not
> > > guarantee that we can break from this loop in a short time. For
> > > instance, if PA executes
> > > pa_decr_and_wait_stream_block() a bit earlier than LA executes
> > > pa_increment_stream_block(), LA has to wait for PA to acquire and
> > > release the stream lock in a busy loop. It should not be long in
> > > normal cases but the duration LA needs to wait for PA depends on PA,
> > > which could be long. Also what if PA raises an error in
> > > pa_lock_stream() due to some reasons? I think LA won't be able to
> > > detect the failure.
> > >
> > > I think we should at least make it interruptible and maybe need to
> > > add some sleep. Or perhaps we can use the condition variable for this case.
> >
> 
> Or we can leave this while (true) logic altogether for the first version and have a
> comment to explain this race. Anyway, after restarting, it will probably be
> solved. We can always change this part of the code later if this really turns out
> to be problematic.

Agreed, and reverted this part.

> 
> > Thanks for the analysis, I will research this part.
> >
> > > ---
> > > In worker.c, we have the following common pattern:
> > >
> > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > >     write change to the file;
> > >     do some work;
> > >     break;
> > >
> > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > >     pa_send_data();
> > >
> > >     if (winfo->serialize_changes)
> > >     {
> > >         do some worker required after writing changes to the file.
> > >     }
> > >     :
> > >     break;
> > >
> > > IIUC there are two different paths for partial serialization: (a)
> > > where apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where
> > > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and
> > > winfo->serialize_changes became true. And we need to match what we
> > > winfo->do
> > > in (a) and (b). Rather than having two different paths for the same
> > > case, how about falling through TRANS_LEADER_PARTIAL_SERIALIZE when
> > > we could not send the changes? That is, pa_send_data() just returns
> > > false when the timeout exceeds and we need to switch to serialize
> > > changes, otherwise returns true. If it returns false, we prepare for
> > > switching to serialize changes such as initializing fileset, and
> > > fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code would be
> like:
> > >
> > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > >     ret = pa_send_data();
> > >
> > >     if (ret)
> > >     {
> > >         do work for sending changes to PA.
> > >         break;
> > >     }
> > >
> > >     /* prepare for switching to serialize changes */
> > >     winfo->serialize_changes = true;
> > >     initialize fileset;
> > >     acquire stream lock if necessary;
> > >
> > >     /* FALLTHROUGH */
> > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > >     do work for serializing changes;
> > >     break;
> >
> > I think that the suggestion is to extract the code that switch to
> > serialize mode out of the pa_send_data(), and then we need to add that
> > logic in all the functions which call pa_send_data(), I am not sure if
> > it looks better as it might introduce some more codes in each handling
> function.
> >
> 
> How about extracting the common code from apply_handle_stream_commit
> and apply_handle_stream_prepare to a separate function say
> pa_xact_finish_common()? I see there is a lot of common code (unlock the
> stream, wait for the finish, store flush location, free worker
> info) in both the functions for TRANS_LEADER_PARTIAL_SERIALIZE and
> TRANS_LEADER_SEND_TO_PARALLEL cases.

Agreed, changed. I also addressed Sawada-san comment by extracting the
code that switch to serialize out of pa_send_data().

> >
> > > ---
> > >  void
> > > pa_lock_stream(TransactionId xid, LOCKMODE lockmode) {
> > >     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
> > >                                    PARALLEL_APPLY_LOCK_STREAM,
> > > lockmode); }
> > >
> > > I think since we don't need to let the caller to specify the lock
> > > mode but need only shared and exclusive modes, we can make it simple
> > > by having a boolean argument say shared instead of lockmode.
> >
> > I personally think passing the lockmode would make the code more clear
> > than passing a Boolean value.
> >
> 
> +1.
> 
> I have made a few changes in the newly added comments and function name in
> the attached patch. Kindly include this if you find the changes okay.

Thanks, I have checked and merged it.

Attach the new version patch set which addressed all comments so far.

Best regards,
Hou zj


Attachment

pgsql-hackers by date:

Previous
From: Thomas Munro
Date:
Subject: check_strxfrm_bug()
Next
From: Tom Lane
Date:
Subject: Re: Error-safe user functions