Re: Time delayed LR (WAS Re: logical replication restrictions) - Mailing list pgsql-hackers

From vignesh C
Subject Re: Time delayed LR (WAS Re: logical replication restrictions)
Date
Msg-id CALDaNm242g=Y-33__jC1CabPprKEsCLkAYtbHJomFcOfHa2iJQ@mail.gmail.com
Whole thread Raw
In response to RE: Time delayed LR (WAS Re: logical replication restrictions)  ("Hayato Kuroda (Fujitsu)" <kuroda.hayato@fujitsu.com>)
Responses RE: Time delayed LR (WAS Re: logical replication restrictions)  ("Hayato Kuroda (Fujitsu)" <kuroda.hayato@fujitsu.com>)
List pgsql-hackers
On Tue, 28 Feb 2023 at 21:21, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
>
> Dear Amit,
>
> > Few comments:
>
> Thank you for reviewing! PSA new version.

Thanks for the updated patch, few comments:
1) Currently we have added the delay during the decode of commit,
while decoding the commit walsender process will stop decoding any
further transaction until delay is completed. There might be a
possibility that a lot of transactions will happen in parallel and
there will be a lot of transactions to be decoded after the delay is
completed.
Will it be possible to decode the WAL if any WAL is generated instead
of staying idle in the meantime, I'm not sure if this is feasible just
throwing my thought to see if it might be possible.
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -676,6 +676,15 @@ DecodeCommit(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf,

buf->origptr, buf->endptr);
        }

+       /*
+        * Delay sending the changes if required. For streaming transactions,
+        * this means a delay in sending the last stream but that is OK
+        * because on the downstream the changes will be applied only after
+        * receiving the last stream.
+        */
+       if (ctx->min_send_delay > 0 && ctx->delay_send)
+               ctx->delay_send(ctx, xid, commit_time);
+

2) Generally single line comments are not terminated by ".", The
comment "/* Sleep until appropriate time. */" can be changed
appropriately:
+
+               /* Sleep until appropriate time. */
+               timeout_sleeptime_ms = WalSndComputeSleeptime(now);
+
+               elog(DEBUG2, "time-delayed replication for txid %u,
delay_time = %d ms, remaining wait time: %ld ms",
+                        xid, (int) ctx->min_send_delay,
remaining_wait_time_ms);
+
+               /* Sleep until we get reply from worker or we time out */
+               WalSndWait(WL_SOCKET_READABLE,

3) In some places we mention as min_send_delay and in some places we
mention it as time-delayed replication, we can keep the comment
consistent by using the similar wordings.
+-- fail - specifying streaming = parallel with time-delayed replication is not
+-- supported
+CREATE SUBSCRIPTION regress_testsub CONNECTION
'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect =
false, streaming = parallel, min_send_delay = 123);

+-- fail - alter subscription with streaming = parallel should fail when
+-- time-delayed replication is set
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);

+-- fail - alter subscription with min_send_delay should fail when
+-- streaming = parallel is set

4) Since the value is stored in ms, we need not add ms again as the
default value is in ms:
@@ -4686,6 +4694,9 @@ dumpSubscription(Archive *fout, const
SubscriptionInfo *subinfo)
        if (strcmp(subinfo->subsynccommit, "off") != 0)
                appendPQExpBuffer(query, ", synchronous_commit = %s",
fmtId(subinfo->subsynccommit));

+       if (subinfo->subminsenddelay > 0)
+               appendPQExpBuffer(query, ", min_send_delay = '%d ms'",
subinfo->subminsenddelay);
+

5) we can use the new error reporting style:
5.a) brackets around errcode can be removed
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("invalid value for parameter
\"%s\": \"%s\"",
+                                               "min_send_delay", input_string),
+                                hintmsg ? errhint("%s", _(hintmsg)) : 0));

5.b) Similarly here too;
+       if (result < 0 || result > PG_INT32_MAX)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("%d ms is outside the valid
range for parameter \"%s\" (%d .. %d)",
+                                               result,
+                                               "min_send_delay",
+                                               0, PG_INT32_MAX)));

5.c) Similarly here too;
+                       delay_val = strtoul(strVal(defel->arg), &endptr, 10);
+                       if (errno != 0 || *endptr != '\0')
+                               ereport(ERROR,
+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid
min_send_delay")));


5.d) Similarly here too;
+                       if (delay_val > PG_INT32_MAX)
+                               ereport(ERROR,
+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+
errmsg("min_send_delay \"%s\" out of range",
+
strVal(defel->arg))));


6) This can be changed to a single line comment:
+       /*
+        * Parse given string as parameter which has millisecond unit
+        */
+       if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg))
+               ereport(ERROR,

7) In the expect we have specifically mention "for non-streaming
transaction", is the behavior different for streaming transaction, if
not we can change the message accordingly
+# The publisher waits for the replication to complete
+$node_publisher->wait_for_catchup('tap_sub_renamed');
+
+# This test is successful only if at least the configured delay has elapsed.
+ok( time() - $publisher_insert_time >= $delay,
+       "subscriber applies changes only after replication delay for
non-streaming transaction"
+);

Regards,
Vignesh



pgsql-hackers by date:

Previous
From: Tomas Vondra
Date:
Subject: Re: Add LZ4 compression in pg_dump
Next
From: Tomas Vondra
Date:
Subject: Re: Add LZ4 compression in pg_dump