RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB5716546863340CA19EC5F8C394789@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Tuesday, August 30, 2022 7:51 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Tue, Aug 30, 2022 at 12:12 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> >
> > Few other comments on v25-0001*
> > ============================
> >
> 
> Some more comments on v25-0001*:
> =============================
> 1.
> +static void
> +apply_handle_stream_abort(StringInfo s)
> ...
> ...
> + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) { if (subxid ==
> + xid) parallel_apply_replorigin_reset();
> +
> + /* Send STREAM ABORT message to the apply parallel worker. */
> + parallel_apply_send_data(winfo, s->len, s->data);
> +
> + /*
> + * After sending the data to the apply parallel worker, wait for
> + * that worker to finish. This is necessary to maintain commit
> + * order which avoids failures due to transaction dependencies and
> + * deadlocks.
> + */
> + if (subxid == xid)
> + {
> + parallel_apply_wait_for_free(winfo);
> ...
> ...
> 
> From this code, it appears that we are waiting for rollbacks to finish but not
> doing the same in the rollback to savepoint cases. Is there a reason for the
> same? I think we need to wait for rollbacks to avoid transaction dependency
> and deadlock issues. Consider the below case:
> 
> Consider table t1 (c1 primary key, c2, c3) has a row (1, 2, 3) on both publisher and
> subscriber.
> 
> Publisher
> Session-1
> ==========
> Begin;
> ...
> Delete from t1 where c1 = 1;
> 
> Session-2
> Begin;
> ...
> insert into t1 values(1, 4, 5); --This will wait for Session-1's Delete to finish.
> 
> Session-1
> Rollback;
> 
> Session-2
> -- The wait will be finished and the insert will be successful.
> Commit;
> 
> Now, assume both these transactions get streamed and if we didn't wait for
> rollback/rollback to savepoint, it is possible that the insert gets executed
> before and leads to a constraint violation. This won't happen in non-parallel
> mode, so we should wait for rollbacks to finish.

Agreed and changed.

> 2. I think we don't need to wait at Rollback Prepared/Commit Prepared
> because we wait for prepare to finish in *_stream_prepare function.
> That will ensure all the operations in that transaction have happened in the
> subscriber, so no concurrent transaction can create deadlock or transaction
> dependency issues. If so, I think it is better to explain this in the comments.

Added some comments about this.

> 3.
> +/* What action to take for the transaction. */ typedef enum
>  {
> - LogicalRepMsgType command; /* 0 if invalid */
> - LogicalRepRelMapEntry *rel;
> + /* The action for non-streaming transactions. */
> + TA_APPLY_IN_LEADER_WORKER,
> 
> - /* Remote node information */
> - int remote_attnum; /* -1 if invalid */
> - TransactionId remote_xid;
> - XLogRecPtr finish_lsn;
> - char    *origin_name;
> -} ApplyErrorCallbackArg;
> + /* Actions for streaming transactions. */  TA_SERIALIZE_TO_FILE,
> +TA_APPLY_IN_PARALLEL_WORKER,  TA_SEND_TO_PARALLEL_WORKER }
> +TransactionApplyAction;
> 
> I think each action needs explanation atop this enum typedef.

Added.

> 4.
> @@ -1149,24 +1315,14 @@ static void
>  apply_handle_stream_start(StringInfo s) { ...
> + else if (apply_action == TA_SERIALIZE_TO_FILE) {
> + /*
> + * For the first stream start, check if there is any free apply
> + * parallel worker we can use to process this transaction.
> + */
> + if (first_segment)
> + winfo = parallel_apply_start_worker(stream_xid);
> 
> - /* open the spool file for this transaction */
> - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
> + if (winfo)
> + {
> + /*
> + * If we have found a free worker, then we pass the data to that
> + * worker.
> + */
> + parallel_apply_send_data(winfo, s->len, s->data);
> 
> - /* if this is not the first segment, open existing subxact file */
> - if (!first_segment)
> - subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
> + nchanges = 0;
> 
> - pgstat_report_activity(STATE_RUNNING, NULL);
> + /* Cache the apply parallel worker for this transaction. */
> + stream_apply_worker = winfo; }
> ...
> 
> This looks odd to me in the sense that even if the action is
> TA_SERIALIZE_TO_FILE, we still send the information to the parallel
> worker. Won't it be better if we call parallel_apply_start_worker()
> for first_segment before checking apply_action with
> get_transaction_apply_action(). That way we can avoid this special
> case handling.

Changed as suggested.

> 5.
> +/*
> + * Struct for sharing information between apply leader apply worker and apply
> + * parallel workers.
> + */
> +typedef struct ApplyParallelWorkerShared
> +{
> + slock_t mutex;
> +
> + bool in_use;
> +
> + /* Logical protocol version. */
> + uint32 proto_version;
> +
> + TransactionId stream_xid;
> 
> Are we using stream_xid passed by the leader in parallel worker? If
> so, how? If not, then can we do without this?

No, it seems we don't need this. Removed.

> 6.
> +void
> +HandleParallelApplyMessages(void)
> {
> ...
> + /* OK to process messages.  Reset the flag saying there are more to do. */
> + ParallelApplyMessagePending = false;
> 
> I don't understand the meaning of the second part of the comment.
> Shouldn't we say: "Reset the flag saying there is nothing more to
> do."? I know you have copied from the other part of the code but there
> also I am not sure if it is correct.

I feel the comment here is not very helpful, so I removed this.

> 7.
> +static List *ApplyParallelWorkersFreeList = NIL;
> +static List *ApplyParallelWorkersList = NIL;
> 
> Do we really need to maintain two different workers' lists? If so,
> what is the advantage? I think there won't be many parallel apply
> workers, so even if maintain one list and search it, there shouldn't
> be any performance impact. I feel maintaining two lists for this
> purpose is a bit complex and has more chances of bugs, so we should
> try to avoid it if possible.

Agreed, I removed the ApplyParallelWorkersList and reused
ApplyParallelWorkersList in other places.

Attach the new version patch set which addressed above comments
and comments from[1].

[1] https://www.postgresql.org/message-id/CAA4eK1%2Be8JsiC8uMZPU25xQRyxNvVS24M4%3DZy-xD18jzX%2BvrmA%40mail.gmail.com

Best regards,
Hou zj

Attachment

pgsql-hackers by date:

Previous
From: Peter Eisentraut
Date:
Subject: Re: First draft of the PG 15 release notes
Next
From: Daniel Gustafsson
Date:
Subject: Re: plpgsql-trigger.html: Format TG_ variables as table (patch)