Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id CAHut+PsJWHRoRzXtMrJ1RaxmkS2LkiMR_4S2pSionxXmYsyOww@mail.gmail.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
List pgsql-hackers
Here are my review comments for v40-0001.

======

src/backend/replication/logical/worker.c


1. should_apply_changes_for_rel

+ else if (am_parallel_apply_worker())
+ {
+ if (rel->state != SUBREL_STATE_READY)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker for subscription
\"%s\" will stop",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transaction using parallel "
+    "apply workers until all tables are synchronized.")));

1a.
"transaction" -> "transactions"

1b.
"are synchronized" -> "have been synchronized."

e.g. "Cannot handle streamed replication transactions using parallel
apply workers until all tables have been synchronized."

~~~

2. maybe_reread_subscription

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));

Maybe there is an easier way to code this instead of if/else and
cut/paste message text:

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will stop
because the subscription was removed",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));
~~~

3.

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));

These can be combined like comment #2 above

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will stop
because the subscription was disabled",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));

~~~

4.

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will stop because of a parameter change",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\"
will restart because of a parameter change",
+ MySubscription->name)));

These can be combined like comment #2 above

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will restart
because of a parameter change",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));

~~~~

4. InitializeApplyWorker

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
%u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));

These can be combined like comment #2 above

SUGGESTION

ereport(LOG,
(errmsg("logical replication %s for subscription %u will not start
because the subscription was removed during startup",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MyLogicalRepWorker->subid)));

~~~

5.

+ else if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" has started",
+ MySubscription->name)));
  else
  ereport(LOG,
  (errmsg("logical replication apply worker for subscription \"%s\"
has started",
  MySubscription->name)));


The last if/else can be combined same as comment #2 above

SUGGESTION

  else
  ereport(LOG,
  (errmsg("logical replication %s for subscription \"%s\" has started",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));

~~~

6. IsLogicalParallelApplyWorker

+bool
+IsLogicalParallelApplyWorker(void)
+{
+ return IsLogicalWorker() && am_parallel_apply_worker();
+}

Patch v40 added the IsLogicalWorker() to the condition, but why is
that extra check necessary?

======

7. src/include/replication/worker_internal.h

+typedef struct ParallelApplyWorkerInfo
+{
+ shm_mq_handle *mq_handle;
+
+ /*
+ * The queue used to transfer messages from the parallel apply worker to
+ * the leader apply worker.
+ */
+ shm_mq_handle *error_mq_handle;

In patch v40 the comment about the NULL error_mq_handle is removed,
but since the code still explicitly set/checks NULL in different
places isn't it still better to have some comment here to describe
what NULL means?

------
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Masahiko Sawada
Date:
Subject: Re: [PoC] Improve dead tuple storage for lazy vacuum
Next
From: Japin Li
Date:
Subject: Re: Question about savepoint level?