Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAHut+PsL=qpOYGbEqZ7pN26mPdmSefTD1=G9njSdqNJkGZD7Aw@mail.gmail.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Fri, Nov 18, 2022 at 6:03 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Here are some review comments for v47-0001
>
> (This review is a WIP - I will post more comments for this patch next week)
>

Here are the rest of my comments for v47-0001

======

doc/src/sgml/monitoring.

1.

@@ -1851,6 +1851,11 @@ postgres   27093  0.0  0.0  30096  2752 ?
 Ss   11:34   0:00 postgres: ser
       <entry>Waiting to acquire an advisory user lock.</entry>
      </row>
      <row>
+      <entry><literal>applytransaction</literal></entry>
+      <entry>Waiting to acquire acquire a lock on a remote transaction being
+      applied on the subscriber side.</entry>
+     </row>
+     <row>

1a.
Typo "acquire acquire"

~

1b.
Maybe "on the subscriber side" does not mean much without any context.
Maybe better to word it as below.

SUGGESTION
Waiting to acquire a lock on a remote transaction being applied by a
logical replication subscriber.

======

doc/src/sgml/system-views.sgml

2.

@@ -1361,8 +1361,9 @@
        <literal>virtualxid</literal>,
        <literal>spectoken</literal>,
        <literal>object</literal>,
-       <literal>userlock</literal>, or
-       <literal>advisory</literal>.
+       <literal>userlock</literal>,
+       <literal>advisory</literal> or
+       <literal>applytransaction</literal>.

This change removed the Oxford comma that was there before. I assume
it was unintended.

======

.../replication/logical/applyparallelworker.c

3. globals

The parallel_apply_XXX functions were all shortened to pa_XXX.

I wondered if the same simplification should be done also to the
global statics...

e.g.
ParallelApplyWorkersHash -> PAWorkerHash
ParallelApplyWorkersList -> PAWorkerList
ParallelApplyMessagePending -> PAMessagePending
etc...

~~~

4. pa_get_free_worker

+ foreach(lc, active_workers)
+ {
+ ParallelApplyWorkerInfo *winfo = NULL;
+
+ winfo = (ParallelApplyWorkerInfo *) lfirst(lc);

No need to assign NULL because the next line just overwrites that anyhow.

~

5.

+ /*
+ * Try to free the worker first, because we don't wait for the rollback
+ * command to finish so the worker may not be freed at the end of the
+ * transaction.
+ */
+ if (pa_free_worker(winfo, winfo->shared->xid))
+ continue;
+
+ if (!winfo->in_use)
+ return winfo;

Shouldn't the (!winfo->in_use) check be done first as well -- e.g. why
are we trying to free a worker which is maybe not even in_use?

SUGGESTION (this will need some comment to explain what it is doing)
if (!winfo->in_use || !pa_free_worker(winfo, winfo->shared->xid) &&
!winfo->in_use)
return winfo;

~~~

6. pa_free_worker

+/*
+ * Remove the parallel apply worker entry from the hash table. Stop the work if
+ * there are enough workers in the pool.
+ *

Typo? "work" -> "worker"

~

7.

+ /* Are there enough workers in the pool? */
+ if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
+ {

IMO that comment should be something more like "Don't detach/stop the
worker unless..."

~~~

8. pa_send_data

+ /*
+ * Retry after 1s to reduce the cost of getting the system time and
+ * calculating the time difference.
+ */
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 1000L,
+ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);

8a.
I am not sure you need to explain the reason in the comment. Just
saying "Wait before retrying." seems sufficient to me.

~

8b.
Instead of the hardwired "1s" in the comment, and 1000L in the code,
maybe better to just have another constant.

SUGGESTION
#define SHM_SEND_RETRY_INTERVAL_MS 1000
#define SHM_SEND_TIMEOUT_MS 10000

~

9.

+ if (startTime == 0)
+ startTime = GetCurrentTimestamp();
+ else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),

IMO the initial startTime should be at top of the function otherwise
the timeout calculation seems wrong.

======

src/backend/replication/logical/worker.c

10. handle_streamed_transaction

+ * In streaming case (receiving a block of streamed transaction), for
+ * SUBSTREAM_ON mode, simply redirect it to a file for the proper toplevel
+ * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to parallel
+ * apply workers (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes
+ * will be applied by both leader apply worker and parallel apply workers).

I'm not sure this function comment should be referring to SUBSTREAM_ON
and SUBSTREAM_PARALLEL because the function body does not use those
anywhere in the logic.

~~~

11. apply_handle_stream_start

+ /*
+ * Increment the number of messages waiting to be processed by
+ * parallel apply worker.
+ */
+ pg_atomic_add_fetch_u32(&(winfo->shared->pending_message_count), 1);
+

The &() parens are not needed. Just write &winfo->shared->pending_message_count.

Also, search/replace others like this -- there are a few of them.

~~~

