RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB571680391393F3CB63469F3E940A9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On  Monday, November 21, 2022 2:26 PM Peter Smith <smithpb2250@gmail.com> wrote:
> On Fri, Nov 18, 2022 at 6:03 PM Peter Smith <smithpb2250@gmail.com>
> wrote:
> >
> > Here are some review comments for v47-0001
> >
> > (This review is a WIP - I will post more comments for this patch next
> > week)
> >
> 
> Here are the rest of my comments for v47-0001

Thanks for the comments!

> ======
> 
> doc/src/sgml/monitoring.
> 
> 1.
> 
> @@ -1851,6 +1851,11 @@ postgres   27093  0.0  0.0  30096  2752 ?
>  Ss   11:34   0:00 postgres: ser
>        <entry>Waiting to acquire an advisory user lock.</entry>
>       </row>
>       <row>
> +      <entry><literal>applytransaction</literal></entry>
> +      <entry>Waiting to acquire acquire a lock on a remote transaction being
> +      applied on the subscriber side.</entry>
> +     </row>
> +     <row>
> 
> 1a.
> Typo "acquire acquire"

Fixed.

> ~
> 
> 1b.
> Maybe "on the subscriber side" does not mean much without any context.
> Maybe better to word it as below.
> 
> SUGGESTION
> Waiting to acquire a lock on a remote transaction being applied by a logical
> replication subscriber.

Changed.

> ======
> 
> doc/src/sgml/system-views.sgml
> 
> 2.
> 
> @@ -1361,8 +1361,9 @@
>         <literal>virtualxid</literal>,
>         <literal>spectoken</literal>,
>         <literal>object</literal>,
> -       <literal>userlock</literal>, or
> -       <literal>advisory</literal>.
> +       <literal>userlock</literal>,
> +       <literal>advisory</literal> or
> +       <literal>applytransaction</literal>.
> 
> This change removed the Oxford comma that was there before. I assume it was
> unintended.

Changed.

> ======
> 
> .../replication/logical/applyparallelworker.c
> 
> 3. globals
> 
> The parallel_apply_XXX functions were all shortened to pa_XXX.
> 
> I wondered if the same simplification should be done also to the global
> statics...
> 
> e.g.
> ParallelApplyWorkersHash -> PAWorkerHash ParallelApplyWorkersList ->
> PAWorkerList ParallelApplyMessagePending -> PAMessagePending etc...

I personally feel these names looks fine to me.

> ~~~
> 
> 4. pa_get_free_worker
> 
> + foreach(lc, active_workers)
> + {
> + ParallelApplyWorkerInfo *winfo = NULL;
> +
> + winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
> 
> No need to assign NULL because the next line just overwrites that anyhow.

Changed.

> ~
> 
> 5.
> 
> + /*
> + * Try to free the worker first, because we don't wait for the rollback
> + * command to finish so the worker may not be freed at the end of the
> + * transaction.
> + */
> + if (pa_free_worker(winfo, winfo->shared->xid)) continue;
> +
> + if (!winfo->in_use)
> + return winfo;
> 
> Shouldn't the (!winfo->in_use) check be done first as well -- e.g. why are we
> trying to free a worker which is maybe not even in_use?
> 
> SUGGESTION (this will need some comment to explain what it is doing) if
> (!winfo->in_use || !pa_free_worker(winfo, winfo->shared->xid) &&
> !winfo->in_use)
> return winfo;

Since the pa_free_worker will check the in_use flag as well and
the current style looks clean to me. So I didn't change this.

But it seems we need to first call pa_free_worker for every worker and then
choose a free a free, otherwise a stopped worker info(shared memory or ...)
might be left for a long time. I will think about this and try to fix it in
next version.

> ~~~
> 
> 6. pa_free_worker
> 
> +/*
> + * Remove the parallel apply worker entry from the hash table. Stop the
> +work if
> + * there are enough workers in the pool.
> + *
> 
> Typo? "work" -> "worker"
> 

Fixed.

> 
> 7.
> 
> + /* Are there enough workers in the pool? */ if (napplyworkers >
> + (max_parallel_apply_workers_per_subscription / 2)) {
> 
> IMO that comment should be something more like "Don't detach/stop the
> worker unless..."
> 

Improved.

> 
> 8. pa_send_data
> 
> + /*
> + * Retry after 1s to reduce the cost of getting the system time and
> + * calculating the time difference.
> + */
> + (void) WaitLatch(MyLatch,
> + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000L,
> + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
> 
> 8a.
> I am not sure you need to explain the reason in the comment. Just saying "Wait
> before retrying." seems sufficient to me.

Changed.

> ~
> 
> 8b.
> Instead of the hardwired "1s" in the comment, and 1000L in the code, maybe
> better to just have another constant.
> 
> SUGGESTION
> #define SHM_SEND_RETRY_INTERVAL_MS 1000
> #define SHM_SEND_TIMEOUT_MS 10000

Changed.

> ~
> 
> 9.
> 
> + if (startTime == 0)
> + startTime = GetCurrentTimestamp();
> + else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
> 
> IMO the initial startTime should be at top of the function otherwise the timeout
> calculation seems wrong.

Setting startTime at beginning will bring unnecessary cost if we don't need to retry.
And start counting from the first failure looks fine to me.

> ======
> 
> src/backend/replication/logical/worker.c
> 
> 10. handle_streamed_transaction
> 
> + * In streaming case (receiving a block of streamed transaction), for
> + * SUBSTREAM_ON mode, simply redirect it to a file for the proper
> + toplevel
> + * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to
> + parallel
> + * apply workers (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
> + changes
> + * will be applied by both leader apply worker and parallel apply workers).
> 
> I'm not sure this function comment should be referring to SUBSTREAM_ON
> and SUBSTREAM_PARALLEL because the function body does not use those
> anywhere in the logic.

Improved.

> ~~~
> 
> 11. apply_handle_stream_start
> 
> + /*
> + * Increment the number of messages waiting to be processed by
> + * parallel apply worker.
> + */
> + pg_atomic_add_fetch_u32(&(winfo->shared->pending_message_count), 1);
> +
> 
> The &() parens are not needed. Just write
> &winfo->shared->pending_message_count.
> 
> Also, search/replace others like this -- there are a few of them.

Changed.

> ~~~
> 
> 12. apply_handle_stream_stop
> 
> + if (!abort_toplevel_transaction &&
> + pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_message_count),
> 1)
> + == 0) { pa_lock_stream(MyParallelShared->xid, AccessShareLock);
> + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); }
> 
> That lock/unlock seems like it is done just as a way of testing/waiting for an
> exclusive lock held on the xid to be released.
> But the code is too tricky -- IMO it needs a big comment saying how this trick
> works, or maybe better to have a wrapper function for this for clarity. e.g.
> pa_wait_nolock_stream(xid); (or some better name)

I think the comments atop applyparallelworker.c explained the usage of
stream/transaction lock.

```
...
* In order for lmgr to detect this, we have LA acquire a session lock on the
 * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
 * trying to receive messages. In other words, LA acquires the lock before
 * sending STREAM_STOP and releases it if already acquired before sending
 * STREAM_START, STREAM_ABORT(for toplevel transaction), STREAM_PREPARE and
 * STREAM_COMMIT. For PA, it always needs to acquire the lock after processing
 * STREAM_STOP and then release immediately after acquiring it. That way, when
 * PA is waiting for LA, we can have a wait-edge from PA to LA in lmgr, which
 * will make a deadlock in lmgr like:
...
```

> ~~~
> 
> 13. apply_handle_stream_abort
> 
> + if (abort_toplevel_transaction)
> + {
> + (void) pa_free_worker(winfo, xid);
> + }
> 
> Unnecessary { }

Removed.

