Re: Skipping logical replication transactions on subscriber side - Mailing list pgsql-hackers

From vignesh C
Subject Re: Skipping logical replication transactions on subscriber side
Date
Msg-id CALDaNm0YD=AqGpPBk6ZtvUiY6XpiTXrzGrrfzPShOhAcVnJ8Fg@mail.gmail.com
Whole thread Raw
In response to Re: Skipping logical replication transactions on subscriber side  (Masahiko Sawada <sawada.mshk@gmail.com>)
Responses Re: Skipping logical replication transactions on subscriber side
List pgsql-hackers
On Fri, Dec 10, 2021 at 11:14 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Thu, Dec 9, 2021 at 6:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Thu, Dec 9, 2021 at 2:24 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> > >
> > > On Thu, Dec 9, 2021 at 11:47 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > > >
> > > > I am thinking that we can start a transaction, update the catalog,
> > > > commit that transaction. Then start a new one to update
> > > > origin_lsn/timestamp, finishprepared, and commit it. Now, if it
> > > > crashes after the first transaction, only commit prepared will be
> > > > resent again and this time we don't need to update the catalog as that
> > > > entry would be already cleared.
> > >
> > > Sounds good. In the crash case, it should be fine since we will just
> > > commit an empty transaction. The same is true for the case where
> > > skip_xid has been changed after skipping and preparing the transaction
> > > and before handling commit_prepared.
> > >
> > > Regarding the case where the user specifies XID of the transaction
> > > after it is prepared on the subscriber (i.g., the transaction is not
> > > empty), we won’t skip committing the prepared transaction. But I think
> > > that we don't need to support skipping already-prepared transaction
> > > since such transaction doesn't conflict with anything regardless of
> > > having changed or not.
> > >
> >
> > Yeah, this makes sense to me.
> >
>
> I've attached an updated patch. The new syntax is like "ALTER
> SUBSCRIPTION testsub SKIP (xid = '123')".
>
> I’ve been thinking we can do something safeguard for the case where
> the user specified the wrong xid. For example, can we somewhat use the
> stats in pg_stat_subscription_workers? An idea is that logical
> replication worker fetches the xid from the stats when reading the
> subscription and skips the transaction if the xid matches to
> subskipxid. That is, the worker checks the error reported by the
> worker previously working on the same subscription. The error could
> not be a conflict error (e.g., connection error etc.) or might have
> been cleared by the reset function, But given the worker is in an
> error loop, the worker can eventually get xid in question. We can
> prevent an unrelated transaction from being skipped unexpectedly. It
> seems not a stable solution though. Or it might be enough to warn
> users when they specified an XID that doesn’t match to last_error_xid.
> Anyway, I think it’s better to have more discussion on this. Any
> ideas?

While the worker is skipping one of the skip transactions specified by
the user and immediately if the user specifies another skip
transaction while the skipping of the transaction is in progress this
new value will be reset by the worker while clearing the skip xid. I
felt once the worker has identified the skip xid and is about to skip
the xid, the worker can acquire a lock to prevent concurrency issues:
+static void
+clear_subscription_skip_xid(void)
+{
+       Relation        rel;
+       HeapTuple       tup;
+       bool            nulls[Natts_pg_subscription];
+       bool            replaces[Natts_pg_subscription];
+       Datum           values[Natts_pg_subscription];
+
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+       memset(replaces, false, sizeof(replaces));
+
+       if (!IsTransactionState())
+               StartTransactionCommand();
+
+       rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+       /* Fetch the existing tuple. */
+       tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+
ObjectIdGetDatum(MySubscription->oid));
+
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "subscription \"%s\" does not exist",
MySubscription->name);
+
+       /* Set subskipxid to null */
+       nulls[Anum_pg_subscription_subskipxid - 1] = true;
+       replaces[Anum_pg_subscription_subskipxid - 1] = true;
+
+       /* Update the system catalog to reset the skip XID */
+       tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+                                                       replaces);
+       CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+       heap_freetuple(tup);
+       table_close(rel, RowExclusiveLock);
+}

Regards,
Vignesh



pgsql-hackers by date:

Previous
From: Andres Freund
Date:
Subject: Re: Adding CI to our tree
Next
From: Amit Kapila
Date:
Subject: Re: row filtering for logical replication