RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From wangw.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OS3PR01MB6275D0A65E4E85AC4F829E029E9F9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
List pgsql-hackers
On Wed, Jul 27, 2022 at 16:03 PM Peter Smith <smithpb2250@gmail.com> wrote:
> Here are some review comments for the patch v19-0004:

Thanks for your kindly review and comments.
To avoid making this thread too long, I will reply to all of your comments
(0001-patch ~ 0004-patch) in this email.
In addition, in order not to confuse the replies, I added the following serial
number above your comments on 0004-patch:
```
4.2 && 4.3
4.4
4.5
```

> 1.6 src/backend/replication/logical/applybgworker.c - LogicalApplyBgwLoop
> 
> +/* Apply Background Worker main loop */
> +static void
> +LogicalApplyBgwLoop(shm_mq_handle *mqh, volatile ApplyBgworkerShared
> *shared)
> 
> 'shared' seems a very vague param name. Maybe can be 'bgw_shared' or
> 'parallel_shared' or something better?
> 
> ~~~
> 
> 1.7 src/backend/replication/logical/applybgworker.c - ApplyBgworkerMain
> 
> +/*
> + * Apply Background Worker entry point
> + */
> +void
> +ApplyBgworkerMain(Datum main_arg)
> +{
> + volatile ApplyBgworkerShared *shared;
> 
> 'shared' seems a very vague var name. Maybe can be 'bgw_shared' or
> 'parallel_shared' or something better?
> 
> ~~~
> 
> 1.8 src/backend/replication/logical/applybgworker.c -
> apply_bgworker_setup_dsm
> 
> +static void
> +apply_bgworker_setup_dsm(ApplyBgworkerState *wstate)
> +{
> + shm_toc_estimator e;
> + Size segsize;
> + dsm_segment *seg;
> + shm_toc    *toc;
> + ApplyBgworkerShared *shared;
> + shm_mq    *mq;
> 
> 'shared' seems a very vague var name. Maybe can be 'bgw_shared' or
> 'parallel_shared' or something better?
> 
> ~~~

Not sure about this.

> 3.3 .../replication/logical/applybgworker.c
> 
> @@ -800,3 +800,47 @@ apply_bgworker_subxact_info_add(TransactionId
> current_xid)
>   MemoryContextSwitchTo(oldctx);
>   }
>  }
> +
> +/*
> + * Check if changes on this relation can be applied by an apply background
> + * worker.
> + *
> + * Although the commit order is maintained only allowing one process to
> commit
> + * at a time, the access order to the relation has changed. This could cause
> + * unexpected problems if the unique column on the replicated table is
> + * inconsistent with the publisher-side or contains non-immutable functions
> + * when applying transactions in the apply background worker.
> + */
> +void
> +apply_bgworker_relation_check(LogicalRepRelMapEntry *rel)
> 
> "only allowing" -> "by only allowing" (I think you mean this, right?)

Since I'm not a native English speaker, I'm not quite sure which of the two
descriptions you suggested is better. See #3.4 in [1]. Now I overwrite your
last suggestion with your suggestion this time.

> 3.4
> 
> + /*
> + * Return if changes on this relation can be applied by an apply background
> + * worker.
> + */
> + if (rel->parallel_apply == PARALLEL_APPLY_SAFE)
> + return;
> +
> + /* We are in error mode and should give user correct error. */
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("cannot replicate target relation \"%s.%s\" using "
> + "subscription parameter streaming=parallel",
> + rel->remoterel.nspname, rel->remoterel.relname),
> + errdetail("The unique column on subscriber is not the unique "
> +    "column on publisher or there is at least one "
> +    "non-immutable function."),
> + errhint("Please change to use subscription parameter "
> + "streaming=on.")));
> 
> 3.4a.
> Of course, the code should give the user the "correct error" if there
> is an error (!), but having a comment explicitly saying so does not
> serve any purpose.
> 
> 3.4b.
> The logic might be simplified if it was written differently like:
> 
> + if (rel->parallel_apply != PARALLEL_APPLY_SAFE)
> + ereport(ERROR, ...

Just to keep the style consistent with the function
apply_bgworker_relation_check.

> 3.8
> 
> + /* Initialize the flag. */
> + entry->parallel_apply = PARALLEL_APPLY_SAFE;
> 
> I previously suggested [1] (#3.6b) to move this. Consider, that you
> cannot logically flag the entry as "safe" until you are certain that
> it is safe. And you cannot be sure of that until you've passed all the
> checks this function is doing. Therefore IMO the assignment to
> PARALLEL_APPLY_SAFE should be the last line of the function.

Not sure about this.

> 3.11 src/backend/utils/cache/typcache.c - GetDomainConstraints
> 
> @@ -2540,6 +2540,23 @@ compare_values_of_enum(TypeCacheEntry *tcache,
> Oid arg1, Oid arg2)
>   return 0;
>  }
> 
> +/*
> + * GetDomainConstraints --- get DomainConstraintState list of
> specified domain type
> + */
> +List *
> +GetDomainConstraints(Oid type_id)
> +{
> + TypeCacheEntry *typentry;
> + List    *constraints = NIL;
> +
> + typentry = lookup_type_cache(type_id,
> TYPECACHE_DOMAIN_CONSTR_INFO);
> +
> + if(typentry->domainData != NULL)
> + constraints = typentry->domainData->constraints;
> +
> + return constraints;
> +}
> 
> This function can be simplified (if you want). e.g.
> 
> List *
> GetDomainConstraints(Oid type_id)
> {
> TypeCacheEntry *typentry;
> 
> typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO);
> 
> return typentry->domainData ? typentry->domainData->constraints : NIL;
> }

I just think the former one looks clearer.

4.2 && 4.3
> 2. src/backend/replication/logical/worker.c - start_table_sync
> 
> @@ -3902,20 +3925,28 @@ start_table_sync(XLogRecPtr *origin_startpos,
> char **myslotname)
>   }
>   PG_CATCH();
>   {
> + /*
> + * Emit the error message, and recover from the error state to an idle
> + * state
> + */
> + HOLD_INTERRUPTS();
> +
> + EmitErrorReport();
> + AbortOutOfAnyTransaction();
> + FlushErrorState();
> +
> + RESUME_INTERRUPTS();
> +
> + /* Report the worker failed during table synchronization */
> + pgstat_report_subscription_error(MySubscription->oid, false);
> +
> + /* Set the retry flag. */
> + set_subscription_retry(true);
> +
>   if (MySubscription->disableonerr)
>   DisableSubscriptionAndExit();
> - else
> - {
> - /*
> - * Report the worker failed during table synchronization. Abort
> - * the current transaction so that the stats message is sent in an
> - * idle state.
> - */
> - AbortOutOfAnyTransaction();
> - pgstat_report_subscription_error(MySubscription->oid, false);
> 
> - PG_RE_THROW();
> - }
> + proc_exit(0);
>   }
> 
> But is it correct to set the 'retry' flag even if the
> MySubscription->disableonerr is true? Won’t that mean even after the
> user corrects the problem and then re-enabled the subscription it
> still won't let the streaming=parallel work, because that retry flag
> is set?
> 
> Also, Something seems wrong to me here - IIUC the patch changed this
> code because of the potential risk of an error within the
> set_subscription_retry function, but now if such an error happens the
> current code will bypass even getting to DisableSubscriptionAndExit,
> so the subscription won't have a chance to get disabled as the user
> might have wanted.
> 3. src/backend/replication/logical/worker.c - start_apply
> 
> @@ -3940,20 +3971,27 @@ start_apply(XLogRecPtr origin_startpos)
>   }
>   PG_CATCH();
>   {
> + /*
> + * Emit the error message, and recover from the error state to an idle
> + * state
> + */
> + HOLD_INTERRUPTS();
> +
> + EmitErrorReport();
> + AbortOutOfAnyTransaction();
> + FlushErrorState();
> +
> + RESUME_INTERRUPTS();
> +
> + /* Report the worker failed while applying changes */
> + pgstat_report_subscription_error(MySubscription->oid,
> + !am_tablesync_worker());
> +
> + /* Set the retry flag. */
> + set_subscription_retry(true);
> +
>   if (MySubscription->disableonerr)
>   DisableSubscriptionAndExit();
> - else
> - {
> - /*
> - * Report the worker failed while applying changes. Abort the
> - * current transaction so that the stats message is sent in an
> - * idle state.
> - */
> - AbortOutOfAnyTransaction();
> - pgstat_report_subscription_error(MySubscription-
> >oid, !am_tablesync_worker());
> -
> - PG_RE_THROW();
> - }
>   }
> 
> (Same as previous review comment #2)
> 
> But is it correct to set the 'retry' flag even if the
> MySubscription->disableonerr is true? Won’t that mean even after the
> user corrects the problem and then re-enabled the subscription it
> still won't let the streaming=parallel work, because that retry flag
> is set?
> 
> Also, Something seems wrong to me here - IIUC the patch changed this
> code because of the potential risk of an error within the
> set_subscription_retry function, but now if such an error happens the
> current code will bypass even getting to DisableSubscriptionAndExit,
> so the subscription won't have a chance to get disabled as the user
> might have wanted.

=>4.2.a
=>4.3.a
I think this is the expected behavior.

=>4.2.b
=>4.3.b
Fixed this point. (Invoke function set_subscription_retry after handling the
"disableonerr" parameter.)

4.4
> 4. src/backend/replication/logical/worker.c - DisableSubscriptionAndExit
> 
>  /*
> - * After error recovery, disable the subscription in a new transaction
> - * and exit cleanly.
> + * Disable the subscription in a new transaction.
>   */
>  static void
>  DisableSubscriptionAndExit(void)
>  {
> - /*
> - * Emit the error message, and recover from the error state to an idle
> - * state
> - */
> - HOLD_INTERRUPTS();
> -
> - EmitErrorReport();
> - AbortOutOfAnyTransaction();
> - FlushErrorState();
> -
> - RESUME_INTERRUPTS();
> -
> - /* Report the worker failed during either table synchronization or apply */
> - pgstat_report_subscription_error(MyLogicalRepWorker->subid,
> - !am_tablesync_worker());
> -
>   /* Disable the subscription */
>   StartTransactionCommand();
>   DisableSubscription(MySubscription->oid);
> @@ -4231,8 +4252,6 @@ DisableSubscriptionAndExit(void)
>   ereport(LOG,
>   errmsg("logical replication subscription \"%s\" has been disabled
> due to an error",
>      MySubscription->name));
> -
> - proc_exit(0);
>  }
> 
> 4a.
> Hmm,  I think it is a bad idea to remove the "exiting" code from the
> function but still leave the function name the same as before saying
> "AndExit".
> 
> 4b.
> Also, now the patch is unconditionally doing proc_exit(0) in the
> calling code where previously it would do PG_RE_THROW. So it's a
> subtle difference from the path the code used to take for worker
> errors..

=>4.a
Fixed as suggested.

=>4.b
I think function PG_RE_THROW will try to report the error and go away (see
function StartBackgroundWorker). So I think that since the error has been
reported at the beginning, it is fine to invoke function proc_exit to go away
at the end.

4.5
> 5. src/backend/replication/logical/worker.c - set_subscription_retry
> 
> @@ -4467,3 +4486,63 @@ reset_apply_error_context_info(void)
>   apply_error_callback_arg.remote_attnum = -1;
>   set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
>  }
> +
> +/*
> + * Set subretry of pg_subscription catalog.
> + *
> + * If retry is true, subscriber is about to exit with an error. Otherwise, it
> + * means that the transaction was applied successfully.
> + */
> +static void
> +set_subscription_retry(bool retry)
> +{
> + Relation rel;
> + HeapTuple tup;
> + bool started_tx = false;
> + bool nulls[Natts_pg_subscription];
> + bool replaces[Natts_pg_subscription];
> + Datum values[Natts_pg_subscription];
> +
> + if (MySubscription->retry == retry ||
> + am_apply_bgworker())
> + return;
> 
> Currently, I think this new 'subretry' field is only used to decide
> whether a retry can use an apply background worker or not. I think all
> this logic is *only* used when streaming=parallel. But AFAICT the
> logic for setting/clearing the retry flag is executed *always*
> regardless of the streaming mode.
> 
> So for all the times when the user did not ask for streaming=parallel
> doesn't this just cause unnecessary overhead for every transaction?

I think it is fine. Because for one transaction, only the first time the
transaction is applied with failure and the first time it is successfully
retried, the catalog pg_subscription will be really modified.

The rest of the comments are improved as suggested.
The new patches were attached in [2].

[1] - https://www.postgresql.org/message-id/CAHut%2BPtRNAOwFtBp_TnDWdC7UpcTxPJzQnrm%3DNytN7cVBt5zRQ%40mail.gmail.com
[2] -
https://www.postgresql.org/message-id/OS3PR01MB6275D64BE7726B0221B15F389E9F9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

pgsql-hackers by date:

Previous
From: "wangw.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Next
From: "wangw.fnst@fujitsu.com"
Date:
Subject: RE: Perform streaming logical transactions by background workers and parallel apply