Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers
From | Peter Smith |
---|---|
Subject | Re: [HACKERS] logical decoding of two-phase transactions |
Date | |
Msg-id | CAHut+Pvmq7Ufk08qAmhyW0BjpzW9LbY+E140fVEM4LC77iSK8Q@mail.gmail.com Whole thread Raw |
In response to | Re: [HACKERS] logical decoding of two-phase transactions (vignesh C <vignesh21@gmail.com>) |
List | pgsql-hackers |
On Tue, Apr 27, 2021 at 6:17 PM vignesh C <vignesh21@gmail.com> wrote: > > On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2250@gmail.com> wrote: > > > > On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2250@gmail.com> wrote: > > > > > > Please find attached the latest patch set v73`* > > > > > > Differences from v72* are: > > > > > > * Rebased to HEAD @ today (required because v72-0001 no longer applied cleanly) > > > > > > * Minor documentation correction for protocol messages for Commit Prepared ('K') > > > > > > * Non-functional code tidy (mostly proto.c) to reduce overloading > > > different meanings to same member names for prepare/commit times. > > > > > > Please find attached a re-posting of patch set v73* > > > > This is the same as yesterday's v73 but with a contrib module compile > > error fixed. > > Few comments on > v73-0002-Add-prepare-API-support-for-streaming-transactio.patch patch: Thanks for your feedback comments. My replies are inline below. > 1) There are slight differences in error message in case of Alter > subscription ... drop publication, we can keep the error message > similar: > postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH > (refresh = false, copy_data=true, two_phase=true); > ERROR: unrecognized subscription parameter: "copy_data" > postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH > (refresh = false, two_phase=true, streaming=true); > ERROR: cannot alter two_phase option OK. Updated in v74. > > 2) We are sending txn->xid twice, I felt we should send only once in > logicalrep_write_stream_prepare: > + /* transaction ID */ > + Assert(TransactionIdIsValid(txn->xid)); > + pq_sendint32(out, txn->xid); > + > + /* send the flags field */ > + pq_sendbyte(out, flags); > + > + /* send fields */ > + pq_sendint64(out, prepare_lsn); > + pq_sendint64(out, txn->end_lsn); > + pq_sendint64(out, txn->u_op_time.prepare_time); > + pq_sendint32(out, txn->xid); > + > OK. Updated in v74. > 3) We could remove xid and return prepare_data->xid > +TransactionId > +logicalrep_read_stream_prepare(StringInfo in, > LogicalRepPreparedTxnData *prepare_data) > +{ > + TransactionId xid; > + uint8 flags; > + > + xid = pq_getmsgint(in, 4); > OK. Updated in v74. > 4) Here comments can be above apply_spooled_messages for better readability > + /* > + * 1. Replay all the spooled operations - Similar code as for > + * apply_handle_stream_commit (i.e. non two-phase stream commit) > + */ > + > + ensure_transaction(); > + > + nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn); > + > Not done. It was deliberately commented this way because the part below the comment is what is in apply_handle_stream_commit. > 5) Similarly this below comment can be above PrepareTransactionBlock > + /* > + * 2. Mark the transaction as prepared. - Similar code as for > + * apply_handle_prepare (i.e. two-phase non-streamed prepare) > + */ > + > + /* > + * BeginTransactionBlock is necessary to balance the EndTransactionBlock > + * called within the PrepareTransactionBlock below. > + */ > + BeginTransactionBlock(); > + CommitTransactionCommand(); > + > + /* > + * Update origin state so we can restart streaming from correct position > + * in case of crash. > + */ > + replorigin_session_origin_lsn = prepare_data.end_lsn; > + replorigin_session_origin_timestamp = prepare_data.prepare_time; > + > + PrepareTransactionBlock(gid); > + CommitTransactionCommand(); > + > + pgstat_report_stat(false); > Not done. It is deliberately commented this way because the part below the comment is what is in apply_handle_prepare. > 6) There is a lot of common code between apply_handle_stream_prepare > and apply_handle_prepare, if possible try to have a common function to > avoid fixing at both places. > + /* > + * 2. Mark the transaction as prepared. - Similar code as for > + * apply_handle_prepare (i.e. two-phase non-streamed prepare) > + */ > + > + /* > + * BeginTransactionBlock is necessary to balance the EndTransactionBlock > + * called within the PrepareTransactionBlock below. > + */ > + BeginTransactionBlock(); > + CommitTransactionCommand(); > + > + /* > + * Update origin state so we can restart streaming from correct position > + * in case of crash. > + */ > + replorigin_session_origin_lsn = prepare_data.end_lsn; > + replorigin_session_origin_timestamp = prepare_data.prepare_time; > + > + PrepareTransactionBlock(gid); > + CommitTransactionCommand(); > + > + pgstat_report_stat(false); > + > + store_flush_position(prepare_data.end_lsn); > Not done. If you diff those functions there are really only ~ 10 statements in common so I felt it is more readable to keep it this way than to try to make a “common” function out of an arbitrary code fragment. > 7) two-phase commit is slightly misleading, we can just mention > streaming prepare. > + * PREPARE callback (for streaming two-phase commit). > + * > + * Notify the downstream to prepare the transaction. > + */ > +static void > +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr prepare_lsn) > OK. Updated in v74. > 8) should we include Assert of in_streaming similar to other > pgoutput_stream*** functions. > +static void > +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr prepare_lsn) > +{ > + Assert(rbtxn_is_streamed(txn)); > + > + OutputPluginUpdateProgress(ctx); > + OutputPluginPrepareWrite(ctx, true); > + logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); > + OutputPluginWrite(ctx, true); > +} > Not done. AFAIK it is correct as-is. > 9) Here also, we can verify that the transaction is streamed by > checking the pg_stat_replication_slots. > +# check that transaction is committed on subscriber > +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), > count(c), count(d = 999) FROM test_tab"); > +is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed > on subscriber, and extra columns contain local defaults'); > +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) > FROM pg_prepared_xacts;"); > +is($result, qq(0), 'transaction is committed on subscriber'); > Not done. If the purpose of this comment is just to confirm that the SQL INSERT of 5000 rows of md5 data exceeds 64K then I think we can simply take that as self-evident. We don’t need some SQL to confirm it. If the purpose of this is just to ensure that stats work properly with 2PC then I agree that there should be some test cases added for stats, but this has already been recorded elsewhere as a future TODO task. ------ Kind Regards, Peter Smith. Fujitsu Australia
pgsql-hackers by date: