Re: [HACKERS] logical decoding of two-phase transactions - Mailing list pgsql-hackers

From vignesh C
Subject Re: [HACKERS] logical decoding of two-phase transactions
Date
Msg-id CALDaNm1BxeN7NP9MjK4VRatjcXqWEO6K_35aVN0um+5AFdv-+A@mail.gmail.com
Whole thread Raw
In response to Re: [HACKERS] logical decoding of two-phase transactions  (Peter Smith <smithpb2250@gmail.com>)
Responses Re: [HACKERS] logical decoding of two-phase transactions
List pgsql-hackers
On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > Please find attached the latest patch set v73`*
> >
> > Differences from v72* are:
> >
> > * Rebased to HEAD @ today (required because v72-0001 no longer applied cleanly)
> >
> > * Minor documentation correction for protocol messages for Commit Prepared ('K')
> >
> > * Non-functional code tidy (mostly proto.c) to reduce overloading
> > different meanings to same member names for prepare/commit times.
>
>
> Please find attached a re-posting of patch set v73*
>
> This is the same as yesterday's v73 but with a contrib module compile
> error fixed.

Thanks for the updated patch, few comments:
1) Should "final_lsn not set in begin message" be "prepare_lsn not set
in begin message"
+logicalrep_read_begin_prepare(StringInfo in,
LogicalRepPreparedTxnData *begin_data)
+{
+       /* read fields */
+       begin_data->prepare_lsn = pq_getmsgint64(in);
+       if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "final_lsn not set in begin message");

2) Should "These commands" be "ALTER SUBSCRIPTION ... REFRESH
PUBLICATION and ALTER SUBSCRIPTION ... SET/ADD PUBLICATION ..." as
copy_data cannot be specified with alter subscription .. drop
publication.
+   These commands also cannot be executed with <literal>copy_data =
true</literal>
+   when the subscription has <literal>two_phase</literal> commit enabled. See
+   column <literal>subtwophasestate</literal> of
+   <xref linkend="catalog-pg-subscription"/> to know the actual
two-phase state.

3) <term>Byte1('A')</term> should be <term>Byte1('r')</term> as we
have defined LOGICAL_REP_MSG_ROLLBACK_PREPARED as r.
+<term>Rollback Prepared</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('A')</term>
+<listitem><para>
+                Identifies this message as the rollback of a
two-phase transaction message.
+</para></listitem>
+</varlistentry>

4) Should "Check if the prepared transaction with the given GID and
lsn is around." be
"Check if the prepared transaction with the given GID, lsn & timestamp
is around."
+/*
+ * LookupGXact
+ *             Check if the prepared transaction with the given GID
and lsn is around.
+ *
+ * Note that we always compare with the LSN where prepare ends because that is
+ * what is stored as origin_lsn in the 2PC file.
+ *
+ * This function is primarily used to check if the prepared transaction
+ * received from the upstream (remote node) already exists. Checking only GID
+ * is not sufficient because a different prepared xact with the same GID can
+ * exist on the same node. So, we are ensuring to match origin_lsn and
+ * origin_timestamp of prepared xact to avoid the possibility of a match of
+ * prepared xact from two different nodes.
+ */

5) Should we change "The LSN of the prepare." to "The LSN of the begin prepare."
+<term>Begin Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('b')</term>
+<listitem><para>
+                Identifies this message as the beginning of a
two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The LSN of the prepare.
+</para></listitem>
+</varlistentry>


6) Similarly in cases of "Commit Prepared" and "Rollback Prepared"

7) No need to initialize has_subrels as we will always assign the
value returned by HeapTupleIsValid
+HasSubscriptionRelations(Oid subid)
+{
+       Relation        rel;
+       int                     nkeys = 0;
+       ScanKeyData skey[2];
+       SysScanDesc scan;
+       bool            has_subrels = false;
+
+       rel = table_open(SubscriptionRelRelationId, AccessShareLock);

8) We could include errhint, like errhint("Option \"two_phase\"
specified more than once") to specify a more informative error
message.
+               else if (strcmp(defel->defname, "two_phase") == 0)
+               {
+                       if (two_phase_option_given)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting
or redundant options")));
+                       two_phase_option_given = true;
+
+                       data->two_phase = defGetBoolean(defel);
+               }

9) We have a lot of function parameters for
parse_subscription_options, should we change it to struct?
@@ -69,7 +69,8 @@ parse_subscription_options(List *options,
                                                   char **synchronous_commit,
                                                   bool *refresh,
                                                   bool *binary_given,
bool *binary,
-                                                  bool
*streaming_given, bool *streaming)
+                                                  bool
*streaming_given, bool *streaming,
+                                                  bool
*twophase_given, bool *twophase)

10) Should we change " errhint("Use ALTER SUBSCRIPTION ...SET
PUBLICATION with refresh = false, or with copy_data = false, or use
DROP/CREATE SUBSCRIPTION.")" to  "errhint("Use ALTER SUBSCRIPTION
...SET/ADD PUBLICATION with refresh = false, or with copy_data =
false.")" as we don't support copy_data in ALTER subscription ... DROP
publication.
+                                       /*
+                                        * See
ALTER_SUBSCRIPTION_REFRESH for details why this is
+                                        * not allowed.
+                                        */
+                                       if (sub->twophasestate ==
LOGICALREP_TWOPHASE_STATE_ENABLED && copy_data)
+                                               ereport(ERROR,
+
(errcode(ERRCODE_SYNTAX_ERROR),
+
errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when two_phase is enabled"),
+
errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh =
false, or with copy_data = false"
+
          ", or use DROP/CREATE SUBSCRIPTION.")));

11) Should 14000 be 15000 as this feature will be committed in PG15
+               if (options->proto.logical.twophase &&
+                       PQserverVersion(conn->streamConn) >= 140000)
+                       appendStringInfoString(&cmd, ", two_phase 'on'");

12) should we change "begin message" to "begin prepare message"
+       if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "final_lsn not set in begin message");
+       begin_data->end_lsn = pq_getmsgint64(in);
+       if (begin_data->end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "end_lsn not set in begin message");

13) should we change "commit prepare message" to "commit prepared message"
+       if (flags != 0)
+               elog(ERROR, "unrecognized flags %u in commit prepare
message", flags);
+
+       /* read fields */
+       prepare_data->commit_lsn = pq_getmsgint64(in);
+       if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "commit_lsn is not set in commit prepared message");
+       prepare_data->end_lsn = pq_getmsgint64(in);
+       if (prepare_data->end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "end_lsn is not set in commit prepared message");
+       prepare_data->commit_time = pq_getmsgint64(in);

14) should we change "commit prepared message" to "rollback prepared message"
+void
+logicalrep_read_rollback_prepared(StringInfo in,
+
LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+       /* read flags */
+       uint8           flags = pq_getmsgbyte(in);
+
+       if (flags != 0)
+               elog(ERROR, "unrecognized flags %u in rollback prepare
message", flags);
+
+       /* read fields */
+       rollback_data->prepare_end_lsn = pq_getmsgint64(in);
+       if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "prepare_end_lsn is not set in commit
prepared message");
+       rollback_data->rollback_end_lsn = pq_getmsgint64(in);
+       if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "rollback_end_lsn is not set in commit
prepared message");
+       rollback_data->prepare_time = pq_getmsgint64(in);
+       rollback_data->rollback_time = pq_getmsgint64(in);
+       rollback_data->xid = pq_getmsgint(in, 4);
+
+       /* read gid (copy it into a pre-allocated buffer) */
+       strcpy(rollback_data->gid, pq_getmsgstring(in));
+}

15) We can include check  pg_stat_replication_slots to verify if
statistics is getting updated.
+$node_publisher->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full VALUES (11);
+       PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*)
FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED
'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);

Regards,
Vignesh



pgsql-hackers by date:

Previous
From: Arne Roland
Date:
Subject: Re: PATCH: generate fractional cheapest paths in generate_orderedappend_path
Next
From: "houzj.fnst@fujitsu.com"
Date:
Subject: RE: Parallel INSERT SELECT take 2