Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
Date
Msg-id CAA4eK1+Myjd35YRc8Swf4b+HzgU2p5N9OPAB4a7xCsmEiVjHGw@mail.gmail.com
Whole thread Raw
In response to Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions  (Dilip Kumar <dilipbalaut@gmail.com>)
Responses Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions
List pgsql-hackers
On Mon, Aug 31, 2020 at 7:28 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Mon, Aug 31, 2020 at 1:24 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Mon, Aug 31, 2020 at 10:49 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > > On Sun, Aug 30, 2020 at 2:43 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> > > >
> > >
> > > Another comment:
> > >
> > > +cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
> > > +{
> > > + HASH_SEQ_STATUS hash_seq;
> > > + RelationSyncEntry *entry;
> > > +
> > > + Assert(RelationSyncCache != NULL);
> > > +
> > > + hash_seq_init(&hash_seq, RelationSyncCache);
> > > + while ((entry = hash_seq_search(&hash_seq)) != NULL)
> > > + {
> > > + if (is_commit)
> > > + entry->schema_sent = true;
> > >
> > > How is it correct to set 'entry->schema_sent' for all the entries in
> > > RelationSyncCache? Consider a case where due to invalidation in an
> > > unrelated transaction we have set the flag schema_sent for a
> > > particular relation 'r1' as 'false' and that transaction is executed
> > > before the current streamed transaction for which we are performing
> > > commit and called this function. It will set the flag for unrelated
> > > entry in this case 'r1' which doesn't seem correct to me. Or, if this
> > > is correct, it would be a good idea to write some comments about it.
>
> Yeah, this is wrong,  I have fixed this issue in the attached patch
> and also added a new test for the same.
>

In functions cleanup_rel_sync_cache and
get_schema_sent_in_streamed_txn, lets cast the result of lfirst_int to
uint32 as suggested by Tom [1]. Also, lets keep the way we compare
xids consistent in both functions, i.e, if (xid == lfirst_int(lc)).

The behavior tested by the test case added for this is not clear
primarily because of comments.

+++ b/src/test/subscription/t/021_stream_schema.pl
@@ -0,0 +1,80 @@
+# Test behavior with streaming transaction exceeding logical_decoding_work_mem
...
+# large (streamed) transaction with DDL, DML and ROLLBACKs
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+ALTER TABLE test_tab ADD COLUMN c INT;
+INSERT INTO test_tab SELECT i, md5(i::text), i FROM
generate_series(3,3000) s(i);
+ALTER TABLE test_tab ADD COLUMN d INT;
+COMMIT;
+});
+
+# large (streamed) transaction with DDL, DML and ROLLBACKs
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM
generate_series(3001,3005) s(i);
+COMMIT;
+});
+wait_for_caught_up($node_publisher, $appname);

I understand that how this test will test the functionality related to
schema_sent stuff but neither the comments atop of file nor atop the
test case explains it clearly.

> > Few more comments:

>
> > 2.
> > 009_stream_simple.pl
> > +# Insert, update and delete enough rows to exceed the 64kB limit.
> > +$node_publisher->safe_psql('postgres', q{
> > +BEGIN;
> > +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
> > +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
> > +DELETE FROM test_tab WHERE mod(a,3) = 0;
> > +COMMIT;
> > +});
> >
> > How much above this data is 64kB limit? I just wanted to see that it
> > should not be on borderline and then due to some alignment issues the
> > streaming doesn't happen on some machines? Also, how such a test
> > ensures that the streaming has happened because the way we are
> > checking results, won't it be the same for the non-streaming case as
> > well?
>
> Only for this case, or you mean for all the tests?
>

It is better to do it for all tests and I have clarified this in my
next email sent yesterday [2] where I have raised a few more comments
as well. I hope you have not missed that email.

> > 3.
> > +# Change the local values of the extra columns on the subscriber,
> > +# update publisher, and check that subscriber retains the expected
> > +# values
> > +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c =
> > 'epoch'::timestamptz + 987654321 * interval '1s'");
> > +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
> > +
> > +wait_for_caught_up($node_publisher, $appname);
> > +
> > +$result =
> > +  $node_subscriber->safe_psql('postgres', "SELECT count(*),
> > count(extract(epoch from c) = 987654321), count(d = 999) FROM
> > test_tab");
> > +is($result, qq(3334|3334|3334), 'check extra columns contain locally
> > changed data');
> >
> > Again, how this test is relevant to streaming mode?
>
> I agree, it is not specific to the streaming.
>

> > Apart from the above, I have made a few changes in the attached patch
> > which are mainly to simplify the code at one place, added/edited few
> > comments, some other cosmetic changes, and renamed the test case files
> > as the initials of their name were matching other tests in the similar
> > directory.
>
> Changes look fine to me except this
>
> +
>
> + /* the value must be on/off */
> + if (strcmp(strVal(defel->arg), "on") && strcmp(strVal(defel->arg), "off"))
> + ereport(ERROR,
> + (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> + errmsg("invalid streaming value")));
> +
> + /* enable streaming if it's 'on' */
> + *enable_streaming = (strcmp(strVal(defel->arg), "on") == 0);
>
> I mean for streaming why we need to handle differently than the other
> surrounding code for example "binary" option.
>

Hmm, I think the code changed by me is to make it look similar to the
binary option. The code you have quoted above is from the patch
version prior to what I have sent. See the code snippet after my
changes:
@@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32
*protocol_version,

  *binary = defGetBoolean(defel);
  }
+ else if (strcmp(defel->defname, "streaming") == 0)
+ {
+ if (streaming_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ streaming_given = true;
+
+ *enable_streaming = defGetBoolean(defel);
+ }

This looks exactly similar to the binary option. Can you please check
it once again and confirm back?

[1] - https://www.postgresql.org/message-id/3955127.1598880523%40sss.pgh.pa.us
[2] - https://www.postgresql.org/message-id/CAA4eK1JjrcK6bk%2Bur3J%2BkLsfz4%2BipJFN7VcRd3cXr4gG5ZWWig%40mail.gmail.com

-- 
With Regards,
Amit Kapila.



pgsql-hackers by date:

Previous
From: Amit Kapila
Date:
Subject: Re: Use T_IntList for uint32
Next
From: Amit Kapila
Date:
Subject: Re: PATCH: logical_work_mem and logical streaming of large in-progress transactions