RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57162BC6B0FB848F216410DE94559@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("kuroda.hayato@fujitsu.com" <kuroda.hayato@fujitsu.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Tuesday, September 27, 2022 2:32 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com>
> 
> Dear Wang,
> 
> Followings are comments for your patchset.

Thanks for the comments.

> ====
> 0001
> 
> 
> 01. launcher.c - logicalrep_worker_stop_internal()
> 
> ```
> +
> +       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
> +
> ```

Changed.

> I think it should be Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock,
> LW_SHARED)) because the lock is released one and acquired again as
> LW_SHARED.
> If newer function has been acquired lock as LW_EXCLUSIVE and call
> logicalrep_worker_stop_internal(),
> its lock may become weaker after calling it.
> 
> 02. launcher.c - apply_handle_stream_start()
> 
> ```
> +                       /*
> +                        * Notify handle methods we're processing a remote
> in-progress
> +                        * transaction.
> +                        */
> +                       in_streamed_transaction = true;
> 
> -               MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
> -               FileSetInit(MyLogicalRepWorker->stream_fileset);
> +                       /*
> +                        * Start a transaction on stream start, this transaction
> will be
> +                        * committed on the stream stop unless it is a
> tablesync worker in
> +                        * which case it will be committed after processing all
> the
> +                        * messages. We need the transaction for handling the
> buffile,
> +                        * used for serializing the streaming data and subxact
> info.
> +                        */
> +                       begin_replication_step();
> ```
> 
> Previously in_streamed_transaction was set after the begin_replication_step(),
> but the ordering is modified. Maybe we don't have to modify it if there is no
> particular reason.
> 
> 03. launcher.c - apply_handle_stream_stop()
> 
> ```
> +                       /* Commit the per-stream transaction */
> +                       CommitTransactionCommand();
> +
> +                       /* Reset per-stream context */
> +                       MemoryContextReset(LogicalStreamingContext);
> +
> +                       pgstat_report_activity(STATE_IDLE, NULL);
> +
> +                       in_streamed_transaction = false;
> ```
> 
> Previously in_streamed_transaction was set after the MemoryContextReset(),
> but the ordering is modified.
> Maybe we don't have to modify it if there is no particular reason.

I adjusted the position of this due to some other improvements this time.

> 
> 04. applyparallelworker.c - LogicalParallelApplyLoop()
> 
> ```
> +               shmq_res = shm_mq_receive(mqh, &len, &data, false);
> ...
> +               if (ConfigReloadPending)
> +               {
> +                       ConfigReloadPending = false;
> +                       ProcessConfigFile(PGC_SIGHUP);
> +               }
> ```
> 
> 
> Here the parallel apply worker waits to receive messages and after dispatching
> it ProcessConfigFile() is called.
> It means that .conf will be not read until the parallel apply worker receives new
> messages and apply them.
> 
> It may be problematic when users change log_min_message to debugXXX for
> debugging but the streamed transaction rarely come.
> They expected that detailed description appears on the log from next
> streaming chunk, but it does not.
> 
> This does not occur in leader worker when it waits messages from publisher,
> because it uses libpqrcv_receive(), which works asynchronously.
> 
> I 'm not sure whether it should be documented that the evaluation of GUCs may
> be delayed, how do you think?

I changed the shm_mq_receive to asynchronous mode which is also consistent with
what we did for Gather node when reading data from parallel query workers.

> 
> ===
> 0004
> 
> 05. logical-replication.sgml
> 
> ```
> ...
> In that case, it may be necessary to change the streaming mode to on or off and
> cause the same conflicts again so the finish LSN of the failed transaction will be
> written to the server log.
>  ...
> ```
> 
> Above sentence is added by 0001, but it is not modified by 0004.
> Such transactions will be retried as streaming=on mode, so some descriptions
> related with it should be added.

Added.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: Aleksander Alekseev
Date:
Subject: Re: Adding a clang-format file