Re: Skipping logical replication transactions on subscriber side - Mailing list pgsql-hackers
From | Masahiko Sawada |
---|---|
Subject | Re: Skipping logical replication transactions on subscriber side |
Date | |
Msg-id | CAD21AoCd3Y2-b67+pVrzrdteUmup1XG6JeHYOa5dGjh8qZ3VuQ@mail.gmail.com Whole thread Raw |
In response to | Re: Skipping logical replication transactions on subscriber side (Amit Kapila <amit.kapila16@gmail.com>) |
Responses |
Re: Skipping logical replication transactions on subscriber side
|
List | pgsql-hackers |
On Sat, Jan 15, 2022 at 7:24 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Fri, Jan 14, 2022 at 7:49 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote: > > > > I agree with all the comments above. I've attached an updated patch. > > > > Review comments > ================ Thank you for the comments! > 1. > + > + <para> > + In this case, you need to consider changing the data on the > subscriber so that it > > The starting of this sentence doesn't make sense to me. How about > changing it like: "To resolve conflicts, you need to ... > Fixed. > 2. > + <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>) > + is cleared. See <xref linkend="logical-replication-conflicts"/> for > + the details of logical replication conflicts. > + </para> > + > + <para> > + <replaceable>skip_option</replaceable> specifies options for > this operation. > + The supported option is: > + > + <variablelist> > + <varlistentry> > + <term><literal>xid</literal> (<type>xid</type>)</term> > + <listitem> > + <para> > + Specifies the ID of the transaction whose changes are to be skipped > + by the logical replication worker. Setting > <literal>NONE</literal> resets > + the transaction ID. > + </para> > > Empty spaces after line finish are inconsistent. I personally use a > single space before a new line but I see that others use two spaces > and the nearby documentation also uses two spaces in this regard so I > am fine either way but let's be consistent. Fixed. > > 3. > + case ALTER_SUBSCRIPTION_SKIP: > + { > + if (!superuser()) > + ereport(ERROR, > + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), > + errmsg("must be superuser to skip transaction"))); > + > + parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts); > + > + if (IsSet(opts.specified_opts, SUBOPT_XID)) > .. > .. > > Is there a case when the above 'if (IsSet(..' won't be true? If not, > then probably there should be Assert instead of 'if'. > Fixed. > 4. > +static TransactionId skipping_xid = InvalidTransactionId; > > I find this variable name bit odd. Can we name it skip_xid? > Okay, renamed. > 5. > + * skipping_xid is valid if we are skipping all data modification changes > + * (INSERT, UPDATE, etc.) of the specified transaction at > MySubscription->skipxid. > + * Once we start skipping changes, we don't stop it until we skip all changes > > I think it would be better to write the first line of comment as: "We > enable skipping all data modification changes (INSERT, UPDATE, etc.) > for the subscription if the user has specified skip_xid. Once we ..." > Changed. > 6. > +static void > +maybe_start_skipping_changes(TransactionId xid) > +{ > + Assert(!is_skipping_changes()); > + Assert(!in_remote_transaction); > + Assert(!in_streamed_transaction); > + > + /* Make sure subscription cache is up-to-date */ > + maybe_reread_subscription(); > > Why do we need to update the cache here by calling > maybe_reread_subscription() and at other places in the patch? It is > sufficient to get the skip_xid value at the start of the worker via > GetSubscription(). MySubscription could be out-of-date after a user changes the catalog. In non-skipping change cases, we check it when starting the transaction in begin_replication_step() which is called, e.g., when applying an insert change. But I think we need to make sure it’s up-to-date at the beginning of applying changes, that is, before starting a transaction. Otherwise, we may end up skipping the transaction based on out-of-dated subscription cache. The reason why calling calling maybe_reread_subscription in both apply_handle_commit_prepared() and apply_handle_rollback_prepared() is the same; MySubscription could be out-of-date when applying commit-prepared or rollback-prepared since we have not called begin_replication_step() to open a new transaction. > > 7. In maybe_reread_subscription(), isn't there a need to check whether > skip_xid is changed where we exit and launch the worker and compare > other subscription parameters? IIUC we relaunch the worker here when subscription parameters such as slot_name was changed. In the current implementation, I think that relaunching the worker is not necessarily necessary when skip_xid is changed. For instance, when skipping the prepared transaction, we deliberately don’t clear subskipxid of pg_subscription and do that at commit-prepared or rollback-prepared case. There are chances that the user changes skip_xid before commit-prepared or rollback-prepared. But we tolerate this case. Also, in non-streaming and non-2PC cases, while skipping changes we don’t call maybe_reread_subscription() until all changes are skipped. So it cannot work to cancel skipping changes that is already started. > > 8. > +static void > +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn, > + TimestampTz origin_timestamp) > +{ > + Relation rel; > + Form_pg_subscription subform; > + HeapTuple tup; > + bool nulls[Natts_pg_subscription]; > + bool replaces[Natts_pg_subscription]; > + Datum values[Natts_pg_subscription]; > + > + memset(values, 0, sizeof(values)); > + memset(nulls, false, sizeof(nulls)); > + memset(replaces, false, sizeof(replaces)); > + > + if (!IsTransactionState()) > + StartTransactionCommand(); > + > + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, > + AccessShareLock); > > It is important to add a comment as to why we need a lock here. Added. > > 9. > + * needs to be set subskipxid again. We can reduce the possibility by > + * logging a replication origin WAL record to advance the origin LSN > + * instead but it doesn't seem to be worth since it's a very minor case. > > You can also add here that there is no way to advance origin_timestamp > so that would be inconsistent. Added. > > 10. > +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn, > + TimestampTz origin_timestamp) > { > .. > .. > + if (!IsTransactionState()) > + StartTransactionCommand(); > .. > .. > + CommitTransactionCommand(); > .. > } > > The transaction should be committed in this function if it is started > here otherwise it should be the responsibility of the caller to commit > it. Fixed. I've attached an updated patch that incorporated these comments except for 6 and 7 that we probably need more discussion on. The comments from Vignesh are also incorporated. Regards, -- Masahiko Sawada EDB: https://www.enterprisedb.com/
Attachment
pgsql-hackers by date: