RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57161E105EBD94EAACF2A40A94109@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Friday, November 25, 2022 10:54 AM Peter Smith <smithpb2250@gmail.com> wrote:
> 
> Here are some review comments for v51-0001.

Thanks for the comments!
> ======
> 
> .../replication/logical/applyparallelworker.c
> 
> 1. General - Error messages, get_worker_name()
> 
> I previously wrote a comment to ask if the get_worker_name() should be used
> in more places but the reply [1, #2b] was:
> 
> > 2b.
> > Consider if maybe all of these ought to be calling get_worker_name()
> > which is currently static in worker.c. Doing this means any future
> > changes to get_worker_name won't cause more inconsistencies.
> 
> The most error message in applyparallelxx.c can only use "xx parallel worker",
> so I think it's fine not to call get_worker_name
> 
> ~
> 
> I thought the reply missed the point I was trying to make -- I meant if it was
> arranged now so *every* message would go via
> get_worker_name() then in future somebody wanted to change the names (e.g.
> from "logical replication parallel apply worker" to "LR PA
> worker") then it would only need to be changed in one central place instead of
> hunting down every hardwired error message.
> 

Thanks for the suggestion. I understand your point, but I feel that using
get_worker_name() at some places where the worker type is decided could make
developer think that all kind of worker can enter this code which I am not sure
is better. So I didn't change this.

> 
> 2. HandleParallelApplyMessage
> 
> + case 'X': /* Terminate, indicating clean exit. */
> + shm_mq_detach(winfo->error_mq_handle);
> + winfo->error_mq_handle = NULL;
> + break;
> + default:
> + elog(ERROR, "unrecognized message type received from logical
> replication parallel apply worker: %c (message length %d bytes)",
> + msgtype, msg->len);
> 
> The case 'X' code indentation is too much.

Changed.

> ======
> 
> src/backend/replication/logical/origin.c
> 
> 3. replorigin_session_setup(RepOriginId node, int acquired_by)
> 
> @@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg)
>   * array doesn't have to be searched when calling
>   * replorigin_session_advance().
>   *
> - * Obviously only one such cached origin can exist per process and the current
> + * Normally only one such cached origin can exist per process and the
> + current
>   * cached value can only be set again after the previous value is torn down
>   * with replorigin_session_reset().
> + *
> + * However, we do allow multiple processes to point to the same origin
> + slot if
> + * requested by the caller by passing PID of the process that has
> + already
> + * acquired it as acquired_by. This is to allow multiple parallel apply
> + * processes to use the same origin, provided they maintain commit
> + order, for
> + * example, by allowing only one process to commit at a time. For the
> + first
> + * process requesting this origin, the acquired_by parameter needs to
> + be set to
> + * 0.
>   */
>  void
> -replorigin_session_setup(RepOriginId node)
> +replorigin_session_setup(RepOriginId node, int acquired_by)
> 
> I think the meaning of the acquired_by=0 is not fully described here:
> "For the first process requesting this origin, the acquired_by parameter needs
> to be set to 0."
> IMO that seems to be describing it only from POV that you are always going to
> want to allow multiple processes. But really this is an optional feature so you
> might pass acquired_by=0, not just because this is the first of multiple, but also
> because you *never* want to allow multiple at all. The comment does not
> convey this meaning.
> 
> Maybe something worded like below is better?
> 
> SUGGESTION
> Normally only one such cached origin can exist per process so the cached value
> can only be set again after the previous value is torn down with
> replorigin_session_reset(). For this normal case pass
> acquired_by=0 (meaning the slot is not allowed to be already acquired by
> another process).
> 
> However, sometimes multiple processes can safely re-use the same origin slot
> (for example, multiple parallel apply processes can safely use the same origin,
> provided they maintain commit order by allowing only one process to commit
> at a time). For this case the first process must pass acquired_by=0, and then the
> other processes sharing that same origin can pass acquired_by=PID of the first
> process.

Changes as suggested.

> ======
> 
> src/backend/replication/logical/worker.c
> 
> 4. GENERAL - get_worker_name()
> 
> If you decide it is OK to hardwire some error messages instead of
> unconditionally calling the get_worker_name() -- see my #1 review comment in
> this post -- then there are some other messages in this file that also seem like
> they can be also hardwired because the type of worker is already known.
> 
> Here are some examples:
> 
> 4a.
> 
> + else if (am_parallel_apply_worker())
> + {
> + if (rel->state != SUBREL_STATE_READY)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + /* translator: first %s is the name of logical replication worker */
> + errmsg("%s for subscription \"%s\" will stop", get_worker_name(),
> + MySubscription->name), errdetail("Cannot handle streamed replication
> + transactions using
> parallel apply workers until all tables have been synchronized.")));
> +
> + return true;
> + }
> 
> In the above code from should_apply_changes_for_rel we already know this is a
> parallel apply worker.
> 
> ~
> 
> 4b.
> 
> + if (am_parallel_apply_worker())
> + ereport(LOG,
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\" will stop because of a parameter
> + change", get_worker_name(), MySubscription->name))); else
> 
> In the above code from maybe_reread_subscription we already know this is a
> parallel apply worker.
> 
> 4c.
> 
>   if (am_tablesync_worker())
>   ereport(LOG,
> - (errmsg("logical replication table synchronization worker for subscription
> \"%s\", table \"%s\" has started",
> - MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\", table \"%s\" has started",
> + get_worker_name(), MySubscription->name,
> + get_rel_name(MyLogicalRepWorker->relid))));
> 
> In the above code from ApplyWorkerMain we already know this is a tablesync
> worker

Thanks for checking these, changed.

> ~~~
> 
> 5. get_transaction_apply_action
> 
> +
> +/*
> + * Return the action to take for the given transaction. *winfo is
> +assigned to
> + * the destination parallel worker info (if the action is
> + * TRANS_LEADER_SEND_TO_PARALLEL, otherwise *winfo is assigned NULL.
> + */
> +static TransApplyAction
> +get_transaction_apply_action(TransactionId xid,
> ParallelApplyWorkerInfo **winfo)
> 
> There is no closing ')' in the function comment.

Added.

> ~~~
> 
> 6. apply_worker_clean_exit
> 
> + /* Notify the leader apply worker that we have exited cleanly. */ if
> + (am_parallel_apply_worker()) pq_putmessage('X', NULL, 0);
> 
> IMO the comment would be better inside the if block
> 
> SUGGESTION
> if (am_parallel_apply_worker())
> {
>     /* Notify the leader apply worker that we have exited cleanly. */
>     pq_putmessage('X', NULL, 0);
> }

Changed.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply