RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB5716B400CD81565E868616DB945F9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply
List pgsql-hackers
On Friday, September 30, 2022 4:27 PM Peter Smith <smithpb2250@gmail.com> wrote:
> 
> Here are my review comments for the v35-0001 patch:

Thanks for the comments.


> 3. GENERAL
> I found the mixed use of the same member names having different meanings to be quite confusing.
> 
> e.g.1
> PGOutputData 'streaming' is now a single char internal representation the subscription parameter streaming mode
('f','t','p')
> - bool streaming;
> + char streaming;
> 
> e.g.2
> WalRcvStreamOptions 'streaming' is a C string version of the subscription streaming mode ("on", "parallel")
> - bool streaming; /* Streaming of large transactions */
> + char    *streaming; /* Streaming of large transactions */
> 
> e.g.3
> SubOpts 'streaming' is again like the first example - a single char for the mode.
> - bool streaming;
> + char streaming;
> 
> 
> IMO everything would become much simpler if you did:
> 
> 3a.
> Rename "char streaming;" -> "char streaming_mode;"

The word 'streaming' is the same as the actual option name, so personally I think it's fine.
But if others also agreed that the name can be improved, I can change it.

> 
> 3b. Re-designed the "char *streaming;" code to also use the single char
> notation, then also call that member 'streaming_mode'. Then everything will
> be > consistent.

If we use single byte(char) here we would need to compare it with the standard
streaming option value in libpqwalreceiver.c which was suggested not to do[1].


> 4. - max_parallel_apply_workers_per_subscription
> +       </para>
> +       <para>
> +        The parallel apply workers are taken from the pool defined by
> +        <varname>max_logical_replication_workers</varname>.
> +       </para>
> +       <para>
> +        The default value is 2. This parameter can only be set in the
> +        <filename>postgresql.conf</filename> file or on the server command
> +        line.
> +       </para>
> +      </listitem>
> +     </varlistentry>
> 
> I felt that maybe this should also xref to the
> doc/src/sgml/logical-replication.sgml section where you say about
> "max_logical_replication_workers should be increased according to the
> desired number of parallel apply workers."

Not sure about this as we don't have similar thing in the document of
max_logical_replication_workers and max_sync_workers_per_subscription.


> ======
> 
> 7. src/backend/access/transam/xact.c - RecordTransactionAbort
> 
> 
> + /*
> + * Are we using the replication origins feature?  Or, in other words, 
> + are
> + * we replaying remote actions?
> + */
> + replorigin = (replorigin_session_origin != InvalidRepOriginId &&
> +   replorigin_session_origin != DoNotReplicateId);
> 
> "Or, in other words," -> "In other words,"

I think it is better to keep consistent with the comments in function
RecordTransactionCommit.


> 10b.
> IMO this flag might be better to be called 'parallel_apply_enabled' or something similar.
> (see also review comment #55b.)

Not sure about this.

> 12. - parallel_apply_free_worker
> 
> + SpinLockAcquire(&winfo->shared->mutex);
> + slot_no = winfo->shared->logicalrep_worker_slot_no;
> + generation = winfo->shared->logicalrep_worker_generation;
> + SpinLockRelease(&winfo->shared->mutex);
> 
> I know there are not many places doing this, but do you think it might be
> worth introducing some new set/get function to encapsulate the set/get of the
> >generation/slot so it does the mutex spin-locks in common code?

Not sure about this.

> 13. - LogicalParallelApplyLoop
> 
> + /*
> + * Init the ApplyMessageContext which we clean up after each 
> + replication
> + * protocol message.
> + */
> + ApplyMessageContext = AllocSetContextCreate(ApplyContext,
> + "ApplyMessageContext",
> + ALLOCSET_DEFAULT_SIZES);
> 
> Because this is in the parallel apply worker should the name (e.g. the 2nd
> param) be changed to "ParallelApplyMessageContext"?

Not sure about this, because ApplyMessageContext is used in both worker.c and
applyparallelworker.c.


> + else if (is_subworker)
> + snprintf(bgw.bgw_name, BGW_MAXLEN,
> + "logical replication parallel apply worker for subscription %u", 
> + subid);
>   else
>   snprintf(bgw.bgw_name, BGW_MAXLEN,
>   "logical replication worker for subscription %u", subid);
> 
> I think that *last* text now be changed like below:
> 
> BEFORE
> "logical replication worker for subscription %u"
> AFTER
> "logical replication apply worker for subscription %u"

I am not sure if it's a good idea to change existing process description.


> 36 - should_apply_changes_for_rel
>  should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)  {
>   if (am_tablesync_worker())
>   return MyLogicalRepWorker->relid == rel->localreloid;
> + else if (am_parallel_apply_worker())
> + {
> + if (rel->state != SUBREL_STATE_READY)
> + ereport(ERROR,
> + (errmsg("logical replication apply workers for subscription \"%s\"
> will restart",
> + MySubscription->name),
> + errdetail("Cannot handle streamed replication transaction using parallel "
> +    "apply workers until all tables are synchronized.")));
> +
> + return true;
> + }
>   else
>   return (rel->state == SUBREL_STATE_READY ||
>   (rel->state == SUBREL_STATE_SYNCDONE && @@ -427,43 +519,87 @@ end_replication_step(void)
> 
> This function can be made tidier just by removing all the 'else' ...

I feel the current style looks better.


> 40. - apply_handle_stream_prepare
> 
> + case TRANS_LEADER_SERIALIZE:
> 
> - /* Mark the transaction as prepared. */
> - apply_handle_prepare_internal(&prepare_data);
> + /*
> + * The transaction has been serialized to file, so replay all the
> + * spooled operations.
> + */
> 
> Spurious blank line after the 'case'.

Personally, I think this style is fine.


> 48. - ApplyWorkerMain
> 
> +/* Logical Replication Apply worker entry point */ void 
> +ApplyWorkerMain(Datum main_arg)
> 
> "Apply worker" -> "apply worker"

Since it's the existing comment, I feel we can leave this.


> + /*
> + * We don't currently need any ResourceOwner in a walreceiver process, 
> + but
> + * if we did, we could call CreateAuxProcessResourceOwner here.
> + */
> 
> I think this comment should have "XXX" prefix.

I am not sure as this comment is just a reminder.


> 50.
> 
> + if (server_version >= 160000 &&
> + MySubscription->stream == SUBSTREAM_PARALLEL)
> + {
> + options.proto.logical.streaming = pstrdup("parallel");
> + MyLogicalRepWorker->parallel_apply = true;
> + }
> + else if (server_version >= 140000 &&
> + MySubscription->stream != SUBSTREAM_OFF)
> + options.proto.logical.streaming = pstrdup("on"); else 
> + options.proto.logical.streaming = NULL;
> 
> IMO it might make more sense for these conditions to be checking the
> 'options.proto.logical.proto_version' here instead of checking the hardwired
> server > versions. Also, I suggest may be better (for clarity) to always
> assign the parallel_apply member.

Currently, the proto_version is only checked at publisher, I am not sure if
it's a good idea to check it here.

> 52. - get_transaction_apply_action
> 
> + /*
> + * Check if we are processing this transaction using a parallel apply
> + * worker and if so, send the changes to that worker.
> + */
> + else if ((*winfo = parallel_apply_find_worker(xid)))  {  return 
> +TRANS_LEADER_SEND_TO_PARALLEL;  }  else  {  return 
> +TRANS_LEADER_SERIALIZE;  } }
> 
> 52a.
> All these if/else and code blocks seem excessive. It can be simplified as follows:

I feel this style is fine.

> 52b.
> Can a tablesync worker ever get here? It might be better to
> Assert(!am_tablesync_worker()); at top of this function?

Not sure if it's necessary or not.


> 55b.
> IMO this member name should be named slightly different to give a better feel
> for what it really means.
> 
> Maybe something like one of:
> "parallel_apply_ok"
> "parallel_apply_enabled"
> "use_parallel_apply"
> etc?

I feel the current name is fine. But if others also feel the same, I can try to
rename it.

> 57. - am_leader_apply_worker
> 
> +static inline bool
> +am_leader_apply_worker(void)
> +{
> + return (!OidIsValid(MyLogicalRepWorker->relid) &&  
> +!isParallelApplyWorker(MyLogicalRepWorker));
> +}
> 
> I wondered if it would be tidier/easier to define this function like below.
> The others are inline functions anyhow so it should end up as the same >
> thing, right?
> 
> static inline bool
> am_leader_apply_worker(void)
> {
> return (!am_tablesync_worker() && !am_parallel_apply_worker); }

I feel the current style is fine.

>--- fail - streaming must be boolean
>+-- fail - streaming must be boolean or 'parallel'
> CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect =
>false,streaming = foo);
 
>
>I think there are tests already for explicitly create/set the subscription
>parameter streaming = on/off/parallel
>
>But what about when there is no value explicitly specified? Shouldn't there
>also be tests like below to check that *implied* boolean true still works for
>this enum?

I didn't find similar tests for no value explicitly specified cases, so I didn't add this
for now.

Attach the new version patch set which addressed most of the comments.

[1] https://www.postgresql.org/message-id/CAA4eK1LMVdS6uM7Tw7ANL0BetAd76TKkmAXNNQa0haTe2tax6g%40mail.gmail.com

Best regards,
Hou zj

Attachment

pgsql-hackers by date:

Previous
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: Etsuro Fujita
Date:
Subject: Re: Fast COPY FROM based on batch insert