RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From wangw.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS3PR01MB6275F145878B4A44586C46CE9E499@OS3PR01MB6275.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Thur, Sep 8, 2022 at 14:52 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> On Mon, Sep 5, 2022 at 6:34 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > Attach the correct patch set this time.
> >
> 
> Few comments on v28-0001*:

Thanks for your comments.

> 1.
> + /* Whether the worker is processing a transaction. */
> + bool in_use;
> 
> I think this same comment applies to in_parallel_apply_xact flag as
> well. How about: "Indicates whether the worker is available to be used
> for parallel apply transaction?"?
> 
> 2.
> + /*
> + * Set this flag in the leader instead of the parallel apply worker to
> + * avoid the race condition where the leader has already started waiting
> + * for the parallel apply worker to finish processing the transaction(set
> + * the in_parallel_apply_xact to false) while the child process has not yet
> + * processed the first STREAM_START and has not set the
> + * in_parallel_apply_xact to true.
> 
> I think part of this comment "(set the in_parallel_apply_xact to
> false)" is not necessary. It will be clear without that.
> 
> 3.
> + /* Create entry for requested transaction. */
> + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_ENTER, &found);
> + if (found)
> + elog(ERROR, "hash table corrupted");
> ...
> ...
> + hash_search(ParallelApplyWorkersHash, &xid, HASH_REMOVE, NULL);
> 
> It is better to have a similar elog for HASH_REMOVE case as well. We
> normally seem to have such elog for HASH_REMOVE.
> 
> 4.
> * Parallel apply is not supported when subscribing to a publisher which
> +     * cannot provide the abort_time, abort_lsn and the column information
> used
> +     * to verify the parallel apply safety.
> 
> 
> In this comment, which column information are you referring to?
> 
> 5.
> + /*
> + * Set in_parallel_apply_xact to true again as we only aborted the
> + * subtransaction and the top transaction is still in progress. No
> + * need to lock here because currently only the apply leader are
> + * accessing this flag.
> + */
> + winfo->shared->in_parallel_apply_xact = true;
> 
> This theory sounds good to me but I think it is better to update/read
> this flag under spinlock as the patch is doing at a few other places.
> I think that will make the code easier to follow without worrying too
> much about such special cases. There are a few asserts as well which
> read this without lock, it would be better to change those as well.
> 
> 6.
> + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum
> protocol version
> + * with support for streaming large transactions using parallel apply
> + * workers. Introduced in PG16.
> 
> How about changing it to something like:
> "LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum
> protocol
> version where we support applying large streaming transactions in
> parallel. Introduced in PG16."
> 
> 7.
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + bool write_abort_lsn = (data->protocol_version >=
> + LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM);
> 
>   /*
>   * The abort should happen outside streaming block, even for streamed
> @@ -1856,7 +1859,8 @@ pgoutput_stream_abort(struct
> LogicalDecodingContext *ctx,
>   Assert(rbtxn_is_streamed(toptxn));
> 
>   OutputPluginPrepareWrite(ctx, true);
> - logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
> + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn, abort_lsn,
> +   write_abort_lsn);
> 
> I think we need to send additional information if the client has used
> the parallel streaming option. Also, let's keep sending subxid as we
> were doing previously and add additional parameters required. It may
> be better to name write_abort_lsn as abort_info.
> 
> 8.
> + /*
> + * Check whether the publisher sends abort_lsn and abort_time.
> + *
> + * Note that the paralle apply worker is only started when the publisher
> + * sends abort_lsn and abort_time.
> + */
> + if (am_parallel_apply_worker() ||
> + walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
> + read_abort_lsn = true;
> +
> + logicalrep_read_stream_abort(s, &abort_data, read_abort_lsn);
> 
> This check should match with the check for the write operation where
> we are checking the protocol version as well. There is a typo as well
> in the comments (/paralle/parallel).

Improved as suggested.

Attach the new patch set.

Regards,
Wang wei

Attachment

pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: [RFC] building postgres with meson - v13
Next
From: "wangw.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply