Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers
From | vignesh C |
---|---|
Subject | Re: [HACKERS] logical decoding of two-phase transactions |
Date | |
Msg-id | CALDaNm1BxeN7NP9MjK4VRatjcXqWEO6K_35aVN0um+5AFdv-+A@mail.gmail.com Whole thread Raw |
In response to | Re: [HACKERS] logical decoding of two-phase transactions (Peter Smith <smithpb2250@gmail.com>) |
Responses |
Re: [HACKERS] logical decoding of two-phase transactions
|
List | pgsql-hackers |
On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2250@gmail.com> wrote: > > On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2250@gmail.com> wrote: > > > > Please find attached the latest patch set v73`* > > > > Differences from v72* are: > > > > * Rebased to HEAD @ today (required because v72-0001 no longer applied cleanly) > > > > * Minor documentation correction for protocol messages for Commit Prepared ('K') > > > > * Non-functional code tidy (mostly proto.c) to reduce overloading > > different meanings to same member names for prepare/commit times. > > > Please find attached a re-posting of patch set v73* > > This is the same as yesterday's v73 but with a contrib module compile > error fixed. Thanks for the updated patch, few comments: 1) Should "final_lsn not set in begin message" be "prepare_lsn not set in begin message" +logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data) +{ + /* read fields */ + begin_data->prepare_lsn = pq_getmsgint64(in); + if (begin_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "final_lsn not set in begin message"); 2) Should "These commands" be "ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... SET/ADD PUBLICATION ..." as copy_data cannot be specified with alter subscription .. drop publication. + These commands also cannot be executed with <literal>copy_data = true</literal> + when the subscription has <literal>two_phase</literal> commit enabled. See + column <literal>subtwophasestate</literal> of + <xref linkend="catalog-pg-subscription"/> to know the actual two-phase state. 3) <term>Byte1('A')</term> should be <term>Byte1('r')</term> as we have defined LOGICAL_REP_MSG_ROLLBACK_PREPARED as r. +<term>Rollback Prepared</term> +<listitem> +<para> + +<variablelist> + +<varlistentry> +<term>Byte1('A')</term> +<listitem><para> + Identifies this message as the rollback of a two-phase transaction message. +</para></listitem> +</varlistentry> 4) Should "Check if the prepared transaction with the given GID and lsn is around." be "Check if the prepared transaction with the given GID, lsn & timestamp is around." +/* + * LookupGXact + * Check if the prepared transaction with the given GID and lsn is around. + * + * Note that we always compare with the LSN where prepare ends because that is + * what is stored as origin_lsn in the 2PC file. + * + * This function is primarily used to check if the prepared transaction + * received from the upstream (remote node) already exists. Checking only GID + * is not sufficient because a different prepared xact with the same GID can + * exist on the same node. So, we are ensuring to match origin_lsn and + * origin_timestamp of prepared xact to avoid the possibility of a match of + * prepared xact from two different nodes. + */ 5) Should we change "The LSN of the prepare." to "The LSN of the begin prepare." +<term>Begin Prepare</term> +<listitem> +<para> + +<variablelist> + +<varlistentry> +<term>Byte1('b')</term> +<listitem><para> + Identifies this message as the beginning of a two-phase transaction message. +</para></listitem> +</varlistentry> + +<varlistentry> +<term>Int64</term> +<listitem><para> + The LSN of the prepare. +</para></listitem> +</varlistentry> 6) Similarly in cases of "Commit Prepared" and "Rollback Prepared" 7) No need to initialize has_subrels as we will always assign the value returned by HeapTupleIsValid +HasSubscriptionRelations(Oid subid) +{ + Relation rel; + int nkeys = 0; + ScanKeyData skey[2]; + SysScanDesc scan; + bool has_subrels = false; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); 8) We could include errhint, like errhint("Option \"two_phase\" specified more than once") to specify a more informative error message. + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_option_given = true; + + data->two_phase = defGetBoolean(defel); + } 9) We have a lot of function parameters for parse_subscription_options, should we change it to struct? @@ -69,7 +69,8 @@ parse_subscription_options(List *options, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) + bool *streaming_given, bool *streaming, + bool *twophase_given, bool *twophase) 10) Should we change " errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")" to "errhint("Use ALTER SUBSCRIPTION ...SET/ADD PUBLICATION with refresh = false, or with copy_data = false.")" as we don't support copy_data in ALTER subscription ... DROP publication. + /* + * See ALTER_SUBSCRIPTION_REFRESH for details why this is + * not allowed. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false" + ", or use DROP/CREATE SUBSCRIPTION."))); 11) Should 14000 be 15000 as this feature will be committed in PG15 + if (options->proto.logical.twophase && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfoString(&cmd, ", two_phase 'on'"); 12) should we change "begin message" to "begin prepare message" + if (begin_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "final_lsn not set in begin message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin message"); 13) should we change "commit prepare message" to "commit prepared message" + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit prepare message", flags); + + /* read fields */ + prepare_data->commit_lsn = pq_getmsgint64(in); + if (prepare_data->commit_lsn == InvalidXLogRecPtr) + elog(ERROR, "commit_lsn is not set in commit prepared message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->commit_time = pq_getmsgint64(in); 14) should we change "commit prepared message" to "rollback prepared message" +void +logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in rollback prepare message", flags); + + /* read fields */ + rollback_data->prepare_end_lsn = pq_getmsgint64(in); + if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_end_lsn is not set in commit prepared message"); + rollback_data->rollback_end_lsn = pq_getmsgint64(in); + if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "rollback_end_lsn is not set in commit prepared message"); + rollback_data->prepare_time = pq_getmsgint64(in); + rollback_data->rollback_time = pq_getmsgint64(in); + rollback_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(rollback_data->gid, pq_getmsgstring(in)); +} 15) We can include check pg_stat_replication_slots to verify if statistics is getting updated. +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); + +# check that transaction is in prepared state on subscriber +my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup($appname); Regards, Vignesh
pgsql-hackers by date: