RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From wangw.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS3PR01MB6275EFC4B707650DAB9392859E4D9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Thu, Sep 15, 2022 at 19:40 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> On Thu, Sep 15, 2022 at 10:45 AM wangw.fnst@fujitsu.com
> <wangw.fnst@fujitsu.com> wrote:
> >
> > Attach the new patch set.
> >
> 
> Review of v29-0001*

Thanks for your comments and patch!

> ==================
> 1.
> +parallel_apply_find_worker(TransactionId xid)
> {
> ...
> + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, &found);
> + if (found)
> + {
> + /* If any workers (or the postmaster) have died, we have failed. */
> + if (entry->winfo->error_mq_handle == NULL)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("lost connection to parallel apply worker")));
> ...
> }
> 
> I think the above comment is incorrect because if the postmaster would
> have died then you wouldn't have found the entry in the hash table.
> How about something like: "We can't proceed if the parallel streaming
> worker has already exited."

Fixed.

> 2.
> +/*
> + * Find the previously assigned worker for the given transaction, if any.
> + */
> +ParallelApplyWorkerInfo *
> +parallel_apply_find_worker(TransactionId xid)
> 
> No need to use word 'previously' in the above sentence.

Improved.

> 3.
> + * We need one key to register the location of the header, and we need
> + * another key to track the location of the message queue.
> + */
> + shm_toc_initialize_estimator(&e);
> + shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
> + shm_toc_estimate_chunk(&e, queue_size);
> + shm_toc_estimate_chunk(&e, error_queue_size);
> +
> + shm_toc_estimate_keys(&e, 3);
> 
> Overall, three keys are used but the comment indicates two. You forgot
> to mention about error_queue.

Fixed.

> 4.
> + if (launched)
> + ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, winfo);
> + else
> + {
> + shm_mq_detach(winfo->mq_handle);
> + shm_mq_detach(winfo->error_mq_handle);
> + dsm_detach(winfo->dsm_seg);
> + pfree(winfo);
> +
> + winfo = NULL;
> + }
> 
> A. The code used in the else part to free worker info is the same as
> what is used in parallel_apply_free_worker. Can we move this to a
> separate function say parallel_apply_free_worker_info()?
> B. I think it will be better if you use {} for if branch to make it
> look consistent with else branch.

Improved.

> 5.
> + * case define a named savepoint, so that we are able to commit/rollback it
> + * separately later.
> + */
> +void
> +parallel_apply_subxact_info_add(TransactionId current_xid)
> 
> I don't see the need of commit in the above message. So, we can
> slightly modify it to: "... so that we are able to rollback to it
> separately later."

Improved.

> 6.
> + for (i = list_length(subxactlist) - 1; i >= 0; i--)
> + {
> + xid = list_nth_xid(subxactlist, i);
> ...
> ...
> 
> +/*
> + * Return the TransactionId value contained in the n'th element of the
> + * specified list.
> + */
> +static inline TransactionId
> +list_nth_xid(const List *list, int n)
> +{
> + Assert(IsA(list, XidList));
> + return lfirst_xid(list_nth_cell(list, n));
> +}
> 
> I am not really sure that we need a new list function to use for this
> place. Can't we directly use lfirst_xid(list_nth_cell) instead?

Improved.
 
> 7.
> +void
> +parallel_apply_replorigin_setup(void)
> +{
> + RepOriginId originid;
> + char originname[NAMEDATALEN];
> + bool started_tx = false;
> +
> + /* This function might be called inside or outside of transaction. */
> + if (!IsTransactionState())
> + {
> + StartTransactionCommand();
> + started_tx = true;
> + }
> 
> Is there a place in the patch where this function will be called
> without having an active transaction state? If so, then this coding is
> fine but if not, then I suggest keeping an assert for transaction
> state here. The same thing applies to
> parallel_apply_replorigin_reset() as well.

When using parallel apply, only the parallel apply worker is in a transaction
while the leader apply worker is not. So when invoking function
parallel_apply_replorigin_setup() in the leader apply worker, we need to start
a transaction block.

> 8.
> + *
> + * If write_abort_lsn is true, send the abort_lsn and abort_time fields,
> + * otherwise don't.
>   */
>  void
>  logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
> -   TransactionId subxid)
> +   TransactionId subxid, XLogRecPtr abort_lsn,
> +   TimestampTz abort_time, bool abort_info)
> 
> In the comment, the name of the variable needs to be updated.

Fixed.

> 9.
> +TransactionId stream_xid = InvalidTransactionId;
> 
> -static TransactionId stream_xid = InvalidTransactionId;
> ...
> ...
> +void
> +parallel_apply_subxact_info_add(TransactionId current_xid)
> +{
> + if (current_xid != stream_xid &&
> + !list_member_xid(subxactlist, current_xid))
> 
> It seems you have changed the scope of stream_xid to use it in
> parallel_apply_subxact_info_add(). Won't it be better to pass it as a
> parameter (say top_xid)?

Improved.

> 10.
> --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> @@ -20,6 +20,7 @@
>  #include <sys/time.h>
> 
>  #include "access/xlog.h"
> +#include "catalog/pg_subscription.h"
>  #include "catalog/pg_type.h"
>  #include "common/connect.h"
>  #include "funcapi.h"
> @@ -443,9 +444,14 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
>   appendStringInfo(&cmd, "proto_version '%u'",
>   options->proto.logical.proto_version);
> 
> - if (options->proto.logical.streaming &&
> - PQserverVersion(conn->streamConn) >= 140000)
> - appendStringInfoString(&cmd, ", streaming 'on'");
> + if (options->proto.logical.streaming != SUBSTREAM_OFF)
> + {
> + if (PQserverVersion(conn->streamConn) >= 160000 &&
> + options->proto.logical.streaming == SUBSTREAM_PARALLEL)
> + appendStringInfoString(&cmd, ", streaming 'parallel'");
> + else if (PQserverVersion(conn->streamConn) >= 140000)
> + appendStringInfoString(&cmd, ", streaming 'on'");
> + }
> 
> It doesn't seem like a good idea to expose subscription options here.
> Can we think of having char *streaming_option instead of the current
> streaming parameter which is filled by the caller and used here
> directly?

Improved.

> 11. The error message used in pgoutput_startup() seems to be better
> than the current messages used in that function but it is better to be
> consistent with other messages. There is a discussion in the email
> thread [1] on improving those messages, so kindly suggest there.

Okay, I will try to modify the two messages and share them in the thread you
mentioned.

> 12. In addition to the above, I have changed/added a few comments in
> the attached patch.

Improved as suggested.

Regards,
Wang wei

Attachment

pgsql-hackers by date:

Previous
From: Zhang Mingli
Date:
Subject: Re: Fix typos in code comments
Next
From: Tom Lane
Date:
Subject: Re: Tree-walker callbacks vs -Wdeprecated-non-prototype