RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57161603FC9CB886F2CEE53194159@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Tuesday, November 29, 2022 8:34 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Tue, Nov 29, 2022 at 10:18 AM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > Attach the new version patch which addressed all comments.
> >
> 
> Review comments on v53-0001*

Thanks for the comments!
> ==========================
> 1.
>  Subscription *MySubscription = NULL;
> -static bool MySubscriptionValid = false;
> +bool MySubscriptionValid = false;
> 
> It seems still this variable is used in worker.c, so why it's scope changed?

I think it's not needed. Removed.

> 2.
> /* fields valid only when processing streamed transaction */ -static bool
> in_streamed_transaction = false;
> +bool in_streamed_transaction = false;
> 
> Is it really required to change the scope of this variable? Can we think of
> exposing a macro or inline function to check it in applyparallelworker.c?

Introduced a new function.

> 3.
> should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)  {
>   if (am_tablesync_worker())
>   return MyLogicalRepWorker->relid == rel->localreloid;
> + else if (am_parallel_apply_worker())
> + {
> + if (rel->state != SUBREL_STATE_READY)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("logical replication parallel apply worker for subscription
> \"%s\" will stop",
> 
> Is this check sufficient? What if the rel->state is SUBREL_STATE_UNKNOWN? I
> think that will be possible when the refresh publication has not been yet
> performed after adding a new relation to the publication. If that is true then
> won't we need to simply ignore that change and continue instead of erroring
> out? Can you please once test and check this case?

You are right. Changed to not report an ERROR for SUBREL_STATE_UNKNOWN.

> 4.
> +
> + case TRANS_PARALLEL_APPLY:
> + list_free(subxactlist);
> + subxactlist = NIL;
> +
> + apply_handle_commit_internal(&commit_data);
> 
> I don't think we need to retail pfree subxactlist as this is allocated in
> TopTransactionContext and will be freed at commit/prepare. This way freeing
> looks a bit adhoc to me and you need to expose this list outside
> applyparallelworker.c which doesn't seem like a good idea to me either.

Removed the list_free.

> 5.
> + apply_handle_commit_internal(&commit_data);
> +
> + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
> + pa_unlock_transaction(xid, AccessShareLock);
> +
> + elog(DEBUG1, "finished processing the transaction finish command");
> 
> I think in this and similar DEBUG logs, we can tell the exact command instead of
> writing 'finish'.

Changed.

> 6.
> apply_handle_stream_commit()
> {
> ...
> + /*
> + * After sending the data to the parallel apply worker, wait for
> + * that worker to finish. This is necessary to maintain commit
> + * order which avoids failures due to transaction dependencies and
> + * deadlocks.
> + */
> + pa_wait_for_xact_finish(winfo);
> +
> + pgstat_report_stat(false);
> + store_flush_position(commit_data.end_lsn);
> + stop_skipping_changes();
> +
> + (void) pa_free_worker(winfo, xid);
> ...
> }

> apply_handle_stream_prepare(StringInfo s) {
> +
> + /*
> + * After sending the data to the parallel apply worker, wait for
> + * that worker to finish. This is necessary to maintain commit
> + * order which avoids failures due to transaction dependencies and
> + * deadlocks.
> + */
> + pa_wait_for_xact_finish(winfo);
> + (void) pa_free_worker(winfo, prepare_data.xid);
> 
> - /* unlink the files with serialized changes and subxact info. */
> - stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
> + in_remote_transaction = false;
> +
> + store_flush_position(prepare_data.end_lsn);
> 
> 
> In both of the above functions, we should be consistent in calling
> pa_free_worker() function which I think should be immediately after
> pa_wait_for_xact_finish(). Is there a reason for not being consistent here?

Changed the order to make them consistent.

> 7.
> + res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
> +
> + /*
> + * The leader will detach from the error queue and set it to NULL
> + * before preparing to stop all parallel apply workers, so we don't
> + * need to handle error messages anymore.
> + */
> + if (!winfo->error_mq_handle)
> + continue;
> 
> This check must be done before calling shm_mq_receive. So, changed it in the
> attached patch.

Thanks, merged.

> 8.
> @@ -2675,6 +3156,10 @@ store_flush_position(XLogRecPtr remote_lsn)  {
>   FlushPosition *flushpos;
> 
> + /* Skip for parallel apply workers. */ if (am_parallel_apply_worker())
> + return;
> 
> It is okay to always update the flush position by leader apply worker but I think
> the leader won't have updated value for XactLastCommitEnd as the local
> transaction is committed by parallel apply worker.

I added a field in shared memory so that the parallel apply worker can pass
the XactLastCommitEnd to leader and then the leader will store that.

> 9.
> @@ -3831,11 +4366,11 @@ ApplyWorkerMain(Datum main_arg)
> 
>   ereport(DEBUG1,
>   (errmsg_internal("logical replication apply worker for subscription \"%s\"
> two_phase is %s",
> - MySubscription->name,
> - MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_DISABLED
> ? "DISABLED" :
> - MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_PENDING ?
> "PENDING" :
> - MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_ENABLED ?
> "ENABLED" :
> - "?")));
> + MySubscription->name,
> + MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_DISABLED
> ? "DISABLED" :
> + MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_PENDING ?
> "PENDING" :
> + MySubscription->twophasestate ==
> LOGICALREP_TWOPHASE_STATE_ENABLED ?
> "ENABLED" :
> + "?")));
> 
> Is this change related to this patch?

I think accidentally changed due to pgident. Reverted.

> 10. What is the reason to expose ApplyErrorCallbackArg via worker_internal.h?

The parallel apply worker need to set the origin name into this. I introduced another function
to set this.

> 11. The order to declare pa_set_stream_apply_worker() in worker_internal.h and
> define in applyparallelworker.c is not the same.
> Similarly, please check all other functions.

Changed.

> 12. Apart from the above, I have made a few changes in the comments and
> some other cosmetic changes in the attached patch.

Thanks, I have checked and merged them.

Attach the new version patch set.

I haven't addressed comment #1 and #2 from [1], I need to think about it and
will handle it soon. Besides, I haven't renamed serialize_stream_start/stop and
haven't finished the word consistency check for comments, I think I will handle
them soon.

[1] https://www.postgresql.org/message-id/CAA4eK1LGKYUDFZ_jFPrU497wQf2HNvt5a%2BtCTpqSeWSG6kfpSA%40mail.gmail.com

Best regards,
Hou zj


Attachment

pgsql-hackers by date:

Previous
From: Robert Haas
Date:
Subject: Re: pgsql: Revoke PUBLIC CREATE from public schema, now owned by pg_databas
Next
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply