Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | CAHut+PvFENKb5fcMko5HHtNEAaZyNwGhu3PASrcBt+HFoFL=Fw@mail.gmail.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
Here are my review comments for the v35-0001 patch: ====== 1. Commit message Currently, for large transactions, the publisher sends the data in multiple streams (changes divided into chunks depending upon logical_decoding_work_mem), and then on the subscriber-side, the apply worker writes the changes into temporary files and once it receives the commit, it reads from the file and applies the entire transaction. ~ There is a mix of plural and singular. "reads from the file" -> "reads from those files" ? ~~~ 2. This preserves commit ordering and avoids writing to and reading from file in most cases. We still need to spill if there is no worker available. 2a. "file" => "files" 2b. "in most cases. We still need to spill" -> "in most cases, although we still need to spill" ====== 3. GENERAL (this comment was written after I wrote all the other ones below so there might be some unintended overlaps...) I found the mixed use of the same member names having different meanings to be quite confusing. e.g.1 PGOutputData 'streaming' is now a single char internal representation the subscription parameter streaming mode ('f','t','p') - bool streaming; + char streaming; e.g.2 WalRcvStreamOptions 'streaming' is a C string version of the subscription streaming mode ("on", "parallel") - bool streaming; /* Streaming of large transactions */ + char *streaming; /* Streaming of large transactions */ e.g.3 SubOpts 'streaming' is again like the first example - a single char for the mode. - bool streaming; + char streaming; IMO everything would become much simpler if you did: 3a. Rename "char streaming;" -> "char streaming_mode;" 3b. Re-designed the "char *streaming;" code to also use the single char notation, then also call that member 'streaming_mode'. Then everything will be consistent. ====== doc/src/sgml/config.sgml 4. - max_parallel_apply_workers_per_subscription + <varlistentry id="guc-max-parallel-apply-workers-per-subscription" xreflabel="max_parallel_apply_workers_per_subscription"> + <term><varname>max_parallel_apply_workers_per_subscription</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_parallel_apply_workers_per_subscription</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Maximum number of parallel apply workers per subscription. This + parameter controls the amount of parallelism for streaming of + in-progress transactions with subscription parameter + <literal>streaming = parallel</literal>. + </para> + <para> + The parallel apply workers are taken from the pool defined by + <varname>max_logical_replication_workers</varname>. + </para> + <para> + The default value is 2. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + </listitem> + </varlistentry> I felt that maybe this should also xref to the doc/src/sgml/logical-replication.sgml section where you say about "max_logical_replication_workers should be increased according to the desired number of parallel apply workers." ===== 5. doc/src/sgml/protocol.sgml + <para> + Version <literal>4</literal> is supported only for server version 16 + and above, and it allows applying streams of large in-progress + transactions in parallel. + </para> SUGGESTION ... and it allows streams of large in-progress transactions to be applied in parallel. ====== 6. doc/src/sgml/ref/create_subscription.sgml + <para> + If set to <literal>parallel</literal>, incoming changes are directly + applied via one of the parallel apply workers, if available. If no + parallel worker is free to handle streaming transactions then the + changes are written to temporary files and applied after the + transaction is committed. Note that if an error happens when + applying changes in a parallel worker, the finish LSN of the + remote transaction might not be reported in the server log. </para> 6a. "parallel worker is free" -> "parallel apply worker is free" ~ 6b. "Note that if an error happens when applying changes in a parallel worker," --> "Note that if an error happens in a parallel apply worker," ====== 7. src/backend/access/transam/xact.c - RecordTransactionAbort + /* + * Are we using the replication origins feature? Or, in other words, are + * we replaying remote actions? + */ + replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId); "Or, in other words," -> "In other words," ====== src/backend/replication/logical/applyparallelworker.c 8. - file header comment + * Refer to the comments in file header of logical/worker.c to see more + * information about parallel apply worker. 8a. "in file header" -> "in the file header" ~ 8b. "about parallel apply worker." -> "about parallel apply workers." ~~~ 9. - parallel_apply_can_start +/* + * Returns true, if it is allowed to start a parallel apply worker, false, + * otherwise. + */ +static bool +parallel_apply_can_start(TransactionId xid) (The commas are strange) SUGGESTION Returns true if it is OK to start a parallel apply worker, false otherwise. or just SUGGESTION Returns true if it is OK to start a parallel apply worker. ~~~ 10. + /* + * Don't start a new parallel worker if not in parallel streaming mode or + * the publisher does not support parallel apply. + */ + if (!MyLogicalRepWorker->parallel_apply) + return false; 10a. SUGGESTION Don't start a new parallel apply worker if the subscription is not using parallel streaming mode, or if the publisher does not support parallel apply. ~ 10b. IMO this flag might be better to be called 'parallel_apply_enabled' or something similar. (see also review comment #55b.) ~~~ 11. - parallel_apply_start_worker + /* Try to start a new parallel apply worker. */ + if (winfo == NULL) + winfo = parallel_apply_setup_worker(); + + /* Failed to start a new parallel apply worker. */ + if (winfo == NULL) + return; IMO might be cleaner to write that code like below. And now the 2nd comment is not really adding anything so it can be removed too. SUGGESTION if (winfo == NULL) { /* Try to start a new parallel apply worker. */ winfo = parallel_apply_setup_worker(); if (winfo == NULL) return; } ~~~ 12. - parallel_apply_free_worker + SpinLockAcquire(&winfo->shared->mutex); + slot_no = winfo->shared->logicalrep_worker_slot_no; + generation = winfo->shared->logicalrep_worker_generation; + SpinLockRelease(&winfo->shared->mutex); I know there are not many places doing this, but do you think it might be worth introducing some new set/get function to encapsulate the set/get of the generation/slot so it does the mutex spin-locks in common code? ~~~ 13. - LogicalParallelApplyLoop + /* + * Init the ApplyMessageContext which we clean up after each replication + * protocol message. + */ + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); Because this is in the parallel apply worker should the name (e.g. the 2nd param) be changed to "ParallelApplyMessageContext"? ~~~ 14. + else if (shmq_res == SHM_MQ_DETACHED) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to the leader apply worker"))); + } + /* SHM_MQ_WOULD_BLOCK is purposefully ignored */ Instead of that comment sort of floating in space I wonder if this code would be better written as a switch, so then you can write this comment in the 'default' case. OR, maybe the "else if (shmq_res == SHM_MQ_DETACHED)" should be changed to SUGGESTION else if (shmq_res != SHM_MQ_WOULD_BLOCK) OR, just having an empty code block would be better than just a code comment all by itself. SUGGESTION else { /* SHM_MQ_WOULD_BLOCK is purposefully ignored */ } ~~~ 15. - ParallelApplyWorkerMain + /* + * Allocate the origin name in long-lived context for error context + * message. + */ + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); 15a. "in long-lived" -> "in a long-lived" ~ 15b. Please watch my other thread [1] where I am hoping to push a patch to will replace these snprintf's with a common function to do the same. If/when my patch is pushed then this code needs to be changed to call that new function. ~~~ 16. - HandleParallelApplyMessages + res = shm_mq_receive(winfo->error_mq_handle, &nbytes, + &data, true); Seems to have unnecessary wrapping. ~~~ 17. - parallel_apply_setup_dsm +/* + * Set up a dynamic shared memory segment. + * + * We set up a control region that contains a fixed worker info + * (ParallelApplyWorkerShared), a message queue, and an error queue. + * + * Returns true on success, false on failure. + */ +static bool +parallel_apply_setup_dsm(ParallelApplyWorkerInfo *winfo) "fixed worker info" -> "fixed size worker info" ? ~~~ 18. + * We need one key to register the location of the header, and we need two + * other keys to track the locations of the message queue and the error + * message queue. "and we need two other" -> "and two other" ~~~ 19. - parallel_apply_wait_for_xact_finish +void +parallel_apply_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) +{ + for (;;) + { + if (!parallel_apply_get_in_xact(winfo->shared)) + break; Should that condition have a comment? All the others do. ~~~ 20. - parallel_apply_savepoint_name The only callers that I could find are from parallel_apply_start_subtrans and parallel_apply_stream_abort so... 20a. Why is there an extern in worker_internal.h? ~ 20b. Why is this not declared static? ~~~ 21. The callers to parallel_apply_start_subtrans are both allocating a name buffer size like: char spname[MAXPGPATH]; Is that right? I thought that PG names were limited by NAMEDATALEN. ~~~ 22. - parallel_apply_replorigin_setup + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); Please watch my other thread [1] where I am hoping to push a patch to will replace these snprintf's with a common function to do the same. If/when my patch is pushed then this code needs to be changed to call that new function. ====== src/backend/replication/logical/launcher.c 23. - GUCs @@ -54,6 +54,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; +int max_parallel_apply_workers_per_subscription = 2; Please watch my other thread [2] where I am hoping to push a patch to clean up some of these GUV C variable declarations. It is not really recommended to assign default values to the C variable like this - they are kind of misleading because they will be overwritten by the GUC default value when the GUC mechanism starts up. ~~~ 24. - logicalrep_worker_launch + /* Sanity check: we don't support table sync in subworker. */ + Assert(!(is_subworker && OidIsValid(relid))); IMO "we don't support" makes it sound like this is something that maybe is intended for the future. In fact, I think just this combination is not possible so it is just a plain sanity check. I think might be better just say like below /* Sanity check - tablesync worker cannot be a subworker */ ~~~ 25. + worker->parallel_apply = is_subworker; It seems kind of strange to assign one boolean to about but they have completely different names. I wondered if 'is_subworker' should be called 'is_parallel_apply_worker'? ~~~ 26. if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u sync %u", subid, relid); + else if (is_subworker) + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication parallel apply worker for subscription %u", subid); else snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u", subid); I think that *last* text now be changed like below: BEFORE "logical replication worker for subscription %u" AFTER "logical replication apply worker for subscription %u" ~~~ 27. - logicalrep_worker_stop_internal +/* + * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach() and + * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die. + */ +static void +logicalrep_worker_stop_internal(LogicalRepWorker *worker) IMO it would be better to define this static function *before* all the callers of it. ~~~ 28. - logicalrep_worker_detach + /* Stop the parallel apply workers. */ + if (am_leader_apply_worker()) + { Should that comment rather say like below? /* If this is the leader apply worker then stop all of its parallel apply workers. */ ~~~ 29. - pg_stat_get_subscription + /* Skip if this is parallel apply worker */ + if (worker.apply_leader_pid != InvalidPid) + continue; 29a. "is parallel apply" -> "is a parallel apply" ~ 29b. IMO this condition should be using your macro isParallelApplyWorker(worker). ====== 30. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort + * + * If read_abort_info is true, try to read the abort_lsn and abort_time fields, + * otherwise don't. */ void -logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, - TransactionId *subxid) +logicalrep_read_stream_abort(StringInfo in, + LogicalRepStreamAbortData *abort_data, + bool read_abort_info) "try to read" -> "read" ====== 31. src/backend/replication/logical/tablesync.c - process_syncing_tables process_syncing_tables(XLogRecPtr current_lsn) { + if (am_parallel_apply_worker()) + return; + Maybe should have some comment here like: /* Skip for parallel apply workers. */ ====== src/backend/replication/logical/worker.c 32. - file header comment + * the list for any available worker. Note that we maintain a maximum of half + * the max_parallel_apply_workers_per_subscription workers in the pool and + * after that, we simply exit the worker after applying the transaction. This + * worker pool threshold is a bit arbitrary and we can provide a guc for this + * in the future if required. IMO that sentence beginning with "This worker pool" should be written as an XXX-style comment. Also "guc" -> "GUC variable" e.g. * the list for any available worker. Note that we maintain a maximum of half * the max_parallel_apply_workers_per_subscription workers in the pool and * after that, we simply exit the worker after applying the transaction. * * XXX This worker pool threshold is a bit arbitrary and we can provide a GUC * variable for this in the future if required. ~~~ 33. * we cannot count how many workers will be started. It may be possible to * allocate enough shared memory in one segment based on the maximum number of * parallel apply workers (max_parallel_apply_workers_per_subscription), but this * may waste some memory if no process is actually started. "may waste some memory" -> "would waste memory" ~~~ 34. + * In case, no worker is available to handle the streamed transaction, we + * follow approach 2. SUGGESTION If no parallel apply worker is available to handle the streamed transaction we follow approach 2. ~~~ 35. - TransApplyAction + * TRANS_LEADER_SERIALIZE means that we are in leader apply worker and changes + * are written to temporary files and then applied when the final commit + * arrives. "in leader apply" -> "in the leader apply" ~~~ 36 - should_apply_changes_for_rel should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { if (am_tablesync_worker()) return MyLogicalRepWorker->relid == rel->localreloid; + else if (am_parallel_apply_worker()) + { + if (rel->state != SUBREL_STATE_READY) + ereport(ERROR, + (errmsg("logical replication apply workers for subscription \"%s\" will restart", + MySubscription->name), + errdetail("Cannot handle streamed replication transaction using parallel " + "apply workers until all tables are synchronized."))); + + return true; + } else return (rel->state == SUBREL_STATE_READY || (rel->state == SUBREL_STATE_SYNCDONE && @@ -427,43 +519,87 @@ end_replication_step(void) This function can be made tidier just by removing all the 'else' ... SUGGESTION if (am_tablesync_worker()) return ... if (am_parallel_apply_worker()) { ... return true; } Assert(am_leader_apply_worker()); return ... ~~~ 37. - handle_streamed_transaction + /* + * XXX The publisher side doesn't always send relation/type update + * messages after the streaming transaction, so also update the + * relation/type in leader apply worker here. See function + * cleanup_rel_sync_cache. + */ + if (action == LOGICAL_REP_MSG_RELATION || + action == LOGICAL_REP_MSG_TYPE) + return false; + return true; 37. "so also update the relation/type in leader apply worker here" Is that comment worded correctly? There is nothing being updated "here". ~ 37. That code is the same as: return (action != LOGICAL_REP_MSG_RELATION && action != LOGICAL_REP_MSG_TYPE); ~~~ 38. - apply_handle_commit_prepared + * + * Note that we don't need to wait here if the transaction was prepared in a + * parallel apply worker. Because we have already waited for the prepare to + * finish in apply_handle_stream_prepare() which will ensure all the operations + * in that transaction have happened in the subscriber and no concurrent + * transaction can create deadlock or transaction dependency issues. */ static void apply_handle_commit_prepared(StringInfo s) "worker. Because" -> "worker because" ~~~ 39. - apply_handle_rollback_prepared + * + * Note that we don't need to wait here if the transaction was prepared in a + * parallel apply worker. Because we have already waited for the prepare to + * finish in apply_handle_stream_prepare() which will ensure all the operations + * in that transaction have happened in the subscriber and no concurrent + * transaction can create deadlock or transaction dependency issues. */ static void apply_handle_rollback_prepared(StringInfo s) See previous review comment #38 above. ~~~ 40. - apply_handle_stream_prepare + case TRANS_LEADER_SERIALIZE: - /* Mark the transaction as prepared. */ - apply_handle_prepare_internal(&prepare_data); + /* + * The transaction has been serialized to file, so replay all the + * spooled operations. + */ Spurious blank line after the 'case'. FYI - this same blank line is also in all the other switch/case that looked like this one, so if you will fix it then please check all those other places too... ~~~ 41. - apply_handle_stream_start + * + * XXX We can avoid sending pair of the START/STOP messages to the parallel + * worker because unlike apply worker it will process only one + * transaction-at-a-time. However, it is not clear whether that is worth the + * effort because it is sent after logical_decoding_work_mem changes. */ static void apply_handle_stream_start(StringInfo s) "sending pair" -> "sending pairs" ~~~ 42. - /* notify handle methods we're processing a remote transaction */ + /* Notify handle methods we're processing a remote transaction. */ in_streamed_transaction = true; Changing this comment seemed unrelated to this patch, so maybe don't do this. ~~~ 43. /* - * Initialize the worker's stream_fileset if we haven't yet. This will be - * used for the entire duration of the worker so create it in a permanent - * context. We create this on the very first streaming message from any - * transaction and then use it for this and other streaming transactions. - * Now, we could create a fileset at the start of the worker as well but - * then we won't be sure that it will ever be used. + * For the first stream start, check if there is any free parallel apply + * worker we can use to process this transaction. */ - if (MyLogicalRepWorker->stream_fileset == NULL) + if (first_segment) + parallel_apply_start_worker(stream_xid); This comment update seems misleading. The parallel_apply_start_worker() isn't just checking if there is a free worker. All that free worker logic stuff is *inside* the parallel_apply_start_worker() function, so maybe no need to mention about it here at the caller. ~~~ 44. + case TRANS_PARALLEL_APPLY: + break; Should this include a comment explaining why there is nothing to do? ~~~ 39. - apply_handle_stream_abort + /* We receive abort information only when we can apply in parallel. */ + if (MyLogicalRepWorker->parallel_apply) + read_abort_info = true; 44a. SUGGESTION We receive abort information only when the publisher can support parallel apply. ~ 44b. Why not remove the assignment in the declaration, and just write this code as: read_abort_info = MyLogicalRepWorker->parallel_apply; ~~~ 45. + /* + * We are in leader apply worker and the transaction has been + * serialized to file. + */ + serialize_stream_abort(xid, subxid); "in leader apply worker" -> "in the leader apply worker" ~~~ 46. - store_flush_position /* Skip if not the leader apply worker */ if (am_parallel_apply_worker()) return; I previously wrote something about this and Hou-san gave a reason [3] why not to change the condition. But the comment still does not match the code, because a tablesync worker would get past here. Maybe the comment is wrong? ~~~ 47. - InitializeApplyWorker +/* + * The common initialization for leader apply worker and parallel apply worker. + * + * Initialize the database connection, in-memory subscription and necessary + * config options. + */ void -ApplyWorkerMain(Datum main_arg) +InitializeApplyWorker(void) "The common initialization" -> "Common initialization" ~~~ 48. - ApplyWorkerMain +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) "Apply worker" -> "apply worker" ~~~ 49. + /* + * We don't currently need any ResourceOwner in a walreceiver process, but + * if we did, we could call CreateAuxProcessResourceOwner here. + */ I think this comment should have "XXX" prefix. ~~~ 50. + if (server_version >= 160000 && + MySubscription->stream == SUBSTREAM_PARALLEL) + { + options.proto.logical.streaming = pstrdup("parallel"); + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != SUBSTREAM_OFF) + options.proto.logical.streaming = pstrdup("on"); + else + options.proto.logical.streaming = NULL; IMO it might make more sense for these conditions to be checking the 'options.proto.logical.proto_version' here instead of checking the hardwired server versions. Also, I suggest may be better (for clarity) to always assign the parallel_apply member. SUGGESTION if (options.proto.logical.proto_version >= LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM && MySubscription->stream == SUBSTREAM_PARALLEL) { options.proto.logical.streaming = pstrdup("parallel"); MyLogicalRepWorker->parallel_apply = true; } else if (options.proto.logical.proto_version >= LOGICALREP_PROTO_STREAM_VERSION_NUM && MySubscription->stream != SUBSTREAM_OFF) { options.proto.logical.streaming = pstrdup("on"); MyLogicalRepWorker->parallel_apply = false; } else { options.proto.logical.streaming = NULL; MyLogicalRepWorker->parallel_apply = false; } ~~~ 51. - clear_subscription_skip_lsn - if (likely(XLogRecPtrIsInvalid(myskiplsn))) + if (likely(XLogRecPtrIsInvalid(myskiplsn)) || + am_parallel_apply_worker()) return; Unnecessary wrapping. ~~~ 52. - get_transaction_apply_action +static TransApplyAction +get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) +{ + *winfo = NULL; + + if (am_parallel_apply_worker()) + { + return TRANS_PARALLEL_APPLY; + } + else if (in_remote_transaction) + { + return TRANS_LEADER_APPLY; + } + + /* + * Check if we are processing this transaction using a parallel apply + * worker and if so, send the changes to that worker. + */ + else if ((*winfo = parallel_apply_find_worker(xid))) + { + return TRANS_LEADER_SEND_TO_PARALLEL; + } + else + { + return TRANS_LEADER_SERIALIZE; + } +} 52a. All these if/else and code blocks seem excessive. It can be simplified as follows: SUGGESTION static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) { *winfo = NULL; if (am_parallel_apply_worker()) return TRANS_PARALLEL_APPLY; if (in_remote_transaction) return TRANS_LEADER_APPLY; /* * Check if we are processing this transaction using a parallel apply * worker and if so, send the changes to that worker. */ if ((*winfo = parallel_apply_find_worker(xid))) return TRANS_LEADER_SEND_TO_PARALLEL; return TRANS_LEADER_SERIALIZE; } ~ 52b. Can a tablesync worker ever get here? It might be better to Assert(!am_tablesync_worker()); at top of this function? ====== src/backend/replication/pgoutput/pgoutput.c 53. - pgoutput_startup ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("requested proto_version=%d does not support streaming, need %d or higher", data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM))); + else if (data->streaming == SUBSTREAM_PARALLEL && + data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support parallel streaming mode, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM))); The previous error message just says "streamimg", not "streaming mode" so for consistency better to remove that word "mode" IMO. ~~~ 54. - pgoutput_stream_abort - logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn, txn->xact_time.abort_time, write_abort_info); + Wrapping is needed here. ====== src/include/replication/worker_internal.h 55. - LogicalRepWorker + /* Indicates whether apply can be performed parallelly. */ + bool parallel_apply; + 55a. "parallelly" - ?? is there a better way to phrase this? IMO that is an uncommon word. ~ 55b. IMO this member name should be named slightly different to give a better feel for what it really means. Maybe something like one of: "parallel_apply_ok" "parallel_apply_enabled" "use_parallel_apply" etc? ~~~ 56. - ParallelApplyWorkerInfo + /* + * Indicates whether the worker is available to be used for parallel apply + * transaction? + */ + bool in_use; As previously posted [4], this member comment is describing the opposite of the member name. (e.g. the comment would be correct if the member was called 'is_available', but it isn't) SUGGESTION True if the worker is being used to process a parallel apply transaction. False indicates this worker is available for re-use. ~~~ 57. - am_leader_apply_worker +static inline bool +am_leader_apply_worker(void) +{ + return (!OidIsValid(MyLogicalRepWorker->relid) && + !isParallelApplyWorker(MyLogicalRepWorker)); +} I wondered if it would be tidier/easier to define this function like below. The others are inline functions anyhow so it should end up as the same thing, right? static inline bool am_leader_apply_worker(void) { return (!am_tablesync_worker() && !am_parallel_apply_worker); } ====== 58. --- fail - streaming must be boolean +-- fail - streaming must be boolean or 'parallel' CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo); I think there are tests already for explicitly create/set the subscription parameter streaming = on/off/parallel But what about when there is no value explicitly specified? Shouldn't there also be tests like below to check that *implied* boolean true still works for this enum? CREATE SUBSCRIPTION ... WITH (streaming) ALTER SUBSCRIPTION ... SET (streaming) ------ [1] My patch snprintfs - https://www.postgresql.org/message-id/flat/CAHut%2BPsB9hEEU-JHqTUBL3bv--vesUvThYr1-95ZyG5PkF9PQQ%40mail.gmail.com#17abe65e826f48d3d5a1cf5b83ce5271 [2] My patch GUC C vars - https://www.postgresql.org/message-id/flat/CAHut%2BPsWxJgmrAvPsw9smFVAvAoyWstO7ttAkAq8NKDhsVNa3Q%40mail.gmail.com#1526a180383a3374ae4d701f25799926 [3] Houz reply comment #41 - https://www.postgresql.org/message-id/OS0PR01MB5716E7E5798625AE9437CD6F94439%40OS0PR01MB5716.jpnprd01.prod.outlook.com [4] Previous review comment #13 - https://www.postgresql.org/message-id/CAHut%2BPuVjRgGr4saN7qwq0oB8DANHVR7UfDiciB1Q3cYN54F6A%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: