RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57164A2F6DB3B24987F48571947B9@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Wednesday, August 31, 2022 5:56 PM houzj.fnst@fujitsu.com wrote: > > On Tuesday, August 30, 2022 7:51 PM Amit Kapila <amit.kapila16@gmail.com> > wrote: > > > > On Tue, Aug 30, 2022 at 12:12 PM Amit Kapila <amit.kapila16@gmail.com> > > wrote: > > > > > > Few other comments on v25-0001* > > > ============================ > > > > > > > Some more comments on v25-0001*: > > ============================= > > 1. > > +static void > > +apply_handle_stream_abort(StringInfo s) > > ... > > ... > > + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) { if (subxid == > > + xid) parallel_apply_replorigin_reset(); > > + > > + /* Send STREAM ABORT message to the apply parallel worker. */ > > + parallel_apply_send_data(winfo, s->len, s->data); > > + > > + /* > > + * After sending the data to the apply parallel worker, wait for > > + * that worker to finish. This is necessary to maintain commit > > + * order which avoids failures due to transaction dependencies and > > + * deadlocks. > > + */ > > + if (subxid == xid) > > + { > > + parallel_apply_wait_for_free(winfo); > > ... > > ... > > > > From this code, it appears that we are waiting for rollbacks to finish > > but not doing the same in the rollback to savepoint cases. Is there a > > reason for the same? I think we need to wait for rollbacks to avoid > > transaction dependency and deadlock issues. Consider the below case: > > > > Consider table t1 (c1 primary key, c2, c3) has a row (1, 2, 3) on both > > publisher and subscriber. > > > > Publisher > > Session-1 > > ========== > > Begin; > > ... > > Delete from t1 where c1 = 1; > > > > Session-2 > > Begin; > > ... > > insert into t1 values(1, 4, 5); --This will wait for Session-1's Delete to finish. > > > > Session-1 > > Rollback; > > > > Session-2 > > -- The wait will be finished and the insert will be successful. > > Commit; > > > > Now, assume both these transactions get streamed and if we didn't wait > > for rollback/rollback to savepoint, it is possible that the insert > > gets executed before and leads to a constraint violation. This won't > > happen in non-parallel mode, so we should wait for rollbacks to finish. > > Agreed and changed. > > > 2. I think we don't need to wait at Rollback Prepared/Commit Prepared > > because we wait for prepare to finish in *_stream_prepare function. > > That will ensure all the operations in that transaction have happened > > in the subscriber, so no concurrent transaction can create deadlock or > > transaction dependency issues. If so, I think it is better to explain this in the > comments. > > Added some comments about this. > > > 3. > > +/* What action to take for the transaction. */ typedef enum > > { > > - LogicalRepMsgType command; /* 0 if invalid */ > > - LogicalRepRelMapEntry *rel; > > + /* The action for non-streaming transactions. */ > > + TA_APPLY_IN_LEADER_WORKER, > > > > - /* Remote node information */ > > - int remote_attnum; /* -1 if invalid */ > > - TransactionId remote_xid; > > - XLogRecPtr finish_lsn; > > - char *origin_name; > > -} ApplyErrorCallbackArg; > > + /* Actions for streaming transactions. */ TA_SERIALIZE_TO_FILE, > > +TA_APPLY_IN_PARALLEL_WORKER, TA_SEND_TO_PARALLEL_WORKER } > > +TransactionApplyAction; > > > > I think each action needs explanation atop this enum typedef. > > Added. > > > 4. > > @@ -1149,24 +1315,14 @@ static void > > apply_handle_stream_start(StringInfo s) { ... > > + else if (apply_action == TA_SERIALIZE_TO_FILE) { > > + /* > > + * For the first stream start, check if there is any free apply > > + * parallel worker we can use to process this transaction. > > + */ > > + if (first_segment) > > + winfo = parallel_apply_start_worker(stream_xid); > > > > - /* open the spool file for this transaction */ > > - stream_open_file(MyLogicalRepWorker->subid, stream_xid, > > first_segment); > > + if (winfo) > > + { > > + /* > > + * If we have found a free worker, then we pass the data to that > > + * worker. > > + */ > > + parallel_apply_send_data(winfo, s->len, s->data); > > > > - /* if this is not the first segment, open existing subxact file */ > > - if (!first_segment) > > - subxact_info_read(MyLogicalRepWorker->subid, stream_xid); > > + nchanges = 0; > > > > - pgstat_report_activity(STATE_RUNNING, NULL); > > + /* Cache the apply parallel worker for this transaction. */ > > + stream_apply_worker = winfo; } > > ... > > > > This looks odd to me in the sense that even if the action is > > TA_SERIALIZE_TO_FILE, we still send the information to the parallel > > worker. Won't it be better if we call parallel_apply_start_worker() > > for first_segment before checking apply_action with > > get_transaction_apply_action(). That way we can avoid this special > > case handling. > > Changed as suggested. > > > 5. > > +/* > > + * Struct for sharing information between apply leader apply worker > > +and apply > > + * parallel workers. > > + */ > > +typedef struct ApplyParallelWorkerShared { slock_t mutex; > > + > > + bool in_use; > > + > > + /* Logical protocol version. */ > > + uint32 proto_version; > > + > > + TransactionId stream_xid; > > > > Are we using stream_xid passed by the leader in parallel worker? If > > so, how? If not, then can we do without this? > > No, it seems we don't need this. Removed. > > > 6. > > +void > > +HandleParallelApplyMessages(void) > > { > > ... > > + /* OK to process messages. Reset the flag saying there are more to > > + do. */ ParallelApplyMessagePending = false; > > > > I don't understand the meaning of the second part of the comment. > > Shouldn't we say: "Reset the flag saying there is nothing more to > > do."? I know you have copied from the other part of the code but there > > also I am not sure if it is correct. > > I feel the comment here is not very helpful, so I removed this. > > > 7. > > +static List *ApplyParallelWorkersFreeList = NIL; static List > > +*ApplyParallelWorkersList = NIL; > > > > Do we really need to maintain two different workers' lists? If so, > > what is the advantage? I think there won't be many parallel apply > > workers, so even if maintain one list and search it, there shouldn't > > be any performance impact. I feel maintaining two lists for this > > purpose is a bit complex and has more chances of bugs, so we should > > try to avoid it if possible. > > Agreed, I removed the ApplyParallelWorkersList and reused > ApplyParallelWorkersList in other places. > > Attach the new version patch set which addressed above comments and > comments from[1]. > > [1] > https://www.postgresql.org/message-id/CAA4eK1%2Be8JsiC8uMZPU25xQRy > xNvVS24M4%3DZy-xD18jzX%2BvrmA%40mail.gmail.com Attach a new version patch set which fixes some typos and some cosmetic things. Best regards, Hou zj
Attachment
- v27-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v27-0001-Perform-streaming-logical-transactions-by-parall.patch
- v27-0002-Test-streaming-parallel-option-in-tap-test.patch
- v27-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v27-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
pgsql-hackers by date: