Thread: Force streaming every change in logical decoding

Force streaming every change in logical decoding

From
"shiy.fnst@fujitsu.com"
Date:
Hi hackers,

In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
sent to output plugin in streaming mode. But there is a restriction that the
minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
allow sending every change to output plugin without waiting until
logical_decoding_work_mem is exceeded.

This helps to test streaming mode. For example, to test "Avoid streaming the
transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
messages. With the new option, it can be tested with fewer changes and in less
time. Also, this new option helps to test more scenarios for "Perform streaming
logical transactions by background workers" [2].

[1] https://www.postgresql.org/message-id/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
[2] https://www.postgresql.org/message-id/flat/CAA4eK1%2BwyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw%40mail.gmail.com

Regards,
Shi yu

Attachment

Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> Hi hackers,
>
> In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> sent to output plugin in streaming mode. But there is a restriction that the
> minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> allow sending every change to output plugin without waiting until
> logical_decoding_work_mem is exceeded.
>
> This helps to test streaming mode. For example, to test "Avoid streaming the
> transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> messages. With the new option, it can be tested with fewer changes and in less
> time. Also, this new option helps to test more scenarios for "Perform streaming
> logical transactions by background workers" [2].
>

Yeah, I think this can also help in reducing the time for various
tests in test_decoding/stream and
src/test/subscription/t/*_stream_*.pl file by reducing the number of
changes required to invoke streaming mode. Can we think of making this
GUC extendible to even test more options on server-side (publisher)
and client-side (subscriber) cases? For example, we can have something
like logical_replication_mode with the following valid values: (a)
server_serialize: this will serialize each change to file on
publishers and then on commit restore and send all changes; (b)
server_stream: this will stream each change as currently proposed in
your patch. Then if we want to extend it for subscriber-side testing
then we can introduce new options like client_serialize for the case
being discussed in the email [1].

Thoughts?

[1] - https://www.postgresql.org/message-id/CAD21AoAVUfDrm4-%3DykihNAmR7bTX-KpHXM9jc42RbHePJv5k1w%40mail.gmail.com

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Tue, Dec 6, 2022 at 7:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> > Hi hackers,
> >
> > In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> > sent to output plugin in streaming mode. But there is a restriction that the
> > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> > allow sending every change to output plugin without waiting until
> > logical_decoding_work_mem is exceeded.
> >
> > This helps to test streaming mode. For example, to test "Avoid streaming the
> > transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> > messages. With the new option, it can be tested with fewer changes and in less
> > time. Also, this new option helps to test more scenarios for "Perform streaming
> > logical transactions by background workers" [2].
> >
>
> Yeah, I think this can also help in reducing the time for various
> tests in test_decoding/stream and
> src/test/subscription/t/*_stream_*.pl file by reducing the number of
> changes required to invoke streaming mode.

+1

> Can we think of making this
> GUC extendible to even test more options on server-side (publisher)
> and client-side (subscriber) cases? For example, we can have something
> like logical_replication_mode with the following valid values: (a)
> server_serialize: this will serialize each change to file on
> publishers and then on commit restore and send all changes; (b)
> server_stream: this will stream each change as currently proposed in
> your patch. Then if we want to extend it for subscriber-side testing
> then we can introduce new options like client_serialize for the case
> being discussed in the email [1].

Setting logical_replication_mode = 'client_serialize' implies that the
publisher behaves as server_stream? or do you mean we can set like
logical_replication_mode = 'server_stream, client_serialize'?

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Peter Smith
Date:
On Tue, Dec 6, 2022 at 9:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> > Hi hackers,
> >
> > In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> > sent to output plugin in streaming mode. But there is a restriction that the
> > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> > allow sending every change to output plugin without waiting until
> > logical_decoding_work_mem is exceeded.
> >
> > This helps to test streaming mode. For example, to test "Avoid streaming the
> > transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> > messages. With the new option, it can be tested with fewer changes and in less
> > time. Also, this new option helps to test more scenarios for "Perform streaming
> > logical transactions by background workers" [2].
> >

+1

>
> Yeah, I think this can also help in reducing the time for various
> tests in test_decoding/stream and
> src/test/subscription/t/*_stream_*.pl file by reducing the number of
> changes required to invoke streaming mode. Can we think of making this
> GUC extendible to even test more options on server-side (publisher)
> and client-side (subscriber) cases? For example, we can have something
> like logical_replication_mode with the following valid values: (a)
> server_serialize: this will serialize each change to file on
> publishers and then on commit restore and send all changes; (b)
> server_stream: this will stream each change as currently proposed in
> your patch. Then if we want to extend it for subscriber-side testing
> then we can introduce new options like client_serialize for the case
> being discussed in the email [1].
>
> Thoughts?

There is potential for lots of developer GUCs for testing/debugging in
the area of logical replication but IMO it might be better to keep
them all separated. Putting everything into a single
'logical_replication_mode' might cause difficulties later when/if you
want combinations of the different modes.

For example, instead of

logical_replication_mode = XXX/YYY/ZZZ

maybe something like below will give more flexibility.

logical_replication_dev_XXX = true/false
logical_replication_dev_YYY = true/false
logical_replication_dev_ZZZ = true/false

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: Force streaming every change in logical decoding

From
Peter Smith
Date:
On Tue, Dec 6, 2022 at 5:23 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> Hi hackers,
>
> In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> sent to output plugin in streaming mode. But there is a restriction that the
> minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> allow sending every change to output plugin without waiting until
> logical_decoding_work_mem is exceeded.
>

Some review comments for patch v1-0001.

1. Typos

In several places "Wheather/wheather" -> "Whether/whether"

======

src/backend/replication/logical/reorderbuffer.c

2. ReorderBufferCheckMemoryLimit

 {
  ReorderBufferTXN *txn;

- /* bail out if we haven't exceeded the memory limit */
- if (rb->size < logical_decoding_work_mem * 1024L)
+ /*
+ * Stream the changes immediately if force_stream_mode is on and the output
+ * plugin supports streaming. Otherwise wait until size exceeds
+ * logical_decoding_work_mem.
+ */
+ bool force_stream = (force_stream_mode && ReorderBufferCanStream(rb));
+
+ /* bail out if force_stream is false and we haven't exceeded the
memory limit */
+ if (!force_stream && rb->size < logical_decoding_work_mem * 1024L)
  return;

  /*
- * Loop until we reach under the memory limit.  One might think that just
- * by evicting the largest (sub)transaction we will come under the memory
- * limit based on assumption that the selected transaction is at least as
- * large as the most recent change (which caused us to go over the memory
- * limit). However, that is not true because a user can reduce the
+ * If force_stream is true, loop until there's no change. Otherwise, loop
+ * until we reach under the memory limit. One might think that just by
+ * evicting the largest (sub)transaction we will come under the memory limit
+ * based on assumption that the selected transaction is at least as large as
+ * the most recent change (which caused us to go over the memory limit).
+ * However, that is not true because a user can reduce the
  * logical_decoding_work_mem to a smaller value before the most recent
  * change.
  */
- while (rb->size >= logical_decoding_work_mem * 1024L)
+ while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) ||
+    (force_stream && rb->size > 0))
  {
  /*
  * Pick the largest transaction (or subtransaction) and evict it from

IIUC this logic can be simplified quite a lot just by shifting that
"bail out" condition into the loop.

Something like:

while (true)
{
if (!(force_stream && rb->size > 0 || rb->size <
logical_decoding_work_mem * 1024L))
break;
...
}

------

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Wed, Dec 7, 2022 at 8:46 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> On Tue, Dec 6, 2022 at 9:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
> > <shiy.fnst@fujitsu.com> wrote:
> > >
> > > Hi hackers,
> > >
> > > In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> > > sent to output plugin in streaming mode. But there is a restriction that the
> > > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> > > allow sending every change to output plugin without waiting until
> > > logical_decoding_work_mem is exceeded.
> > >
> > > This helps to test streaming mode. For example, to test "Avoid streaming the
> > > transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> > > messages. With the new option, it can be tested with fewer changes and in less
> > > time. Also, this new option helps to test more scenarios for "Perform streaming
> > > logical transactions by background workers" [2].
> > >
>
> +1
>
> >
> > Yeah, I think this can also help in reducing the time for various
> > tests in test_decoding/stream and
> > src/test/subscription/t/*_stream_*.pl file by reducing the number of
> > changes required to invoke streaming mode. Can we think of making this
> > GUC extendible to even test more options on server-side (publisher)
> > and client-side (subscriber) cases? For example, we can have something
> > like logical_replication_mode with the following valid values: (a)
> > server_serialize: this will serialize each change to file on
> > publishers and then on commit restore and send all changes; (b)
> > server_stream: this will stream each change as currently proposed in
> > your patch. Then if we want to extend it for subscriber-side testing
> > then we can introduce new options like client_serialize for the case
> > being discussed in the email [1].
> >
> > Thoughts?
>
> There is potential for lots of developer GUCs for testing/debugging in
> the area of logical replication but IMO it might be better to keep
> them all separated. Putting everything into a single
> 'logical_replication_mode' might cause difficulties later when/if you
> want combinations of the different modes.

I think we want the developer option that forces streaming changes
during logical decoding to be PGC_USERSET but probably the developer
option for testing the parallel apply feature would be PGC_SIGHUP.
Also, since streaming changes is not specific to logical replication
but to logical decoding, I'm not sure logical_replication_XXX is a
good name. IMO having force_stream_mode and a different GUC for
testing the parallel apply feature makes sense to me.

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Tue, Dec 6, 2022 at 7:18 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Tue, Dec 6, 2022 at 7:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
> > <shiy.fnst@fujitsu.com> wrote:
> > >
> > > Hi hackers,
> > >
> > > In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> > > sent to output plugin in streaming mode. But there is a restriction that the
> > > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> > > allow sending every change to output plugin without waiting until
> > > logical_decoding_work_mem is exceeded.
> > >
> > > This helps to test streaming mode. For example, to test "Avoid streaming the
> > > transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> > > messages. With the new option, it can be tested with fewer changes and in less
> > > time. Also, this new option helps to test more scenarios for "Perform streaming
> > > logical transactions by background workers" [2].
> > >
> >
> > Yeah, I think this can also help in reducing the time for various
> > tests in test_decoding/stream and
> > src/test/subscription/t/*_stream_*.pl file by reducing the number of
> > changes required to invoke streaming mode.
>
> +1
>
> > Can we think of making this
> > GUC extendible to even test more options on server-side (publisher)
> > and client-side (subscriber) cases? For example, we can have something
> > like logical_replication_mode with the following valid values: (a)
> > server_serialize: this will serialize each change to file on
> > publishers and then on commit restore and send all changes; (b)
> > server_stream: this will stream each change as currently proposed in
> > your patch. Then if we want to extend it for subscriber-side testing
> > then we can introduce new options like client_serialize for the case
> > being discussed in the email [1].
>
> Setting logical_replication_mode = 'client_serialize' implies that the
> publisher behaves as server_stream? or do you mean we can set like
> logical_replication_mode = 'server_stream, client_serialize'?
>

The latter one (logical_replication_mode = 'server_stream,
client_serialize'). The idea is to cover more options with one GUC and
each option can be used individually as well as in combination with
others.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Wed, Dec 7, 2022 at 7:31 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Wed, Dec 7, 2022 at 8:46 AM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > >
> > > Yeah, I think this can also help in reducing the time for various
> > > tests in test_decoding/stream and
> > > src/test/subscription/t/*_stream_*.pl file by reducing the number of
> > > changes required to invoke streaming mode. Can we think of making this
> > > GUC extendible to even test more options on server-side (publisher)
> > > and client-side (subscriber) cases? For example, we can have something
> > > like logical_replication_mode with the following valid values: (a)
> > > server_serialize: this will serialize each change to file on
> > > publishers and then on commit restore and send all changes; (b)
> > > server_stream: this will stream each change as currently proposed in
> > > your patch. Then if we want to extend it for subscriber-side testing
> > > then we can introduce new options like client_serialize for the case
> > > being discussed in the email [1].
> > >
> > > Thoughts?
> >
> > There is potential for lots of developer GUCs for testing/debugging in
> > the area of logical replication but IMO it might be better to keep
> > them all separated. Putting everything into a single
> > 'logical_replication_mode' might cause difficulties later when/if you
> > want combinations of the different modes.
>
> I think we want the developer option that forces streaming changes
> during logical decoding to be PGC_USERSET but probably the developer
> option for testing the parallel apply feature would be PGC_SIGHUP.
>

Ideally, that is true but if we want to combine the multiple modes in
one parameter, is there a harm in keeping it as PGC_SIGHUP?

> Also, since streaming changes is not specific to logical replication
> but to logical decoding, I'm not sure logical_replication_XXX is a
> good name. IMO having force_stream_mode and a different GUC for
> testing the parallel apply feature makes sense to me.
>

But if we want to have a separate variable for testing/debugging
streaming like force_stream_mode, why not for serializing as well? And
if we want for both then we can even think of combining them in one
variable as logical_decoding_mode with values as 'stream' and
'serialize'. The first one specified would be given preference. Also,
the name force_stream_mode doesn't seem to convey that it is for
logical decoding. We can probably have a separate variable for the
subscriber side.

On one side having separate GUCs for publisher and subscriber seems to
give better flexibility but having more GUCs also sometimes makes them
less usable. Here, my thought was to have a single or as few GUCs as
possible which can be extendible by providing multiple values instead
of having different GUCs. I was trying to map this with the existing
string parameters in developer options.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Wed, Dec 7, 2022 at 12:55 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Dec 7, 2022 at 7:31 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >
> > On Wed, Dec 7, 2022 at 8:46 AM Peter Smith <smithpb2250@gmail.com> wrote:
> > >
> > > >
> > > > Yeah, I think this can also help in reducing the time for various
> > > > tests in test_decoding/stream and
> > > > src/test/subscription/t/*_stream_*.pl file by reducing the number of
> > > > changes required to invoke streaming mode. Can we think of making this
> > > > GUC extendible to even test more options on server-side (publisher)
> > > > and client-side (subscriber) cases? For example, we can have something
> > > > like logical_replication_mode with the following valid values: (a)
> > > > server_serialize: this will serialize each change to file on
> > > > publishers and then on commit restore and send all changes; (b)
> > > > server_stream: this will stream each change as currently proposed in
> > > > your patch. Then if we want to extend it for subscriber-side testing
> > > > then we can introduce new options like client_serialize for the case
> > > > being discussed in the email [1].
> > > >
> > > > Thoughts?
> > >
> > > There is potential for lots of developer GUCs for testing/debugging in
> > > the area of logical replication but IMO it might be better to keep
> > > them all separated. Putting everything into a single
> > > 'logical_replication_mode' might cause difficulties later when/if you
> > > want combinations of the different modes.
> >
> > I think we want the developer option that forces streaming changes
> > during logical decoding to be PGC_USERSET but probably the developer
> > option for testing the parallel apply feature would be PGC_SIGHUP.
> >
>
> Ideally, that is true but if we want to combine the multiple modes in
> one parameter, is there a harm in keeping it as PGC_SIGHUP?

It's not a big harm but we will end up doing ALTER SYSTEM and
pg_reload_conf() even in regression tests (e.g. in
test_decoding/stream.sql).

>
> > Also, since streaming changes is not specific to logical replication
> > but to logical decoding, I'm not sure logical_replication_XXX is a
> > good name. IMO having force_stream_mode and a different GUC for
> > testing the parallel apply feature makes sense to me.
> >
>
> But if we want to have a separate variable for testing/debugging
> streaming like force_stream_mode, why not for serializing as well? And
> if we want for both then we can even think of combining them in one
> variable as logical_decoding_mode with values as 'stream' and
> 'serialize'.

Making it enum makes sense to me.

> The first one specified would be given preference. Also,
> the name force_stream_mode doesn't seem to convey that it is for
> logical decoding.

Agreed.

> On one side having separate GUCs for publisher and subscriber seems to
> give better flexibility but having more GUCs also sometimes makes them
> less usable. Here, my thought was to have a single or as few GUCs as
> possible which can be extendible by providing multiple values instead
> of having different GUCs. I was trying to map this with the existing
> string parameters in developer options.

I see your point. On the other hand, I'm not sure it's a good idea to
control different features by one GUC in general. The developer option
could be an exception?

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Wed, Dec 7, 2022 at 10:55 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Wed, Dec 7, 2022 at 12:55 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
>
> > On one side having separate GUCs for publisher and subscriber seems to
> > give better flexibility but having more GUCs also sometimes makes them
> > less usable. Here, my thought was to have a single or as few GUCs as
> > possible which can be extendible by providing multiple values instead
> > of having different GUCs. I was trying to map this with the existing
> > string parameters in developer options.
>
> I see your point. On the other hand, I'm not sure it's a good idea to
> control different features by one GUC in general. The developer option
> could be an exception?
>

I am not sure what is the best thing if this was proposed as a
non-developer option but it seems to me that having a single parameter
for publisher/subscriber, in this case, can serve our need for
testing/debugging. BTW, even though it is not a very good example but
we use max_replication_slots for different purposes on the publisher
(the limit for slots) and subscriber (the limit for origins).

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Dilip Kumar
Date:
On Wed, Dec 7, 2022 at 5:16 AM Peter Smith <smithpb2250@gmail.com> wrote:

+1 for the idea

>
> There is potential for lots of developer GUCs for testing/debugging in
> the area of logical replication but IMO it might be better to keep
> them all separated. Putting everything into a single
> 'logical_replication_mode' might cause difficulties later when/if you
> want combinations of the different modes.
>
> For example, instead of
>
> logical_replication_mode = XXX/YYY/ZZZ
>
> maybe something like below will give more flexibility.
>
> logical_replication_dev_XXX = true/false
> logical_replication_dev_YYY = true/false
> logical_replication_dev_ZZZ = true/false
>

Even I agree that usability wise keeping them independent is better.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Force streaming every change in logical decoding

From
Dilip Kumar
Date:
On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> Hi hackers,
>
> In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> sent to output plugin in streaming mode. But there is a restriction that the
> minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> allow sending every change to output plugin without waiting until
> logical_decoding_work_mem is exceeded.
>
> This helps to test streaming mode. For example, to test "Avoid streaming the
> transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> messages. With the new option, it can be tested with fewer changes and in less
> time. Also, this new option helps to test more scenarios for "Perform streaming
> logical transactions by background workers" [2].

Some comments on the patch

1. Can you add one test case using this option

2. +     <varlistentry id="guc-force-stream-mode" xreflabel="force_stream_mode">
+      <term><varname>force_stream_mode</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>force_stream_mode</varname> configuration
parameter</primary>
+      </indexterm>
+      </term>

This GUC name "force_stream_mode" somehow appears like we are forcing
this streaming mode irrespective of whether the
subscriber has requested for this mode or not.  But actually it is not
that, it is just streaming each change if
it is enabled.  So we might need to think on the name (at least we
should avoid using *mode* in the name IMHO).

3.
-    while (rb->size >= logical_decoding_work_mem * 1024L)
+    while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) ||
+           (force_stream && rb->size > 0))
     {

It seems like if force_stream is on then indirectly it is enabling
force serialization as well.  Because once we enter into the loop
based on "force_stream" then it will either stream or serialize but I
guess we do not want to force serialize based on this parameter.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Force streaming every change in logical decoding

From
Peter Smith
Date:
On Sat, Dec 10, 2022 at 5:03 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> > Hi hackers,
> >
> > In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> > sent to output plugin in streaming mode. But there is a restriction that the
> > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> > allow sending every change to output plugin without waiting until
> > logical_decoding_work_mem is exceeded.
> >
> > This helps to test streaming mode. For example, to test "Avoid streaming the
> > transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> > messages. With the new option, it can be tested with fewer changes and in less
> > time. Also, this new option helps to test more scenarios for "Perform streaming
> > logical transactions by background workers" [2].
>
> Some comments on the patch
>
...
> This GUC name "force_stream_mode" somehow appears like we are forcing
> this streaming mode irrespective of whether the
> subscriber has requested for this mode or not.  But actually it is not
> that, it is just streaming each change if
> it is enabled.  So we might need to think on the name (at least we
> should avoid using *mode* in the name IMHO).
>

I thought the same. Names like those shown below might be more appropriate:
stream_checks_work_mem = true/false
stream_mode_checks_size = true/false
stream_for_large_tx_only = true/false
... etc.

The GUC name length could get a bit unwieldy but isn't it better for
it to have the correct meaning than to have a shorter but slightly
ambiguous name? Anyway, it is a developer option so I guess longer
names are less of a problem.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Sat, Dec 10, 2022 at 11:18 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Wed, Dec 7, 2022 at 5:16 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> +1 for the idea
>
> >
> > There is potential for lots of developer GUCs for testing/debugging in
> > the area of logical replication but IMO it might be better to keep
> > them all separated. Putting everything into a single
> > 'logical_replication_mode' might cause difficulties later when/if you
> > want combinations of the different modes.
> >
> > For example, instead of
> >
> > logical_replication_mode = XXX/YYY/ZZZ
> >
> > maybe something like below will give more flexibility.
> >
> > logical_replication_dev_XXX = true/false
> > logical_replication_dev_YYY = true/false
> > logical_replication_dev_ZZZ = true/false
> >
>
> Even I agree that usability wise keeping them independent is better.
>

But OTOH, doesn't introducing multiple GUCs (one to allow streaming
each change, another to allow serialization, and a third one to
probably test subscriber-side work) for the purpose of testing, and
debugging logical replication code sound a bit more?

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Peter Smith
Date:
On Tue, Dec 6, 2022 at 5:23 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> Hi hackers,
>
> In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> sent to output plugin in streaming mode. But there is a restriction that the
> minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> allow sending every change to output plugin without waiting until
> logical_decoding_work_mem is exceeded.
>
> This helps to test streaming mode. For example, to test "Avoid streaming the
> transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> messages. With the new option, it can be tested with fewer changes and in less
> time. Also, this new option helps to test more scenarios for "Perform streaming
> logical transactions by background workers" [2].
>
> [1] https://www.postgresql.org/message-id/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
> [2]
https://www.postgresql.org/message-id/flat/CAA4eK1%2BwyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw%40mail.gmail.com
>

Hi, I've been doing some testing that makes use of this new developer
GUC `force_stream_mode`.

IIUC this GUC is used by the walsender during the logic of the
ReorderBufferCheckMemoryLimit(). Also, AFAIK the only way that the
walsender is going to know this GUC value is by inheritance from the
parent publisher at the time the walsender process gets launched.

I may be overthinking this. Isn't there potential for this to become
quite confusing depending on the timing of when this GUC is modified?

E.g.1 When the walsender is launched, it will use whatever is the
current value of this GUC.
E.g.2 But if the GUC is changed after the walsender is already
launched, then that will have no effect on the already running
walsender.

Is that understanding correct?

------
Kind Regards,
Peter Smith.
Fujitsu Australia.



Re: Force streaming every change in logical decoding

From
Peter Smith
Date:
On Tue, Dec 13, 2022 at 2:33 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> On Tue, Dec 6, 2022 at 5:23 PM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> > Hi hackers,
> >
> > In logical decoding, when logical_decoding_work_mem is exceeded, the changes are
> > sent to output plugin in streaming mode. But there is a restriction that the
> > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to
> > allow sending every change to output plugin without waiting until
> > logical_decoding_work_mem is exceeded.
> >
> > This helps to test streaming mode. For example, to test "Avoid streaming the
> > transaction which are skipped" [1], it needs many XLOG_XACT_INVALIDATIONS
> > messages. With the new option, it can be tested with fewer changes and in less
> > time. Also, this new option helps to test more scenarios for "Perform streaming
> > logical transactions by background workers" [2].
> >
> > [1] https://www.postgresql.org/message-id/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com
> > [2]
https://www.postgresql.org/message-id/flat/CAA4eK1%2BwyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw%40mail.gmail.com
> >
>
> Hi, I've been doing some testing that makes use of this new developer
> GUC `force_stream_mode`.
>
> IIUC this GUC is used by the walsender during the logic of the
> ReorderBufferCheckMemoryLimit(). Also, AFAIK the only way that the
> walsender is going to know this GUC value is by inheritance from the
> parent publisher at the time the walsender process gets launched.
>
> I may be overthinking this. Isn't there potential for this to become
> quite confusing depending on the timing of when this GUC is modified?
>
> E.g.1 When the walsender is launched, it will use whatever is the
> current value of this GUC.
> E.g.2 But if the GUC is changed after the walsender is already
> launched, then that will have no effect on the already running
> walsender.
>
> Is that understanding correct?
>

I think I was mistaken above. It looks like even the already-launched
walsender gets the updated GUC value via a SIGHUP on the parent
publisher.

2022-12-13 16:31:33.453 AEDT [1902] LOG:  received SIGHUP, reloading
configuration files
2022-12-13 16:31:33.455 AEDT [1902] LOG:  parameter
"force_stream_mode" changed to "true"

Sorry for the noise.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



RE: Force streaming every change in logical decoding

From
"shiy.fnst@fujitsu.com"
Date:
On Sat, Dec 10, 2022 2:03 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> 
> On Tue, Dec 6, 2022 at 11:53 AM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> > Hi hackers,
> >
> > In logical decoding, when logical_decoding_work_mem is exceeded, the
> changes are
> > sent to output plugin in streaming mode. But there is a restriction that the
> > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC
> to
> > allow sending every change to output plugin without waiting until
> > logical_decoding_work_mem is exceeded.
> >
> > This helps to test streaming mode. For example, to test "Avoid streaming the
> > transaction which are skipped" [1], it needs many
> XLOG_XACT_INVALIDATIONS
> > messages. With the new option, it can be tested with fewer changes and in
> less
> > time. Also, this new option helps to test more scenarios for "Perform
> streaming
> > logical transactions by background workers" [2].
> 
> Some comments on the patch
> 

Thanks for your comments.

> 1. Can you add one test case using this option
> 

I added a simple test to confirm the new option works.

> 2. +     <varlistentry id="guc-force-stream-mode"
> xreflabel="force_stream_mode">
> +      <term><varname>force_stream_mode</varname>
> (<type>boolean</type>)
> +      <indexterm>
> +       <primary><varname>force_stream_mode</varname> configuration
> parameter</primary>
> +      </indexterm>
> +      </term>
> 
> This GUC name "force_stream_mode" somehow appears like we are forcing
> this streaming mode irrespective of whether the
> subscriber has requested for this mode or not.  But actually it is not
> that, it is just streaming each change if
> it is enabled.  So we might need to think on the name (at least we
> should avoid using *mode* in the name IMHO).
> 

I think a similar GUC is force_parallel_mode, and if the query is parallel
unsafe or max_worker_processes is exceeded, force_parallel_mode will not work.
This is similar to what we do in this patch. So, maybe it's ok to use "mode". I
didn't change it in the new version of patch. What do you think?

> 3.
> -    while (rb->size >= logical_decoding_work_mem * 1024L)
> +    while ((!force_stream && rb->size >= logical_decoding_work_mem *
> 1024L) ||
> +           (force_stream && rb->size > 0))
>      {
> 
> It seems like if force_stream is on then indirectly it is enabling
> force serialization as well.  Because once we enter into the loop
> based on "force_stream" then it will either stream or serialize but I
> guess we do not want to force serialize based on this parameter.
> 

Agreed, I refactored the code and modified this point.

Please see the attached patch. I also fix Peter's comments[1]. The GUC name and
design are still under discussion, so I didn't modify them.

By the way, I noticed that the comment for ReorderBufferCheckMemoryLimit() on
HEAD missed something. I fix it in this patch, too.

[1] https://www.postgresql.org/message-id/CAHut%2BPtOjZ_e-KLf26i1XLH2ffPEZGOmGSKy0wDjwyB_uvzxBQ%40mail.gmail.com

Regards,
Shi yu

Attachment

Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Wed, Dec 14, 2022 at 2:15 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> Please see the attached patch. I also fix Peter's comments[1]. The GUC name and
> design are still under discussion, so I didn't modify them.
>

Let me summarize the discussion on name and design till now. As per my
understanding, we have three requirements: (a) In publisher, stream
each change of transaction instead of waiting till
logical_decoding_work_mem or commit; this will help us to reduce the
test timings of current and future tests for replication of
in-progress transactions; (b) In publisher, serialize each change
instead of waiting till logical_decoding_work_mem; this can help
reduce the test time of tests related to serialization of changes in
logical decoding; (c) In subscriber, during parallel apply for
in-progress transactions (a new feature being discussed at [1]) allow
the system to switch to serialize mode (no more space in shared memory
queue between leader and parallel apply worker either due to a
parallel worker being busy or waiting on some lock) while sending
changes.

Having a GUC that controls these actions/features will allow us to
write tests with shorter duration and better predictability as
otherwise, they require a lot of changes. Apart from tests, these also
help to easily debug the required code. So they fit the Developer
Options category of GUC [2].

We have discussed three different ways to provide GUC for these
features. (1) Have separate GUCs like force_server_stream_mode,
force_server_serialize_mode, force_client_serialize_mode (we can use
different names for these) for each of these; (2) Have two sets of
GUCs for server and client. We can have logical_decoding_mode with
values as 'stream' and 'serialize' for the server and then
logical_apply_serialize = true/false for the client. (3) Have one GUC
like logical_replication_mode with values as 'server_stream',
'server_serialize', 'client_serialize'.

The names used here are tentative mainly to explain each of the
options, we can use different names once we decide among the above.

Thoughts?

[1] -
https://www.postgresql.org/message-id/flat/CAA4eK1%2BwyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw%40mail.gmail.com
[2] - https://www.postgresql.org/docs/devel/runtime-config-developer.html

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Dilip Kumar
Date:
On Wed, Dec 14, 2022 at 2:15 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:

> > -    while (rb->size >= logical_decoding_work_mem * 1024L)
> > +    while ((!force_stream && rb->size >= logical_decoding_work_mem *
> > 1024L) ||
> > +           (force_stream && rb->size > 0))
> >      {
> >
> > It seems like if force_stream is on then indirectly it is enabling
> > force serialization as well.  Because once we enter into the loop
> > based on "force_stream" then it will either stream or serialize but I
> > guess we do not want to force serialize based on this parameter.
> >
>
> Agreed, I refactored the code and modified this point.

After thinking more on this I feel the previous behavior made more
sense.  Because without this patch if we cross the work_mem we try to
stream and if we can not stream for some reason e.g. partial change
then we serialize.  And I feel your previous patch was mimicking the
same behavior for each change.  Now in the new patch, we will try to
stream and if we can not we will queue the change so I feel we are
creating a new patch that actually doesn't exist without the force
mode.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Force streaming every change in logical decoding

From
Dilip Kumar
Date:
On Wed, Dec 14, 2022 at 5:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Dec 14, 2022 at 2:15 PM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> > Please see the attached patch. I also fix Peter's comments[1]. The GUC name and
> > design are still under discussion, so I didn't modify them.
> >
>
> Let me summarize the discussion on name and design till now. As per my
> understanding, we have three requirements: (a) In publisher, stream
> each change of transaction instead of waiting till
> logical_decoding_work_mem or commit; this will help us to reduce the
> test timings of current and future tests for replication of
> in-progress transactions; (b) In publisher, serialize each change
> instead of waiting till logical_decoding_work_mem; this can help
> reduce the test time of tests related to serialization of changes in
> logical decoding; (c) In subscriber, during parallel apply for
> in-progress transactions (a new feature being discussed at [1]) allow
> the system to switch to serialize mode (no more space in shared memory
> queue between leader and parallel apply worker either due to a
> parallel worker being busy or waiting on some lock) while sending
> changes.
>
> Having a GUC that controls these actions/features will allow us to
> write tests with shorter duration and better predictability as
> otherwise, they require a lot of changes. Apart from tests, these also
> help to easily debug the required code. So they fit the Developer
> Options category of GUC [2].
>
> We have discussed three different ways to provide GUC for these
> features. (1) Have separate GUCs like force_server_stream_mode,
> force_server_serialize_mode, force_client_serialize_mode (we can use
> different names for these) for each of these; (2) Have two sets of
> GUCs for server and client. We can have logical_decoding_mode with
> values as 'stream' and 'serialize' for the server and then
> logical_apply_serialize = true/false for the client. (3) Have one GUC
> like logical_replication_mode with values as 'server_stream',
> 'server_serialize', 'client_serialize'.
>
> The names used here are tentative mainly to explain each of the
> options, we can use different names once we decide among the above.
>
> Thoughts?

I think option 2 makes sense because stream/serialize are two related
options and both are dependent on the same parameter
(logical_decoding_work_mem) so having a single know is logical.  And
another GUC for serializing logical apply.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Tue, Dec 20, 2022 at 10:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Wed, Dec 14, 2022 at 5:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Wed, Dec 14, 2022 at 2:15 PM shiy.fnst@fujitsu.com
> > <shiy.fnst@fujitsu.com> wrote:
> > >
> > > Please see the attached patch. I also fix Peter's comments[1]. The GUC name and
> > > design are still under discussion, so I didn't modify them.
> > >
> >
> > Let me summarize the discussion on name and design till now. As per my
> > understanding, we have three requirements: (a) In publisher, stream
> > each change of transaction instead of waiting till
> > logical_decoding_work_mem or commit; this will help us to reduce the
> > test timings of current and future tests for replication of
> > in-progress transactions; (b) In publisher, serialize each change
> > instead of waiting till logical_decoding_work_mem; this can help
> > reduce the test time of tests related to serialization of changes in
> > logical decoding; (c) In subscriber, during parallel apply for
> > in-progress transactions (a new feature being discussed at [1]) allow
> > the system to switch to serialize mode (no more space in shared memory
> > queue between leader and parallel apply worker either due to a
> > parallel worker being busy or waiting on some lock) while sending
> > changes.
> >
> > Having a GUC that controls these actions/features will allow us to
> > write tests with shorter duration and better predictability as
> > otherwise, they require a lot of changes. Apart from tests, these also
> > help to easily debug the required code. So they fit the Developer
> > Options category of GUC [2].
> >
> > We have discussed three different ways to provide GUC for these
> > features. (1) Have separate GUCs like force_server_stream_mode,
> > force_server_serialize_mode, force_client_serialize_mode (we can use
> > different names for these) for each of these; (2) Have two sets of
> > GUCs for server and client. We can have logical_decoding_mode with
> > values as 'stream' and 'serialize' for the server and then
> > logical_apply_serialize = true/false for the client. (3) Have one GUC
> > like logical_replication_mode with values as 'server_stream',
> > 'server_serialize', 'client_serialize'.
> >
> > The names used here are tentative mainly to explain each of the
> > options, we can use different names once we decide among the above.
> >
> > Thoughts?
>
> I think option 2 makes sense because stream/serialize are two related
> options and both are dependent on the same parameter
> (logical_decoding_work_mem) so having a single know is logical.  And
> another GUC for serializing logical apply.
>

Thanks for your input. Sawada-San, others, any preferences/suggestions?

-- 
With Regards,
Amit Kapila.



RE: Force streaming every change in logical decoding

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear hackers,

> We have discussed three different ways to provide GUC for these
> features. (1) Have separate GUCs like force_server_stream_mode,
> force_server_serialize_mode, force_client_serialize_mode (we can use
> different names for these) for each of these; (2) Have two sets of
> GUCs for server and client. We can have logical_decoding_mode with
> values as 'stream' and 'serialize' for the server and then
> logical_apply_serialize = true/false for the client. (3) Have one GUC
> like logical_replication_mode with values as 'server_stream',
> 'server_serialize', 'client_serialize'.

I also agreed for adding new GUC parameters (and I have already done partially
in parallel apply[1]), and basically options 2 made sense for me. But is it OK
that we can choose "serialize" mode even if subscribers require streaming?

Currently the reorder buffer transactions are serialized on publisher only when
the there are no streamable transaction. So what happen if the
logical_decoding_mode = "serialize" but streaming option streaming is on? If we
break the first one and serialize changes on publisher anyway, it may be not
suitable for testing the normal operation.

Therefore, I came up with the variant of (2): logical_decoding_mode can be
"normal" or "immediate".

"normal" is a default value, which is same as current HEAD. Changes are streamed
or serialized when the buffered size exceeds logical_decoding_work_mem.

When users set to "immediate", the walsenders starts to stream or serialize all
changes. The choice is depends on the subscription option.


In short: +1 for (2) family.

[1]:
https://www.postgresql.org/message-id/TYAPR01MB5866160DE81FA2D88B8F22DEF5159%40TYAPR01MB5866.jpnprd01.prod.outlook.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Tue, Dec 20, 2022 at 2:46 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
>
> Dear hackers,
>
> > We have discussed three different ways to provide GUC for these
> > features. (1) Have separate GUCs like force_server_stream_mode,
> > force_server_serialize_mode, force_client_serialize_mode (we can use
> > different names for these) for each of these; (2) Have two sets of
> > GUCs for server and client. We can have logical_decoding_mode with
> > values as 'stream' and 'serialize' for the server and then
> > logical_apply_serialize = true/false for the client. (3) Have one GUC
> > like logical_replication_mode with values as 'server_stream',
> > 'server_serialize', 'client_serialize'.
>
> I also agreed for adding new GUC parameters (and I have already done partially
> in parallel apply[1]), and basically options 2 made sense for me. But is it OK
> that we can choose "serialize" mode even if subscribers require streaming?
>
> Currently the reorder buffer transactions are serialized on publisher only when
> the there are no streamable transaction. So what happen if the
> logical_decoding_mode = "serialize" but streaming option streaming is on? If we
> break the first one and serialize changes on publisher anyway, it may be not
> suitable for testing the normal operation.
>

I think the change will be streamed as soon as the next change is
processed even if we serialize based on this option. See
ReorderBufferProcessPartialChange. However, I see your point that when
the streaming option is given, the value 'serialize' for this GUC may
not make much sense.

> Therefore, I came up with the variant of (2): logical_decoding_mode can be
> "normal" or "immediate".
>
> "normal" is a default value, which is same as current HEAD. Changes are streamed
> or serialized when the buffered size exceeds logical_decoding_work_mem.
>
> When users set to "immediate", the walsenders starts to stream or serialize all
> changes. The choice is depends on the subscription option.
>

The other possibility to achieve what you are saying is that we allow
a minimum value of logical_decoding_work_mem as 0 which would mean
stream or serialize each change depending on whether the streaming
option is enabled. I think we normally don't allow a minimum value
below a certain threshold for other *_work_mem parameters (like
maintenance_work_mem, work_mem), so we have followed the same here.
And, I think it makes sense from the user's perspective because below
a certain threshold it will just add overhead by either writing small
changes to the disk or by sending those over the network. However, it
can be quite useful for testing/debugging. So, not sure, if we should
restrict setting logical_decoding_work_mem below a certain threshold.
What do you think?

-- 
With Regards,
Amit Kapila.



RE: Force streaming every change in logical decoding

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Amit,

> The other possibility to achieve what you are saying is that we allow
> a minimum value of logical_decoding_work_mem as 0 which would mean
> stream or serialize each change depending on whether the streaming
> option is enabled.

I understood that logical_decoding_work_mem may double as normal option as
developer option. I think yours is smarter because we can reduce # of GUCs.

> I think we normally don't allow a minimum value
> below a certain threshold for other *_work_mem parameters (like
> maintenance_work_mem, work_mem), so we have followed the same here.
> And, I think it makes sense from the user's perspective because below
> a certain threshold it will just add overhead by either writing small
> changes to the disk or by sending those over the network. However, it
> can be quite useful for testing/debugging. So, not sure, if we should
> restrict setting logical_decoding_work_mem below a certain threshold.
> What do you think?

You mean to say that there is a possibility that users may set a small value without deep
considerations, right? If so, how about using the approach like autovacuum_work_mem?

autovacuum_work_mem has a range [-1, MAX_KIROBYTES], and -1 mean that it follows
maintenance_work_mem. If it is set small value like 5KB, its working memory is rounded
up to 1024KB. See check_autovacuum_work_mem().

Based on that, I suggest followings. Can they solve the problem what you said?

* If logical_decoding_work_mem is set to 0, all transactions are streamed or serialized
  on publisher.
* If logical_decoding_work_mem is set within [1, 63KB], the value is rounded up or ERROR
  is raised.
* If logical_decoding_work_mem  is set greater than or equal to 64KB, the set value
  is used.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED
> Amit Kapila.

Re: Force streaming every change in logical decoding

From
shveta malik
Date:

Going with ' logical_decoding_work_mem' seems a reasonable solution, but since we are mixing
the functionality of developer and production GUC, there is a slight risk that customer/DBAs may end
up setting it to 0 and forget about it and thus hampering system's performance.
Have seen many such cases in previous org.

Adding a new developer parameter seems slightly safe, considering we already have one 
such category supported in postgres. It can be on the same line as that of 'force_parallel_mode'.
It will be purely developer GUC, plus if we want to extend something in future to add/automate 
heavier test-cases or any other streaming related dev option, we can extend the same parameter w/o 
disturbing production's one. (see force_parallel_mode=regress for ref).

thanks
Shveta


On Wed, Dec 21, 2022 at 11:25 AM Hayato Kuroda (Fujitsu) <kuroda.hayato@fujitsu.com> wrote:
Dear Amit,

> The other possibility to achieve what you are saying is that we allow
> a minimum value of logical_decoding_work_mem as 0 which would mean
> stream or serialize each change depending on whether the streaming
> option is enabled.

I understood that logical_decoding_work_mem may double as normal option as
developer option. I think yours is smarter because we can reduce # of GUCs.

> I think we normally don't allow a minimum value
> below a certain threshold for other *_work_mem parameters (like
> maintenance_work_mem, work_mem), so we have followed the same here.
> And, I think it makes sense from the user's perspective because below
> a certain threshold it will just add overhead by either writing small
> changes to the disk or by sending those over the network. However, it
> can be quite useful for testing/debugging. So, not sure, if we should
> restrict setting logical_decoding_work_mem below a certain threshold.
> What do you think?

You mean to say that there is a possibility that users may set a small value without deep
considerations, right? If so, how about using the approach like autovacuum_work_mem?

autovacuum_work_mem has a range [-1, MAX_KIROBYTES], and -1 mean that it follows
maintenance_work_mem. If it is set small value like 5KB, its working memory is rounded
up to 1024KB. See check_autovacuum_work_mem().

Based on that, I suggest followings. Can they solve the problem what you said?

* If logical_decoding_work_mem is set to 0, all transactions are streamed or serialized
  on publisher.
* If logical_decoding_work_mem is set within [1, 63KB], the value is rounded up or ERROR
  is raised.
* If logical_decoding_work_mem  is set greater than or equal to 64KB, the set value
  is used.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED
> Amit Kapila.

Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Tue, Dec 20, 2022 at 7:49 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Tue, Dec 20, 2022 at 2:46 PM Hayato Kuroda (Fujitsu)
> <kuroda.hayato@fujitsu.com> wrote:
> >
> > Dear hackers,
> >
> > > We have discussed three different ways to provide GUC for these
> > > features. (1) Have separate GUCs like force_server_stream_mode,
> > > force_server_serialize_mode, force_client_serialize_mode (we can use
> > > different names for these) for each of these; (2) Have two sets of
> > > GUCs for server and client. We can have logical_decoding_mode with
> > > values as 'stream' and 'serialize' for the server and then
> > > logical_apply_serialize = true/false for the client. (3) Have one GUC
> > > like logical_replication_mode with values as 'server_stream',
> > > 'server_serialize', 'client_serialize'.
> >
> > I also agreed for adding new GUC parameters (and I have already done partially
> > in parallel apply[1]), and basically options 2 made sense for me. But is it OK
> > that we can choose "serialize" mode even if subscribers require streaming?
> >
> > Currently the reorder buffer transactions are serialized on publisher only when
> > the there are no streamable transaction. So what happen if the
> > logical_decoding_mode = "serialize" but streaming option streaming is on? If we
> > break the first one and serialize changes on publisher anyway, it may be not
> > suitable for testing the normal operation.
> >
>
> I think the change will be streamed as soon as the next change is
> processed even if we serialize based on this option. See
> ReorderBufferProcessPartialChange. However, I see your point that when
> the streaming option is given, the value 'serialize' for this GUC may
> not make much sense.
>
> > Therefore, I came up with the variant of (2): logical_decoding_mode can be
> > "normal" or "immediate".
> >
> > "normal" is a default value, which is same as current HEAD. Changes are streamed
> > or serialized when the buffered size exceeds logical_decoding_work_mem.
> >
> > When users set to "immediate", the walsenders starts to stream or serialize all
> > changes. The choice is depends on the subscription option.
> >
>
> The other possibility to achieve what you are saying is that we allow
> a minimum value of logical_decoding_work_mem as 0 which would mean
> stream or serialize each change depending on whether the streaming
> option is enabled. I think we normally don't allow a minimum value
> below a certain threshold for other *_work_mem parameters (like
> maintenance_work_mem, work_mem), so we have followed the same here.
> And, I think it makes sense from the user's perspective because below
> a certain threshold it will just add overhead by either writing small
> changes to the disk or by sending those over the network. However, it
> can be quite useful for testing/debugging. So, not sure, if we should
> restrict setting logical_decoding_work_mem below a certain threshold.
> What do you think?

I agree with (2), having separate GUCs for publisher side and
subscriber side. Also, on the publisher side, Amit's idea, controlling
the logical decoding behavior by changing logical_decoding_work_mem,
seems like a good idea.

But I'm not sure it's a good idea if we lower the minimum value of
logical_decoding_work_mem to 0. I agree it's helpful for testing and
debugging but setting logical_decoding_work_mem = 0 doesn't benefit
users at all, rather brings risks.

I prefer the idea Kuroda-san previously proposed; setting
logical_decoding_mode = 'immediate' means setting
logical_decoding_work_mem = 0. We might not need to have it as an enum
parameter since it has only two values, though.

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Peter Smith
Date:
Here are some review comments for patch v2.

Since the GUC is still under design maybe these comments can be
ignored for now, but I guess similar comments will apply in future
anyhow (just with some name changes).

======

1. Commit message

Add a new GUC force_stream_mode, when it is set on, send the change to
output plugin immediately in streaming mode. Otherwise, send until
logical_decoding_work_mem is exceeded.

~

Is that quite right? I thought it was more like shown below:

SUGGESTION
Add a new GUC 'force_stream_mode' which modifies behavior when
streaming mode is enabled. If force_stream_mode is on the changes are
sent to the output plugin immediately. Otherwise,(when
force_stream_mode is off) changes are written to memory until
logical_decoding_work_mem is exceeded.

======

2. doc/src/sgml/config.sgml

+       <para>
+        Specifies whether to force sending the changes to output plugin
+        immediately in streaming mode. If set to <literal>off</literal> (the
+        default), send until <varname>logical_decoding_work_mem</varname> is
+        exceeded.
+       </para>

Suggest slight rewording like #1.

SUGGESTION
This modifies the behavior when streaming mode is enabled. If set to
<literal>on</literal> the changes are sent to the output plugin
immediately. If set <literal>off</literal> (the default), changes are
written to memory until <varname>logical_decoding_work_mem</varname>
is exceeded.

======

3. More docs.

It might be helpful if this developer option is referenced also on the
page "31.10.1 Logical Replication > Configuration Settings >
Publishers" [1]

======

src/backend/replication/logical/reorderbuffer.c

4. GUCs

+/*
+ * Whether to send the change to output plugin immediately in streaming mode.
+ * When it is off, wait until logical_decoding_work_mem is exceeded.
+ */
+bool force_stream_mode;

4a.
"to output plugin" -> "to the output plugin"

~

4b.
By convention (see. [2]) IIUC there should be some indication that
these (this and 'logical_decoding_work_mem') are GUC variables. e.g.
these should be refactored to be grouped togther without the other
static var in between. And add a comment for them both like:

/* GUC variable. */

~

4c.
Also, (see [2]) it makes the code more readable to give the GUC an
explicit initial value.

SUGGESTION
bool force_stream_mode = false;

~~~

5. ReorderBufferCheckMemoryLimit

+ /* we know there has to be one, because the size is not zero */

Uppercase comment. Although not strictly added by this patch you might
as well make this change while adjusting the indentation.

======

src/backend/utils/misc/guc_tables.c

6.

+ {
+ {"force_stream_mode", PGC_USERSET, DEVELOPER_OPTIONS,
+ gettext_noop("Force sending the changes to output plugin immediately
if streaming is supported, without waiting till
logical_decoding_work_mem."),
+ NULL,
+ GUC_NOT_IN_SAMPLE
+ },
+ &force_stream_mode,
+ false,
+ NULL, NULL, NULL
+ },

"without waiting till logical_decoding_work_mem.".... seem like an
incomplete sentence

SUGGESTION
Force sending any streaming changes to the output plugin immediately
without waiting until logical_decoding_work_mem is exceeded."),

------
[1] https://www.postgresql.org/docs/devel/logical-replication-config.html
[2] GUC declarations -
https://github.com/postgres/postgres/commit/d9d873bac67047cfacc9f5ef96ee488f2cb0f1c3

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: Force streaming every change in logical decoding

From
Peter Smith
Date:
On Wed, Dec 21, 2022 at 6:22 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Tue, Dec 20, 2022 at 7:49 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Tue, Dec 20, 2022 at 2:46 PM Hayato Kuroda (Fujitsu)
> > <kuroda.hayato@fujitsu.com> wrote:
> > >
> > > Dear hackers,
> > >
> > > > We have discussed three different ways to provide GUC for these
> > > > features. (1) Have separate GUCs like force_server_stream_mode,
> > > > force_server_serialize_mode, force_client_serialize_mode (we can use
> > > > different names for these) for each of these; (2) Have two sets of
> > > > GUCs for server and client. We can have logical_decoding_mode with
> > > > values as 'stream' and 'serialize' for the server and then
> > > > logical_apply_serialize = true/false for the client. (3) Have one GUC
> > > > like logical_replication_mode with values as 'server_stream',
> > > > 'server_serialize', 'client_serialize'.
> > >
> > > I also agreed for adding new GUC parameters (and I have already done partially
> > > in parallel apply[1]), and basically options 2 made sense for me. But is it OK
> > > that we can choose "serialize" mode even if subscribers require streaming?
> > >
> > > Currently the reorder buffer transactions are serialized on publisher only when
> > > the there are no streamable transaction. So what happen if the
> > > logical_decoding_mode = "serialize" but streaming option streaming is on? If we
> > > break the first one and serialize changes on publisher anyway, it may be not
> > > suitable for testing the normal operation.
> > >
> >
> > I think the change will be streamed as soon as the next change is
> > processed even if we serialize based on this option. See
> > ReorderBufferProcessPartialChange. However, I see your point that when
> > the streaming option is given, the value 'serialize' for this GUC may
> > not make much sense.
> >
> > > Therefore, I came up with the variant of (2): logical_decoding_mode can be
> > > "normal" or "immediate".
> > >
> > > "normal" is a default value, which is same as current HEAD. Changes are streamed
> > > or serialized when the buffered size exceeds logical_decoding_work_mem.
> > >
> > > When users set to "immediate", the walsenders starts to stream or serialize all
> > > changes. The choice is depends on the subscription option.
> > >
> >
> > The other possibility to achieve what you are saying is that we allow
> > a minimum value of logical_decoding_work_mem as 0 which would mean
> > stream or serialize each change depending on whether the streaming
> > option is enabled. I think we normally don't allow a minimum value
> > below a certain threshold for other *_work_mem parameters (like
> > maintenance_work_mem, work_mem), so we have followed the same here.
> > And, I think it makes sense from the user's perspective because below
> > a certain threshold it will just add overhead by either writing small
> > changes to the disk or by sending those over the network. However, it
> > can be quite useful for testing/debugging. So, not sure, if we should
> > restrict setting logical_decoding_work_mem below a certain threshold.
> > What do you think?
>
> I agree with (2), having separate GUCs for publisher side and
> subscriber side. Also, on the publisher side, Amit's idea, controlling
> the logical decoding behavior by changing logical_decoding_work_mem,
> seems like a good idea.
>
> But I'm not sure it's a good idea if we lower the minimum value of
> logical_decoding_work_mem to 0. I agree it's helpful for testing and
> debugging but setting logical_decoding_work_mem = 0 doesn't benefit
> users at all, rather brings risks.
>
> I prefer the idea Kuroda-san previously proposed; setting
> logical_decoding_mode = 'immediate' means setting
> logical_decoding_work_mem = 0. We might not need to have it as an enum
> parameter since it has only two values, though.

Did you mean one GUC (logical_decoding_mode) will cause a side-effect
implicit value change on another GUC value
(logical_decoding_work_mem)?

If so, then that seems a like potential source of confusion IMO.
- e.g. actual value is invisibly set differently from what the user
sees in the conf file
- e.g. will it depend on the order they get assigned

Are there any GUC precedents for something like that?

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Wed, Dec 21, 2022 at 1:55 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> On Wed, Dec 21, 2022 at 6:22 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >
> > On Tue, Dec 20, 2022 at 7:49 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > > On Tue, Dec 20, 2022 at 2:46 PM Hayato Kuroda (Fujitsu)
> > > <kuroda.hayato@fujitsu.com> wrote:
> > > >
> > > > Dear hackers,
> > > >
> > > > > We have discussed three different ways to provide GUC for these
> > > > > features. (1) Have separate GUCs like force_server_stream_mode,
> > > > > force_server_serialize_mode, force_client_serialize_mode (we can use
> > > > > different names for these) for each of these; (2) Have two sets of
> > > > > GUCs for server and client. We can have logical_decoding_mode with
> > > > > values as 'stream' and 'serialize' for the server and then
> > > > > logical_apply_serialize = true/false for the client. (3) Have one GUC
> > > > > like logical_replication_mode with values as 'server_stream',
> > > > > 'server_serialize', 'client_serialize'.
> > > >
> > > > I also agreed for adding new GUC parameters (and I have already done partially
> > > > in parallel apply[1]), and basically options 2 made sense for me. But is it OK
> > > > that we can choose "serialize" mode even if subscribers require streaming?
> > > >
> > > > Currently the reorder buffer transactions are serialized on publisher only when
> > > > the there are no streamable transaction. So what happen if the
> > > > logical_decoding_mode = "serialize" but streaming option streaming is on? If we
> > > > break the first one and serialize changes on publisher anyway, it may be not
> > > > suitable for testing the normal operation.
> > > >
> > >
> > > I think the change will be streamed as soon as the next change is
> > > processed even if we serialize based on this option. See
> > > ReorderBufferProcessPartialChange. However, I see your point that when
> > > the streaming option is given, the value 'serialize' for this GUC may
> > > not make much sense.
> > >
> > > > Therefore, I came up with the variant of (2): logical_decoding_mode can be
> > > > "normal" or "immediate".
> > > >
> > > > "normal" is a default value, which is same as current HEAD. Changes are streamed
> > > > or serialized when the buffered size exceeds logical_decoding_work_mem.
> > > >
> > > > When users set to "immediate", the walsenders starts to stream or serialize all
> > > > changes. The choice is depends on the subscription option.
> > > >
> > >
> > > The other possibility to achieve what you are saying is that we allow
> > > a minimum value of logical_decoding_work_mem as 0 which would mean
> > > stream or serialize each change depending on whether the streaming
> > > option is enabled. I think we normally don't allow a minimum value
> > > below a certain threshold for other *_work_mem parameters (like
> > > maintenance_work_mem, work_mem), so we have followed the same here.
> > > And, I think it makes sense from the user's perspective because below
> > > a certain threshold it will just add overhead by either writing small
> > > changes to the disk or by sending those over the network. However, it
> > > can be quite useful for testing/debugging. So, not sure, if we should
> > > restrict setting logical_decoding_work_mem below a certain threshold.
> > > What do you think?
> >
> > I agree with (2), having separate GUCs for publisher side and
> > subscriber side. Also, on the publisher side, Amit's idea, controlling
> > the logical decoding behavior by changing logical_decoding_work_mem,
> > seems like a good idea.
> >
> > But I'm not sure it's a good idea if we lower the minimum value of
> > logical_decoding_work_mem to 0. I agree it's helpful for testing and
> > debugging but setting logical_decoding_work_mem = 0 doesn't benefit
> > users at all, rather brings risks.
> >
> > I prefer the idea Kuroda-san previously proposed; setting
> > logical_decoding_mode = 'immediate' means setting
> > logical_decoding_work_mem = 0. We might not need to have it as an enum
> > parameter since it has only two values, though.
>
> Did you mean one GUC (logical_decoding_mode) will cause a side-effect
> implicit value change on another GUC value
> (logical_decoding_work_mem)?
>

I don't think that is required. The only value that can be allowed for
logical_decoding_mode will be "immediate", something like we do for
recovery_target. The default will be "". The "immediate" value will
mean that stream each change if the "streaming" option is enabled
("on" of "parallel") or if "streaming" is not enabled then that would
mean serializing each change.

-- 
With Regards,
Amit Kapila.



RE: Force streaming every change in logical decoding

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Shveta, other hackers

> Going with ' logical_decoding_work_mem' seems a reasonable solution, but since we are mixing
> the functionality of developer and production GUC, there is a slight risk that customer/DBAs may end
> up setting it to 0 and forget about it and thus hampering system's performance.
> Have seen many such cases in previous org.

That never crossed my mind at all. Indeed, DBA may be confused, this proposal seems to be too optimized.
I can withdraw this and we can go another way, maybe my previous approach.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


RE: Force streaming every change in logical decoding

From
"shiy.fnst@fujitsu.com"
Date:
On Wed, Dec 21, 2022 4:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Wed, Dec 21, 2022 at 1:55 PM Peter Smith <smithpb2250@gmail.com>
> wrote:
> >
> > On Wed, Dec 21, 2022 at 6:22 PM Masahiko Sawada
> <sawada.mshk@gmail.com> wrote:
> > >
> > > On Tue, Dec 20, 2022 at 7:49 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> > > >
> > > > On Tue, Dec 20, 2022 at 2:46 PM Hayato Kuroda (Fujitsu)
> > > > <kuroda.hayato@fujitsu.com> wrote:
> > > > >
> > > > > Dear hackers,
> > > > >
> > > > > > We have discussed three different ways to provide GUC for these
> > > > > > features. (1) Have separate GUCs like force_server_stream_mode,
> > > > > > force_server_serialize_mode, force_client_serialize_mode (we can
> use
> > > > > > different names for these) for each of these; (2) Have two sets of
> > > > > > GUCs for server and client. We can have logical_decoding_mode with
> > > > > > values as 'stream' and 'serialize' for the server and then
> > > > > > logical_apply_serialize = true/false for the client. (3) Have one GUC
> > > > > > like logical_replication_mode with values as 'server_stream',
> > > > > > 'server_serialize', 'client_serialize'.
> > > > >
> > > > > I also agreed for adding new GUC parameters (and I have already done
> partially
> > > > > in parallel apply[1]), and basically options 2 made sense for me. But is
> it OK
> > > > > that we can choose "serialize" mode even if subscribers require
> streaming?
> > > > >
> > > > > Currently the reorder buffer transactions are serialized on publisher
> only when
> > > > > the there are no streamable transaction. So what happen if the
> > > > > logical_decoding_mode = "serialize" but streaming option streaming is
> on? If we
> > > > > break the first one and serialize changes on publisher anyway, it may
> be not
> > > > > suitable for testing the normal operation.
> > > > >
> > > >
> > > > I think the change will be streamed as soon as the next change is
> > > > processed even if we serialize based on this option. See
> > > > ReorderBufferProcessPartialChange. However, I see your point that
> when
> > > > the streaming option is given, the value 'serialize' for this GUC may
> > > > not make much sense.
> > > >
> > > > > Therefore, I came up with the variant of (2): logical_decoding_mode
> can be
> > > > > "normal" or "immediate".
> > > > >
> > > > > "normal" is a default value, which is same as current HEAD. Changes
> are streamed
> > > > > or serialized when the buffered size exceeds
> logical_decoding_work_mem.
> > > > >
> > > > > When users set to "immediate", the walsenders starts to stream or
> serialize all
> > > > > changes. The choice is depends on the subscription option.
> > > > >
> > > >
> > > > The other possibility to achieve what you are saying is that we allow
> > > > a minimum value of logical_decoding_work_mem as 0 which would
> mean
> > > > stream or serialize each change depending on whether the streaming
> > > > option is enabled. I think we normally don't allow a minimum value
> > > > below a certain threshold for other *_work_mem parameters (like
> > > > maintenance_work_mem, work_mem), so we have followed the same
> here.
> > > > And, I think it makes sense from the user's perspective because below
> > > > a certain threshold it will just add overhead by either writing small
> > > > changes to the disk or by sending those over the network. However, it
> > > > can be quite useful for testing/debugging. So, not sure, if we should
> > > > restrict setting logical_decoding_work_mem below a certain threshold.
> > > > What do you think?
> > >
> > > I agree with (2), having separate GUCs for publisher side and
> > > subscriber side. Also, on the publisher side, Amit's idea, controlling
> > > the logical decoding behavior by changing logical_decoding_work_mem,
> > > seems like a good idea.
> > >
> > > But I'm not sure it's a good idea if we lower the minimum value of
> > > logical_decoding_work_mem to 0. I agree it's helpful for testing and
> > > debugging but setting logical_decoding_work_mem = 0 doesn't benefit
> > > users at all, rather brings risks.
> > >
> > > I prefer the idea Kuroda-san previously proposed; setting
> > > logical_decoding_mode = 'immediate' means setting
> > > logical_decoding_work_mem = 0. We might not need to have it as an
> enum
> > > parameter since it has only two values, though.
> >
> > Did you mean one GUC (logical_decoding_mode) will cause a side-effect
> > implicit value change on another GUC value
> > (logical_decoding_work_mem)?
> >
> 
> I don't think that is required. The only value that can be allowed for
> logical_decoding_mode will be "immediate", something like we do for
> recovery_target. The default will be "". The "immediate" value will
> mean that stream each change if the "streaming" option is enabled
> ("on" of "parallel") or if "streaming" is not enabled then that would
> mean serializing each change.
> 

I agreed and updated the patch as Amit suggested.
Please see the attached patch.

Regards,
Shi yu

Attachment

RE: Force streaming every change in logical decoding

From
"shiy.fnst@fujitsu.com"
Date:
On Wed, Dec 21, 2022 4:05 PM Peter Smith <smithpb2250@gmail.com> wrote:
> 
> Here are some review comments for patch v2.
> 
> Since the GUC is still under design maybe these comments can be
> ignored for now, but I guess similar comments will apply in future
> anyhow (just with some name changes).
> 

Thanks for your comments.

> ======
> 
> 3. More docs.
> 
> It might be helpful if this developer option is referenced also on the
> page "31.10.1 Logical Replication > Configuration Settings >
> Publishers" [1]
> 

The new added GUC is developer option, and it seems that most developer options
are not referenced on other pages. So, I am not sure if we need to add this to
other docs.

Other comments are fixed [1]. (Some of them are ignored because of the new
design.)

[1]
https://www.postgresql.org/message-id/OSZPR01MB6310AAE12BC281158880380DFDEB9%40OSZPR01MB6310.jpnprd01.prod.outlook.com

Regards,
Shi yu

Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Wed, Dec 21, 2022 at 10:14 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> On Wed, Dec 21, 2022 4:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Wed, Dec 21, 2022 at 1:55 PM Peter Smith <smithpb2250@gmail.com>
> > wrote:
> > >
> > > On Wed, Dec 21, 2022 at 6:22 PM Masahiko Sawada
> > <sawada.mshk@gmail.com> wrote:
> > > >
> > > > On Tue, Dec 20, 2022 at 7:49 PM Amit Kapila <amit.kapila16@gmail.com>
> > wrote:
> > > > >
> > > > > On Tue, Dec 20, 2022 at 2:46 PM Hayato Kuroda (Fujitsu)
> > > > > <kuroda.hayato@fujitsu.com> wrote:
> > > > > >
> > > > > > Dear hackers,
> > > > > >
> > > > > > > We have discussed three different ways to provide GUC for these
> > > > > > > features. (1) Have separate GUCs like force_server_stream_mode,
> > > > > > > force_server_serialize_mode, force_client_serialize_mode (we can
> > use
> > > > > > > different names for these) for each of these; (2) Have two sets of
> > > > > > > GUCs for server and client. We can have logical_decoding_mode with
> > > > > > > values as 'stream' and 'serialize' for the server and then
> > > > > > > logical_apply_serialize = true/false for the client. (3) Have one GUC
> > > > > > > like logical_replication_mode with values as 'server_stream',
> > > > > > > 'server_serialize', 'client_serialize'.
> > > > > >
> > > > > > I also agreed for adding new GUC parameters (and I have already done
> > partially
> > > > > > in parallel apply[1]), and basically options 2 made sense for me. But is
> > it OK
> > > > > > that we can choose "serialize" mode even if subscribers require
> > streaming?
> > > > > >
> > > > > > Currently the reorder buffer transactions are serialized on publisher
> > only when
> > > > > > the there are no streamable transaction. So what happen if the
> > > > > > logical_decoding_mode = "serialize" but streaming option streaming is
> > on? If we
> > > > > > break the first one and serialize changes on publisher anyway, it may
> > be not
> > > > > > suitable for testing the normal operation.
> > > > > >
> > > > >
> > > > > I think the change will be streamed as soon as the next change is
> > > > > processed even if we serialize based on this option. See
> > > > > ReorderBufferProcessPartialChange. However, I see your point that
> > when
> > > > > the streaming option is given, the value 'serialize' for this GUC may
> > > > > not make much sense.
> > > > >
> > > > > > Therefore, I came up with the variant of (2): logical_decoding_mode
> > can be
> > > > > > "normal" or "immediate".
> > > > > >
> > > > > > "normal" is a default value, which is same as current HEAD. Changes
> > are streamed
> > > > > > or serialized when the buffered size exceeds
> > logical_decoding_work_mem.
> > > > > >
> > > > > > When users set to "immediate", the walsenders starts to stream or
> > serialize all
> > > > > > changes. The choice is depends on the subscription option.
> > > > > >
> > > > >
> > > > > The other possibility to achieve what you are saying is that we allow
> > > > > a minimum value of logical_decoding_work_mem as 0 which would
> > mean
> > > > > stream or serialize each change depending on whether the streaming
> > > > > option is enabled. I think we normally don't allow a minimum value
> > > > > below a certain threshold for other *_work_mem parameters (like
> > > > > maintenance_work_mem, work_mem), so we have followed the same
> > here.
> > > > > And, I think it makes sense from the user's perspective because below
> > > > > a certain threshold it will just add overhead by either writing small
> > > > > changes to the disk or by sending those over the network. However, it
> > > > > can be quite useful for testing/debugging. So, not sure, if we should
> > > > > restrict setting logical_decoding_work_mem below a certain threshold.
> > > > > What do you think?
> > > >
> > > > I agree with (2), having separate GUCs for publisher side and
> > > > subscriber side. Also, on the publisher side, Amit's idea, controlling
> > > > the logical decoding behavior by changing logical_decoding_work_mem,
> > > > seems like a good idea.
> > > >
> > > > But I'm not sure it's a good idea if we lower the minimum value of
> > > > logical_decoding_work_mem to 0. I agree it's helpful for testing and
> > > > debugging but setting logical_decoding_work_mem = 0 doesn't benefit
> > > > users at all, rather brings risks.
> > > >
> > > > I prefer the idea Kuroda-san previously proposed; setting
> > > > logical_decoding_mode = 'immediate' means setting
> > > > logical_decoding_work_mem = 0. We might not need to have it as an
> > enum
> > > > parameter since it has only two values, though.
> > >
> > > Did you mean one GUC (logical_decoding_mode) will cause a side-effect
> > > implicit value change on another GUC value
> > > (logical_decoding_work_mem)?
> > >
> >
> > I don't think that is required. The only value that can be allowed for
> > logical_decoding_mode will be "immediate", something like we do for
> > recovery_target. The default will be "". The "immediate" value will
> > mean that stream each change if the "streaming" option is enabled
> > ("on" of "parallel") or if "streaming" is not enabled then that would
> > mean serializing each change.
> >
>
> I agreed and updated the patch as Amit suggested.
> Please see the attached patch.
>

The patch looks good to me. Some minor comments are:

- * limit, but we might also adapt a more elaborate eviction strategy
- for example
- * evicting enough transactions to free certain fraction (e.g. 50%)
of the memory
- * limit.
+ * limit, but we might also adapt a more elaborate eviction strategy - for
+ * example evicting enough transactions to free certain fraction (e.g. 50%) of
+ * the memory limit.

This change is not relevant with this feature.

---
+        if (logical_decoding_mode == LOGICAL_DECODING_MODE_DEFAULT
+                && rb->size < logical_decoding_work_mem * 1024L)

Since we put '&&' before the new line in all other places in
reorderbuffer.c, I think it's better to make it consistent. The same
is true for the change for while loop in the patch.

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Wed, Dec 21, 2022 at 7:25 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Wed, Dec 21, 2022 at 10:14 PM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
>
> The patch looks good to me. Some minor comments are:
>
> - * limit, but we might also adapt a more elaborate eviction strategy
> - for example
> - * evicting enough transactions to free certain fraction (e.g. 50%)
> of the memory
> - * limit.
> + * limit, but we might also adapt a more elaborate eviction strategy - for
> + * example evicting enough transactions to free certain fraction (e.g. 50%) of
> + * the memory limit.
>
> This change is not relevant with this feature.
>
> ---
> +        if (logical_decoding_mode == LOGICAL_DECODING_MODE_DEFAULT
> +                && rb->size < logical_decoding_work_mem * 1024L)
>
> Since we put '&&' before the new line in all other places in
> reorderbuffer.c, I think it's better to make it consistent. The same
> is true for the change for while loop in the patch.
>

I have addressed these comments in the attached. Additionally, I have
modified the docs and commit messages to make those clear. I think
instead of adding new tests with this patch, it may be better to
change some of the existing tests related to streaming to use this
parameter as that will clearly show one of the purposes of this patch.

-- 
With Regards,
Amit Kapila.

Attachment

Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Thu, Dec 22, 2022 at 4:05 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Dec 21, 2022 at 7:25 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >
> > On Wed, Dec 21, 2022 at 10:14 PM shiy.fnst@fujitsu.com
> > <shiy.fnst@fujitsu.com> wrote:
> >
> > The patch looks good to me. Some minor comments are:
> >
> > - * limit, but we might also adapt a more elaborate eviction strategy
> > - for example
> > - * evicting enough transactions to free certain fraction (e.g. 50%)
> > of the memory
> > - * limit.
> > + * limit, but we might also adapt a more elaborate eviction strategy - for
> > + * example evicting enough transactions to free certain fraction (e.g. 50%) of
> > + * the memory limit.
> >
> > This change is not relevant with this feature.
> >
> > ---
> > +        if (logical_decoding_mode == LOGICAL_DECODING_MODE_DEFAULT
> > +                && rb->size < logical_decoding_work_mem * 1024L)
> >
> > Since we put '&&' before the new line in all other places in
> > reorderbuffer.c, I think it's better to make it consistent. The same
> > is true for the change for while loop in the patch.
> >
>
> I have addressed these comments in the attached. Additionally, I have
> modified the docs and commit messages to make those clear.

Thanks!

>  I think
> instead of adding new tests with this patch, it may be better to
> change some of the existing tests related to streaming to use this
> parameter as that will clearly show one of the purposes of this patch.

+1. I think test_decoding/sql/stream.sql and spill.sql are good
candidates and we change logical replication TAP tests in a separate
patch.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



RE: Force streaming every change in logical decoding

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Amit,

Thank you for updating the patch. I have also checked the patch
and basically it has worked well. Almost all things I found were modified
by v4.

One comment: while setting logical_decoding_mode to wrong value,
I got unfriendly ERROR message.

```
postgres=# SET logical_decoding_mode = 1;
ERROR:  invalid value for parameter "logical_decoding_mode": "1"
HINT:  Available values: , immediate
```

Here all acceptable enum should be output as HINT, but we could not see the empty string.
Should we modify config_enum_get_options() for treating empty string, maybe
like (empty)? Or we can ignore now.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


Re: Force streaming every change in logical decoding

From
Kyotaro Horiguchi
Date:
At Thu, 22 Dec 2022 12:35:46 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in 
> I have addressed these comments in the attached. Additionally, I have
> modified the docs and commit messages to make those clear. I think
> instead of adding new tests with this patch, it may be better to
> change some of the existing tests related to streaming to use this
> parameter as that will clearly show one of the purposes of this patch.

Being late but I'm warried that we might sometime regret that the lack
of the explicit default. Don't we really need it?

+        Allows streaming or serializing changes immediately in logical decoding.
+        The allowed values of <varname>logical_decoding_mode</varname> are the
+        empty string and <literal>immediate</literal>. When set to
+        <literal>immediate</literal>, stream each change if
+        <literal>streaming</literal> option is enabled, otherwise, serialize
+        each change.  When set to an empty string, which is the default,
+        decoding will stream or serialize changes when
+        <varname>logical_decoding_work_mem</varname> is reached.

With (really) fresh eyes, I took a bit long time to understand what
the "streaming" option is. Couldn't we augment the description by a
bit?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Thu, Dec 22, 2022 at 4:18 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
>
> Dear Amit,
>
> Thank you for updating the patch. I have also checked the patch
> and basically it has worked well. Almost all things I found were modified
> by v4.
>
> One comment: while setting logical_decoding_mode to wrong value,
> I got unfriendly ERROR message.
>
> ```
> postgres=# SET logical_decoding_mode = 1;
> ERROR:  invalid value for parameter "logical_decoding_mode": "1"
> HINT:  Available values: , immediate
> ```
>
> Here all acceptable enum should be output as HINT, but we could not see the empty string.
> Should we modify config_enum_get_options() for treating empty string, maybe
> like (empty)?

Good point. I think the hint message can say "The only allowed value
is \"immediate\" as recovery_target does. Or considering the name of
logical_decoding_mode, we might want to have a non-empty string, say
'normal' as Kuroda-san proposed, as the default value.

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Kyotaro Horiguchi
Date:
At Thu, 22 Dec 2022 16:59:30 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in 
> On Thu, Dec 22, 2022 at 4:18 PM Hayato Kuroda (Fujitsu)
> <kuroda.hayato@fujitsu.com> wrote:
> >
> > Dear Amit,
> >
> > Thank you for updating the patch. I have also checked the patch
> > and basically it has worked well. Almost all things I found were modified
> > by v4.
> >
> > One comment: while setting logical_decoding_mode to wrong value,
> > I got unfriendly ERROR message.
> >
> > ```
> > postgres=# SET logical_decoding_mode = 1;
> > ERROR:  invalid value for parameter "logical_decoding_mode": "1"
> > HINT:  Available values: , immediate
> > ```
> >
> > Here all acceptable enum should be output as HINT, but we could not see the empty string.
> > Should we modify config_enum_get_options() for treating empty string, maybe
> > like (empty)?
> 
> Good point. I think the hint message can say "The only allowed value
> is \"immediate\" as recovery_target does. Or considering the name of
> logical_decoding_mode, we might want to have a non-empty string, say
> 'normal' as Kuroda-san proposed, as the default value.

Oh. I missed this, and agree to have the explicit default option.
I slightly prefer "buffered" but "normal" also works fine for me.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Thu, Dec 22, 2022 at 1:55 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
>
> At Thu, 22 Dec 2022 16:59:30 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in
> > On Thu, Dec 22, 2022 at 4:18 PM Hayato Kuroda (Fujitsu)
> > <kuroda.hayato@fujitsu.com> wrote:
> > >
> > > Dear Amit,
> > >
> > > Thank you for updating the patch. I have also checked the patch
> > > and basically it has worked well. Almost all things I found were modified
> > > by v4.
> > >
> > > One comment: while setting logical_decoding_mode to wrong value,
> > > I got unfriendly ERROR message.
> > >
> > > ```
> > > postgres=# SET logical_decoding_mode = 1;
> > > ERROR:  invalid value for parameter "logical_decoding_mode": "1"
> > > HINT:  Available values: , immediate
> > > ```
> > >
> > > Here all acceptable enum should be output as HINT, but we could not see the empty string.
> > > Should we modify config_enum_get_options() for treating empty string, maybe
> > > like (empty)?
> >
> > Good point. I think the hint message can say "The only allowed value
> > is \"immediate\" as recovery_target does. Or considering the name of
> > logical_decoding_mode, we might want to have a non-empty string, say
> > 'normal' as Kuroda-san proposed, as the default value.
>
> Oh. I missed this, and agree to have the explicit default option.
> I slightly prefer "buffered" but "normal" also works fine for me.
>

+1 for "buffered" as that seems to convey the meaning better.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Thu, Dec 22, 2022 at 12:47 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Thu, Dec 22, 2022 at 4:05 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
>
> >  I think
> > instead of adding new tests with this patch, it may be better to
> > change some of the existing tests related to streaming to use this
> > parameter as that will clearly show one of the purposes of this patch.
>
> +1. I think test_decoding/sql/stream.sql and spill.sql are good
> candidates and we change logical replication TAP tests in a separate
> patch.
>

I prefer the other way, let's first do TAP tests because that will
also help immediately with the parallel apply feature. We need to
execute most of those tests in parallel mode.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Thu, Dec 22, 2022 at 1:15 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
>
> At Thu, 22 Dec 2022 12:35:46 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in
> > I have addressed these comments in the attached. Additionally, I have
> > modified the docs and commit messages to make those clear. I think
> > instead of adding new tests with this patch, it may be better to
> > change some of the existing tests related to streaming to use this
> > parameter as that will clearly show one of the purposes of this patch.
>
> Being late but I'm warried that we might sometime regret that the lack
> of the explicit default. Don't we really need it?
>

For this, I like your proposal for "buffered" as an explicit default value.

> +        Allows streaming or serializing changes immediately in logical decoding.
> +        The allowed values of <varname>logical_decoding_mode</varname> are the
> +        empty string and <literal>immediate</literal>. When set to
> +        <literal>immediate</literal>, stream each change if
> +        <literal>streaming</literal> option is enabled, otherwise, serialize
> +        each change.  When set to an empty string, which is the default,
> +        decoding will stream or serialize changes when
> +        <varname>logical_decoding_work_mem</varname> is reached.
>
> With (really) fresh eyes, I took a bit long time to understand what
> the "streaming" option is. Couldn't we augment the description by a
> bit?
>

Okay, how about modifying it to: "When set to
<literal>immediate</literal>, stream each change if
<literal>streaming</literal> option (see optional parameters set by
CREATE SUBSCRIPTION) is enabled, otherwise, serialize each change.

-- 
With Regards,
Amit Kapila.



RE: Force streaming every change in logical decoding

From
"shiy.fnst@fujitsu.com"
Date:
On Thu, Dec 22, 2022 5:24 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Thu, Dec 22, 2022 at 1:15 PM Kyotaro Horiguchi
> <horikyota.ntt@gmail.com> wrote:
> >
> > At Thu, 22 Dec 2022 12:35:46 +0530, Amit Kapila
> <amit.kapila16@gmail.com> wrote in
> > > I have addressed these comments in the attached. Additionally, I have
> > > modified the docs and commit messages to make those clear. I think
> > > instead of adding new tests with this patch, it may be better to
> > > change some of the existing tests related to streaming to use this
> > > parameter as that will clearly show one of the purposes of this patch.
> >
> > Being late but I'm warried that we might sometime regret that the lack
> > of the explicit default. Don't we really need it?
> >
> 
> For this, I like your proposal for "buffered" as an explicit default value.
> 
> > +        Allows streaming or serializing changes immediately in logical
> decoding.
> > +        The allowed values of <varname>logical_decoding_mode</varname>
> are the
> > +        empty string and <literal>immediate</literal>. When set to
> > +        <literal>immediate</literal>, stream each change if
> > +        <literal>streaming</literal> option is enabled, otherwise, serialize
> > +        each change.  When set to an empty string, which is the default,
> > +        decoding will stream or serialize changes when
> > +        <varname>logical_decoding_work_mem</varname> is reached.
> >
> > With (really) fresh eyes, I took a bit long time to understand what
> > the "streaming" option is. Couldn't we augment the description by a
> > bit?
> >
> 
> Okay, how about modifying it to: "When set to
> <literal>immediate</literal>, stream each change if
> <literal>streaming</literal> option (see optional parameters set by
> CREATE SUBSCRIPTION) is enabled, otherwise, serialize each change.
> 

I updated the patch to use "buffered" as the explicit default value, and include
Amit's changes about document.

Besides, I tried to reduce data size in streaming subscription tap tests by this
new GUC (see 0002 patch). But I didn't covert all streaming tap tests because I
think we also need to cover the case that there are lots of changes. So, 015* is
not modified. And 017* is not modified because streaming transactions and
non-streaming transactions are tested alternately in this test.

I collected the time to run these tests before and after applying the patch set
on my machine. In debug version, it saves about 5.3 s; and in release version,
it saves about 1.8 s. The time of each test is attached.

Regards,
Shi yu

Attachment

Re: Force streaming every change in logical decoding

From
Kyotaro Horiguchi
Date:
At Thu, 22 Dec 2022 14:53:34 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in 
> For this, I like your proposal for "buffered" as an explicit default value.

Good to hear.

> Okay, how about modifying it to: "When set to
> <literal>immediate</literal>, stream each change if
> <literal>streaming</literal> option (see optional parameters set by
> CREATE SUBSCRIPTION) is enabled, otherwise, serialize each change.

Looks fine. Thanks!

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Thu, Dec 22, 2022 at 6:19 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Thu, Dec 22, 2022 at 12:47 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >
> > On Thu, Dec 22, 2022 at 4:05 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> >
> > >  I think
> > > instead of adding new tests with this patch, it may be better to
> > > change some of the existing tests related to streaming to use this
> > > parameter as that will clearly show one of the purposes of this patch.
> >
> > +1. I think test_decoding/sql/stream.sql and spill.sql are good
> > candidates and we change logical replication TAP tests in a separate
> > patch.
> >
>
> I prefer the other way, let's first do TAP tests because that will
> also help immediately with the parallel apply feature. We need to
> execute most of those tests in parallel mode.

Good point. Or we can do both if changes for test_decoding tests are not huge?

Regards,

-- 
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Thu, Dec 22, 2022 at 9:48 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> On Thu, Dec 22, 2022 5:24 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Thu, Dec 22, 2022 at 1:15 PM Kyotaro Horiguchi
> > <horikyota.ntt@gmail.com> wrote:
> > >
> > > At Thu, 22 Dec 2022 12:35:46 +0530, Amit Kapila
> > <amit.kapila16@gmail.com> wrote in
> > > > I have addressed these comments in the attached. Additionally, I have
> > > > modified the docs and commit messages to make those clear. I think
> > > > instead of adding new tests with this patch, it may be better to
> > > > change some of the existing tests related to streaming to use this
> > > > parameter as that will clearly show one of the purposes of this patch.
> > >
> > > Being late but I'm warried that we might sometime regret that the lack
> > > of the explicit default. Don't we really need it?
> > >
> >
> > For this, I like your proposal for "buffered" as an explicit default value.
> >
> > > +        Allows streaming or serializing changes immediately in logical
> > decoding.
> > > +        The allowed values of <varname>logical_decoding_mode</varname>
> > are the
> > > +        empty string and <literal>immediate</literal>. When set to
> > > +        <literal>immediate</literal>, stream each change if
> > > +        <literal>streaming</literal> option is enabled, otherwise, serialize
> > > +        each change.  When set to an empty string, which is the default,
> > > +        decoding will stream or serialize changes when
> > > +        <varname>logical_decoding_work_mem</varname> is reached.
> > >
> > > With (really) fresh eyes, I took a bit long time to understand what
> > > the "streaming" option is. Couldn't we augment the description by a
> > > bit?
> > >
> >
> > Okay, how about modifying it to: "When set to
> > <literal>immediate</literal>, stream each change if
> > <literal>streaming</literal> option (see optional parameters set by
> > CREATE SUBSCRIPTION) is enabled, otherwise, serialize each change.
> >
>
> I updated the patch to use "buffered" as the explicit default value, and include
> Amit's changes about document.
>
> Besides, I tried to reduce data size in streaming subscription tap tests by this
> new GUC (see 0002 patch). But I didn't covert all streaming tap tests because I
> think we also need to cover the case that there are lots of changes. So, 015* is
> not modified.

If we want to eventually convert 015 some time, isn't it better to
include it even if it requires many changes? Is there any reason we
want to change 017 in a separate patch?

> And 017* is not modified because streaming transactions and
> non-streaming transactions are tested alternately in this test.

How about 029_on_error.pl? It also sets logical_decoding_work_mem to
64kb to test the STREAM COMMIT case.

>
> I collected the time to run these tests before and after applying the patch set
> on my machine. In debug version, it saves about 5.3 s; and in release version,
> it saves about 1.8 s. The time of each test is attached.

Nice improvements.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



RE: Force streaming every change in logical decoding

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Shi,

Thanks for updating the patch! Followings are comments.

ReorderBufferCheckMemoryLimit()

```
+    /*
+     * Bail out if logical_decoding_mode is disabled and we haven't exceeded
+     * the memory limit.
+     */
```

I think 'disabled' should be '"buffered"'.

```
+     * If logical_decoding_mode is immediate, loop until there's no change.
+     * Otherwise, loop until we reach under the memory limit. One might think
+     * that just by evicting the largest (sub)transaction we will come under
+     * the memory limit based on assumption that the selected transaction is
+     * at least as large as the most recent change (which caused us to go over
+     * the memory limit). However, that is not true because a user can reduce
+     * the logical_decoding_work_mem to a smaller value before the most recent
      * change.
      */
```

Do we need to pick the largest (sub)transaciton even if we are in the immediate mode?
It seems that the liner search is done in ReorderBufferLargestStreamableTopTXN()
to find the largest transaction, but in this case we can choose the arbitrary one.

reorderbuffer.h

+/* possible values for logical_decoding_mode */
+typedef enum
+{
+    LOGICAL_DECODING_MODE_BUFFERED,
+    LOGICAL_DECODING_MODE_IMMEDIATE
+}            LogicalDecodingMode;

I'm not sure, but do we have to modify typedefs.list?



Moreover, I have also checked the improvements of elapsed time.
All builds were made by meson system, and the unit of each cells is second.
It seemed that results have same trends with Shi.

Debug build:

    HEAD    PATCH    Delta
16    6.44    5.96    0.48
18    6.92    6.47    0.45
19    5.93    5.91    0.02
22    14.98    13.7    1.28
23    12.01    8.22    3.79

SUM of delta: 6.02s

Release build:

    HEAD    PATCH    Delta
16    3.51    3.22    0.29
17    4.04    3.48    0.56
19    3.43    3.3    0.13
22    10.06    8.46    1.6
23    6.74    5.39    1.35

SUM of delta: 3.93s


I will check and report the test coverage if I can.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Fri, Dec 23, 2022 at 10:32 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> >
> > Besides, I tried to reduce data size in streaming subscription tap tests by this
> > new GUC (see 0002 patch). But I didn't covert all streaming tap tests because I
> > think we also need to cover the case that there are lots of changes. So, 015* is
> > not modified.
>
> If we want to eventually convert 015 some time, isn't it better to
> include it even if it requires many changes?
>

I think there is some confusion here because the point is that we
don't want to convert all the tests. It would be good to have some
tests which follow the regular path of logical_decoding_work_mem.

> Is there any reason we
> want to change 017 in a separate patch?
>

Here also, the idea is to leave it as it is. It has a mix of stream
and non-stream cases and it would be tricky to convert it because we
need to change GUC before streamed one and reload the config and
ensure reloaded config is in effect.

> > And 017* is not modified because streaming transactions and
> > non-streaming transactions are tested alternately in this test.
>
> How about 029_on_error.pl? It also sets logical_decoding_work_mem to
> 64kb to test the STREAM COMMIT case.
>

How would you like to change this? Do we want to enable the GUC or
streaming option just before that case and wait for it? If so, that
might take more time than we save.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Fri, Dec 23, 2022 at 10:48 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
>
> ```
> +        * If logical_decoding_mode is immediate, loop until there's no change.
> +        * Otherwise, loop until we reach under the memory limit. One might think
> +        * that just by evicting the largest (sub)transaction we will come under
> +        * the memory limit based on assumption that the selected transaction is
> +        * at least as large as the most recent change (which caused us to go over
> +        * the memory limit). However, that is not true because a user can reduce
> +        * the logical_decoding_work_mem to a smaller value before the most recent
>          * change.
>          */
> ```
>
> Do we need to pick the largest (sub)transaciton even if we are in the immediate mode?
> It seems that the liner search is done in ReorderBufferLargestStreamableTopTXN()
> to find the largest transaction, but in this case we can choose the arbitrary one.
>

In immediate mode, we will stream/spill each change, so ideally, we
don't need to perform any search. Otherwise, also, I think changing
those functions will complicate the code without serving any purpose.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Thu, Dec 22, 2022 at 6:18 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
>
> Besides, I tried to reduce data size in streaming subscription tap tests by this
> new GUC (see 0002 patch). But I didn't covert all streaming tap tests because I
> think we also need to cover the case that there are lots of changes. So, 015* is
> not modified. And 017* is not modified because streaming transactions and
> non-streaming transactions are tested alternately in this test.
>

I think we can remove the newly added test from the patch and instead
combine the 0001 and 0002 patches. I think we should leave the
022_twophase_cascade as it is because it can impact code coverage,
especially the below part of the test:
# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
$node_A->safe_psql(
    'postgres', "
    BEGIN;
    INSERT INTO test_tab VALUES (9999, 'foobar');
    SAVEPOINT sp_inner;
    INSERT INTO test_tab SELECT i, md5(i::text) FROM
generate_series(3, 5000) s(i);

Here, we will stream first time after the subtransaction, so can
impact the below part of the code in ReorderBufferStreamTXN:
if (txn->snapshot_now == NULL)
{
...
dlist_foreach(subxact_i, &txn->subtxns)
{
ReorderBufferTXN *subtxn;

subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
ReorderBufferTransferSnapToParent(txn, subtxn);
}
...

-- 
With Regards,
Amit Kapila.



RE: Force streaming every change in logical decoding

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear hackers,

> I will check and report the test coverage if I can.

I ran make coverage. PSA the screen shot that shows results.
According to the result the coverage seemed to be not changed
even if the elapsed time was reduced.

Only following lines at process_syncing_tables_for_apply() seemed to be not hit after patching,
but I thought it was the timing issue because we do not modify around there.

```
                    /*
                     * Enter busy loop and wait for synchronization worker to
                     * reach expected state (or die trying).
                     */
                    if (!started_tx)
                    {
                        StartTransactionCommand();
                        started_tx = true;
                    }
```

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


Attachment

Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Fri, Dec 23, 2022 at 1:12 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
>
> Dear hackers,
>
> > I will check and report the test coverage if I can.
>
> I ran make coverage. PSA the screen shot that shows results.
> According to the result the coverage seemed to be not changed
> even if the elapsed time was reduced.
>
> Only following lines at process_syncing_tables_for_apply() seemed to be not hit after patching,
> but I thought it was the timing issue because we do not modify around there.
>
> ```
>                                         /*
>                                          * Enter busy loop and wait for synchronization worker to
>                                          * reach expected state (or die trying).
>                                          */
>                                         if (!started_tx)
>                                         {
>                                                 StartTransactionCommand();
>                                                 started_tx = true;
>                                         }
> ```
>

This part of the code is related to synchronization between apply and
sync workers which depends upon timing. So, we can ignore this
difference.

-- 
With Regards,
Amit Kapila.



RE: Force streaming every change in logical decoding

From
"shiy.fnst@fujitsu.com"
Date:
On Fri, Dec 23, 2022 1:50 PM Amit Kapila <amit.kapila16@gmail.com>
> 
> On Thu, Dec 22, 2022 at 6:18 PM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> >
> > Besides, I tried to reduce data size in streaming subscription tap tests by this
> > new GUC (see 0002 patch). But I didn't covert all streaming tap tests
> because I
> > think we also need to cover the case that there are lots of changes. So, 015*
> is
> > not modified. And 017* is not modified because streaming transactions and
> > non-streaming transactions are tested alternately in this test.
> >
> 
> I think we can remove the newly added test from the patch and instead
> combine the 0001 and 0002 patches. I think we should leave the
> 022_twophase_cascade as it is because it can impact code coverage,
> especially the below part of the test:
> # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
> $node_A->safe_psql(
>     'postgres', "
>     BEGIN;
>     INSERT INTO test_tab VALUES (9999, 'foobar');
>     SAVEPOINT sp_inner;
>     INSERT INTO test_tab SELECT i, md5(i::text) FROM
> generate_series(3, 5000) s(i);
> 
> Here, we will stream first time after the subtransaction, so can
> impact the below part of the code in ReorderBufferStreamTXN:
> if (txn->snapshot_now == NULL)
> {
> ...
> dlist_foreach(subxact_i, &txn->subtxns)
> {
> ReorderBufferTXN *subtxn;
> 
> subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
> ReorderBufferTransferSnapToParent(txn, subtxn);
> }
> ...
> 

OK, I removed the modification in 022_twophase_cascade.pl and combine the two patches.

Please see the attached patch.
I also fixed Kuroda-san's comments[1].

[1]
https://www.postgresql.org/message-id/TYAPR01MB5866CD99CF86EAC84119BC91F5E99%40TYAPR01MB5866.jpnprd01.prod.outlook.com

Regards,
Shi yu

Attachment

RE: Force streaming every change in logical decoding

From
"Hayato Kuroda (Fujitsu)"
Date:
Dear Shi,

> I also fixed Kuroda-san's comments[1].

Thanks for updating the patch! I confirmed that my comments were fixed and
your patch could pass subscription and test_decoding tests. I think we can
change the status to "Ready for Committer".

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


Re: Force streaming every change in logical decoding

From
shveta malik
Date:


On Fri, Dec 23, 2022 at 2:03 PM shiy.fnst@fujitsu.com <shiy.fnst@fujitsu.com> wrote:
>
> On Fri, Dec 23, 2022 1:50 PM Amit Kapila <amit.kapila16@gmail.com>
> >
> > On Thu, Dec 22, 2022 at 6:18 PM shiy.fnst@fujitsu.com
> > <shiy.fnst@fujitsu.com> wrote:
> > >
> > >
> > > Besides, I tried to reduce data size in streaming subscription tap tests by this
> > > new GUC (see 0002 patch). But I didn't covert all streaming tap tests
> > because I
> > > think we also need to cover the case that there are lots of changes. So, 015*
> > is
> > > not modified. And 017* is not modified because streaming transactions and
> > > non-streaming transactions are tested alternately in this test.
> > >
> >
> > I think we can remove the newly added test from the patch and instead
> > combine the 0001 and 0002 patches. I think we should leave the
> > 022_twophase_cascade as it is because it can impact code coverage,
> > especially the below part of the test:
> > # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
> > $node_A->safe_psql(
> >     'postgres', "
> >     BEGIN;
> >     INSERT INTO test_tab VALUES (9999, 'foobar');
> >     SAVEPOINT sp_inner;
> >     INSERT INTO test_tab SELECT i, md5(i::text) FROM
> > generate_series(3, 5000) s(i);
> >
> > Here, we will stream first time after the subtransaction, so can
> > impact the below part of the code in ReorderBufferStreamTXN:
> > if (txn->snapshot_now == NULL)
> > {
> > ...
> > dlist_foreach(subxact_i, &txn->subtxns)
> > {
> > ReorderBufferTXN *subtxn;
> >
> > subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
> > ReorderBufferTransferSnapToParent(txn, subtxn);
> > }
> > ...
> >
>
> OK, I removed the modification in 022_twophase_cascade.pl and combine the two patches.
>
> Please see the attached patch.
> I also fixed Kuroda-san's comments[1].
>
> [1] https://www.postgresql.org/message-id/TYAPR01MB5866CD99CF86EAC84119BC91F5E99%40TYAPR01MB5866.jpnprd01.prod.outlook.com
>
> Regards,
> Shi yu

Hello,
I ran tests (4 runs) on both versions (v5 and v6) in release mode. And the data looks promising, time is reduced now:

  HEAD                                 V5                                      Delta (sec)
2:20.535307                      2:15.865241                             4.670066
2:19.220917                      2:14.445312                             4.775605
2:22.492128                      2:17.35755                               5.134578
2:20.737309                      2:15.564306                             5.173003

HEAD                                V6                                         Delta (sec)
2:20.535307                   2:15.363567                                 5.17174
2:19.220917                   2:15.079082                                 4.14.1835
2:22.492128                   2:16.244139                                 6.247989
2:20.737309                   2:16.108033                                 4.629276

thanks
Shveta

Re: Force streaming every change in logical decoding

From
Masahiko Sawada
Date:
On Fri, Dec 23, 2022 at 5:32 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> On Fri, Dec 23, 2022 1:50 PM Amit Kapila <amit.kapila16@gmail.com>
> >
> > On Thu, Dec 22, 2022 at 6:18 PM shiy.fnst@fujitsu.com
> > <shiy.fnst@fujitsu.com> wrote:
> > >
> > >
> > > Besides, I tried to reduce data size in streaming subscription tap tests by this
> > > new GUC (see 0002 patch). But I didn't covert all streaming tap tests
> > because I
> > > think we also need to cover the case that there are lots of changes. So, 015*
> > is
> > > not modified. And 017* is not modified because streaming transactions and
> > > non-streaming transactions are tested alternately in this test.
> > >
> >
> > I think we can remove the newly added test from the patch and instead
> > combine the 0001 and 0002 patches. I think we should leave the
> > 022_twophase_cascade as it is because it can impact code coverage,
> > especially the below part of the test:
> > # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
> > $node_A->safe_psql(
> >     'postgres', "
> >     BEGIN;
> >     INSERT INTO test_tab VALUES (9999, 'foobar');
> >     SAVEPOINT sp_inner;
> >     INSERT INTO test_tab SELECT i, md5(i::text) FROM
> > generate_series(3, 5000) s(i);
> >
> > Here, we will stream first time after the subtransaction, so can
> > impact the below part of the code in ReorderBufferStreamTXN:
> > if (txn->snapshot_now == NULL)
> > {
> > ...
> > dlist_foreach(subxact_i, &txn->subtxns)
> > {
> > ReorderBufferTXN *subtxn;
> >
> > subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
> > ReorderBufferTransferSnapToParent(txn, subtxn);
> > }
> > ...
> >
>
> OK, I removed the modification in 022_twophase_cascade.pl and combine the two patches.

Thank you for updating the patch. The v6 patch looks good to me.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Fri, Dec 23, 2022 at 10:31 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Fri, Dec 23, 2022 at 5:32 PM shiy.fnst@fujitsu.com
> <shiy.fnst@fujitsu.com> wrote:
> >
> > On Fri, Dec 23, 2022 1:50 PM Amit Kapila <amit.kapila16@gmail.com>
> > >
> > > On Thu, Dec 22, 2022 at 6:18 PM shiy.fnst@fujitsu.com
> > > <shiy.fnst@fujitsu.com> wrote:
> > > >
> > > >
> > > > Besides, I tried to reduce data size in streaming subscription tap tests by this
> > > > new GUC (see 0002 patch). But I didn't covert all streaming tap tests
> > > because I
> > > > think we also need to cover the case that there are lots of changes. So, 015*
> > > is
> > > > not modified. And 017* is not modified because streaming transactions and
> > > > non-streaming transactions are tested alternately in this test.
> > > >
> > >
> > > I think we can remove the newly added test from the patch and instead
> > > combine the 0001 and 0002 patches. I think we should leave the
> > > 022_twophase_cascade as it is because it can impact code coverage,
> > > especially the below part of the test:
> > > # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
> > > $node_A->safe_psql(
> > >     'postgres', "
> > >     BEGIN;
> > >     INSERT INTO test_tab VALUES (9999, 'foobar');
> > >     SAVEPOINT sp_inner;
> > >     INSERT INTO test_tab SELECT i, md5(i::text) FROM
> > > generate_series(3, 5000) s(i);
> > >
> > > Here, we will stream first time after the subtransaction, so can
> > > impact the below part of the code in ReorderBufferStreamTXN:
> > > if (txn->snapshot_now == NULL)
> > > {
> > > ...
> > > dlist_foreach(subxact_i, &txn->subtxns)
> > > {
> > > ReorderBufferTXN *subtxn;
> > >
> > > subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
> > > ReorderBufferTransferSnapToParent(txn, subtxn);
> > > }
> > > ...
> > >
> >
> > OK, I removed the modification in 022_twophase_cascade.pl and combine the two patches.
>
> Thank you for updating the patch. The v6 patch looks good to me.
>

LGTM as well. I'll push this on Monday.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Dilip Kumar
Date:
On Sat, Dec 24, 2022 at 8:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> > >
> > > OK, I removed the modification in 022_twophase_cascade.pl and combine the two patches.
> >
> > Thank you for updating the patch. The v6 patch looks good to me.
> >
>
> LGTM as well. I'll push this on Monday.
>

The patch looks good to me.



--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Sat, Dec 24, 2022 at 3:28 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Sat, Dec 24, 2022 at 8:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > > >
> > > > OK, I removed the modification in 022_twophase_cascade.pl and combine the two patches.
> > >
> > > Thank you for updating the patch. The v6 patch looks good to me.
> > >
> >
> > LGTM as well. I'll push this on Monday.
> >
>
> The patch looks good to me.
>

Pushed.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
Andres Freund
Date:
Hi,

On 2022-12-26 14:04:28 +0530, Amit Kapila wrote:
> Pushed.

I did not follow this thread but saw the commit. Could you explain why a GUC
is the right mechanism here? The commit message didn't explain why a GUC was
chosen.

To me an option like this should be passed in when decoding rather than a GUC.

Greetings,

Andres Freund



Re: Force streaming every change in logical decoding

From
Amit Kapila
Date:
On Wed, Dec 28, 2022 at 1:42 AM Andres Freund <andres@anarazel.de> wrote:
>
> On 2022-12-26 14:04:28 +0530, Amit Kapila wrote:
> > Pushed.
>
> I did not follow this thread but saw the commit. Could you explain why a GUC
> is the right mechanism here? The commit message didn't explain why a GUC was
> chosen.
>
> To me an option like this should be passed in when decoding rather than a GUC.
>

For that, we need to have a subscription option for this as we need to
reduce test timing for streaming TAP tests (by streaming, I mean
replication of large in-progress transactions) as well. Currently,
there is no subscription option that is merely used for
testing/debugging purpose and there doesn't seem to be a use for this
for users. So, we didn't want to expose it as a user option. There is
also a risk that if users use it they will face a slowdown.

-- 
With Regards,
Amit Kapila.



Re: Force streaming every change in logical decoding

From
vignesh C
Date:
On Mon, 26 Dec 2022 at 14:04, Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Sat, Dec 24, 2022 at 3:28 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Sat, Dec 24, 2022 at 8:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > > > >
> > > > > OK, I removed the modification in 022_twophase_cascade.pl and combine the two patches.
> > > >
> > > > Thank you for updating the patch. The v6 patch looks good to me.
> > > >
> > >
> > > LGTM as well. I'll push this on Monday.
> > >
> >
> > The patch looks good to me.
> >
>
> Pushed.

If there is nothing pending for this patch, can we close the commitfest entry?

Regards,
Vignesh



RE: Force streaming every change in logical decoding

From
"shiy.fnst@fujitsu.com"
Date:
On Thu, Dec 22, 2022 3:17 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> >  I think
> > instead of adding new tests with this patch, it may be better to
> > change some of the existing tests related to streaming to use this
> > parameter as that will clearly show one of the purposes of this patch.
> 
> +1. I think test_decoding/sql/stream.sql and spill.sql are good
> candidates and we change logical replication TAP tests in a separate
> patch.
> 

Hi,

I tried to reduce the data size in test_decoding tests by using the new added
GUC "logical_decoding_mode", and found that the new GUC is not suitable for
some cases.

For example, in the following cases in stream.sql (and there are some similar
cases in twophase_stream.sql), the changes in sub transaction exceed
logical_decoding_work_mem, but they won't be streamed because the it is rolled
back. (But the transaction is marked as streamed.) After the sub transaction,
there is a small amount of inserts, as logical_decoding_work_mem is not
exceeded, it will be streamed when decoding COMMIT. If we use
logical_decoding_mode=immediate, the small amount of inserts in toplevel
transaction will be streamed immediately. This is different from before, so I'm
not sure we can use logical_decoding_mode in this case.

```
-- streaming test with sub-transaction
BEGIN;
savepoint s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
rollback to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
COMMIT;
```

Besides, some cases in spill.sql can't use the new GUC because they want to
serialize when processing the specific toplevel transaction or sub transaction.
For example, in the case below, the changes in the subxact exceed
logical_decoding_work_mem and are serialized, and the insert after subxact is
not serialized. If logical_decoding_mode is used, both of them will be
serialized. This is not what we want to test.

```
-- spilling subxact, non-spilling main xact
BEGIN;
SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
RELEASE SAVEPOINT s;
INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
COMMIT;
```

Although the rest cases in spill.sql can use new GUC, but it needs set and reset
logical_decoding_mode many times. Besides, the tests in this file were added
before logical_decoding_work_mem was introduced, I am not sure if we can modify
these cases by using the new GUC.

Also, it looks the tests for toast in stream.sql can't be changed, too.

Due to the above reasons, it seems that currently few tests can be modified to
use logical_decoding_mode. If later we find some tests can changed to use
it, we can do it in a separate thread. I will close the CF entry.

Regards,
Shi yu