12. apply_handle_stream_stop

+ if (!abort_toplevel_transaction &&
+ pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_message_count), 1) == 0)
+ {
+ pa_lock_stream(MyParallelShared->xid, AccessShareLock);
+ pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
+ }

That lock/unlock seems like it is done just as a way of
testing/waiting for an exclusive lock held on the xid to be released.
But the code is too tricky -- IMO it needs a big comment saying how
this trick works, or maybe better to have a wrapper function for this
for clarity. e.g. pa_wait_nolock_stream(xid); (or some better name)

~~~

13. apply_handle_stream_abort

+ if (abort_toplevel_transaction)
+ {
+ (void) pa_free_worker(winfo, xid);
+ }

Unnecessary { }

~~~

14. maybe_reread_subscription

@@ -3083,8 +3563,9 @@ maybe_reread_subscription(void)
  if (!newsub)
  {
  ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "stop because the subscription was removed",
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will stop because the "
+ "subscription was removed", get_worker_name(),
  MySubscription->name)));

  proc_exit(0);
@@ -3094,8 +3575,9 @@ maybe_reread_subscription(void)
  if (!newsub->enabled)
  {
  ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "stop because the subscription was disabled",
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will stop because the "
+ "subscription was disabled", get_worker_name(),
  MySubscription->name)));

IMO better to avoid splitting the string literals over multiple line like this.

Please check the rest of the patch too -- there may be many more just like this.

~~~

15. ApplyWorkerMain

@@ -3726,7 +4236,7 @@ ApplyWorkerMain(Datum main_arg)
  }
  else
  {
- /* This is main apply worker */
+ /* This is leader apply worker */
  RepOriginId originid;
"This is leader" -> "This is the leader"

======

src/bin/psql/describe.c

16. describeSubscriptions

+ if (pset.sversion >= 160000)
+ appendPQExpBuffer(&buf,
+   ", (CASE substream\n"
+   "    WHEN 'f' THEN 'off'\n"
+   "    WHEN 't' THEN 'on'\n"
+   "    WHEN 'p' THEN 'parallel'\n"
+   "   END) AS \"%s\"\n",
+   gettext_noop("Streaming"));
+ else
+ appendPQExpBuffer(&buf,
+   ", substream AS \"%s\"\n",
+   gettext_noop("Streaming"));

I'm not sure it is an improvement to change the output "t/f/p" to
"on/off/parallel"

IMO "t/f/parallel" would be better. Then the t/f is consistent with
- how it used to display, and
- all the other boolean fields

======

src/include/replication/worker_internal.h

17. ParallelTransState

+/*
+ * State of the transaction in parallel apply worker.
+ *
+ * These enum values are ordered by the order of transaction state changes in
+ * parallel apply worker.
+ */
+typedef enum ParallelTransState

"ordered by the order" ??

SUGGESTION
The enum values must have the same order as the transaction state transitions.

======

src/include/storage/lock.h

18.

@@ -149,10 +149,12 @@ typedef enum LockTagType
  LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */
  LOCKTAG_OBJECT, /* non-relation database object */
  LOCKTAG_USERLOCK, /* reserved for old contrib/userlock code */
- LOCKTAG_ADVISORY /* advisory user locks */
+ LOCKTAG_ADVISORY, /* advisory user locks */
+ LOCKTAG_APPLY_TRANSACTION /* transaction being applied on the subscriber
+ * side */
 } LockTagType;

-#define LOCKTAG_LAST_TYPE LOCKTAG_ADVISORY
+#define LOCKTAG_LAST_TYPE LOCKTAG_APPLY_TRANSACTION

 extern PGDLLIMPORT const char *const LockTagTypeNames[];

@@ -278,6 +280,17 @@ typedef struct LOCKTAG
  (locktag).locktag_type = LOCKTAG_ADVISORY, \
  (locktag).locktag_lockmethodid = USER_LOCKMETHOD)

+/*
+ * ID info for a remote transaction on the subscriber side is:
+ * DB OID + SUBSCRIPTION OID + TRANSACTION ID + OBJID
+ */
+#define SET_LOCKTAG_APPLY_TRANSACTION(locktag,dboid,suboid,xid,objid) \
+ ((locktag).locktag_field1 = (dboid), \
+ (locktag).locktag_field2 = (suboid), \
+ (locktag).locktag_field3 = (xid), \
+ (locktag).locktag_field4 = (objid), \
+ (locktag).locktag_type = LOCKTAG_APPLY_TRANSACTION, \
+ (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)

Maybe "on the subscriber side" (2 places above) has no meaning here
because there is no context this is talking about logical replication.
Maybe those comments need to say something more like  "on a logical
replication subscriber"

------
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Bharath Rupireddy
Date:
Subject: Re: Reducing power consumption on idle servers
Next
From: John Naylor
Date:
Subject: Re: [PoC] Improve dead tuple storage for lazy vacuum