RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB5716546863340CA19EC5F8C394789@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Tuesday, August 30, 2022 7:51 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Tue, Aug 30, 2022 at 12:12 PM Amit Kapila <amit.kapila16@gmail.com> > wrote: > > > > Few other comments on v25-0001* > > ============================ > > > > Some more comments on v25-0001*: > ============================= > 1. > +static void > +apply_handle_stream_abort(StringInfo s) > ... > ... > + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) { if (subxid == > + xid) parallel_apply_replorigin_reset(); > + > + /* Send STREAM ABORT message to the apply parallel worker. */ > + parallel_apply_send_data(winfo, s->len, s->data); > + > + /* > + * After sending the data to the apply parallel worker, wait for > + * that worker to finish. This is necessary to maintain commit > + * order which avoids failures due to transaction dependencies and > + * deadlocks. > + */ > + if (subxid == xid) > + { > + parallel_apply_wait_for_free(winfo); > ... > ... > > From this code, it appears that we are waiting for rollbacks to finish but not > doing the same in the rollback to savepoint cases. Is there a reason for the > same? I think we need to wait for rollbacks to avoid transaction dependency > and deadlock issues. Consider the below case: > > Consider table t1 (c1 primary key, c2, c3) has a row (1, 2, 3) on both publisher and > subscriber. > > Publisher > Session-1 > ========== > Begin; > ... > Delete from t1 where c1 = 1; > > Session-2 > Begin; > ... > insert into t1 values(1, 4, 5); --This will wait for Session-1's Delete to finish. > > Session-1 > Rollback; > > Session-2 > -- The wait will be finished and the insert will be successful. > Commit; > > Now, assume both these transactions get streamed and if we didn't wait for > rollback/rollback to savepoint, it is possible that the insert gets executed > before and leads to a constraint violation. This won't happen in non-parallel > mode, so we should wait for rollbacks to finish. Agreed and changed. > 2. I think we don't need to wait at Rollback Prepared/Commit Prepared > because we wait for prepare to finish in *_stream_prepare function. > That will ensure all the operations in that transaction have happened in the > subscriber, so no concurrent transaction can create deadlock or transaction > dependency issues. If so, I think it is better to explain this in the comments. Added some comments about this. > 3. > +/* What action to take for the transaction. */ typedef enum > { > - LogicalRepMsgType command; /* 0 if invalid */ > - LogicalRepRelMapEntry *rel; > + /* The action for non-streaming transactions. */ > + TA_APPLY_IN_LEADER_WORKER, > > - /* Remote node information */ > - int remote_attnum; /* -1 if invalid */ > - TransactionId remote_xid; > - XLogRecPtr finish_lsn; > - char *origin_name; > -} ApplyErrorCallbackArg; > + /* Actions for streaming transactions. */ TA_SERIALIZE_TO_FILE, > +TA_APPLY_IN_PARALLEL_WORKER, TA_SEND_TO_PARALLEL_WORKER } > +TransactionApplyAction; > > I think each action needs explanation atop this enum typedef. Added. > 4. > @@ -1149,24 +1315,14 @@ static void > apply_handle_stream_start(StringInfo s) { ... > + else if (apply_action == TA_SERIALIZE_TO_FILE) { > + /* > + * For the first stream start, check if there is any free apply > + * parallel worker we can use to process this transaction. > + */ > + if (first_segment) > + winfo = parallel_apply_start_worker(stream_xid); > > - /* open the spool file for this transaction */ > - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); > + if (winfo) > + { > + /* > + * If we have found a free worker, then we pass the data to that > + * worker. > + */ > + parallel_apply_send_data(winfo, s->len, s->data); > > - /* if this is not the first segment, open existing subxact file */ > - if (!first_segment) > - subxact_info_read(MyLogicalRepWorker->subid, stream_xid); > + nchanges = 0; > > - pgstat_report_activity(STATE_RUNNING, NULL); > + /* Cache the apply parallel worker for this transaction. */ > + stream_apply_worker = winfo; } > ... > > This looks odd to me in the sense that even if the action is > TA_SERIALIZE_TO_FILE, we still send the information to the parallel > worker. Won't it be better if we call parallel_apply_start_worker() > for first_segment before checking apply_action with > get_transaction_apply_action(). That way we can avoid this special > case handling. Changed as suggested. > 5. > +/* > + * Struct for sharing information between apply leader apply worker and apply > + * parallel workers. > + */ > +typedef struct ApplyParallelWorkerShared > +{ > + slock_t mutex; > + > + bool in_use; > + > + /* Logical protocol version. */ > + uint32 proto_version; > + > + TransactionId stream_xid; > > Are we using stream_xid passed by the leader in parallel worker? If > so, how? If not, then can we do without this? No, it seems we don't need this. Removed. > 6. > +void > +HandleParallelApplyMessages(void) > { > ... > + /* OK to process messages. Reset the flag saying there are more to do. */ > + ParallelApplyMessagePending = false; > > I don't understand the meaning of the second part of the comment. > Shouldn't we say: "Reset the flag saying there is nothing more to > do."? I know you have copied from the other part of the code but there > also I am not sure if it is correct. I feel the comment here is not very helpful, so I removed this. > 7. > +static List *ApplyParallelWorkersFreeList = NIL; > +static List *ApplyParallelWorkersList = NIL; > > Do we really need to maintain two different workers' lists? If so, > what is the advantage? I think there won't be many parallel apply > workers, so even if maintain one list and search it, there shouldn't > be any performance impact. I feel maintaining two lists for this > purpose is a bit complex and has more chances of bugs, so we should > try to avoid it if possible. Agreed, I removed the ApplyParallelWorkersList and reused ApplyParallelWorkersList in other places. Attach the new version patch set which addressed above comments and comments from[1]. [1] https://www.postgresql.org/message-id/CAA4eK1%2Be8JsiC8uMZPU25xQRyxNvVS24M4%3DZy-xD18jzX%2BvrmA%40mail.gmail.com Best regards, Hou zj
Attachment
- v26-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v26-0001-Perform-streaming-logical-transactions-by-parall.patch
- v26-0002-Test-streaming-parallel-option-in-tap-test.patch
- v26-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v26-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
pgsql-hackers by date: