RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB5716339FF7CB759E751492CB940D9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Tues, November 22, 2022 13:20 PM Peter Smith <smithpb2250@gmail.com> wrote:
> Thanks for addressing my review comments on v47-0001.
> 
> Here are my review comments for v49-0001.

Thanks for your comments.

> ======
> 
> src/backend/replication/logical/applyparallelworker.c
> 
> 1. GENERAL - NULL checks
> 
> There is inconsistent NULL checking in the patch.
> 
> Sometimes it is like (!winfo)
> Sometimes explicit NULL checks like  (winfo->mq_handle != NULL)
> 
> (That is just one example -- there are differences in many places)
> 
> It would be better to use a consistent style everywhere.

Changed.

> ~
> 
> 2. GENERAL - Error message worker name
> 
> 2a.
> In worker.c all the messages are now "logical replication apply 
> worker" or "logical replication parallel apply worker" etc, but in the 
> applyparallel.c sometimes the "logical replication" part is missing.
> IMO all the messages in this patch should be consistently worded.
> 
> I've reported some of them in the following comment below, but please 
> search the whole patch for any I might have missed.

Rename LA and PA to the following styles:
```
LA -> logical replication apply worker
PA -> logical replication parallel apply worker ```

> 2b.
> Consider if maybe all of these ought to be calling get_worker_name() 
> which is currently static in worker.c. Doing this means any future 
> changes to get_worker_name won't cause more inconsistencies.

The most error message in applyparallelxx.c can only use "xx parallel worker",
so I think it's fine not to call get_worker_name

> ~~~
> 
> 3. File header comment
> 
> + * IDENTIFICATION 
> + src/backend/replication/logical/applyparallelworker.c
> 
> The word "IDENTIFICATION" should be on a separate line (for 
> consistency with every other PG source file)

Fixed.

> ~
> 
> 4.
> 
> + * In order for lmgr to detect this, we have LA acquire a session 
> + lock on the
> + * remote transaction (by pa_lock_stream()) and have PA wait on the 
> + lock
> before
> + * trying to receive messages. In other words, LA acquires the lock 
> + before
> + * sending STREAM_STOP and releases it if already acquired before 
> + sending
> + * STREAM_START, STREAM_ABORT(for toplevel transaction),
> STREAM_PREPARE and
> + * STREAM_COMMIT. For PA, it always needs to acquire the lock after
> processing
> + * STREAM_STOP and STREAM_ABORT(for subtransaction) and then release
> + * immediately after acquiring it. That way, when PA is waiting for 
> + LA, we can
> + * have a wait-edge from PA to LA in lmgr, which will make a deadlock 
> + in lmgr
> + * like:
> 
> Missing spaces before '(' deliberate?

Added.

> ~~~
> 
> 5. globals
> 
> +/*
> + * Is there a message sent by parallel apply worker which the leader 
> +apply
> + * worker need to receive?
> + */
> +volatile sig_atomic_t ParallelApplyMessagePending = false;
> 
> SUGGESTION
> Is there a message sent by a parallel apply worker that the leader 
> apply worker needs to receive?

Changed.

> ~~~
> 
> 6. pa_get_available_worker
> 
> +/*
> + * get an available parallel apply worker from the worker pool.
> + */
> +static ParallelApplyWorkerInfo *
> +pa_get_available_worker(void)
> 
> Uppercase comment

Changed.

> ~
> 
> 7.
> 
> + /*
> + * We first try to free the worker to improve our chances of getting
> + * the worker. Normally, we free the worker after ensuring that the
> + * transaction is committed by parallel worker but for rollbacks, we
> + * don't wait for the transaction to finish so can't free the worker
> + * information immediately.
> + */
> 
> 7a.
> "We first try to free the worker to improve our chances of getting the worker."
> 
> SUGGESTION
> We first try to free the worker to improve our chances of finding one 
> that is not in use.
> 
> ~
> 
> 7b.
> "parallel worker" -> "the parallel worker"

Changed.

> ~~~
> 
> 8. pa_allocate_worker
> 
> + /* Try to get a free parallel apply worker. */ winfo = 
> + pa_get_available_worker();
> +
> 
> SUGGESTION
> First, try to get a parallel apply worker from the pool.

Changed.

> ~~~
> 
> 9. pa_free_worker
> 
> + * This removes the parallel apply worker entry from the hash table 
> + so that it
> + * can't be used. This either stops the worker and free the 
> + corresponding info,
> + * if there are enough workers in the pool or just marks it available 
> + for
> + * reuse.
> 
> BEFORE
> This either stops the worker and free the corresponding info, if there 
> are enough workers in the pool or just marks it available for reuse.
> 
> SUGGESTION
> If there are enough workers in the pool it stops the worker and frees 
> the corresponding info, otherwise it just marks the worker as 
> available for reuse.

Changed.

> ~
> 
> 10.
> 
> + /* Free the corresponding info if the worker exited cleanly. */ if 
> + (winfo->error_mq_handle == NULL) { pa_free_worker_info(winfo); 
> + return true; }
> 
> Is it correct that this bypasses the removal from the hash table?

I rethink about this, it seems unnecessary to free the information here as
we don't expect the worker to stop unless the leader as them to stop.
So, I temporarily remove this and will think about this in next version.

> ~
> 
> 14.
> 
> + case 'X': /* Terminate, indicating clean exit. */ { 
> + shm_mq_detach(winfo->error_mq_handle);
> + winfo->error_mq_handle = NULL;
> + break;
> + }
> + default:
> 
> 
> No need for the { } here.

Changed.

> ~~~
> 
> 16. pa_init_and_launch_worker
> 
> + /* Setup shared memory. */
> + if (!pa_setup_dsm(winfo))
> + {
> + MemoryContextSwitchTo(oldcontext);
> + pfree(winfo);
> + return NULL;
> + }
> 
> 
> Wouldn't it be better to do the pfree before switching back to the oldcontext?

I think either style seems fine.

> ~~~
> 
> 17. pa_send_data
> 
> + /* Wait before retrying. */
> + rc = WaitLatch(MyLatch,
> +    WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
> +    SHM_SEND_RETRY_INTERVAL_MS,
> +    WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
> +
> + if (rc & WL_LATCH_SET)
> + {
> + ResetLatch(MyLatch);
> + CHECK_FOR_INTERRUPTS();
> + }
> 
> 
> Instead of CHECK_FOR_INTERRUPTS, should this be calling your new 
> function ProcessParallelApplyInterrupts?

I thought the ProcessParallelApplyInterrupts is intended to be invoked only in main
loop(LogicalParallelApplyLoop) to make the parallel apply worker exit cleanly.

> ~
> 
> 18.
> 
> + if (startTime == 0)
> + startTime = GetCurrentTimestamp();
> + else if (TimestampDifferenceExceeds(startTime, 
> + GetCurrentTimestamp(),
> + SHM_SEND_TIMEOUT_MS))
> + ereport(ERROR,
> + (errcode(ERRCODE_CONNECTION_FAILURE),
> + errmsg("terminating logical replication parallel apply worker due to
> timeout")));
> 
> 
> I'd previously commented that the timeout calculation seemed wrong.
> Hou-san replied [1,#9] "start counting from the first failure looks 
> fine to me." but I am not so sure - e.g. If the timeout is 10s then I 
> expect it to fail ~10s after the function is called, not 11s after. I 
> know it's pedantic, but where's the harm in making the calculation 
> right instead of just nearly right?
> 
> IMO probably an easy fix for this is like:
> 
> #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS 
> (10000 - SHM_SEND_RETRY_INTERVAL_MS)

OK, I moved the place of setting startTime before the WaitLatch.

> ~~~
> 
> 20. pa_savepoint_name
> 
> +static void
> +pa_savepoint_name(Oid suboid, TransactionId xid, char *spname,
> +   Size szsp)
> 
> Unnecessary wrapping?

Changed.

> ======
> 
> src/backend/replication/logical/origin.c
> 
> 21. replorigin_session_setup
> 
> + * However, we do allow multiple processes to point to the same 
> + origin slot
> + * if requested by the caller by passing PID of the process that has 
> + already
> + * acquired it. This is to allow using the same origin by multiple 
> + parallel
> + * apply processes the provided they maintain commit order, for 
> + example, by
> + * allowing only one process to commit at a time.
> 
> 21a.
> I thought the comment should mention this is optional and the special 
> value acquired_by=0 means don't do this.

Added.

> ~
> 
> 21b.
> "the provided they" ?? typo?

Changed.

> ======
> 
> src/backend/replication/logical/tablesync.c
> 
> 22. process_syncing_tables
> 
>  process_syncing_tables(XLogRecPtr current_lsn)  {
> + /*
> + * Skip for parallel apply workers as they don't operate on tables 
> + that
> + * are not in ready state. See pa_can_start() and
> + * should_apply_changes_for_rel().
> + */
> + if (am_parallel_apply_worker())
> + return;
> 
> SUGGESTION (remove the double negative) Skip for parallel apply 
> workers because they only operate on tables that are in a READY state. 
> See pa_can_start() and should_apply_changes_for_rel().

Changed.

> ======
> 
> src/backend/replication/logical/worker.c
> 
> 23. apply_handle_stream_stop
> 
> 
> Previously I suggested that this lock/unlock seems too tricky and 
> needed a comment. The reply [1,#12] was that this is already described 
> atop parallelapplyworker.c. OK, but in that case maybe here the 
> comment can just refer to that explanation:
> 
> SUGGESTION
> Refer to the comments atop applyparallelworker.c for what this lock 
> and immediate unlock is doing.
> 
> ~~~
> 
> 24. apply_handle_stream_abort
> 
> + if 
> + (pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_stream_count),
> 1) == 0)
> + {
> + pa_lock_stream(MyParallelShared->xid, AccessShareLock); 
> + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); }
> 
> ditto comment #23

I feel the place atop the definition of pa_lock_xxx function is a better place to
put the comments, so added there. User can check it when reading the lock
functions.

> ~~~
> 
> 25. apply_worker_clean_exit
> 
> +void
> +apply_worker_clean_exit(void)
> +{
> + /* Notify the leader apply worker that we have exited cleanly. */  
> +if (am_parallel_apply_worker())  pq_putmessage('X', NULL, 0);
> +
> + proc_exit(0);
> +}
> 
> Somehow it doesn't seem right that the PA worker sending 'X' is here 
> in worker.c, while the LA worker receipt of this 'X' is in the other 
> applyparallelworker.c module. Maybe that other function 
> HandleParallelApplyMessage should also be here in worker.c?

I thought the function apply_worker_clean_exit is widely used in worker.c and
is a common function for both leader/parallel apply workers, so I put it in
worker.c. But HandleParallelApplyMessage is a function only for parallel
worker, so it would be better to put it in applyparallelworker.c.

> ======
> 
> src/backend/utils/misc/guc_tables.c
> 
> 26.
> 
> @@ -2957,6 +2957,18 @@ struct config_int ConfigureNamesInt[] =
>   NULL,
>   },
>   &max_sync_workers_per_subscription,
> + 2, 0, MAX_PARALLEL_WORKER_LIMIT,
> + NULL, NULL, NULL
> + },
> +
> + {
> + {"max_parallel_apply_workers_per_subscription",
> + PGC_SIGHUP,
> + REPLICATION_SUBSCRIBERS,
> + gettext_noop("Maximum number of parallel apply workers per 
> + subscription."), NULL, }, 
> + &max_parallel_apply_workers_per_subscription,
>   2, 0, MAX_BACKENDS,
>   NULL, NULL, NULL
> 
> Is this correct? Did you mean to change 
> max_sync_workers_per_subscription, My 1st impression is that there has 
> been some mixup with the MAX_PARALLEL_WORKER_LIMIT and MAX_BACKENDS or 
> that this change was accidentally made to the wrong GUC.

Fixed.

> ======
> 
> src/include/replication/worker_internal.h
> 
> 27. ParallelApplyWorkerShared
> 
> + /*
> + * Indicates whether there are pending streaming blocks in the queue. 
> + The
> + * parallel apply worker will check it before starting to wait.
> + */
> + pg_atomic_uint32 pending_stream_count;
> 
> A better name might be 'n_pending_stream_blocks'.

I am not sure if the name looks better, so didn’t change this.

> ~
> 
> 28. function names
> 
>  extern void logicalrep_worker_stop(Oid subid, Oid relid);
> +extern void logicalrep_parallel_apply_worker_stop(int slot_no, uint16
> generation);
>  extern void logicalrep_worker_wakeup(Oid subid, Oid relid);  extern 
> void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
> 
>  extern int logicalrep_sync_worker_count(Oid subid);
> +extern int logicalrep_parallel_apply_worker_count(Oid subid);
> 
> Would it be better to call those new functions using similar shorter 
> names as done elsewhere?
> 
> logicalrep_parallel_apply_worker_stop -> logicalrep_pa_worker_stop 
> logicalrep_parallel_apply_worker_count -> logicalrep_pa_worker_count

Changed.

Attach new version patch which also fixed an invalid shared memory access bug
in 0002 patch reported by Kuroda-San offlist. 

Best regards,
Hou zj

Attachment

pgsql-hackers by date:

Previous
From: Bharath Rupireddy
Date:
Subject: Re: Introduce a new view for checkpointer related stats
Next
From: Stavros Koureas
Date:
Subject: Re: Logical Replication Custom Column Expression