Re: Time delayed LR (WAS Re: logical replication restrictions) - Mailing list pgsql-hackers

From shveta malik
Subject Re: Time delayed LR (WAS Re: logical replication restrictions)
Date
Msg-id CAJpy0uDcuA0TaejZ=xVaHucPR-CQp0A_Ccra=YaP3dwD4vCw0Q@mail.gmail.com
Whole thread Raw
In response to RE: Time delayed LR (WAS Re: logical replication restrictions)  ("Takamichi Osumi (Fujitsu)" <osumi.takamichi@fujitsu.com>)
Responses Re: Time delayed LR (WAS Re: logical replication restrictions)
RE: Time delayed LR (WAS Re: logical replication restrictions)
List pgsql-hackers
On Thu, Jan 19, 2023 at 12:42 PM Takamichi Osumi (Fujitsu)
<osumi.takamichi@fujitsu.com> wrote:
>
> On Wednesday, January 18, 2023 4:06 PM Peter Smith <smithpb2250@gmail.com> wrote:
> >  Here are my review comments for the latest patch v16-0001. (excluding the
> > test code)
> Hi, thank you for your review !
>
> > ======
> >
> > General
> >
> > 1.
> >
> > Since the value of min_apply_delay cannot be < 0,  I was thinking probably it
> > should have been declared everywhere in this patch as a
> > uint64 instead of an int64, right?
> No, we won't be able to adopt this idea.
>
> It seems that we are not able to use uint for catalog type.
> So, can't applying it to the pg_subscription.h definitions
> and then similarly Int64GetDatum to store catalog variables
> and the argument variable of Int64GetDatum.
>
> Plus, there is a possibility that type Interval becomes negative value,
> then we are not able to change the int64 variable to get
> the return value of interval2ms().
>
> > ======
> >
> > Commit message
> >
> > 2.
> >
> > If the subscription sets min_apply_delay parameter, the logical replication
> > worker will delay the transaction commit for min_apply_delay milliseconds.
> >
> > ~
> >
> > IMO there should be another sentence before this just to say that a new
> > parameter is being added:
> >
> > e.g.
> > This patch implements a new subscription parameter called
> > 'min_apply_delay'.
> Added.
>
>
> > ======
> >
> > doc/src/sgml/config.sgml
> >
> > 3.
> >
> > +      <para>
> > +       For time-delayed logical replication, the apply worker sends a Standby
> > +       Status Update message to the corresponding publisher per the
> > indicated
> > +       time of this parameter. Therefore, if this parameter is longer than
> > +       <literal>wal_sender_timeout</literal> on the publisher, then the
> > +       walsender doesn't get any update message during the delay and
> > repeatedly
> > +       terminates due to the timeout errors. Hence, make sure this parameter
> > is
> > +       shorter than the <literal>wal_sender_timeout</literal> of the
> > publisher.
> > +       If this parameter is set to zero with time-delayed replication, the
> > +       apply worker doesn't send any feedback messages during the
> > +       <literal>min_apply_delay</literal>.
> > +      </para>
> >
> >
> > This paragraph seemed confusing. I think it needs to be reworded to change all
> > of the "this parameter" references because there are at least 3 different
> > parameters mentioned in this paragraph. e.g. maybe just change them to
> > explicitly name the parameter you are talking about.
> >
> > I also think it needs to mention the ‘min_apply_delay’ subscription parameter
> > up-front and then refer to it appropriately.
> >
> > The end result might be something like I wrote below (this is just my guess ?
> > probably you can word it better).
> >
> > SUGGESTION
> > For time-delayed logical replication (i.e. when the subscription is created with
> > parameter min_apply_delay > 0), the apply worker sends a Standby Status
> > Update message to the publisher with a period of wal_receiver_status_interval .
> > Make sure to set wal_receiver_status_interval less than the
> > wal_sender_timeout on the publisher, otherwise, the walsender will repeatedly
> > terminate due to the timeout errors. If wal_receiver_status_interval is set to zero,
> > the apply worker doesn't send any feedback messages during the subscriber’s
> > min_apply_delay period.
> Applied. Also, I added one reference for min_apply_delay parameter
> at the end of this description.
>
>
> > ======
> >
> > doc/src/sgml/ref/create_subscription.sgml
> >
> > 4.
> >
> > +         <para>
> > +          By default, the subscriber applies changes as soon as possible. As
> > +          with the physical replication feature
> > +          (<xref linkend="guc-recovery-min-apply-delay"/>), it can be
> > useful to
> > +          have a time-delayed logical replica. This parameter lets the user to
> > +          delay the application of changes by a specified amount of
> > time. If this
> > +          value is specified without units, it is taken as milliseconds. The
> > +          default is zero(no delay).
> > +         </para>
> >
> > 4a.
> > As with the physical replication feature (recovery_min_apply_delay), it can be
> > useful to have a time-delayed logical replica.
> >
> > IMO not sure that the above sentence is necessary. It seems only to be saying
> > that this parameter can be useful. Why do we need to say that?
> Removed the sentence.
>
>
> > ~
> >
> > 4b.
> > "This parameter lets the user to delay" -> "This parameter lets the user delay"
> > OR
> > "This parameter lets the user to delay" -> "This parameter allows the user to
> > delay"
> Fixed.
>
>
> > ~
> >
> > 4c.
> > "If this value is specified without units" -> "If the value is specified without
> > units"
> Fixed.
>
> > ~
> >
> > 4d.
> > "zero(no delay)." -> "zero (no delay)."
> Fixed.
>
> > ----
> >
> > 5.
> >
> > +         <para>
> > +          The delay occurs only on WAL records for transaction begins and
> > after
> > +          the initial table synchronization. It is possible that the
> > +          replication delay between publisher and subscriber exceeds the
> > value
> > +          of this parameter, in which case no delay is added. Note that the
> > +          delay is calculated between the WAL time stamp as written on
> > +          publisher and the current time on the subscriber. Time
> > spent in logical
> > +          decoding and in transferring the transaction may reduce the
> > actual wait
> > +          time. If the system clocks on publisher and subscriber are not
> > +          synchronized, this may lead to apply changes earlier than
> > expected,
> > +          but this is not a major issue because this parameter is
> > typically much
> > +          larger than the time deviations between servers. Note that if this
> > +          parameter is set to a long delay, the replication will stop if the
> > +          replication slot falls behind the current LSN by more than
> > +          <link
> > linkend="guc-max-slot-wal-keep-size"><literal>max_slot_wal_keep_size</
> > literal></link>.
> > +         </para>
> >
> > I think the first part can be reworded slightly. See what you think about the
> > suggestion below.
> >
> > SUGGESTION
> > Any delay occurs only on WAL records for transaction begins after all initial
> > table synchronization has finished.  The delay is calculated between the WAL
> > timestamp as written on the publisher and the current time on the subscriber.
> > Any overhead of time spent in logical decoding and in transferring the
> > transaction may reduce the actual wait time.
> > It is also possible that the overhead already exceeds the requested
> > 'min_apply_delay' value, in which case no additional wait is necessary. If the
> > system clocks...
> Addressed.
>
>
> > ----
> >
> > 6.
> >
> > +  <para>
> > +   Setting streaming to <literal>parallel</literal> mode and
> > <literal>min_apply_delay</literal>
> > +   simultaneously is not supported.
> > +  </para>
> >
> > SUGGESTION
> > A non-zero min_apply_delay parameter is not allowed when streaming in
> > parallel mode.
> Applied.
>
>
> > ======
> >
> > src/backend/commands/subscriptioncmds.c
> >
> > 7. parse_subscription_options
> >
> > @@ -404,6 +445,17 @@ parse_subscription_options(ParseState *pstate, List
> > *stmt_options,
> >   "slot_name = NONE", "create_slot = false")));
> >   }
> >   }
> > +
> > + /* Test the combination of streaming mode and min_apply_delay */ if
> > + (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
> > + opts->min_apply_delay > 0)
> > + {
> > + if (opts->streaming == LOGICALREP_STREAM_PARALLEL)
> > ereport(ERROR,
> > + errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually
> > + exclusive options",
> > +    "min_apply_delay > 0", "streaming = parallel")); }
> >
> > SUGGESTION (comment)
> > The combination of parallel streaming mode and min_apply_delay is not
> > allowed.
> Fixed.
>
>
> > ~~~
> >
> > 8. AlterSubscription (general)
> >
> > I observed during testing there are 3 different errors….
> >
> > At subscription CREATE time you can get this error:
> > ERROR:  min_apply_delay > 0 and streaming = parallel are mutually exclusive
> > options
> >
> > If you try to ALTER the min_apply_delay when already streaming = parallel you
> > can get this error:
> > ERROR:  cannot enable min_apply_delay for subscription in streaming =
> > parallel mode
> >
> > If you try to ALTER the streaming to be parallel if there is already a
> > min_apply_delay > 0 then you can get this error:
> > ERROR:  cannot enable streaming = parallel mode for subscription with
> > min_apply_delay
> Yes. This is because the existing error message styles
> in AlterSubscription and parse_subscription_options.
>
> The former uses "mutually exclusive" messages consistently,
> while the latter does "cannot enable ..." ones.
> > ~
> >
> > IMO there is no need to have 3 different error message texts.  I think all these
> > cases are explained by just the first text (ERROR:
> > min_apply_delay > 0 and streaming = parallel are mutually exclusive
> > options)
> Then, we followed this kind of formats.
>
>
> > ~~~
> >
> > 9. AlterSubscription
> >
> > @@ -1098,6 +1152,18 @@ AlterSubscription(ParseState *pstate,
> > AlterSubscriptionStmt *stmt,
> >
> >   if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
> >   {
> > + /*
> > + * Test the combination of streaming mode and
> > + * min_apply_delay
> > + */
> > + if (opts.streaming == LOGICALREP_STREAM_PARALLEL) if
> > + ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) &&
> > opts.min_apply_delay > 0) ||
> > + (!IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) &&
> > sub->minapplydelay > 0))
> > + ereport(ERROR,
> > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> > + errmsg("cannot enable %s mode for subscription with %s",
> > +    "streaming = parallel", "min_apply_delay"));
> > +
> >
> > 9a.
> > SUGGESTION (comment)
> > The combination of parallel streaming mode and min_apply_delay is not
> > allowed.
> Fixed.
>
>
> > ~
> >
> > 9b.
> > (see AlterSubscription general review comment #8 above) Here you can use the
> > same comment error message that says min_apply_delay > 0 and streaming =
> > parallel are mutually exclusive options.
> As described above, we followed the current style in the existing functions.
>
>
> > ~~~
> >
> > 10. AlterSubscription
> >
> > @@ -1111,6 +1177,25 @@ AlterSubscription(ParseState *pstate,
> > AlterSubscriptionStmt *stmt,
> >   = true;
> >   }
> >
> > + if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)) {
> > + /*
> > + * Test the combination of streaming mode and
> > + * min_apply_delay
> > + */
> > + if (opts.min_apply_delay > 0)
> > + if ((IsSet(opts.specified_opts, SUBOPT_STREAMING) && opts.streaming
> > == LOGICALREP_STREAM_PARALLEL) ||
> > + (!IsSet(opts.specified_opts, SUBOPT_STREAMING) && sub->stream ==
> > LOGICALREP_STREAM_PARALLEL))
> > + ereport(ERROR,
> > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> > + errmsg("cannot enable %s for subscription in %s mode",
> > +    "min_apply_delay", "streaming = parallel"));
> > +
> > + values[Anum_pg_subscription_subminapplydelay - 1] =
> > + Int64GetDatum(opts.min_apply_delay);
> > + replaces[Anum_pg_subscription_subminapplydelay - 1] = true; }
> >
> > 10a.
> > SUGGESTION (comment)
> > The combination of parallel streaming mode and min_apply_delay is not
> > allowed.
> Fixed.
>
>
> > ~
> >
> > 10b.
> > (see AlterSubscription general review comment #8 above) Here you can use the
> > same comment error message that says min_apply_delay > 0 and streaming =
> > parallel are mutually exclusive options.
> Same as 9b.
>
> > ======
> >
> > .../replication/logical/applyparallelworker.c
> >
> > 11.
> >
> > @@ -704,7 +704,8 @@ pa_process_spooled_messages_if_required(void)
> >   {
> >   apply_spooled_messages(&MyParallelShared->fileset,
> >      MyParallelShared->xid,
> > -    InvalidXLogRecPtr);
> > +    InvalidXLogRecPtr,
> > +    0);
> >
> > IMO this passing of 0 is a bit strange because it is currently acting like a dummy
> > value since the apply_spooled_messages will never make use of the 'finish_ts'
> > anyway (since this call is from a parallel apply worker).
> >
> > I think a better way to code this might be to pass the 0 (same as you are doing
> > here) but inside the apply_spooled_messages change the code:
> >
> > FROM
> > if (!am_parallel_apply_worker())
> > maybe_delay_apply(finish_ts);
> >
> > TO
> > if (finish_ts)
> > maybe_delay_apply(finish_ts);
> >
> > That does 2 things.
> > - It makes the passed-in 0 have some meaning
> > - It simplifies the apply_spooled_messages code
> Adopted.
>
>
> > ======
> >
> > src/backend/replication/logical/worker.c
> >
> > 12.
> >
> > @@ -318,6 +318,17 @@ static List *on_commit_wakeup_workers_subids =
> > NIL;  bool in_remote_transaction = false;  static XLogRecPtr
> > remote_final_lsn = InvalidXLogRecPtr;
> >
> > +/*
> > + * In order to avoid walsender's timeout during time-delayed
> > +replication,
> > + * it's necessary to keep sending feedback messages during the delay
> > +from the
> > + * worker process. Meanwhile, the feature delays the apply before
> > +starting the
> > + * transaction and thus we don't write WALs for the suspended changes
> > +during
> > + * the wait. Hence, in the case the worker process sends a feedback
> > +message
> > + * during the delay, we should not make positions of the flushed and
> > +apply LSN
> > + * overwritten by the last received latest LSN. See send_feedback()
> > for details.
> > + */
> > +static XLogRecPtr last_received = InvalidXLogRecPtr;
> >
> > 12a.
> > Suggest a small change to the first sentence of the comment.
> >
> > BEFORE
> > In order to avoid walsender's timeout during time-delayed replication, it's
> > necessary to keep sending feedback messages during the delay from the
> > worker process.
> >
> > AFTER
> > In order to avoid walsender timeout for time-delayed replication the worker
> > process keeps sending feedback messages during the delay period.
> Fixed.
>
>
> > ~
> >
> > 12b.
> > "Hence, in the case" -> "When"
> Fixed.
>
>
> > ~~~
> >
> > 13. forward declare
> >
> > -static void send_feedback(XLogRecPtr recvpos, bool force, bool
> > requestReply);
> > +static void send_feedback(XLogRecPtr recvpos, bool force, bool
> > requestReply,
> > +   bool in_delaying_apply);
> >
> > Change the param name:
> >
> > "in_delaying_apply" -> "in_delayed_apply” (??)
> Changed. The initial intention to append the "in_"
> prefix is to make the variable name aligned with
> some other variables such as "in_remote_transaction" and
> "in_streamed_transaction" that mean the current status
> for the transaction. So, until there is a better name proposed,
> we can keep it.
>
>
> > ~~~
> >
> > 14. maybe_delay_apply
> >
> > + /* Nothing to do if no delay set */
> > + if (MySubscription->minapplydelay <= 0) return;
> >
> > IIUC min_apply_delay cannot be < 0 so this condition could simply be:
> >
> > if (!MySubscription->minapplydelay)
> > return;
> Fixed.
>
>
> > ~~~
> >
> > 15. maybe_delay_apply
> >
> > + /*
> > + * The min_apply_delay parameter is ignored until all tablesync workers
> > + * have reached READY state. If we allow the delay during the catchup
> > + * phase, once we reach the limit of tablesync workers, it will impose
> > + a
> > + * delay for each subsequent worker. It means it will take a long time
> > + to
> > + * finish the initial table synchronization.
> > + */
> > + if (!AllTablesyncsReady())
> > + return;
> >
> > SUGGESTION (slight rewording)
> > The min_apply_delay parameter is ignored until all tablesync workers have
> > reached READY state. This is because if we allowed the delay during the
> > catchup phase, then once we reached the limit of tablesync workers it would
> > impose a delay for each subsequent worker. That would cause initial table
> > synchronization completion to take a long time.
> Fixed.
>
>
> > ~~~
> >
> > 16. maybe_delay_apply
> >
> > + while (true)
> > + {
> > + long diffms;
> > +
> > + ResetLatch(MyLatch);
> > +
> > + CHECK_FOR_INTERRUPTS();
> >
> > IMO there should be some small explanatory comment here at the top of the
> > while loop.
> Added.
>
>
> > ~~~
> >
> > 17. apply_spooled_messages
> >
> > @@ -2024,6 +2141,21 @@ apply_spooled_messages(FileSet *stream_fileset,
> > TransactionId xid,
> >   int fileno;
> >   off_t offset;
> >
> > + /*
> > + * Should we delay the current transaction?
> > + *
> > + * Unlike the regular (non-streamed) cases, the delay is applied in a
> > + * STREAM COMMIT/STREAM PREPARE message for streamed transactions.
> > The
> > + * STREAM START message does not contain a commit/prepare time (it will
> > + be
> > + * available when the in-progress transaction finishes). Hence, it's
> > + not
> > + * appropriate to apply a delay at that time.
> > + *
> > + * It's not allowed to execute time-delayed replication with parallel
> > + * apply feature.
> > + */
> > + if (!am_parallel_apply_worker())
> > + maybe_delay_apply(finish_ts);
> >
> > That whole comment part "Unlike the regular (non-streamed) cases"
> > seems misplaced here.  Perhaps this part of the comment is better put into
> > the function header where the meaning of 'finish_ts' is explained?
> Moved it to the header comment for maybe_delay_apply.
>
>
> > ~~~
> >
> > 18. apply_spooled_messages
> >
> > + * It's not allowed to execute time-delayed replication with parallel
> > + * apply feature.
> > + */
> > + if (!am_parallel_apply_worker())
> > + maybe_delay_apply(finish_ts);
> >
> > As was mentioned in comment #11 above this code could be changed like
> >
> > if (finish_ts)
> > maybe_delay_apply(finish_ts);
> > then you don't even need to make mention of "parallel apply" at all here.
> >
> > OTOH if you want to still have the parallel apply comment then maybe reword it
> > like this:
> > "It is not allowed to combine time-delayed replication with the parallel apply
> > feature."
> Changed and now I don't mention the parallel apply feature.
>
> > ~~~
> >
> > 19. apply_spooled_messages
> >
> > If you chose not to do my suggestion from comment #11, then there are
> > 2 identical conditions (!am_parallel_apply_worker()); In this case, I was
> > wondering if it would be better to refactor to use a single condition instead.
> I applied #11 comment. Now, the conditions are not identical.
>
> > ~~~
> >
> > 20. send_feedback
> > (same as comment #13)
> >
> > Maybe change the new param name to “in_delayed_apply”?
> Changed.
>
>
> > ~~~
> >
> > 21.
> >
> > @@ -3737,8 +3869,15 @@ send_feedback(XLogRecPtr recvpos, bool force,
> > bool requestReply)
> >   /*
> >   * No outstanding transactions to flush, we can report the latest received
> >   * position. This is important for synchronous replication.
> > + *
> > + * During the delay of time-delayed replication, do not tell the
> > + publisher
> > + * that the received latest LSN is already applied and flushed at this
> > + * stage, since we don't apply the transaction yet. If we do so, it
> > + leads
> > + * to a wrong assumption of logical replication progress on the
> > + publisher
> > + * side. Here, we just send a feedback message to avoid publisher's
> > + * timeout during the delay.
> >   */
> >
> > Minor rewording of the comment
> >
> > SUGGESTION
> > If the subscriber side apply is delayed (because of time-delayed
> > replication) then do not tell the publisher that the received latest LSN is already
> > applied and flushed, otherwise, it leads to the publisher side making a wrong
> > assumption of logical replication progress. Instead, we just send a feedback
> > message to avoid a publisher timeout during the delay.
> Adopted.
>
>
> > ======
> >
> >
> > src/bin/pg_dump/pg_dump.c
> >
> > 22.
> >
> > @@ -4546,9 +4547,14 @@ getSubscriptions(Archive *fout)
> >     LOGICALREP_TWOPHASE_STATE_DISABLED);
> >
> >   if (fout->remoteVersion >= 160000)
> > - appendPQExpBufferStr(query, " s.suborigin\n");
> > + appendPQExpBufferStr(query,
> > + " s.suborigin,\n"
> > + " s.subminapplydelay\n");
> >   else
> > - appendPQExpBuffer(query, " '%s' AS suborigin\n",
> > LOGICALREP_ORIGIN_ANY);
> > + {
> > + appendPQExpBuffer(query, " '%s' AS suborigin,\n",
> > + LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, " 0 AS
> > + subminapplydelay\n"); }
> >
> > Can’t those appends in the else part can be combined to a single
> > appendPQExpBuffer
> >
> > appendPQExpBuffer(query,
> > " '%s' AS suborigin,\n"
> > " 0 AS subminapplydelay\n"
> > LOGICALREP_ORIGIN_ANY);
> Adopted.
>
>
> > ======
> >
> > src/include/catalog/pg_subscription.h
> >
> > 23.
> >
> > @@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId)
> > BKI_SHARED_RELATION BKI_ROW
> >   XLogRecPtr subskiplsn; /* All changes finished at this LSN are
> >   * skipped */
> >
> > + int64 subminapplydelay; /* Replication apply delay */
> > +
> >   NameData subname; /* Name of the subscription */
> >
> >   Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
> >
> > SUGGESTION (for comment)
> > Replication apply delay (ms)
> Fixed.
>
> > ~~
> >
> > 24.
> >
> > @@ -120,6 +122,7 @@ typedef struct Subscription
> >   * in */
> >   XLogRecPtr skiplsn; /* All changes finished at this LSN are
> >   * skipped */
> > + int64 minapplydelay; /* Replication apply delay */
> >
> > SUGGESTION (for comment)
> > Replication apply delay (ms)
> Fixed.
>
>
> Kindly have a look at the latest v17 patch in [1].
>
>
> [1] -
https://www.postgresql.org/message-id/TYCPR01MB8373F5162C7A0E6224670CF0EDC49%40TYCPR01MB8373.jpnprd01.prod.outlook.com
>
> Best Regards,
>         Takamichi Osumi
>

1)
Tried different variations of altering 'min_apply_delay'. All passed
except one below:

postgres=# alter subscription mysubnew set (min_apply_delay = '10.9min 1ms');
ALTER SUBSCRIPTION
postgres=# alter subscription mysubnew set (min_apply_delay = '10.9min 2s 1ms');
ALTER SUBSCRIPTION
--very similar to above but fails,
postgres=# alter subscription mysubnew set (min_apply_delay = '10.9s 1ms');
ERROR:  invalid input syntax for type interval: "10.9s 1ms"


2)
 Logging:
2023-01-19 17:33:16.202 IST [404797] DEBUG:  logical replication apply
delay: 19979 ms
2023-01-19 17:33:26.212 IST [404797] DEBUG:  logical replication apply
delay: 9969 ms
2023-01-19 17:34:25.730 IST [404962] DEBUG:  logical replication apply
delay: 179988 ms-->previous wait over, started for next txn
2023-01-19 17:34:35.737 IST [404962] DEBUG:  logical replication apply
delay: 169981 ms
2023-01-19 17:34:45.746 IST [404962] DEBUG:  logical replication apply
delay: 159972 ms

Is there a way to distinguish between these logs? Maybe dumping xids along-with?

thanks
Shveta



pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: Record queryid when auto_explain.log_verbose is on
Next
From: Amit Langote
Date:
Subject: Re: generic plans and "initial" pruning