RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | wangw.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS3PR01MB6275EFC4B707650DAB9392859E4D9@OS3PR01MB6275.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply Re: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
On Thu, Sep 15, 2022 at 19:40 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > On Thu, Sep 15, 2022 at 10:45 AM wangw.fnst@fujitsu.com > <wangw.fnst@fujitsu.com> wrote: > > > > Attach the new patch set. > > > > Review of v29-0001* Thanks for your comments and patch! > ================== > 1. > +parallel_apply_find_worker(TransactionId xid) > { > ... > + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, &found); > + if (found) > + { > + /* If any workers (or the postmaster) have died, we have failed. */ > + if (entry->winfo->error_mq_handle == NULL) > + ereport(ERROR, > + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("lost connection to parallel apply worker"))); > ... > } > > I think the above comment is incorrect because if the postmaster would > have died then you wouldn't have found the entry in the hash table. > How about something like: "We can't proceed if the parallel streaming > worker has already exited." Fixed. > 2. > +/* > + * Find the previously assigned worker for the given transaction, if any. > + */ > +ParallelApplyWorkerInfo * > +parallel_apply_find_worker(TransactionId xid) > > No need to use word 'previously' in the above sentence. Improved. > 3. > + * We need one key to register the location of the header, and we need > + * another key to track the location of the message queue. > + */ > + shm_toc_initialize_estimator(&e); > + shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared)); > + shm_toc_estimate_chunk(&e, queue_size); > + shm_toc_estimate_chunk(&e, error_queue_size); > + > + shm_toc_estimate_keys(&e, 3); > > Overall, three keys are used but the comment indicates two. You forgot > to mention about error_queue. Fixed. > 4. > + if (launched) > + ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, winfo); > + else > + { > + shm_mq_detach(winfo->mq_handle); > + shm_mq_detach(winfo->error_mq_handle); > + dsm_detach(winfo->dsm_seg); > + pfree(winfo); > + > + winfo = NULL; > + } > > A. The code used in the else part to free worker info is the same as > what is used in parallel_apply_free_worker. Can we move this to a > separate function say parallel_apply_free_worker_info()? > B. I think it will be better if you use {} for if branch to make it > look consistent with else branch. Improved. > 5. > + * case define a named savepoint, so that we are able to commit/rollback it > + * separately later. > + */ > +void > +parallel_apply_subxact_info_add(TransactionId current_xid) > > I don't see the need of commit in the above message. So, we can > slightly modify it to: "... so that we are able to rollback to it > separately later." Improved. > 6. > + for (i = list_length(subxactlist) - 1; i >= 0; i--) > + { > + xid = list_nth_xid(subxactlist, i); > ... > ... > > +/* > + * Return the TransactionId value contained in the n'th element of the > + * specified list. > + */ > +static inline TransactionId > +list_nth_xid(const List *list, int n) > +{ > + Assert(IsA(list, XidList)); > + return lfirst_xid(list_nth_cell(list, n)); > +} > > I am not really sure that we need a new list function to use for this > place. Can't we directly use lfirst_xid(list_nth_cell) instead? Improved. > 7. > +void > +parallel_apply_replorigin_setup(void) > +{ > + RepOriginId originid; > + char originname[NAMEDATALEN]; > + bool started_tx = false; > + > + /* This function might be called inside or outside of transaction. */ > + if (!IsTransactionState()) > + { > + StartTransactionCommand(); > + started_tx = true; > + } > > Is there a place in the patch where this function will be called > without having an active transaction state? If so, then this coding is > fine but if not, then I suggest keeping an assert for transaction > state here. The same thing applies to > parallel_apply_replorigin_reset() as well. When using parallel apply, only the parallel apply worker is in a transaction while the leader apply worker is not. So when invoking function parallel_apply_replorigin_setup() in the leader apply worker, we need to start a transaction block. > 8. > + * > + * If write_abort_lsn is true, send the abort_lsn and abort_time fields, > + * otherwise don't. > */ > void > logicalrep_write_stream_abort(StringInfo out, TransactionId xid, > - TransactionId subxid) > + TransactionId subxid, XLogRecPtr abort_lsn, > + TimestampTz abort_time, bool abort_info) > > In the comment, the name of the variable needs to be updated. Fixed. > 9. > +TransactionId stream_xid = InvalidTransactionId; > > -static TransactionId stream_xid = InvalidTransactionId; > ... > ... > +void > +parallel_apply_subxact_info_add(TransactionId current_xid) > +{ > + if (current_xid != stream_xid && > + !list_member_xid(subxactlist, current_xid)) > > It seems you have changed the scope of stream_xid to use it in > parallel_apply_subxact_info_add(). Won't it be better to pass it as a > parameter (say top_xid)? Improved. > 10. > --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c > +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c > @@ -20,6 +20,7 @@ > #include <sys/time.h> > > #include "access/xlog.h" > +#include "catalog/pg_subscription.h" > #include "catalog/pg_type.h" > #include "common/connect.h" > #include "funcapi.h" > @@ -443,9 +444,14 @@ libpqrcv_startstreaming(WalReceiverConn *conn, > appendStringInfo(&cmd, "proto_version '%u'", > options->proto.logical.proto_version); > > - if (options->proto.logical.streaming && > - PQserverVersion(conn->streamConn) >= 140000) > - appendStringInfoString(&cmd, ", streaming 'on'"); > + if (options->proto.logical.streaming != SUBSTREAM_OFF) > + { > + if (PQserverVersion(conn->streamConn) >= 160000 && > + options->proto.logical.streaming == SUBSTREAM_PARALLEL) > + appendStringInfoString(&cmd, ", streaming 'parallel'"); > + else if (PQserverVersion(conn->streamConn) >= 140000) > + appendStringInfoString(&cmd, ", streaming 'on'"); > + } > > It doesn't seem like a good idea to expose subscription options here. > Can we think of having char *streaming_option instead of the current > streaming parameter which is filled by the caller and used here > directly? Improved. > 11. The error message used in pgoutput_startup() seems to be better > than the current messages used in that function but it is better to be > consistent with other messages. There is a discussion in the email > thread [1] on improving those messages, so kindly suggest there. Okay, I will try to modify the two messages and share them in the thread you mentioned. > 12. In addition to the above, I have changed/added a few comments in > the attached patch. Improved as suggested. Regards, Wang wei
Attachment
- v30-0001-Perform-streaming-logical-transactions-by-parall.patch
- v30-0002-Test-streaming-parallel-option-in-tap-test.patch
- v30-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v30-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v30-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
pgsql-hackers by date: