RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From houzj.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS0PR01MB57162232BF51A09F4BD13C7594249@OS0PR01MB5716.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Wed, Oct 12, 2022 at 18:11 PM Peter Smith <smithpb2250@gmail.com> wrote:
> Here are some review comments for v36-0001.

Thanks for your comments.

> ======
> 
> 1. GENERAL
> 
> Houzj wrote ([1] #3a):
> The word 'streaming' is the same as the actual option name, so 
> personally I think it's fine. But if others also agreed that the name 
> can be improved, I can change it.
> 
> ~
> 
> Sure, I was not really complaining that the name is "wrong". Only I 
> did not think it was a good idea to have multiple struct members 
> called 'streaming' when they don't have the same meaning. e.g. one is 
> the internal character mode equivalent of the parameter, and one is 
> the parameter value as a string. That's why I thought they should be 
> different names. e.g. Make the 2nd one 'streaming_valstr' or 
> something.

Changed.

> ======
> 
> 2. doc/src/sgml/config.sgml
> 
> Previously I suggested there should be xrefsto the "Configuration 
> Settings" page but Houzj wrote ([1] #4):
> Not sure about this as we don't have similar thing in the document of 
> max_logical_replication_workers and max_sync_workers_per_subscription.
> 
> ~
> 
> Fair enough, but IMO perhaps all those others should also xref to the 
> "Configuration Settings" chapter. So if such a change does not belong 
> in this patch, then how about if I make another independent thread to 
> post this suggestion?

Sure, I feel it would be better to do it in a separate thread.

> ======
> 
> .../replication/logical/applyparallelworker.c
> 
> 
> 3. parallel_apply_find_worker
> 
> +parallel_apply_find_worker(TransactionId xid) {  bool found;  
> +ParallelApplyWorkerEntry *entry = NULL;
> +
> + if (!TransactionIdIsValid(xid))
> + return NULL;
> +
> + if (ParallelApplyWorkersHash == NULL) return NULL;
> +
> + /* Return the cached parallel apply worker if valid. */ if 
> + (stream_apply_worker != NULL) return stream_apply_worker;
> +
> + /*
> + * Find entry for requested transaction.
> + */
> + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, 
> + &found);
> 
> In function parallel_apply_start_worker() you removed the entry 
> assignment to NULL because it is never needed. Can do the same here 
> too.

Changed.

> 4. parallel_apply_free_worker
> 
> +/*
> + * Remove the parallel apply worker entry from the hash table. And 
> +stop the
> + * worker if there are enough workers in the pool. For more 
> +information about
> + * the worker pool, see comments atop worker.c.
> + */
> +void
> +parallel_apply_free_worker(ParallelApplyWorkerInfo *winfo, 
> +TransactionId
> xid)
> 
> "And stop" -> "Stop"

Changed.

> 5. parallel_apply_free_worker
> 
> + * Although some error messages may be lost in rare scenarios, but
> + * since the parallel apply worker has finished processing the
> + * transaction, and error messages may be lost even if we detach the
> + * error queue after terminating the process. So it should be ok.
> + */
> 
> SUGGESTION (minor rewording)
> Some error messages may be lost in rare scenarios, but it should be OK 
> because the parallel apply worker has finished processing the 
> transaction, and error messages may be lost even if we detached the 
> error queue after terminating the process.

Changed.


> ~~~
> 
> 7. LogicalParallelApplyLoop
> 
> Previous I suggested maybe the name (e.g. the 2nd param) should be 
> changed to "ParallelApplyMessageContext"? Houzj wrote ([1] #13): Not 
> sure about this, because ApplyMessageContext is used in both worker.c 
> and applyparallelworker.c.
> 
> ~
> 
> But I thought those are completely independent ApplyMessageContext's 
> in different processes that happen to have the same name. Shouldn't 
> they have a name appropriate to who owns them?

ApplyMessageContext is used by the begin_replication_step() function which will
be invoked in both leader and parallel apply worker. So, we need to name the
memory context the same as ApplyMessageContext, otherwise we would need to
modify the logic of begin_replication_step() to use another memory context if
in parallel apply worker.


> ~~~
> 
> 8. ParallelApplyWorkerMain
> 
> + /*
> + * Allocate the origin name in a long-lived context for error context
> + * message.
> + */
> + snprintf(originname, sizeof(originname), "pg_%u", 
> + MySubscription->oid);
> 
> Now that ReplicationOriginNameForLogicalRep patch is pushed [2] please 
> make use of this common function.

Changed.

> ~~~
> 
> 9. HandleParallelApplyMessage
> 
> + case 'X': /* Terminate, indicating clean exit */ { 
> + shm_mq_detach(winfo->error_mq_handle);
> + winfo->error_mq_handle = NULL;
> + break;
> + }
> +
> + /*
> + * Don't need to do anything about NoticeResponse and
> + * NotifyResponse as the logical replication worker doesn't need
> + * to send messages to the client.
> + */
> + case 'N':
> + case 'A':
> + break;
> + default:
> + {
> + elog(ERROR, "unrecognized message type received from parallel apply
> worker: %c (message length %d bytes)",
> + msgtype, msg->len);
> + }
> 
> 9a. case 'X':
> There are no variable declarations here so the statement block {} is 
> not needed
> 
> ~
> 
> 9b. default:
> There are no variable declarations here so the statement block {} is 
> not needed

Changed.

> ~~~
> 
> 10. parallel_apply_stream_abort
> 
> + int i;
> + bool found = false;
> + char spname[MAXPGPATH];
> +
> + parallel_apply_savepoint_name(MySubscription->oid, subxid, spname,
> +   sizeof(spname));
> 
> I posted about using NAMEDATALEN in a previous review ([3] #21) but I 
> think only one place was fixed and this one was missed.

Changed.

> ======
> 
> src/backend/replication/logical/launcher.c
> 
> 12. logicalrep_worker_launch
> 
> Previously I suggested may the apply process name should change
> 
> FROM
> "logical replication worker for subscription %u"
> TO
> "logical replication apply worker for subscription %u"
> 
> and Houz wrote ([1] #13)
> I am not sure if it's a good idea to change existing process description.
> 
> ~
> 
> But that seems inconsistent to me because elsewhere this patch is 
> already exposing the name to the user (like when it says "logical 
> replication apply worker for subscription \"%s\" has started".
> Shouldn’t the process name match these logs?

Changed.

> ======
> 
> src/backend/replication/logical/worker.c
> 
> 13. apply_handle_stream_start
> 
> + *
> + * XXX We can avoid sending pairs of the START messages to the 
> + parallel
> worker
> + * because unlike apply worker it will process only one transaction-at-a-time.
> + * However, it is not clear whether that is worth the effort because 
> + it is sent
> + * after logical_decoding_work_mem changes.
>   */
>  static void
>  apply_handle_stream_start(StringInfo s)
> 
> 13a.
> "transaction-at-a-time." -> "transaction at a time."
> 
> ~
> 
> 13b.
> I was not sure what does that last sentence mean? Does it mean something like:
> "However, it is not clear whether doing this is worth the effort 
> because pairs of START messages occur only after 
> logical_decoding_work_mem changes."

=>13a.
Changed.

> ~~~
> 
> 14. apply_handle_stream_start
> 
> + ParallelApplyWorkerInfo *winfo = NULL;
> 
> The declaration *winfo assignment to NULL is not needed because 
> get_transaction_apply_action will always do this anyway.

Changed.

> ~~~
> 
> 15. apply_handle_stream_start
> 
> +
> + case TRANS_PARALLEL_APPLY:
> + break;
> 
> I had previously suggested this include a comment explaining why there 
> is nothing to do ([3] #44), but I think there was no reply.

The parallel apply worker doesn't need special handling for STREAM START,
it only needs to run some common code path that is shared by leader.
I added a small comment about this.

> ~~~
> 
> 20. ApplyWorkerMain
> 
> + /*
> + * We don't currently need any ResourceOwner in a walreceiver 
> + process, but
> + * if we did, we could call CreateAuxProcessResourceOwner here.
> + */
> 
> Previously I suggested prefixing this as "XXX" and Houzj replied ([1] #48):
> I am not sure as this comment is just a reminder.
> 
> ~
> 
> OK, then maybe since it is a reminder "Note" then it should be changed:
> "We don't currently..." -> "Note: We don't currently..."

I feel it's fine to leave the comment as that's the existing comment
in ApplyWorkerMain().

> ~~~
> 
> 21. ApplyWorkerMain
> 
> + if (server_version >= 160000 &&
> + MySubscription->stream == SUBSTREAM_PARALLEL)
> + {
> + options.proto.logical.streaming = pstrdup("parallel");
> + MyLogicalRepWorker->parallel_apply = true;
> + }
> + else if (server_version >= 140000 &&
> + MySubscription->stream != SUBSTREAM_OFF)
> + {
> + options.proto.logical.streaming = pstrdup("on");
> + MyLogicalRepWorker->parallel_apply = false;
> + }
> + else
> + {
> + options.proto.logical.streaming = NULL;
> + MyLogicalRepWorker->parallel_apply = false;
> + }
> 
> I think the block of if/else is only for assigning the 
> streaming/parallel members so should have some comment to say that:
> 
> SUGGESTION
> Assign the appropriate streaming flag according to the 'streaming'
> mode and the publisher's ability to support that mode.

Added the comments as suggested.

> ~~~
> 
> 22. get_transaction_apply_action
> 
> +static TransApplyAction
> +get_transaction_apply_action(TransactionId xid,
> ParallelApplyWorkerInfo **winfo)
> +{
> + *winfo = NULL;
> +
> + if (am_parallel_apply_worker())
> + {
> + return TRANS_PARALLEL_APPLY;
> + }
> + else if (in_remote_transaction)
> + {
> + return TRANS_LEADER_APPLY;
> + }
> +
> + /*
> + * Check if we are processing this transaction using a parallel apply
> + * worker and if so, send the changes to that worker.
> + */
> + else if ((*winfo = parallel_apply_find_worker(xid)))  {  return 
> +TRANS_LEADER_SEND_TO_PARALLEL;  }  else  {  return 
> +TRANS_LEADER_SERIALIZE;  } }
> 
> 22b.
> Also previously I had suggested
> 
> > Can a tablesync worker ever get here? It might be better to 
> > Assert(!am_tablesync_worker()); at top of this function?
> 
> and Houzj ([1] #52b) replied:
> Not sure if it's necessary or not.
> 
> ~
> 
> OTOH you could say no Assert is ever really necessary, but IMO adding 
> one here would at least be a sanity check and help to document the 
> function better.

get_transaction_apply_action might also be invoked in table sync worker in some
rare cases when some streaming transaction comes while doing the table sync.
And the function works fine in that case, so I don't think we should add the
Assert() here.

Best regards,
Hou zj


pgsql-hackers by date:

Previous
From: Alvaro Herrera
Date:
Subject: Re: Fix error message for MERGE foreign tables
Next
From: Bharath Rupireddy
Date:
Subject: Re: Improve description of XLOG_RUNNING_XACTS