> ~~~
> 
> 14. maybe_reread_subscription
> 
> @@ -3083,8 +3563,9 @@ maybe_reread_subscription(void)
>   if (!newsub)
>   {
>   ereport(LOG,
> - (errmsg("logical replication apply worker for subscription \"%s\" will "
> - "stop because the subscription was removed",
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\" will stop because the "
> + "subscription was removed", get_worker_name(),
>   MySubscription->name)));
> 
>   proc_exit(0);
> @@ -3094,8 +3575,9 @@ maybe_reread_subscription(void)
>   if (!newsub->enabled)
>   {
>   ereport(LOG,
> - (errmsg("logical replication apply worker for subscription \"%s\" will "
> - "stop because the subscription was disabled",
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\" will stop because the "
> + "subscription was disabled", get_worker_name(),
>   MySubscription->name)));
> 
> IMO better to avoid splitting the string literals over multiple line like this.
> 
> Please check the rest of the patch too -- there may be many more just like this.

Changed.

> ~~~
> 
> 15. ApplyWorkerMain
> 
> @@ -3726,7 +4236,7 @@ ApplyWorkerMain(Datum main_arg)
>   }
>   else
>   {
> - /* This is main apply worker */
> + /* This is leader apply worker */
>   RepOriginId originid;
> "This is leader" -> "This is the leader"

Changed.

> ======
> 
> src/bin/psql/describe.c
> 
> 16. describeSubscriptions
> 
> + if (pset.sversion >= 160000)
> + appendPQExpBuffer(&buf,
> +   ", (CASE substream\n"
> +   "    WHEN 'f' THEN 'off'\n"
> +   "    WHEN 't' THEN 'on'\n"
> +   "    WHEN 'p' THEN 'parallel'\n"
> +   "   END) AS \"%s\"\n",
> +   gettext_noop("Streaming"));
> + else
> + appendPQExpBuffer(&buf,
> +   ", substream AS \"%s\"\n",
> +   gettext_noop("Streaming"));
> 
> I'm not sure it is an improvement to change the output "t/f/p" to
> "on/off/parallel"
> 
> IMO "t/f/parallel" would be better. Then the t/f is consistent with
> - how it used to display, and
> - all the other boolean fields

I think the current style is consistent with the " Synchronous commit" parameter which
also shows "on/off/remote_apply/...", so didn't change this.

Name | ... | Synchronous commit
------+-----+-------------------
sub  | ... | on    

> ======
> 
> src/include/replication/worker_internal.h
> 
> 17. ParallelTransState
> 
> +/*
> + * State of the transaction in parallel apply worker.
> + *
> + * These enum values are ordered by the order of transaction state
> +changes in
> + * parallel apply worker.
> + */
> +typedef enum ParallelTransState
> 
> "ordered by the order" ??
> 
> SUGGESTION
> The enum values must have the same order as the transaction state transitions.

Changed.

> ======
> 
> src/include/storage/lock.h
> 
> 18.
> 
> @@ -149,10 +149,12 @@ typedef enum LockTagType
>   LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */
>   LOCKTAG_OBJECT, /* non-relation database object */
>   LOCKTAG_USERLOCK, /* reserved for old contrib/userlock code */
> - LOCKTAG_ADVISORY /* advisory user locks */
> + LOCKTAG_ADVISORY, /* advisory user locks */
> LOCKTAG_APPLY_TRANSACTION
> + /* transaction being applied on the subscriber
> + * side */
>  } LockTagType;
> 
> -#define LOCKTAG_LAST_TYPE LOCKTAG_ADVISORY
> +#define LOCKTAG_LAST_TYPE LOCKTAG_APPLY_TRANSACTION
> 
>  extern PGDLLIMPORT const char *const LockTagTypeNames[];
> 
> @@ -278,6 +280,17 @@ typedef struct LOCKTAG
>   (locktag).locktag_type = LOCKTAG_ADVISORY, \
>   (locktag).locktag_lockmethodid = USER_LOCKMETHOD)
> 
> +/*
> + * ID info for a remote transaction on the subscriber side is:
> + * DB OID + SUBSCRIPTION OID + TRANSACTION ID + OBJID  */ #define
> +SET_LOCKTAG_APPLY_TRANSACTION(locktag,dboid,suboid,xid,objid) \
> + ((locktag).locktag_field1 = (dboid), \
> + (locktag).locktag_field2 = (suboid), \
> + (locktag).locktag_field3 = (xid), \
> + (locktag).locktag_field4 = (objid), \
> + (locktag).locktag_type = LOCKTAG_APPLY_TRANSACTION, \
> +(locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
> 
> Maybe "on the subscriber side" (2 places above) has no meaning here because
> there is no context this is talking about logical replication.
> Maybe those comments need to say something more like  "on a logical
> replication subscriber"
> 
Changed.

I also addressed all the comments from [1]

[1] https://www.postgresql.org/message-id/CAHut%2BPs7TzqqDnuH8r_ct1W_zSBCnuo3wodMt4Y8_Gw7rSRAaw%40mail.gmail.com

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: Amit Langote
Date:
Subject: Re: ExecRTCheckPerms() and many prunable partitions