RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB5716B400CD81565E868616DB945F9@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Peter Smith <smithpb2250@gmail.com>) |
Responses |
RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
On Friday, September 30, 2022 4:27 PM Peter Smith <smithpb2250@gmail.com> wrote: > > Here are my review comments for the v35-0001 patch: Thanks for the comments. > 3. GENERAL > I found the mixed use of the same member names having different meanings to be quite confusing. > > e.g.1 > PGOutputData 'streaming' is now a single char internal representation the subscription parameter streaming mode ('f','t','p') > - bool streaming; > + char streaming; > > e.g.2 > WalRcvStreamOptions 'streaming' is a C string version of the subscription streaming mode ("on", "parallel") > - bool streaming; /* Streaming of large transactions */ > + char *streaming; /* Streaming of large transactions */ > > e.g.3 > SubOpts 'streaming' is again like the first example - a single char for the mode. > - bool streaming; > + char streaming; > > > IMO everything would become much simpler if you did: > > 3a. > Rename "char streaming;" -> "char streaming_mode;" The word 'streaming' is the same as the actual option name, so personally I think it's fine. But if others also agreed that the name can be improved, I can change it. > > 3b. Re-designed the "char *streaming;" code to also use the single char > notation, then also call that member 'streaming_mode'. Then everything will > be > consistent. If we use single byte(char) here we would need to compare it with the standard streaming option value in libpqwalreceiver.c which was suggested not to do[1]. > 4. - max_parallel_apply_workers_per_subscription > + </para> > + <para> > + The parallel apply workers are taken from the pool defined by > + <varname>max_logical_replication_workers</varname>. > + </para> > + <para> > + The default value is 2. This parameter can only be set in the > + <filename>postgresql.conf</filename> file or on the server command > + line. > + </para> > + </listitem> > + </varlistentry> > > I felt that maybe this should also xref to the > doc/src/sgml/logical-replication.sgml section where you say about > "max_logical_replication_workers should be increased according to the > desired number of parallel apply workers." Not sure about this as we don't have similar thing in the document of max_logical_replication_workers and max_sync_workers_per_subscription. > ====== > > 7. src/backend/access/transam/xact.c - RecordTransactionAbort > > > + /* > + * Are we using the replication origins feature? Or, in other words, > + are > + * we replaying remote actions? > + */ > + replorigin = (replorigin_session_origin != InvalidRepOriginId && > + replorigin_session_origin != DoNotReplicateId); > > "Or, in other words," -> "In other words," I think it is better to keep consistent with the comments in function RecordTransactionCommit. > 10b. > IMO this flag might be better to be called 'parallel_apply_enabled' or something similar. > (see also review comment #55b.) Not sure about this. > 12. - parallel_apply_free_worker > > + SpinLockAcquire(&winfo->shared->mutex); > + slot_no = winfo->shared->logicalrep_worker_slot_no; > + generation = winfo->shared->logicalrep_worker_generation; > + SpinLockRelease(&winfo->shared->mutex); > > I know there are not many places doing this, but do you think it might be > worth introducing some new set/get function to encapsulate the set/get of the > >generation/slot so it does the mutex spin-locks in common code? Not sure about this. > 13. - LogicalParallelApplyLoop > > + /* > + * Init the ApplyMessageContext which we clean up after each > + replication > + * protocol message. > + */ > + ApplyMessageContext = AllocSetContextCreate(ApplyContext, > + "ApplyMessageContext", > + ALLOCSET_DEFAULT_SIZES); > > Because this is in the parallel apply worker should the name (e.g. the 2nd > param) be changed to "ParallelApplyMessageContext"? Not sure about this, because ApplyMessageContext is used in both worker.c and applyparallelworker.c. > + else if (is_subworker) > + snprintf(bgw.bgw_name, BGW_MAXLEN, > + "logical replication parallel apply worker for subscription %u", > + subid); > else > snprintf(bgw.bgw_name, BGW_MAXLEN, > "logical replication worker for subscription %u", subid); > > I think that *last* text now be changed like below: > > BEFORE > "logical replication worker for subscription %u" > AFTER > "logical replication apply worker for subscription %u" I am not sure if it's a good idea to change existing process description. > 36 - should_apply_changes_for_rel > should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { > if (am_tablesync_worker()) > return MyLogicalRepWorker->relid == rel->localreloid; > + else if (am_parallel_apply_worker()) > + { > + if (rel->state != SUBREL_STATE_READY) > + ereport(ERROR, > + (errmsg("logical replication apply workers for subscription \"%s\" > will restart", > + MySubscription->name), > + errdetail("Cannot handle streamed replication transaction using parallel " > + "apply workers until all tables are synchronized."))); > + > + return true; > + } > else > return (rel->state == SUBREL_STATE_READY || > (rel->state == SUBREL_STATE_SYNCDONE && @@ -427,43 +519,87 @@ end_replication_step(void) > > This function can be made tidier just by removing all the 'else' ... I feel the current style looks better. > 40. - apply_handle_stream_prepare > > + case TRANS_LEADER_SERIALIZE: > > - /* Mark the transaction as prepared. */ > - apply_handle_prepare_internal(&prepare_data); > + /* > + * The transaction has been serialized to file, so replay all the > + * spooled operations. > + */ > > Spurious blank line after the 'case'. Personally, I think this style is fine. > 48. - ApplyWorkerMain > > +/* Logical Replication Apply worker entry point */ void > +ApplyWorkerMain(Datum main_arg) > > "Apply worker" -> "apply worker" Since it's the existing comment, I feel we can leave this. > + /* > + * We don't currently need any ResourceOwner in a walreceiver process, > + but > + * if we did, we could call CreateAuxProcessResourceOwner here. > + */ > > I think this comment should have "XXX" prefix. I am not sure as this comment is just a reminder. > 50. > > + if (server_version >= 160000 && > + MySubscription->stream == SUBSTREAM_PARALLEL) > + { > + options.proto.logical.streaming = pstrdup("parallel"); > + MyLogicalRepWorker->parallel_apply = true; > + } > + else if (server_version >= 140000 && > + MySubscription->stream != SUBSTREAM_OFF) > + options.proto.logical.streaming = pstrdup("on"); else > + options.proto.logical.streaming = NULL; > > IMO it might make more sense for these conditions to be checking the > 'options.proto.logical.proto_version' here instead of checking the hardwired > server > versions. Also, I suggest may be better (for clarity) to always > assign the parallel_apply member. Currently, the proto_version is only checked at publisher, I am not sure if it's a good idea to check it here. > 52. - get_transaction_apply_action > > + /* > + * Check if we are processing this transaction using a parallel apply > + * worker and if so, send the changes to that worker. > + */ > + else if ((*winfo = parallel_apply_find_worker(xid))) { return > +TRANS_LEADER_SEND_TO_PARALLEL; } else { return > +TRANS_LEADER_SERIALIZE; } } > > 52a. > All these if/else and code blocks seem excessive. It can be simplified as follows: I feel this style is fine. > 52b. > Can a tablesync worker ever get here? It might be better to > Assert(!am_tablesync_worker()); at top of this function? Not sure if it's necessary or not. > 55b. > IMO this member name should be named slightly different to give a better feel > for what it really means. > > Maybe something like one of: > "parallel_apply_ok" > "parallel_apply_enabled" > "use_parallel_apply" > etc? I feel the current name is fine. But if others also feel the same, I can try to rename it. > 57. - am_leader_apply_worker > > +static inline bool > +am_leader_apply_worker(void) > +{ > + return (!OidIsValid(MyLogicalRepWorker->relid) && > +!isParallelApplyWorker(MyLogicalRepWorker)); > +} > > I wondered if it would be tidier/easier to define this function like below. > The others are inline functions anyhow so it should end up as the same > > thing, right? > > static inline bool > am_leader_apply_worker(void) > { > return (!am_tablesync_worker() && !am_parallel_apply_worker); } I feel the current style is fine. >--- fail - streaming must be boolean >+-- fail - streaming must be boolean or 'parallel' > CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = >false,streaming = foo); > >I think there are tests already for explicitly create/set the subscription >parameter streaming = on/off/parallel > >But what about when there is no value explicitly specified? Shouldn't there >also be tests like below to check that *implied* boolean true still works for >this enum? I didn't find similar tests for no value explicitly specified cases, so I didn't add this for now. Attach the new version patch set which addressed most of the comments. [1] https://www.postgresql.org/message-id/CAA4eK1LMVdS6uM7Tw7ANL0BetAd76TKkmAXNNQa0haTe2tax6g%40mail.gmail.com Best regards, Hou zj
Attachment
- v36-0002-Test-streaming-parallel-option-in-tap-test.patch
- v36-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v36-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v36-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v36-0001-Perform-streaming-logical-transactions-by-parall.patch
pgsql-hackers by date: