RE: Time delayed LR (WAS Re: logical replication restrictions) - Mailing list pgsql-hackers

From Hayato Kuroda (Fujitsu)
Subject RE: Time delayed LR (WAS Re: logical replication restrictions)
Date
Msg-id TYAPR01MB5866F8DE90C45B4EFAFD726BF5B29@TYAPR01MB5866.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Time delayed LR (WAS Re: logical replication restrictions)  (vignesh C <vignesh21@gmail.com>)
List pgsql-hackers
Dear Vignesh,

Thank you for reviewing! New version can be available at [1].

> 1) Currently we have added the delay during the decode of commit,
> while decoding the commit walsender process will stop decoding any
> further transaction until delay is completed. There might be a
> possibility that a lot of transactions will happen in parallel and
> there will be a lot of transactions to be decoded after the delay is
> completed.
> Will it be possible to decode the WAL if any WAL is generated instead
> of staying idle in the meantime, I'm not sure if this is feasible just
> throwing my thought to see if it might be possible.
> --- a/src/backend/replication/logical/decode.c
> +++ b/src/backend/replication/logical/decode.c
> @@ -676,6 +676,15 @@ DecodeCommit(LogicalDecodingContext *ctx,
> XLogRecordBuffer *buf,
> 
> buf->origptr, buf->endptr);
>         }
> 
> +       /*
> +        * Delay sending the changes if required. For streaming transactions,
> +        * this means a delay in sending the last stream but that is OK
> +        * because on the downstream the changes will be applied only after
> +        * receiving the last stream.
> +        */
> +       if (ctx->min_send_delay > 0 && ctx->delay_send)
> +               ctx->delay_send(ctx, xid, commit_time);
> +

I see your point, but I think that extension can be done in future version if needed.
This is because we must change some parts and introduce some complexities.

If we have decoded but have not wanted to send changes yet, we must store them in
the memory one and skip sending. In order to do that we must add new data structure,
and we must add another path in DecodeCommit, DecodePrepare not to send changes
and in WalSndLoop() and other functions to send pending changes. These may not be sufficient. 

I'm now thinking aboves are not needed, we can modify later if the overhead of
decoding is quite large and we must do them very efficiently.

> 2) Generally single line comments are not terminated by ".", The
> comment "/* Sleep until appropriate time. */" can be changed
> appropriately:
> +
> +               /* Sleep until appropriate time. */
> +               timeout_sleeptime_ms = WalSndComputeSleeptime(now);
> +
> +               elog(DEBUG2, "time-delayed replication for txid %u,
> delay_time = %d ms, remaining wait time: %ld ms",
> +                        xid, (int) ctx->min_send_delay,
> remaining_wait_time_ms);
> +
> +               /* Sleep until we get reply from worker or we time out */
> +               WalSndWait(WL_SOCKET_READABLE,

Right, removed.

> 3) In some places we mention as min_send_delay and in some places we
> mention it as time-delayed replication, we can keep the comment
> consistent by using the similar wordings.
> +-- fail - specifying streaming = parallel with time-delayed replication is not
> +-- supported
> +CREATE SUBSCRIPTION regress_testsub CONNECTION
> 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect =
> false, streaming = parallel, min_send_delay = 123);
> 
> +-- fail - alter subscription with streaming = parallel should fail when
> +-- time-delayed replication is set
> +ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
> 
> +-- fail - alter subscription with min_send_delay should fail when
> +-- streaming = parallel is set

"time-delayed replication" was removed.

> 4) Since the value is stored in ms, we need not add ms again as the
> default value is in ms:
> @@ -4686,6 +4694,9 @@ dumpSubscription(Archive *fout, const
> SubscriptionInfo *subinfo)
>         if (strcmp(subinfo->subsynccommit, "off") != 0)
>                 appendPQExpBuffer(query, ", synchronous_commit = %s",
> fmtId(subinfo->subsynccommit));
> 
> +       if (subinfo->subminsenddelay > 0)
> +               appendPQExpBuffer(query, ", min_send_delay = '%d ms'",
> subinfo->subminsenddelay);

Right, fixed.

> 5) we can use the new error reporting style:
> 5.a) brackets around errcode can be removed
> +               ereport(ERROR,
> +
> (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                                errmsg("invalid value for parameter
> \"%s\": \"%s\"",
> +                                               "min_send_delay",
> input_string),
> +                                hintmsg ? errhint("%s", _(hintmsg)) : 0));
> 
> 5.b) Similarly here too;
> +       if (result < 0 || result > PG_INT32_MAX)
> +               ereport(ERROR,
> +
> (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                                errmsg("%d ms is outside the valid
> range for parameter \"%s\" (%d .. %d)",
> +                                               result,
> +                                               "min_send_delay",
> +                                               0, PG_INT32_MAX)));
> 
> 5.c) Similarly here too;
> +                       delay_val = strtoul(strVal(defel->arg), &endptr, 10);
> +                       if (errno != 0 || *endptr != '\0')
> +                               ereport(ERROR,
> +
> (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                                                errmsg("invalid
> min_send_delay")));
> 
> 
> 5.d) Similarly here too;
> +                       if (delay_val > PG_INT32_MAX)
> +                               ereport(ERROR,
> +
> (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +
> errmsg("min_send_delay \"%s\" out of range",
> +
> strVal(defel->arg))));

All of them are fixed.

> 6) This can be changed to a single line comment:
> +       /*
> +        * Parse given string as parameter which has millisecond unit
> +        */
> +       if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg))
> +               ereport(ERROR,

Changed. I grepped ereport() in the patch and I thought there were no similar one.

> 7) In the expect we have specifically mention "for non-streaming
> transaction", is the behavior different for streaming transaction, if
> not we can change the message accordingly
> +# The publisher waits for the replication to complete
> +$node_publisher->wait_for_catchup('tap_sub_renamed');
> +
> +# This test is successful only if at least the configured delay has elapsed.
> +ok( time() - $publisher_insert_time >= $delay,
> +       "subscriber applies changes only after replication delay for
> non-streaming transaction"
> +);

There is no difference, both of normal and streamed transaction could be delayed to apply.
So removed.

[1]:
https://www.postgresql.org/message-id/flat/TYAPR01MB586606CF3B585B6F8BE13A9CF5B29%40TYAPR01MB5866.jpnprd01.prod.outlook.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


pgsql-hackers by date:

Previous
From: "Hayato Kuroda (Fujitsu)"
Date:
Subject: RE: Time delayed LR (WAS Re: logical replication restrictions)
Next
From: "Hayato Kuroda (Fujitsu)"
Date:
Subject: RE: Time delayed LR (WAS Re: logical replication restrictions)