RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57161603FC9CB886F2CEE53194159@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Tuesday, November 29, 2022 8:34 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Tue, Nov 29, 2022 at 10:18 AM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > Attach the new version patch which addressed all comments. > > > > Review comments on v53-0001* Thanks for the comments! > ========================== > 1. > Subscription *MySubscription = NULL; > -static bool MySubscriptionValid = false; > +bool MySubscriptionValid = false; > > It seems still this variable is used in worker.c, so why it's scope changed? I think it's not needed. Removed. > 2. > /* fields valid only when processing streamed transaction */ -static bool > in_streamed_transaction = false; > +bool in_streamed_transaction = false; > > Is it really required to change the scope of this variable? Can we think of > exposing a macro or inline function to check it in applyparallelworker.c? Introduced a new function. > 3. > should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { > if (am_tablesync_worker()) > return MyLogicalRepWorker->relid == rel->localreloid; > + else if (am_parallel_apply_worker()) > + { > + if (rel->state != SUBREL_STATE_READY) > + ereport(ERROR, > + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("logical replication parallel apply worker for subscription > \"%s\" will stop", > > Is this check sufficient? What if the rel->state is SUBREL_STATE_UNKNOWN? I > think that will be possible when the refresh publication has not been yet > performed after adding a new relation to the publication. If that is true then > won't we need to simply ignore that change and continue instead of erroring > out? Can you please once test and check this case? You are right. Changed to not report an ERROR for SUBREL_STATE_UNKNOWN. > 4. > + > + case TRANS_PARALLEL_APPLY: > + list_free(subxactlist); > + subxactlist = NIL; > + > + apply_handle_commit_internal(&commit_data); > > I don't think we need to retail pfree subxactlist as this is allocated in > TopTransactionContext and will be freed at commit/prepare. This way freeing > looks a bit adhoc to me and you need to expose this list outside > applyparallelworker.c which doesn't seem like a good idea to me either. Removed the list_free. > 5. > + apply_handle_commit_internal(&commit_data); > + > + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); > + pa_unlock_transaction(xid, AccessShareLock); > + > + elog(DEBUG1, "finished processing the transaction finish command"); > > I think in this and similar DEBUG logs, we can tell the exact command instead of > writing 'finish'. Changed. > 6. > apply_handle_stream_commit() > { > ... > + /* > + * After sending the data to the parallel apply worker, wait for > + * that worker to finish. This is necessary to maintain commit > + * order which avoids failures due to transaction dependencies and > + * deadlocks. > + */ > + pa_wait_for_xact_finish(winfo); > + > + pgstat_report_stat(false); > + store_flush_position(commit_data.end_lsn); > + stop_skipping_changes(); > + > + (void) pa_free_worker(winfo, xid); > ... > } > apply_handle_stream_prepare(StringInfo s) { > + > + /* > + * After sending the data to the parallel apply worker, wait for > + * that worker to finish. This is necessary to maintain commit > + * order which avoids failures due to transaction dependencies and > + * deadlocks. > + */ > + pa_wait_for_xact_finish(winfo); > + (void) pa_free_worker(winfo, prepare_data.xid); > > - /* unlink the files with serialized changes and subxact info. */ > - stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); > + in_remote_transaction = false; > + > + store_flush_position(prepare_data.end_lsn); > > > In both of the above functions, we should be consistent in calling > pa_free_worker() function which I think should be immediately after > pa_wait_for_xact_finish(). Is there a reason for not being consistent here? Changed the order to make them consistent. > 7. > + res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true); > + > + /* > + * The leader will detach from the error queue and set it to NULL > + * before preparing to stop all parallel apply workers, so we don't > + * need to handle error messages anymore. > + */ > + if (!winfo->error_mq_handle) > + continue; > > This check must be done before calling shm_mq_receive. So, changed it in the > attached patch. Thanks, merged. > 8. > @@ -2675,6 +3156,10 @@ store_flush_position(XLogRecPtr remote_lsn) { > FlushPosition *flushpos; > > + /* Skip for parallel apply workers. */ if (am_parallel_apply_worker()) > + return; > > It is okay to always update the flush position by leader apply worker but I think > the leader won't have updated value for XactLastCommitEnd as the local > transaction is committed by parallel apply worker. I added a field in shared memory so that the parallel apply worker can pass the XactLastCommitEnd to leader and then the leader will store that. > 9. > @@ -3831,11 +4366,11 @@ ApplyWorkerMain(Datum main_arg) > > ereport(DEBUG1, > (errmsg_internal("logical replication apply worker for subscription \"%s\" > two_phase is %s", > - MySubscription->name, > - MySubscription->twophasestate == > LOGICALREP_TWOPHASE_STATE_DISABLED > ? "DISABLED" : > - MySubscription->twophasestate == > LOGICALREP_TWOPHASE_STATE_PENDING ? > "PENDING" : > - MySubscription->twophasestate == > LOGICALREP_TWOPHASE_STATE_ENABLED ? > "ENABLED" : > - "?"))); > + MySubscription->name, > + MySubscription->twophasestate == > LOGICALREP_TWOPHASE_STATE_DISABLED > ? "DISABLED" : > + MySubscription->twophasestate == > LOGICALREP_TWOPHASE_STATE_PENDING ? > "PENDING" : > + MySubscription->twophasestate == > LOGICALREP_TWOPHASE_STATE_ENABLED ? > "ENABLED" : > + "?"))); > > Is this change related to this patch? I think accidentally changed due to pgident. Reverted. > 10. What is the reason to expose ApplyErrorCallbackArg via worker_internal.h? The parallel apply worker need to set the origin name into this. I introduced another function to set this. > 11. The order to declare pa_set_stream_apply_worker() in worker_internal.h and > define in applyparallelworker.c is not the same. > Similarly, please check all other functions. Changed. > 12. Apart from the above, I have made a few changes in the comments and > some other cosmetic changes in the attached patch. Thanks, I have checked and merged them. Attach the new version patch set. I haven't addressed comment #1 and #2 from [1], I need to think about it and will handle it soon. Besides, I haven't renamed serialize_stream_start/stop and haven't finished the word consistency check for comments, I think I will handle them soon. [1] https://www.postgresql.org/message-id/CAA4eK1LGKYUDFZ_jFPrU497wQf2HNvt5a%2BtCTpqSeWSG6kfpSA%40mail.gmail.com Best regards, Hou zj
Attachment
- v54-0001-Perform-streaming-logical-transactions-by-parall.patch
- v54-0002-Serialize-partial-changes-to-a-file-when-the-att.patch
- v54-0003-Test-streaming-parallel-option-in-tap-test.patch
- v54-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v54-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
pgsql-hackers by date: