RE: Open a streamed block for transactional messages during decoding - Mailing list pgsql-hackers

From Zhijie Hou (Fujitsu)
Subject RE: Open a streamed block for transactional messages during decoding
Date
Msg-id OS3PR01MB57185190D47C717F206BA6A294A1A@OS3PR01MB5718.jpnprd01.prod.outlook.com
Whole thread Raw
In response to Re: Open a streamed block for transactional messages during decoding  (Amit Kapila <amit.kapila16@gmail.com>)
Responses Re: Open a streamed block for transactional messages during decoding
List pgsql-hackers
On Monday, October 30, 2023 12:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Thu, Oct 26, 2023 at 2:01 PM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
> wrote:
> >
> > On Thursday, October 26, 2023 12:42 PM Amit Kapila
> <amit.kapila16@gmail.com> wrote:
> > >
> > > On Tue, Oct 24, 2023 at 5:27 PM Zhijie Hou (Fujitsu)
> > > <houzj.fnst@fujitsu.com>
> > > wrote:
> > > >
> > > > While reviewing the test_decoding code, I noticed that when
> > > > skip_empty_xacts option is specified, it doesn't open the
> > > > streaming
> > > block( e.g.
> > > > pg_output_stream_start) before streaming the transactional MESSAGE
> > > > even if it's the first change in a streaming block.
> > > >
> > > > It looks inconsistent with what we do when streaming DML changes(e.g.
> > > > pg_decode_stream_change()).
> > > >
> > > > Here is a small patch to open the stream block in this case.
> > > >
> > >
> > > The change looks good to me though I haven't tested it yet. BTW, can
> > > we change the comment: "Output stream start if we haven't yet, but
> > > only for the transactional case." to "Output stream start if we
> > > haven't yet for transactional messages"?
> >
> > Thanks for the review and I changed this as suggested.
> >
> 
> --- a/contrib/test_decoding/expected/stream.out
> +++ b/contrib/test_decoding/expected/stream.out
> @@ -29,7 +29,10 @@ COMMIT;
>  SELECT data FROM pg_logical_slot_get_changes('regression_slot',
> NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
>                             data
>  ----------------------------------------------------------
> + opening a streamed block for transaction
>   streaming message: transactional: 1 prefix: test, sz: 50
> + closing a streamed block for transaction aborting streamed
> + (sub)transaction
> 
> I was analyzing the reason for the additional message: "aborting streamed
> (sub)transaction" in the above test and it seems to be due to the below check in
> the function pg_decode_stream_abort():
> 
> if (data->skip_empty_xacts && !xact_wrote_changes) return;
> 
> Before the patch, we won't be setting the 'xact_wrote_changes' flag in txndata
> which is fixed now. So, this looks okay to me. However, I have another
> observation in this code which is that for aborts or subtransactions, we are not
> checking the flag 'stream_wrote_changes', so we may end up emitting the
> abort message even when no actual change has been streamed. I haven't tried
> to generate a test to verify this observation, so I could be wrong as well but it is
> worth analyzing such cases.

I have confirmed that the mentioned case is possible(steps[1]): the
sub-transaction doesn't output any data, but the stream abort for this
sub-transaction will still be sent.

But I think this may not be a problemic behavior, as even the pgoutput can
behave similarly, e.g. If all the changes are filtered by row filter or table
filter, then the stream abort will still be sent. The subscriber will skip
handling the STREAM ABORT if the aborted txn was not applied.

And if we want to fix this, in output plugin, we need to record if we have sent
any changes for each sub-transaction so that we can decide whether to send the
following stream abort or not. We cannot use 'stream_wrote_changes' because
it's a per streamed block flag and there could be serval streamed blocks for one
sub-txn. It looks a bit complicate to me.


[1]
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
BEGIN;
savepoint p1;
CREATE TABLE test(a int);
INSERT INTO test VALUES(1);
savepoint p2;
CREATE TABLE test2(a int);
ROLLBACK TO SAVEPOINT p2;
COMMIT;

SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids',
'1','stream-changes', '1');
 

                       data
--------------------------------------------------
 opening a streamed block for transaction TXN 734
 streaming change for TXN 734
 closing a streamed block for transaction TXN 734
 aborting streamed (sub)transaction TXN 736
 committing streamed transaction TXN 734

Best Regards,
Hou zj




pgsql-hackers by date:

Previous
From: Bharath Rupireddy
Date:
Subject: Re: Add new option 'all' to pg_stat_reset_shared()
Next
From: Bharath Rupireddy
Date:
Subject: Re: A recent message added to pg_upgade