RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57164A2F6DB3B24987F48571947B9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Wednesday, August 31, 2022 5:56 PM houzj.fnst@fujitsu.com wrote:
> 
> On Tuesday, August 30, 2022 7:51 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> >
> > On Tue, Aug 30, 2022 at 12:12 PM Amit Kapila <amit.kapila16@gmail.com>
> > wrote:
> > >
> > > Few other comments on v25-0001*
> > > ============================
> > >
> >
> > Some more comments on v25-0001*:
> > =============================
> > 1.
> > +static void
> > +apply_handle_stream_abort(StringInfo s)
> > ...
> > ...
> > + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) { if (subxid ==
> > + xid) parallel_apply_replorigin_reset();
> > +
> > + /* Send STREAM ABORT message to the apply parallel worker. */
> > + parallel_apply_send_data(winfo, s->len, s->data);
> > +
> > + /*
> > + * After sending the data to the apply parallel worker, wait for
> > + * that worker to finish. This is necessary to maintain commit
> > + * order which avoids failures due to transaction dependencies and
> > + * deadlocks.
> > + */
> > + if (subxid == xid)
> > + {
> > + parallel_apply_wait_for_free(winfo);
> > ...
> > ...
> >
> > From this code, it appears that we are waiting for rollbacks to finish
> > but not doing the same in the rollback to savepoint cases. Is there a
> > reason for the same? I think we need to wait for rollbacks to avoid
> > transaction dependency and deadlock issues. Consider the below case:
> >
> > Consider table t1 (c1 primary key, c2, c3) has a row (1, 2, 3) on both
> > publisher and subscriber.
> >
> > Publisher
> > Session-1
> > ==========
> > Begin;
> > ...
> > Delete from t1 where c1 = 1;
> >
> > Session-2
> > Begin;
> > ...
> > insert into t1 values(1, 4, 5); --This will wait for Session-1's Delete to finish.
> >
> > Session-1
> > Rollback;
> >
> > Session-2
> > -- The wait will be finished and the insert will be successful.
> > Commit;
> >
> > Now, assume both these transactions get streamed and if we didn't wait
> > for rollback/rollback to savepoint, it is possible that the insert
> > gets executed before and leads to a constraint violation. This won't
> > happen in non-parallel mode, so we should wait for rollbacks to finish.
> 
> Agreed and changed.
> 
> > 2. I think we don't need to wait at Rollback Prepared/Commit Prepared
> > because we wait for prepare to finish in *_stream_prepare function.
> > That will ensure all the operations in that transaction have happened
> > in the subscriber, so no concurrent transaction can create deadlock or
> > transaction dependency issues. If so, I think it is better to explain this in the
> comments.
> 
> Added some comments about this.
> 
> > 3.
> > +/* What action to take for the transaction. */ typedef enum
> >  {
> > - LogicalRepMsgType command; /* 0 if invalid */
> > - LogicalRepRelMapEntry *rel;
> > + /* The action for non-streaming transactions. */
> > + TA_APPLY_IN_LEADER_WORKER,
> >
> > - /* Remote node information */
> > - int remote_attnum; /* -1 if invalid */
> > - TransactionId remote_xid;
> > - XLogRecPtr finish_lsn;
> > - char    *origin_name;
> > -} ApplyErrorCallbackArg;
> > + /* Actions for streaming transactions. */  TA_SERIALIZE_TO_FILE,
> > +TA_APPLY_IN_PARALLEL_WORKER,  TA_SEND_TO_PARALLEL_WORKER }
> > +TransactionApplyAction;
> >
> > I think each action needs explanation atop this enum typedef.
> 
> Added.
> 
> > 4.
> > @@ -1149,24 +1315,14 @@ static void
> >  apply_handle_stream_start(StringInfo s) { ...
> > + else if (apply_action == TA_SERIALIZE_TO_FILE) {
> > + /*
> > + * For the first stream start, check if there is any free apply
> > + * parallel worker we can use to process this transaction.
> > + */
> > + if (first_segment)
> > + winfo = parallel_apply_start_worker(stream_xid);
> >
> > - /* open the spool file for this transaction */
> > - stream_open_file(MyLogicalRepWorker->subid, stream_xid,
> > first_segment);
> > + if (winfo)
> > + {
> > + /*
> > + * If we have found a free worker, then we pass the data to that
> > + * worker.
> > + */
> > + parallel_apply_send_data(winfo, s->len, s->data);
> >
> > - /* if this is not the first segment, open existing subxact file */
> > - if (!first_segment)
> > - subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
> > + nchanges = 0;
> >
> > - pgstat_report_activity(STATE_RUNNING, NULL);
> > + /* Cache the apply parallel worker for this transaction. */
> > + stream_apply_worker = winfo; }
> > ...
> >
> > This looks odd to me in the sense that even if the action is
> > TA_SERIALIZE_TO_FILE, we still send the information to the parallel
> > worker. Won't it be better if we call parallel_apply_start_worker()
> > for first_segment before checking apply_action with
> > get_transaction_apply_action(). That way we can avoid this special
> > case handling.
> 
> Changed as suggested.
> 
> > 5.
> > +/*
> > + * Struct for sharing information between apply leader apply worker
> > +and apply
> > + * parallel workers.
> > + */
> > +typedef struct ApplyParallelWorkerShared {  slock_t mutex;
> > +
> > + bool in_use;
> > +
> > + /* Logical protocol version. */
> > + uint32 proto_version;
> > +
> > + TransactionId stream_xid;
> >
> > Are we using stream_xid passed by the leader in parallel worker? If
> > so, how? If not, then can we do without this?
> 
> No, it seems we don't need this. Removed.
> 
> > 6.
> > +void
> > +HandleParallelApplyMessages(void)
> > {
> > ...
> > + /* OK to process messages.  Reset the flag saying there are more to
> > + do. */ ParallelApplyMessagePending = false;
> >
> > I don't understand the meaning of the second part of the comment.
> > Shouldn't we say: "Reset the flag saying there is nothing more to
> > do."? I know you have copied from the other part of the code but there
> > also I am not sure if it is correct.
> 
> I feel the comment here is not very helpful, so I removed this.
> 
> > 7.
> > +static List *ApplyParallelWorkersFreeList = NIL; static List
> > +*ApplyParallelWorkersList = NIL;
> >
> > Do we really need to maintain two different workers' lists? If so,
> > what is the advantage? I think there won't be many parallel apply
> > workers, so even if maintain one list and search it, there shouldn't
> > be any performance impact. I feel maintaining two lists for this
> > purpose is a bit complex and has more chances of bugs, so we should
> > try to avoid it if possible.
> 
> Agreed, I removed the ApplyParallelWorkersList and reused
> ApplyParallelWorkersList in other places.
> 
> Attach the new version patch set which addressed above comments and
> comments from[1].
> 
> [1]
> https://www.postgresql.org/message-id/CAA4eK1%2Be8JsiC8uMZPU25xQRy
> xNvVS24M4%3DZy-xD18jzX%2BvrmA%40mail.gmail.com

Attach a new version patch set which fixes some typos and some cosmetic things.

Best regards,
Hou zj

Attachment

pgsql-hackers by date:

Previous
From: Thomas Munro
Date:
Subject: Re: pg15b3: recovery fails with wal prefetch enabled
Next
From: Polina Bungina
Date:
Subject: Re: pg_rewind WAL segments deletion pitfall