RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57169AEA399C6DC370EAF23B94649@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Wednesday, August 10, 2022 5:40 PM Peter Smith <smithpb2250@gmail.com> wrote:
> 
> Here are some review comments for the patch v20-0001:
> ======
> 
> 1. doc/src/sgml/catalogs.sgml
> 
> +       <literal>p</literal> = apply changes directly using a background
> +       worker, if available, otherwise, it behaves the same as 't'
> 
> The different char values 'f','t','p' are separated by comma (,) in
> the list, which is normal for the pgdocs AFAIK. However, because of
> this I don't think it is a good idea to use those other commas within
> the description for  'p', I suggest you remove those ones to avoid
> ambiguity with the separators.

Changed.

> ======
> 
> 2. doc/src/sgml/protocol.sgml
> 
> @@ -3096,7 +3096,7 @@ psql "dbname=postgres replication=database" -c
> "IDENTIFY_SYSTEM;"
>       <listitem>
>        <para>
>         Protocol version. Currently versions <literal>1</literal>,
> <literal>2</literal>,
> -       and <literal>3</literal> are supported.
> +       <literal>3</literal> and <literal>4</literal> are supported.
>        </para>
> 
> Put a comma after the penultimate value like it had before.
> 


Changed.

> ======
> 
> 3. src/backend/replication/logical/applybgworker.c - <general>
> 
> There are multiple function comments and other code comments in this
> file that are missing a terminating period (.)
> 
> ======
> 

Changed.

> 4. src/backend/replication/logical/applybgworker.c - apply_bgworker_start
> 
> +/*
> + * Try to get a free apply background worker.
> + *
> + * If there is at least one worker in the free list, then take one. Otherwise,
> + * try to start a new apply background worker. If successful, cache it in
> + * ApplyBgworkersHash keyed by the specified xid.
> + */
> +ApplyBgworkerState *
> +apply_bgworker_start(TransactionId xid)
> 
> SUGGESTION (for function comment)
> Return the apply background worker that will be used for the specified xid.
> 
> If an apply background worker is found in the free list then re-use
> it, otherwise start a fresh one. Cache the worker ApplyBgworkersHash
> keyed by the specified xid.
> 
> ~~~
> 

Changed.

> 5.
> 
> + /* Try to get a free apply background worker */
> + if (list_length(ApplyBgworkersFreeList) > 0)
> 
> if (list_length(ApplyBgworkersFreeList) > 0)
> 
> AFAIK a non-empty list is guaranteed to be not NIL, and an empty list
> is guaranteed to be NIL. So if you want to the above can simply be
> written as:
> 
> if (ApplyBgworkersFreeList)
> 

Both ways are fine to me, so I kept the current style.

> ~~~
> 
> 6. src/backend/replication/logical/applybgworker.c - apply_bgworker_find
> 
> +/*
> + * Try to look up worker assigned before (see function
> apply_bgworker_get_free)
> + * inside ApplyBgworkersHash for requested xid.
> + */
> +ApplyBgworkerState *
> +apply_bgworker_find(TransactionId xid)
> 
> SUGGESTION (for function comment)
> Find the worker previously assigned/cached for this xid. (see function
> apply_bgworker_start)
> 

Changed.

> ~~~
> 
> 7.
> 
> + Assert(status == APPLY_BGWORKER_BUSY);
> +
> + return entry->wstate;
> + }
> + else
> + return NULL;
> 
> IMO here it is better to just remove that 'else' and unconditionally
> return NULL at the end of this function.
> 

Changed.

> ~~~
> 
> 8. src/backend/replication/logical/applybgworker.c -
> apply_bgworker_subxact_info_add
> 
> + * Inside apply background worker we can figure out that new subtransaction
> was
> + * started if new change arrived with different xid. In that case we can define
> + * named savepoint, so that we were able to commit/rollback it separately
> + * later.
> + * Special case is if the first change comes from subtransaction, then
> + * we check that current_xid differs from stream_xid.
> + */
> +void
> +apply_bgworker_subxact_info_add(TransactionId current_xid)
> 
> It is not quite English. Can you improve it a bit?
> 
> SUGGESTION (maybe like this?)
> The apply background worker can figure out if a new subtransaction was
> started by checking if the new change arrived with different xid. In
> that case define a named savepoint, so that we are able to
> commit/rollback it separately later. A special case is when the first
> change comes from subtransaction – this is determined by checking if
> the current_xid differs from stream_xid.
> 

