Re: Time delayed LR (WAS Re: logical replication restrictions) - Mailing list pgsql-hackers
From | shveta malik |
---|---|
Subject | Re: Time delayed LR (WAS Re: logical replication restrictions) |
Date | |
Msg-id | CAJpy0uDcuA0TaejZ=xVaHucPR-CQp0A_Ccra=YaP3dwD4vCw0Q@mail.gmail.com Whole thread Raw |
In response to | RE: Time delayed LR (WAS Re: logical replication restrictions) ("Takamichi Osumi (Fujitsu)" <osumi.takamichi@fujitsu.com>) |
Responses |
Re: Time delayed LR (WAS Re: logical replication restrictions)
RE: Time delayed LR (WAS Re: logical replication restrictions) |
List | pgsql-hackers |
On Thu, Jan 19, 2023 at 12:42 PM Takamichi Osumi (Fujitsu) <osumi.takamichi@fujitsu.com> wrote: > > On Wednesday, January 18, 2023 4:06 PM Peter Smith <smithpb2250@gmail.com> wrote: > > Here are my review comments for the latest patch v16-0001. (excluding the > > test code) > Hi, thank you for your review ! > > > ====== > > > > General > > > > 1. > > > > Since the value of min_apply_delay cannot be < 0, I was thinking probably it > > should have been declared everywhere in this patch as a > > uint64 instead of an int64, right? > No, we won't be able to adopt this idea. > > It seems that we are not able to use uint for catalog type. > So, can't applying it to the pg_subscription.h definitions > and then similarly Int64GetDatum to store catalog variables > and the argument variable of Int64GetDatum. > > Plus, there is a possibility that type Interval becomes negative value, > then we are not able to change the int64 variable to get > the return value of interval2ms(). > > > ====== > > > > Commit message > > > > 2. > > > > If the subscription sets min_apply_delay parameter, the logical replication > > worker will delay the transaction commit for min_apply_delay milliseconds. > > > > ~ > > > > IMO there should be another sentence before this just to say that a new > > parameter is being added: > > > > e.g. > > This patch implements a new subscription parameter called > > 'min_apply_delay'. > Added. > > > > ====== > > > > doc/src/sgml/config.sgml > > > > 3. > > > > + <para> > > + For time-delayed logical replication, the apply worker sends a Standby > > + Status Update message to the corresponding publisher per the > > indicated > > + time of this parameter. Therefore, if this parameter is longer than > > + <literal>wal_sender_timeout</literal> on the publisher, then the > > + walsender doesn't get any update message during the delay and > > repeatedly > > + terminates due to the timeout errors. Hence, make sure this parameter > > is > > + shorter than the <literal>wal_sender_timeout</literal> of the > > publisher. > > + If this parameter is set to zero with time-delayed replication, the > > + apply worker doesn't send any feedback messages during the > > + <literal>min_apply_delay</literal>. > > + </para> > > > > > > This paragraph seemed confusing. I think it needs to be reworded to change all > > of the "this parameter" references because there are at least 3 different > > parameters mentioned in this paragraph. e.g. maybe just change them to > > explicitly name the parameter you are talking about. > > > > I also think it needs to mention the ‘min_apply_delay’ subscription parameter > > up-front and then refer to it appropriately. > > > > The end result might be something like I wrote below (this is just my guess ? > > probably you can word it better). > > > > SUGGESTION > > For time-delayed logical replication (i.e. when the subscription is created with > > parameter min_apply_delay > 0), the apply worker sends a Standby Status > > Update message to the publisher with a period of wal_receiver_status_interval . > > Make sure to set wal_receiver_status_interval less than the > > wal_sender_timeout on the publisher, otherwise, the walsender will repeatedly > > terminate due to the timeout errors. If wal_receiver_status_interval is set to zero, > > the apply worker doesn't send any feedback messages during the subscriber’s > > min_apply_delay period. > Applied. Also, I added one reference for min_apply_delay parameter > at the end of this description. > > > > ====== > > > > doc/src/sgml/ref/create_subscription.sgml > > > > 4. > > > > + <para> > > + By default, the subscriber applies changes as soon as possible. As > > + with the physical replication feature > > + (<xref linkend="guc-recovery-min-apply-delay"/>), it can be > > useful to > > + have a time-delayed logical replica. This parameter lets the user to > > + delay the application of changes by a specified amount of > > time. If this > > + value is specified without units, it is taken as milliseconds. The > > + default is zero(no delay). > > + </para> > > > > 4a. > > As with the physical replication feature (recovery_min_apply_delay), it can be > > useful to have a time-delayed logical replica. > > > > IMO not sure that the above sentence is necessary. It seems only to be saying > > that this parameter can be useful. Why do we need to say that? > Removed the sentence. > > > > ~ > > > > 4b. > > "This parameter lets the user to delay" -> "This parameter lets the user delay" > > OR > > "This parameter lets the user to delay" -> "This parameter allows the user to > > delay" > Fixed. > > > > ~ > > > > 4c. > > "If this value is specified without units" -> "If the value is specified without > > units" > Fixed. > > > ~ > > > > 4d. > > "zero(no delay)." -> "zero (no delay)." > Fixed. > > > ---- > > > > 5. > > > > + <para> > > + The delay occurs only on WAL records for transaction begins and > > after > > + the initial table synchronization. It is possible that the > > + replication delay between publisher and subscriber exceeds the > > value > > + of this parameter, in which case no delay is added. Note that the > > + delay is calculated between the WAL time stamp as written on > > + publisher and the current time on the subscriber. Time > > spent in logical > > + decoding and in transferring the transaction may reduce the > > actual wait > > + time. If the system clocks on publisher and subscriber are not > > + synchronized, this may lead to apply changes earlier than > > expected, > > + but this is not a major issue because this parameter is > > typically much > > + larger than the time deviations between servers. Note that if this > > + parameter is set to a long delay, the replication will stop if the > > + replication slot falls behind the current LSN by more than > > + <link > > linkend="guc-max-slot-wal-keep-size"><literal>max_slot_wal_keep_size</ > > literal></link>. > > + </para> > > > > I think the first part can be reworded slightly. See what you think about the > > suggestion below. > > > > SUGGESTION > > Any delay occurs only on WAL records for transaction begins after all initial > > table synchronization has finished. The delay is calculated between the WAL > > timestamp as written on the publisher and the current time on the subscriber. > > Any overhead of time spent in logical decoding and in transferring the > > transaction may reduce the actual wait time. > > It is also possible that the overhead already exceeds the requested > > 'min_apply_delay' value, in which case no additional wait is necessary. If the > > system clocks... > Addressed. > > > > ---- > > > > 6. > > > > + <para> > > + Setting streaming to <literal>parallel</literal> mode and > > <literal>min_apply_delay</literal> > > + simultaneously is not supported. > > + </para> > > > > SUGGESTION > > A non-zero min_apply_delay parameter is not allowed when streaming in > > parallel mode. > Applied. > > > > ====== > > > > src/backend/commands/subscriptioncmds.c > > > > 7. parse_subscription_options > > > > @@ -404,6 +445,17 @@ parse_subscription_options(ParseState *pstate, List > > *stmt_options, > > "slot_name = NONE", "create_slot = false"))); > > } > > } > > + > > + /* Test the combination of streaming mode and min_apply_delay */ if > > + (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && > > + opts->min_apply_delay > 0) > > + { > > + if (opts->streaming == LOGICALREP_STREAM_PARALLEL) > > ereport(ERROR, > > + errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually > > + exclusive options", > > + "min_apply_delay > 0", "streaming = parallel")); } > > > > SUGGESTION (comment) > > The combination of parallel streaming mode and min_apply_delay is not > > allowed. > Fixed. > > > > ~~~ > > > > 8. AlterSubscription (general) > > > > I observed during testing there are 3 different errors…. > > > > At subscription CREATE time you can get this error: > > ERROR: min_apply_delay > 0 and streaming = parallel are mutually exclusive > > options > > > > If you try to ALTER the min_apply_delay when already streaming = parallel you > > can get this error: > > ERROR: cannot enable min_apply_delay for subscription in streaming = > > parallel mode > > > > If you try to ALTER the streaming to be parallel if there is already a > > min_apply_delay > 0 then you can get this error: > > ERROR: cannot enable streaming = parallel mode for subscription with > > min_apply_delay > Yes. This is because the existing error message styles > in AlterSubscription and parse_subscription_options. > > The former uses "mutually exclusive" messages consistently, > while the latter does "cannot enable ..." ones. > > ~ > > > > IMO there is no need to have 3 different error message texts. I think all these > > cases are explained by just the first text (ERROR: > > min_apply_delay > 0 and streaming = parallel are mutually exclusive > > options) > Then, we followed this kind of formats. > > > > ~~~ > > > > 9. AlterSubscription > > > > @@ -1098,6 +1152,18 @@ AlterSubscription(ParseState *pstate, > > AlterSubscriptionStmt *stmt, > > > > if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) > > { > > + /* > > + * Test the combination of streaming mode and > > + * min_apply_delay > > + */ > > + if (opts.streaming == LOGICALREP_STREAM_PARALLEL) if > > + ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && > > opts.min_apply_delay > 0) || > > + (!IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && > > sub->minapplydelay > 0)) > > + ereport(ERROR, > > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > > + errmsg("cannot enable %s mode for subscription with %s", > > + "streaming = parallel", "min_apply_delay")); > > + > > > > 9a. > > SUGGESTION (comment) > > The combination of parallel streaming mode and min_apply_delay is not > > allowed. > Fixed. > > > > ~ > > > > 9b. > > (see AlterSubscription general review comment #8 above) Here you can use the > > same comment error message that says min_apply_delay > 0 and streaming = > > parallel are mutually exclusive options. > As described above, we followed the current style in the existing functions. > > > > ~~~ > > > > 10. AlterSubscription > > > > @@ -1111,6 +1177,25 @@ AlterSubscription(ParseState *pstate, > > AlterSubscriptionStmt *stmt, > > = true; > > } > > > > + if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)) { > > + /* > > + * Test the combination of streaming mode and > > + * min_apply_delay > > + */ > > + if (opts.min_apply_delay > 0) > > + if ((IsSet(opts.specified_opts, SUBOPT_STREAMING) && opts.streaming > > == LOGICALREP_STREAM_PARALLEL) || > > + (!IsSet(opts.specified_opts, SUBOPT_STREAMING) && sub->stream == > > LOGICALREP_STREAM_PARALLEL)) > > + ereport(ERROR, > > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > > + errmsg("cannot enable %s for subscription in %s mode", > > + "min_apply_delay", "streaming = parallel")); > > + > > + values[Anum_pg_subscription_subminapplydelay - 1] = > > + Int64GetDatum(opts.min_apply_delay); > > + replaces[Anum_pg_subscription_subminapplydelay - 1] = true; } > > > > 10a. > > SUGGESTION (comment) > > The combination of parallel streaming mode and min_apply_delay is not > > allowed. > Fixed. > > > > ~ > > > > 10b. > > (see AlterSubscription general review comment #8 above) Here you can use the > > same comment error message that says min_apply_delay > 0 and streaming = > > parallel are mutually exclusive options. > Same as 9b. > > > ====== > > > > .../replication/logical/applyparallelworker.c > > > > 11. > > > > @@ -704,7 +704,8 @@ pa_process_spooled_messages_if_required(void) > > { > > apply_spooled_messages(&MyParallelShared->fileset, > > MyParallelShared->xid, > > - InvalidXLogRecPtr); > > + InvalidXLogRecPtr, > > + 0); > > > > IMO this passing of 0 is a bit strange because it is currently acting like a dummy > > value since the apply_spooled_messages will never make use of the 'finish_ts' > > anyway (since this call is from a parallel apply worker). > > > > I think a better way to code this might be to pass the 0 (same as you are doing > > here) but inside the apply_spooled_messages change the code: > > > > FROM > > if (!am_parallel_apply_worker()) > > maybe_delay_apply(finish_ts); > > > > TO > > if (finish_ts) > > maybe_delay_apply(finish_ts); > > > > That does 2 things. > > - It makes the passed-in 0 have some meaning > > - It simplifies the apply_spooled_messages code > Adopted. > > > > ====== > > > > src/backend/replication/logical/worker.c > > > > 12. > > > > @@ -318,6 +318,17 @@ static List *on_commit_wakeup_workers_subids = > > NIL; bool in_remote_transaction = false; static XLogRecPtr > > remote_final_lsn = InvalidXLogRecPtr; > > > > +/* > > + * In order to avoid walsender's timeout during time-delayed > > +replication, > > + * it's necessary to keep sending feedback messages during the delay > > +from the > > + * worker process. Meanwhile, the feature delays the apply before > > +starting the > > + * transaction and thus we don't write WALs for the suspended changes > > +during > > + * the wait. Hence, in the case the worker process sends a feedback > > +message > > + * during the delay, we should not make positions of the flushed and > > +apply LSN > > + * overwritten by the last received latest LSN. See send_feedback() > > for details. > > + */ > > +static XLogRecPtr last_received = InvalidXLogRecPtr; > > > > 12a. > > Suggest a small change to the first sentence of the comment. > > > > BEFORE > > In order to avoid walsender's timeout during time-delayed replication, it's > > necessary to keep sending feedback messages during the delay from the > > worker process. > > > > AFTER > > In order to avoid walsender timeout for time-delayed replication the worker > > process keeps sending feedback messages during the delay period. > Fixed. > > > > ~ > > > > 12b. > > "Hence, in the case" -> "When" > Fixed. > > > > ~~~ > > > > 13. forward declare > > > > -static void send_feedback(XLogRecPtr recvpos, bool force, bool > > requestReply); > > +static void send_feedback(XLogRecPtr recvpos, bool force, bool > > requestReply, > > + bool in_delaying_apply); > > > > Change the param name: > > > > "in_delaying_apply" -> "in_delayed_apply” (??) > Changed. The initial intention to append the "in_" > prefix is to make the variable name aligned with > some other variables such as "in_remote_transaction" and > "in_streamed_transaction" that mean the current status > for the transaction. So, until there is a better name proposed, > we can keep it. > > > > ~~~ > > > > 14. maybe_delay_apply > > > > + /* Nothing to do if no delay set */ > > + if (MySubscription->minapplydelay <= 0) return; > > > > IIUC min_apply_delay cannot be < 0 so this condition could simply be: > > > > if (!MySubscription->minapplydelay) > > return; > Fixed. > > > > ~~~ > > > > 15. maybe_delay_apply > > > > + /* > > + * The min_apply_delay parameter is ignored until all tablesync workers > > + * have reached READY state. If we allow the delay during the catchup > > + * phase, once we reach the limit of tablesync workers, it will impose > > + a > > + * delay for each subsequent worker. It means it will take a long time > > + to > > + * finish the initial table synchronization. > > + */ > > + if (!AllTablesyncsReady()) > > + return; > > > > SUGGESTION (slight rewording) > > The min_apply_delay parameter is ignored until all tablesync workers have > > reached READY state. This is because if we allowed the delay during the > > catchup phase, then once we reached the limit of tablesync workers it would > > impose a delay for each subsequent worker. That would cause initial table > > synchronization completion to take a long time. > Fixed. > > > > ~~~ > > > > 16. maybe_delay_apply > > > > + while (true) > > + { > > + long diffms; > > + > > + ResetLatch(MyLatch); > > + > > + CHECK_FOR_INTERRUPTS(); > > > > IMO there should be some small explanatory comment here at the top of the > > while loop. > Added. > > > > ~~~ > > > > 17. apply_spooled_messages > > > > @@ -2024,6 +2141,21 @@ apply_spooled_messages(FileSet *stream_fileset, > > TransactionId xid, > > int fileno; > > off_t offset; > > > > + /* > > + * Should we delay the current transaction? > > + * > > + * Unlike the regular (non-streamed) cases, the delay is applied in a > > + * STREAM COMMIT/STREAM PREPARE message for streamed transactions. > > The > > + * STREAM START message does not contain a commit/prepare time (it will > > + be > > + * available when the in-progress transaction finishes). Hence, it's > > + not > > + * appropriate to apply a delay at that time. > > + * > > + * It's not allowed to execute time-delayed replication with parallel > > + * apply feature. > > + */ > > + if (!am_parallel_apply_worker()) > > + maybe_delay_apply(finish_ts); > > > > That whole comment part "Unlike the regular (non-streamed) cases" > > seems misplaced here. Perhaps this part of the comment is better put into > > the function header where the meaning of 'finish_ts' is explained? > Moved it to the header comment for maybe_delay_apply. > > > > ~~~ > > > > 18. apply_spooled_messages > > > > + * It's not allowed to execute time-delayed replication with parallel > > + * apply feature. > > + */ > > + if (!am_parallel_apply_worker()) > > + maybe_delay_apply(finish_ts); > > > > As was mentioned in comment #11 above this code could be changed like > > > > if (finish_ts) > > maybe_delay_apply(finish_ts); > > then you don't even need to make mention of "parallel apply" at all here. > > > > OTOH if you want to still have the parallel apply comment then maybe reword it > > like this: > > "It is not allowed to combine time-delayed replication with the parallel apply > > feature." > Changed and now I don't mention the parallel apply feature. > > > ~~~ > > > > 19. apply_spooled_messages > > > > If you chose not to do my suggestion from comment #11, then there are > > 2 identical conditions (!am_parallel_apply_worker()); In this case, I was > > wondering if it would be better to refactor to use a single condition instead. > I applied #11 comment. Now, the conditions are not identical. > > > ~~~ > > > > 20. send_feedback > > (same as comment #13) > > > > Maybe change the new param name to “in_delayed_apply”? > Changed. > > > > ~~~ > > > > 21. > > > > @@ -3737,8 +3869,15 @@ send_feedback(XLogRecPtr recvpos, bool force, > > bool requestReply) > > /* > > * No outstanding transactions to flush, we can report the latest received > > * position. This is important for synchronous replication. > > + * > > + * During the delay of time-delayed replication, do not tell the > > + publisher > > + * that the received latest LSN is already applied and flushed at this > > + * stage, since we don't apply the transaction yet. If we do so, it > > + leads > > + * to a wrong assumption of logical replication progress on the > > + publisher > > + * side. Here, we just send a feedback message to avoid publisher's > > + * timeout during the delay. > > */ > > > > Minor rewording of the comment > > > > SUGGESTION > > If the subscriber side apply is delayed (because of time-delayed > > replication) then do not tell the publisher that the received latest LSN is already > > applied and flushed, otherwise, it leads to the publisher side making a wrong > > assumption of logical replication progress. Instead, we just send a feedback > > message to avoid a publisher timeout during the delay. > Adopted. > > > > ====== > > > > > > src/bin/pg_dump/pg_dump.c > > > > 22. > > > > @@ -4546,9 +4547,14 @@ getSubscriptions(Archive *fout) > > LOGICALREP_TWOPHASE_STATE_DISABLED); > > > > if (fout->remoteVersion >= 160000) > > - appendPQExpBufferStr(query, " s.suborigin\n"); > > + appendPQExpBufferStr(query, > > + " s.suborigin,\n" > > + " s.subminapplydelay\n"); > > else > > - appendPQExpBuffer(query, " '%s' AS suborigin\n", > > LOGICALREP_ORIGIN_ANY); > > + { > > + appendPQExpBuffer(query, " '%s' AS suborigin,\n", > > + LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, " 0 AS > > + subminapplydelay\n"); } > > > > Can’t those appends in the else part can be combined to a single > > appendPQExpBuffer > > > > appendPQExpBuffer(query, > > " '%s' AS suborigin,\n" > > " 0 AS subminapplydelay\n" > > LOGICALREP_ORIGIN_ANY); > Adopted. > > > > ====== > > > > src/include/catalog/pg_subscription.h > > > > 23. > > > > @@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) > > BKI_SHARED_RELATION BKI_ROW > > XLogRecPtr subskiplsn; /* All changes finished at this LSN are > > * skipped */ > > > > + int64 subminapplydelay; /* Replication apply delay */ > > + > > NameData subname; /* Name of the subscription */ > > > > Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */ > > > > SUGGESTION (for comment) > > Replication apply delay (ms) > Fixed. > > > ~~ > > > > 24. > > > > @@ -120,6 +122,7 @@ typedef struct Subscription > > * in */ > > XLogRecPtr skiplsn; /* All changes finished at this LSN are > > * skipped */ > > + int64 minapplydelay; /* Replication apply delay */ > > > > SUGGESTION (for comment) > > Replication apply delay (ms) > Fixed. > > > Kindly have a look at the latest v17 patch in [1]. > > > [1] - https://www.postgresql.org/message-id/TYCPR01MB8373F5162C7A0E6224670CF0EDC49%40TYCPR01MB8373.jpnprd01.prod.outlook.com > > Best Regards, > Takamichi Osumi > 1) Tried different variations of altering 'min_apply_delay'. All passed except one below: postgres=# alter subscription mysubnew set (min_apply_delay = '10.9min 1ms'); ALTER SUBSCRIPTION postgres=# alter subscription mysubnew set (min_apply_delay = '10.9min 2s 1ms'); ALTER SUBSCRIPTION --very similar to above but fails, postgres=# alter subscription mysubnew set (min_apply_delay = '10.9s 1ms'); ERROR: invalid input syntax for type interval: "10.9s 1ms" 2) Logging: 2023-01-19 17:33:16.202 IST [404797] DEBUG: logical replication apply delay: 19979 ms 2023-01-19 17:33:26.212 IST [404797] DEBUG: logical replication apply delay: 9969 ms 2023-01-19 17:34:25.730 IST [404962] DEBUG: logical replication apply delay: 179988 ms-->previous wait over, started for next txn 2023-01-19 17:34:35.737 IST [404962] DEBUG: logical replication apply delay: 169981 ms 2023-01-19 17:34:45.746 IST [404962] DEBUG: logical replication apply delay: 159972 ms Is there a way to distinguish between these logs? Maybe dumping xids along-with? thanks Shveta
pgsql-hackers by date: