RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB571652B8593F8902B541680394009@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
On Tuesday, November 8, 2022 7:50 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> On Mon, Nov 7, 2022 at 6:49 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > On Friday, November 4, 2022 7:45 PM Amit Kapila
> <amit.kapila16@gmail.com> wrote:
> > > 3.
> > > apply_handle_stream_start(StringInfo s) { ...
> > > + if (!first_segment)
> > > + {
> > > + /*
> > > + * Unlock the shared object lock so that parallel apply worker
> > > + * can continue to receive and apply changes.
> > > + */
> > > + parallel_apply_unlock(winfo->shared->stream_lock_id);
> > > ...
> > > }
> > >
> > > Can we have an assert before this unlock call that the lock must be
> > > held? Similarly, if there are other places then we can have assert
> > > there as well.
> >
> > It seems we don't have a standard API can be used without a transaction.
> > Maybe we can use the list ParallelApplyLockids to check that ?
> >
> 
> Yeah, that occurred to me as well but I am not sure if it is a good
> idea to maintain this list just for assertion but if it turns out that
> we need to maintain it for a different purpose then we can probably
> use it for assert as well.
> 
> Few other comments/questions:
> =========================
> 1.
> apply_handle_stream_start(StringInfo s)
> {
> ...
> 
> + case TRANS_PARALLEL_APPLY:
> ...
> ...
> + /*
> + * Unlock the shared object lock so that the leader apply worker
> + * can continue to send changes.
> + */
> + parallel_apply_unlock(MyParallelShared->stream_lock_id,
> AccessShareLock);
> 
> As per the design in the email [1], this lock needs to be released by
> the leader worker during stream start which means it should be
> released under the state TRANS_LEADER_SEND_TO_PARALLEL. From the
> comments as well, it is not clear to me why at this time leader is
> supposed to be blocked. Is there a reason for doing differently than
> what is proposed in the original design?
> 2. Similar to above, it is not clear why the parallel worker needs to
> release the stream_lock_id lock at stream_commit and stream_prepare?

Sorry, these were due to my miss. Changed.

> 3. Am, I understanding correctly that you need to lock/unlock in
> apply_handle_stream_abort() for the parallel worker because after
> rollback to savepoint, there could be another set of stream or
> transaction end commands for which you want to wait? If so, maybe an
> additional comment would serve the purpose.

I think you are right. I will think about this in case I missed something and
add some comments in next version.

> 4.
> The leader may have sent multiple streaming blocks in the queue
> + * When the child is processing a streaming block. So only try to
> + * lock if there is no message left in the queue.
> 
> Let's slightly reword this to: "By the time child is processing the
> changes in the current streaming block, the leader may have sent
> multiple streaming blocks. So, try to lock only if there is no message
> left in the queue."

Changed.

> 5.
> +parallel_apply_unlock(uint16 lockid, LOCKMODE lockmode)
> +{
> + if (!list_member_int(ParallelApplyLockids, lockid))
> + return;
> +
> + UnlockSharedObjectForSession(SubscriptionRelationId,
> MySubscription->oid,
> + lockid, am_leader_apply_worker() ?
> + AccessExclusiveLock:
> + AccessShareLock);
> 
> This function should use lockmode argument passed rather than deciding
> based on am_leader_apply_worker. I think this is anyway going to
> change if we start using a different locktag as discussed in one of
> the above emails.

Changed.

> 6.
> +
>  /*
>   * Common spoolfile processing.
>   */
> -static void
> -apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
> +void
> +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
> 
> Seems like a spurious line addition.

Removed.

> Fair point. I think if the user wants, she can join with pg_stat_subscription
> based on PID and find the corresponding subscription. However, if we want to
> identify everything via pg_locks then I think we should also mention classid
> or database id as field1. So, it would look like: field1: (pg_subscription's
> oid or current db id); field2: OID of subscription in pg_subscription;
> field3: local or remote xid; field4: 0/1 to differentiate between remote and
> local xid.

I tried to use local xid to lock the transaction, but we currently can only get
the local xid after applying the first change. And it's possible that the first
change in parallel apply worker is blocked by other parallel apply worker which
means the parallel apply worker might not have a chance to share the local xid
with the leader.

To resolve this, I tried to use remote_xid for both stream and transaction lock
and use field4: 0/1 to differentiate between stream and transaction lock. Like:

field1: (current db id); field2: OID of subscription in pg_subscription;
field3: remote xid; field4: 0/1 to differentiate between stream_lock and
transaction_lock.


> IIUC, this handling is required for the case when we are not able to send a
> message to parallel apply worker and switch to serialize mode (write
> remaining data to file). Basically, it is possible that the message is only
> partially sent and there is no way clean the queue. I feel we can directly
> free the worker in this case even if there is a space in the worker pool. The
> other idea could be that we detach from shm_mq and then invent a way to
> re-attach it after we try to reuse the same worker.

For now, I directly stop the worker in this case. But I will think more about
this.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: Jeff Davis
Date:
Subject: Re: Lack of PageSetLSN in heap_xlog_visible
Next
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply