Re: Time delayed LR (WAS Re: logical replication restrictions) - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: Time delayed LR (WAS Re: logical replication restrictions) |
Date | |
Msg-id | CALDaNm242g=Y-33__jC1CabPprKEsCLkAYtbHJomFcOfHa2iJQ@mail.gmail.com Whole thread Raw |
In response to | RE: Time delayed LR (WAS Re: logical replication restrictions) ("Hayato Kuroda (Fujitsu)" <kuroda.hayato@fujitsu.com>) |
Responses |
RE: Time delayed LR (WAS Re: logical replication restrictions)
|
List | pgsql-hackers |
On Tue, 28 Feb 2023 at 21:21, Hayato Kuroda (Fujitsu) <kuroda.hayato@fujitsu.com> wrote: > > Dear Amit, > > > Few comments: > > Thank you for reviewing! PSA new version. Thanks for the updated patch, few comments: 1) Currently we have added the delay during the decode of commit, while decoding the commit walsender process will stop decoding any further transaction until delay is completed. There might be a possibility that a lot of transactions will happen in parallel and there will be a lot of transactions to be decoded after the delay is completed. Will it be possible to decode the WAL if any WAL is generated instead of staying idle in the meantime, I'm not sure if this is feasible just throwing my thought to see if it might be possible. --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -676,6 +676,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* + * Delay sending the changes if required. For streaming transactions, + * this means a delay in sending the last stream but that is OK + * because on the downstream the changes will be applied only after + * receiving the last stream. + */ + if (ctx->min_send_delay > 0 && ctx->delay_send) + ctx->delay_send(ctx, xid, commit_time); + 2) Generally single line comments are not terminated by ".", The comment "/* Sleep until appropriate time. */" can be changed appropriately: + + /* Sleep until appropriate time. */ + timeout_sleeptime_ms = WalSndComputeSleeptime(now); + + elog(DEBUG2, "time-delayed replication for txid %u, delay_time = %d ms, remaining wait time: %ld ms", + xid, (int) ctx->min_send_delay, remaining_wait_time_ms); + + /* Sleep until we get reply from worker or we time out */ + WalSndWait(WL_SOCKET_READABLE, 3) In some places we mention as min_send_delay and in some places we mention it as time-delayed replication, we can keep the comment consistent by using the similar wordings. +-- fail - specifying streaming = parallel with time-delayed replication is not +-- supported +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_send_delay = 123); +-- fail - alter subscription with streaming = parallel should fail when +-- time-delayed replication is set +ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); +-- fail - alter subscription with min_send_delay should fail when +-- streaming = parallel is set 4) Since the value is stored in ms, we need not add ms again as the default value is in ms: @@ -4686,6 +4694,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); + if (subinfo->subminsenddelay > 0) + appendPQExpBuffer(query, ", min_send_delay = '%d ms'", subinfo->subminsenddelay); + 5) we can use the new error reporting style: 5.a) brackets around errcode can be removed + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter \"%s\": \"%s\"", + "min_send_delay", input_string), + hintmsg ? errhint("%s", _(hintmsg)) : 0)); 5.b) Similarly here too; + if (result < 0 || result > PG_INT32_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)", + result, + "min_send_delay", + 0, PG_INT32_MAX))); 5.c) Similarly here too; + delay_val = strtoul(strVal(defel->arg), &endptr, 10); + if (errno != 0 || *endptr != '\0') + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid min_send_delay"))); 5.d) Similarly here too; + if (delay_val > PG_INT32_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("min_send_delay \"%s\" out of range", + strVal(defel->arg)))); 6) This can be changed to a single line comment: + /* + * Parse given string as parameter which has millisecond unit + */ + if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg)) + ereport(ERROR, 7) In the expect we have specifically mention "for non-streaming transaction", is the behavior different for streaming transaction, if not we can change the message accordingly +# The publisher waits for the replication to complete +$node_publisher->wait_for_catchup('tap_sub_renamed'); + +# This test is successful only if at least the configured delay has elapsed. +ok( time() - $publisher_insert_time >= $delay, + "subscriber applies changes only after replication delay for non-streaming transaction" +); Regards, Vignesh
pgsql-hackers by date: