RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57160D33CD7DBEF6D6206AE5942A9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("kuroda.hayato@fujitsu.com" <kuroda.hayato@fujitsu.com>)
List pgsql-hackers
On Wednesday, October 19, 2022 8:50 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:

Thanks for the comments.

> 03. applyparallelwprker.c - LogicalParallelApplyLoop
> 
> ```
>             case SHM_MQ_WOULD_BLOCK:
>                 {
>                     int            rc;
> 
>                     if (!in_streamed_transaction)
>                     {
>                         /*
>                          * If we didn't get any transactions for a while there might be
>                          * unconsumed invalidation messages in the queue, consume them
>                          * now.
>                          */
>                         AcceptInvalidationMessages();
>                         maybe_reread_subscription();
>                     }
> 
>                     MemoryContextReset(ApplyMessageContext);
> ```
> 
> Is MemoryContextReset() needed? IIUC no one uses ApplyMessageContext if we reach here.

I was concerned that some code in deeper level might allocate some memory as
there are lots of codes and functions could be invoked in the loop(For example,
the functions in ProcessInterrupts()). Although It might not matter in
practice, but I think it might be better to reset here to make it robust. Besides,
the code seems consistent with the logic in LogicalRepApplyLoop.

> 04. applyparallelwprker.c - HandleParallelApplyMessages
> 
> ```
>         else if (res == SHM_MQ_SUCCESS)
>         {
>             StringInfoData msg;
> 
>             initStringInfo(&msg);
>             appendBinaryStringInfo(&msg, data, nbytes);
>             HandleParallelApplyMessage(winfo, &msg);
>             pfree(msg.data);
>         }
> ```
> 
> In LogicalParallelApplyLoop(), appendBinaryStringInfo() is not used but
> initialized StringInfoData directly initialized. Why there is a difrerence? The
> function will do repalloc() and memcpy(), so it may be inefficient.

I think both styles are fine, the code in HandleParallelApplyMessages is to keep
consistent with the similar function HandleParallelMessages() which is not a
performance sensitive function.


> 05. applyparallelwprker.c - parallel_apply_send_data
> 
> ```
>     if (result != SHM_MQ_SUCCESS)
>         ereport(ERROR,
>                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>                  errmsg("could not send data to shared-memory queue")));
> 
> ```
> 
> I checked the enumeration of shm_mq_result, and I felt that shm_mq_send(nowait
> = false) failed only when the opposite process has been exited. How about add a
> hint or detailed message like "lost connection to parallel apply worker"?

Thanks for analyzing, but I am not sure if "lost connection to xx worker" is a
appropriate errhint or detail. The current error message looks clear to me.


> 07. worker.c - apply_handle_commit_internal
> 
> I think we can add an assertion like Assert(replorigin_session_origin_lsn !=
> InvalidXLogRecPtr && replorigin_session_origin = InvalidRepOriginId), to
> avoid missing replorigin_session_setup. Previously it was set at the entry
> point at never reset.

I feel addding the assert for replorigin_session_origin is fine here. For
replorigin_session_origin_lsn, I am not sure if looks better to check here as
we need to distingush the case for streaming=on and streaming=parallel if we
want to check that.


> 10. general
> 
> IIUC parallel apply workers could not detect the deadlock automatically,
> right? I thought we might be able to use the heartbeat protocol between a
> leader worker and parallel workers.
>  
> You have already implemented a mechanism to send and receive messages between
> workers. My idea is that each parallel apply worker records a timestamp that
> gets a message from the leader and if a certain time (30s?) has passed it
> sends a heartbeat message like 'H'. The leader consumes 'H' and sends a reply
> like LOGICAL_REP_MSG_KEEPALIVE in HandleParallelApplyMessage(). If the
> parallel apply worker does not receive any message for more than one minute,
> it regards that the deadlock has occurred and can change the retry flag to on
> and exit.
> 
> The above assumes that the leader cannot reply to the message while waiting
> for the lock. Moreover, it may have notable overhead and we must use a new
> logical replication message type.
> 
> How do you think? Have you already considered about it?

Thanks for the suggestion. But we are trying to detect this kind of problem before
this problematic case happens and disallow parallelism in these cases by
checking the unique/constr/partitioned... in 0003/0004 patch.

About the keepalive design. We could do that, but the leader could also be
blocked by some other user backend, so this design might cause the worker to
error out in some unexpected cases which seems not great.

Best regards,
Hou zj

pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: Re: libpq support for NegotiateProtocolVersion
Next
From: Amit Kapila
Date:
Subject: Re: Perform streaming logical transactions by background workers and parallel apply