Changed.

> ======
> 
> 9. src/backend/replication/logical/launcher.c -
> WaitForReplicationWorkerAttach
> 
> + *
> + * Return false if the attach fails. Otherwise return true.
>   */
> -static void
> +static bool
>  WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
> 
> Why not just say "Return whether the attach was successful."
> 

Changed.

> ~~~
> 
> 10. src/backend/replication/logical/launcher.c - logicalrep_worker_stop
> 
> + /* Found the main worker, then try to stop it. */
> + if (worker)
> + logicalrep_worker_stop_internal(worker);
> 
> IMO the comment is kind of pointless because it only says what the
> code is clearly doing. If you really wanted to reinforce this worker
> is a main apply worker then you can do that with code like:
> 
> if (worker)
> {
> Assert(!worker->subworker);
> logicalrep_worker_stop_internal(worker);
> }
> 

Changed.

> ~~~
> 
> 11. src/backend/replication/logical/launcher.c - logicalrep_worker_detach
> 
> @@ -599,6 +632,29 @@ logicalrep_worker_attach(int slot)
>  static void
>  logicalrep_worker_detach(void)
>  {
> + /*
> + * This is the main apply worker, stop all the apply background workers we
> + * started before.
> + */
> + if (!MyLogicalRepWorker->subworker)
> 
> SUGGESTION (for comment)
> This is the main apply worker. Stop all apply background workers
> previously started from here.
> 

Changed.

> ~~~
> 
> 12 src/backend/replication/logical/launcher.c -
> logicalrep_apply_bgworker_count
> 
> +/*
> + * Count the number of registered (not necessarily running) apply background
> + * workers for a subscription.
> + */
> +int
> +logicalrep_apply_bgworker_count(Oid subid)
> 
> SUGGESTION
> Count the number of registered (but not necessarily running) apply
> background workers for a subscription.
> 

Changed.

> ~~~
> 
> 13.
> 
> + /* Search for attached worker for a given subscription id. */
> + for (i = 0; i < max_logical_replication_workers; i++)
> 
> SUGGESTION
> Scan all attached apply background workers, only counting those which
> have the given subscription id.
> 

Changed.

> ======
> 
> 14. src/backend/replication/logical/worker.c - apply_error_callback
> 
> + {
> + if (errarg->remote_attnum < 0)
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u finished at %X/%X",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->remote_xid,
> +    LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + else
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> +    errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u finished at %X/%X",
> +    errarg->origin_name,
> +    logicalrep_message_type(errarg->command),
> +    errarg->rel->remoterel.nspname,
> +    errarg->rel->remoterel.relname,
> +    errarg->rel->remoterel.attnames[errarg->remote_attnum],
> +    errarg->remote_xid,
> +    LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + }
> 
> There is quite a lot of common code here:
> 
> "processing remote data for replication origin \"%s\" during \"%s\"
> for replication target relation \"%s.%s\"
> 
>    errarg->origin_name,
>    logicalrep_message_type(errarg->command),
>    errarg->rel->remoterel.nspname,
>    errarg->rel->remoterel.relname,
> 
> Is it worth trying to extract that common part to keep this code
> shorter? E.g. It could be easily done just with some #defines
> 

I am not sure do we have a clean way to change this, any suggestions ?

> ======
> 
> 15. src/include/replication/worker_internal.h
> 
> + /* proto version of publisher. */
> + uint32 proto_version;
> 
> SUGGESTION
> Protocol version of publisher
> 
> ~~~
> 

Changed.

> 16.
> 
> + /* id of apply background worker */
> + uint32 worker_id;
> 
> Uppercase comment
> 

Changed.

> 
> 17.
> 
> +/*
> + * Struct for maintaining an apply background worker.
> + */
> +typedef struct ApplyBgworkerState
> 
> I'm not sure what this comment means. Perhaps there are some words missing?
> 

I renamed the struct to ApplyBgworkerInfo which sounds better to me and changed the comments.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: Dilip Kumar
Date:
Subject: Re: making relfilenodes 56 bits