Re: Skipping logical replication transactions on subscriber side - Mailing list pgsql-hackers

From Amit Kapila
Subject Re: Skipping logical replication transactions on subscriber side
Date
Msg-id CAA4eK1KT0C2TDL++os03EpY_5v2STjUBmF8WJYX8zT36MJ5SfQ@mail.gmail.com
Whole thread Raw
In response to Re: Skipping logical replication transactions on subscriber side  (Masahiko Sawada <sawada.mshk@gmail.com>)
Responses Re: Skipping logical replication transactions on subscriber side
List pgsql-hackers
On Fri, Jan 14, 2022 at 7:49 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> I agree with all the comments above. I've attached an updated patch.
>

Review comments
================
1.
+
+  <para>
+   In this case, you need to consider changing the data on the
subscriber so that it

The starting of this sentence doesn't make sense to me. How about
changing it like: "To resolve conflicts, you need to ...

2.
+      <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>)
+      is cleared.  See <xref linkend="logical-replication-conflicts"/> for
+      the details of logical replication conflicts.
+     </para>
+
+     <para>
+      <replaceable>skip_option</replaceable> specifies options for
this operation.
+      The supported option is:
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>xid</literal> (<type>xid</type>)</term>
+        <listitem>
+         <para>
+          Specifies the ID of the transaction whose changes are to be skipped
+          by the logical replication worker. Setting
<literal>NONE</literal> resets
+          the transaction ID.
+         </para>

Empty spaces after line finish are inconsistent. I personally use a
single space before a new line but I see that others use two spaces
and the nearby documentation also uses two spaces in this regard so I
am fine either way but let's be consistent.

3.
+ case ALTER_SUBSCRIPTION_SKIP:
+ {
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to skip transaction")));
+
+ parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts);
+
+ if (IsSet(opts.specified_opts, SUBOPT_XID))
..
..

Is there a case when the above 'if (IsSet(..' won't be true? If not,
then probably there should be Assert instead of 'if'.

4.
+static TransactionId skipping_xid = InvalidTransactionId;

I find this variable name bit odd. Can we name it skip_xid?

5.
+ * skipping_xid is valid if we are skipping all data modification changes
+ * (INSERT, UPDATE, etc.) of the specified transaction at
MySubscription->skipxid.
+ * Once we start skipping changes, we don't stop it until we skip all changes

I think it would be better to write the first line of comment as: "We
enable skipping all data modification changes (INSERT, UPDATE, etc.)
for the subscription if the user has specified skip_xid. Once we ..."

6.
+static void
+maybe_start_skipping_changes(TransactionId xid)
+{
+ Assert(!is_skipping_changes());
+ Assert(!in_remote_transaction);
+ Assert(!in_streamed_transaction);
+
+ /* Make sure subscription cache is up-to-date */
+ maybe_reread_subscription();

Why do we need to update the cache here by calling
maybe_reread_subscription() and at other places in the patch? It is
sufficient to get the skip_xid value at the start of the worker via
GetSubscription().

7. In maybe_reread_subscription(), isn't there a need to check whether
skip_xid is changed where we exit and launch the worker and compare
other subscription parameters?

8.
+static void
+clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
+ TimestampTz origin_timestamp)
+{
+ Relation rel;
+ Form_pg_subscription subform;
+ HeapTuple tup;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ if (!IsTransactionState())
+ StartTransactionCommand();
+
+ LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+ AccessShareLock);

It is important to add a comment as to why we need a lock here.

9.
+ * needs to be set subskipxid again.  We can reduce the possibility by
+ * logging a replication origin WAL record to advance the origin LSN
+ * instead but it doesn't seem to be worth since it's a very minor case.

You can also add here that there is no way to advance origin_timestamp
so that would be inconsistent.

10.
+clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
+ TimestampTz origin_timestamp)
{
..
..
+ if (!IsTransactionState())
+ StartTransactionCommand();
..
..
+ CommitTransactionCommand();
..
}

The transaction should be committed in this function if it is started
here otherwise it should be the responsibility of the caller to commit
it.

-- 
With Regards,
Amit Kapila.



pgsql-hackers by date:

Previous
From: Denis Hirn
Date:
Subject: Re: [PATCH] Allow multiple recursive self-references
Next
From: Amit Kapila
Date:
Subject: Re: Skipping logical replication transactions on subscriber side