Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers

From Peter Smith
Subject Re: [HACKERS] logical decoding of two-phase transactions
Date
Msg-id CAHut+Pvmq7Ufk08qAmhyW0BjpzW9LbY+E140fVEM4LC77iSK8Q@mail.gmail.com
Whole thread Raw
In response to Re: [HACKERS] logical decoding of two-phase transactions  (vignesh C <vignesh21@gmail.com>)
List pgsql-hackers
On Tue, Apr 27, 2021 at 6:17 PM vignesh C <vignesh21@gmail.com> wrote:
>
> On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2250@gmail.com> wrote:
> > >
> > > Please find attached the latest patch set v73`*
> > >
> > > Differences from v72* are:
> > >
> > > * Rebased to HEAD @ today (required because v72-0001 no longer applied cleanly)
> > >
> > > * Minor documentation correction for protocol messages for Commit Prepared ('K')
> > >
> > > * Non-functional code tidy (mostly proto.c) to reduce overloading
> > > different meanings to same member names for prepare/commit times.
> >
> >
> > Please find attached a re-posting of patch set v73*
> >
> > This is the same as yesterday's v73 but with a contrib module compile
> > error fixed.
>
> Few comments on
> v73-0002-Add-prepare-API-support-for-streaming-transactio.patch patch:

Thanks for your feedback comments. My replies are inline below.

> 1) There are slight differences in error message in case of Alter
> subscription ... drop publication, we can keep the error message
> similar:
> postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH
> (refresh = false, copy_data=true, two_phase=true);
> ERROR:  unrecognized subscription parameter: "copy_data"
> postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH
> (refresh = false, two_phase=true, streaming=true);
> ERROR:  cannot alter two_phase option

OK. Updated in v74.

>
> 2) We are sending txn->xid twice, I felt we should send only once in
> logicalrep_write_stream_prepare:
> +       /* transaction ID */
> +       Assert(TransactionIdIsValid(txn->xid));
> +       pq_sendint32(out, txn->xid);
> +
> +       /* send the flags field */
> +       pq_sendbyte(out, flags);
> +
> +       /* send fields */
> +       pq_sendint64(out, prepare_lsn);
> +       pq_sendint64(out, txn->end_lsn);
> +       pq_sendint64(out, txn->u_op_time.prepare_time);
> +       pq_sendint32(out, txn->xid);
> +
>

OK. Updated in v74.

> 3) We could remove xid and return prepare_data->xid
> +TransactionId
> +logicalrep_read_stream_prepare(StringInfo in,
> LogicalRepPreparedTxnData *prepare_data)
> +{
> +       TransactionId xid;
> +       uint8           flags;
> +
> +       xid = pq_getmsgint(in, 4);
>

OK. Updated in v74.

> 4) Here comments can be above apply_spooled_messages for better readability
> +       /*
> +        * 1. Replay all the spooled operations - Similar code as for
> +        * apply_handle_stream_commit (i.e. non two-phase stream commit)
> +        */
> +
> +       ensure_transaction();
> +
> +       nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn);
> +
>

Not done. It was deliberately commented this way because the part
below the comment is what is in apply_handle_stream_commit.

> 5) Similarly this below comment can be above PrepareTransactionBlock
> +       /*
> +        * 2. Mark the transaction as prepared. - Similar code as for
> +        * apply_handle_prepare (i.e. two-phase non-streamed prepare)
> +        */
> +
> +       /*
> +        * BeginTransactionBlock is necessary to balance the EndTransactionBlock
> +        * called within the PrepareTransactionBlock below.
> +        */
> +       BeginTransactionBlock();
> +       CommitTransactionCommand();
> +
> +       /*
> +        * Update origin state so we can restart streaming from correct position
> +        * in case of crash.
> +        */
> +       replorigin_session_origin_lsn = prepare_data.end_lsn;
> +       replorigin_session_origin_timestamp = prepare_data.prepare_time;
> +
> +       PrepareTransactionBlock(gid);
> +       CommitTransactionCommand();
> +
> +       pgstat_report_stat(false);
>

Not done. It is deliberately commented this way because the part below
the comment is what is in apply_handle_prepare.

> 6) There is a lot of common code between apply_handle_stream_prepare
> and apply_handle_prepare, if possible try to have a common function to
> avoid fixing at both places.
> +       /*
> +        * 2. Mark the transaction as prepared. - Similar code as for
> +        * apply_handle_prepare (i.e. two-phase non-streamed prepare)
> +        */
> +
> +       /*
> +        * BeginTransactionBlock is necessary to balance the EndTransactionBlock
> +        * called within the PrepareTransactionBlock below.
> +        */
> +       BeginTransactionBlock();
> +       CommitTransactionCommand();
> +
> +       /*
> +        * Update origin state so we can restart streaming from correct position
> +        * in case of crash.
> +        */
> +       replorigin_session_origin_lsn = prepare_data.end_lsn;
> +       replorigin_session_origin_timestamp = prepare_data.prepare_time;
> +
> +       PrepareTransactionBlock(gid);
> +       CommitTransactionCommand();
> +
> +       pgstat_report_stat(false);
> +
> +       store_flush_position(prepare_data.end_lsn);
>

Not done. If you diff those functions there are really only ~ 10
statements in common so I felt it is more readable to keep it this way
than to try to make a “common” function out of an arbitrary code
fragment.

> 7) two-phase commit is slightly misleading, we can just mention
> streaming prepare.
> + * PREPARE callback (for streaming two-phase commit).
> + *
> + * Notify the downstream to prepare the transaction.
> + */
> +static void
> +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
> +                                                       ReorderBufferTXN *txn,
> +                                                       XLogRecPtr prepare_lsn)
>

OK. Updated in v74.

> 8) should we include Assert of in_streaming similar to other
> pgoutput_stream*** functions.
> +static void
> +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
> +                                                       ReorderBufferTXN *txn,
> +                                                       XLogRecPtr prepare_lsn)
> +{
> +       Assert(rbtxn_is_streamed(txn));
> +
> +       OutputPluginUpdateProgress(ctx);
> +       OutputPluginPrepareWrite(ctx, true);
> +       logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
> +       OutputPluginWrite(ctx, true);
> +}
>

Not done. AFAIK it is correct as-is.

> 9) Here also, we can verify that the transaction is streamed by
> checking the pg_stat_replication_slots.
> +# check that transaction is committed on subscriber
> +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*),
> count(c), count(d = 999) FROM test_tab");
> +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed
> on subscriber, and extra columns contain local defaults');
> +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*)
> FROM pg_prepared_xacts;");
> +is($result, qq(0), 'transaction is committed on subscriber');
>

Not done. If the purpose of this comment is just to confirm that the
SQL INSERT of 5000 rows of md5 data exceeds 64K then I think we can
simply take that as self-evident. We don’t need some SQL to confirm
it.

If the purpose of this is just to ensure that stats work properly with
2PC then I agree that there should be some test cases added for stats,
but this has already been recorded elsewhere as a future TODO task.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: Peter Smith
Date:
Subject: Re: [HACKERS] logical decoding of two-phase transactions
Next
From: Amit Kapila
Date:
Subject: Re: Replication slot stats misgivings