RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57162186AD61A1FDA107A89494019@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Masahiko Sawada <sawada.mshk@gmail.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Monday, November 7, 2022 6:18 PM Masahiko Sawada <sawada.mshk@gmail.com> > > On Thu, Nov 3, 2022 at 10:06 PM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > On Wednesday, November 2, 2022 10:50 AM Masahiko Sawada > <sawada.mshk@gmail.com> wrote: > > > > > > On Mon, Oct 24, 2022 at 8:42 PM Masahiko Sawada > > > <sawada.mshk@gmail.com> wrote: > > > > > > > > On Wed, Oct 12, 2022 at 3:04 PM Amit Kapila <amit.kapila16@gmail.com> > > > wrote: > > > > > > > > > > On Tue, Oct 11, 2022 at 5:52 AM Masahiko Sawada > > > <sawada.mshk@gmail.com> wrote: > > > > > > > > > > > > On Fri, Oct 7, 2022 at 2:00 PM Amit Kapila <amit.kapila16@gmail.com> > > > wrote: > > > > > > > > > > > > > > About your point that having different partition structures for > > > > > > > publisher and subscriber, I don't know how common it will be once > we > > > > > > > have DDL replication. Also, the default value of > > > > > > > publish_via_partition_root is false which doesn't seem to indicate > > > > > > > that this is a quite common case. > > > > > > > > > > > > So how can we consider these concurrent issues that could happen > only > > > > > > when streaming = 'parallel'? Can we restrict some use cases to avoid > > > > > > the problem or can we have a safeguard against these conflicts? > > > > > > > > > > > > > > > > Yeah, right now the strategy is to disallow parallel apply for such > > > > > cases as you can see in *0003* patch. > > > > > > > > Tightening the restrictions could work in some cases but there might > > > > still be coner cases and it could reduce the usability. I'm not really > > > > sure that we can ensure such a deadlock won't happen with the current > > > > restrictions. I think we need something safeguard just in case. For > > > > example, if the leader apply worker is waiting for a lock acquired by > > > > its parallel worker, it cancels the parallel worker's transaction, > > > > commits its transaction, and restarts logical replication. Or the > > > > leader can log the deadlock to let the user know. > > > > > > > > > > As another direction, we could make the parallel apply feature robust > > > if we can detect deadlocks that happen among the leader worker and > > > parallel workers. I'd like to summarize the idea discussed off-list > > > (with Amit, Hou-San, and Kuroda-San) for discussion. The basic idea is > > > that when the leader worker or parallel worker needs to wait for > > > something (eg. transaction completion, messages) we use lmgr > > > functionality so that we can create wait-for edges and detect > > > deadlocks in lmgr. > > > > > > For example, a scenario where a deadlock occurs is the following: > > > > > > [Publisher] > > > create table tab1(a int); > > > create publication pub for table tab1; > > > > > > [Subcriber] > > > creat table tab1(a int primary key); > > > create subscription sub connection 'port=10000 dbname=postgres' > > > publication pub with (streaming = parallel); > > > > > > TX1: > > > BEGIN; > > > INSERT INTO tab1 SELECT i FROM generate_series(1, 5000) s(i); -- streamed > > > Tx2: > > > BEGIN; > > > INSERT INTO tab1 SELECT i FROM generate_series(1, 5000) s(i); -- > streamed > > > COMMIT; > > > COMMIT; > > > > > > Suppose a parallel apply worker (PA-1) is executing TX-1 and the > > > leader apply worker (LA) is executing TX-2 concurrently on the > > > subscriber. Now, LA is waiting for PA-1 because of the unique key of > > > tab1 while PA-1 is waiting for LA to send further messages. There is a > > > deadlock between PA-1 and LA but lmgr cannot detect it. > > > > > > One idea to resolve this issue is that we have LA acquire a session > > > lock on a shared object (by LockSharedObjectForSession()) and have > > > PA-1 wait on the lock before trying to receive messages. IOW, LA > > > acquires the lock before sending STREAM_STOP and releases it if > > > already acquired before sending STREAM_START, STREAM_PREPARE and > > > STREAM_COMMIT. For PA-1, it always needs to acquire the lock after > > > processing STREAM_STOP and then release immediately after acquiring > > > it. That way, when PA-1 is waiting for LA, we can have a wait-edge > > > from PA-1 to LA in lmgr, which will make a deadlock in lmgr like: > > > > > > LA (waiting to acquire lock) -> PA-1 (waiting to acquire the shared > > > object) -> LA > > > > > > We would need the shared objects per parallel apply worker. > > > > > > After detecting a deadlock, we can restart logical replication with > > > temporarily disabling the parallel apply, which is done by 0005 patch. > > > > > > Another scenario is similar to the previous case but TX-1 and TX-2 are > > > executed by two parallel apply workers (PA-1 and PA-2 respectively). > > > In this scenario, PA-2 is waiting for PA-1 to complete its transaction > > > while PA-1 is waiting for subsequent input from LA. Also, LA is > > > waiting for PA-2 to complete its transaction in order to preserve the > > > commit order. There is a deadlock among three processes but it cannot > > > be detected in lmgr because the fact that LA is waiting for PA-2 to > > > complete its transaction doesn't appear in lmgr (see > > > parallel_apply_wait_for_xact_finish()). To fix it, we can use > > > XactLockTableWait() instead. > > > > > > However, since XactLockTableWait() considers PREPARED TRANSACTION as > > > still in progress, probably we need a similar trick as above in case > > > where a transaction is prepared. For example, suppose that TX-2 was > > > prepared instead of committed in the above scenario, PA-2 acquires > > > another shared lock at START_STREAM and releases it at > > > STREAM_COMMIT/PREPARE. LA can wait on the lock. > > > > > > Yet another scenario where LA has to wait is the case where the shm_mq > > > buffer is full. In the above scenario (ie. PA-1 and PA-2 are executing > > > transactions concurrently), if the shm_mq buffer between LA and PA-2 > > > is full, LA has to wait to send messages, and this wait doesn't appear > > > in lmgr. To fix it, probably we have to use non-blocking write and > > > wait with a timeout. If timeout is exceeded, the LA will write to file > > > and indicate PA-2 that it needs to read file for remaining messages. > > > Then LA will start waiting for commit which will detect deadlock if > > > any. > > > > > > If we can detect deadlocks by having such a functionality or some > > > other way then we don't need to tighten the restrictions of subscribed > > > tables' schemas etc. > > > > Thanks for the analysis and summary ! > > > > I tried to implement the above idea and here is the patch set. I have done some > > basic tests for the new codes and it work fine. > > Thank you for updating the patches! > > Here are comments on v42-0001: Thanks for the comments. > We have the following three similar name functions regarding to > starting a new parallel apply worker: > > parallel_apply_start_worker() > parallel_apply_setup_worker() > parallel_apply_setup_dsm() > > It seems to me that we can somewhat merge them since > parallel_apply_setup_worker() and parallel_apply_setup_dsm() have only > one caller. Since these functions are doing different tasks(external function, Launch, DSM), so I personally feel it's OK to split them. But if others also feel it's unnecessary I will merge them. > --- > +/* > + * Extract the streaming mode value from a DefElem. This is like > + * defGetBoolean() but also accepts the special value of "parallel". > + */ > +char > +defGetStreamingMode(DefElem *def) > > It's a bit unnatural to have this function in define.c since other > functions in this file for primitive data types. How about having it > in subscription.c? Changed. > --- > /* > * Exit if any parameter that affects the remote connection > was changed. > - * The launcher will start a new worker. > + * The launcher will start a new worker, but note that the > parallel apply > + * worker may or may not restart depending on the value of > the streaming > + * option and whether there will be a streaming transaction. > > In which case does the parallel apply worker don't restart even if the > streaming option has been changed? > > --- > I think we should explain somewhere the idea of using locks for > synchronization between leader and worker. Maybe can we do that with > sample workload in new README file? Having a README sounds like a good idea. I think not only the lock design, we might need to also move some other existing design comments atop worker.c into that. So, maybe better do that as a separate patch ? For now, I added comments atop applyparallelworker.c. > --- > in parallel_apply_send_data(): > > + result = shm_mq_send(winfo->mq_handle, nbytes, data, > true, true); > + > + if (result == SHM_MQ_SUCCESS) > + break; > + else if (result == SHM_MQ_DETACHED) > + ereport(ERROR, > + > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("could not send data > to shared-memory queue"))) > + > + Assert(result == SHM_MQ_WOULD_BLOCK); > + > + if (++retry >= CHANGES_THRESHOLD) > + { > + MemoryContext oldcontext; > + StringInfoData msg; > + TimestampTz now = GetCurrentTimestamp(); > + > + if (startTime == 0) > + startTime = now; > + > + if (!TimestampDifferenceExceeds(startTime, > now, SHM_SEND_TIMEOUT_MS)) > + continue; > > IIUC since the parallel worker retries to send data without waits the > 'retry' will get larger than CHANGES_THRESHOLD in a very short time. > But the worker waits at least for SHM_SEND_TIMEOUT_MS to spool data > regardless of 'retry' count. Don't we need to nap somewhat and why do > we need CHANGES_THRESHOLD? Oh, I intended to only check for timeout after continuously retrying XX times to reduce the cost of getting the system time and calculating the time difference. I added some comments in the code. > --- > +/* > + * Wait until the parallel apply worker's xact_state flag becomes > + * the same as in_xact. > + */ > +static void > +parallel_apply_wait_for_in_xact(ParallelApplyWorkerShared *wshared, > + > ParallelTransState xact_state) > +{ > + for (;;) > + { > + /* Stop if the flag becomes the same as in_xact. */ > > What do you mean by 'in_xact' here? Changed. > --- > I got the error "ERROR: invalid logical replication message type "" > with the following scenario: > > 1. Stop the PA by sending SIGSTOP signal. > 2. Stream a large transaction so that the LA spools changes to the file for PA. > 3. Resume the PA by sending SIGCONT signal. > 4. Stream another large transaction. > > --- > * On publisher (with logical_decoding_work_mem = 64kB) > begin; > insert into t select generate_series(1, 1000); > rollback; > begin; > insert into t select generate_series(1, 1000); > rollback; > > I got the following error: > > ERROR: hash table corrupted > CONTEXT: processing remote data for replication origin "pg_16393" > during message type "STREAM START" in transaction 734 Thanks! I think I have fixed them in the new version. > --- > IIUC the changes for worker.c in 0001 patch includes both changes: > > 1. apply worker takes action based on the apply_action returned by > get_transaction_apply_action() per message (or streamed chunk). > 2. apply worker supports handling parallel apply workers. > > It seems to me that (1) is a rather refactoring patch, so probably we > can do that in a separate patch so that we can make the patches > smaller. I tried it, but it seems the code size of the apply_action is quite small, Because we only have two action(LEADER_APPLY/LEADER_SERIALIZE) on HEAD branch and only handle_streamed_transaction use it. I will think if there are other ways to split the patch. > --- > postgres(1:2831190)=# \dRs+ test_sub1 > List of subscriptions > -[ RECORD 1 ]------+-------------------------- > Name | test_sub1 > Owner | masahiko > Enabled | t > Publication | {test_pub1} > Binary | f > Streaming | p > Two-phase commit | d > Disable on error | f > Origin | any > Synchronous commit | off > Conninfo | port=5551 dbname=postgres > Skip LSN | 0/0 > > It's better to show 'on', 'off' or 'streaming' rather than one character. Changed. Best regards, Hou zj
Attachment
pgsql-hackers by date: