Re: Skipping logical replication transactions on subscriber side - Mailing list pgsql-hackers
From | Amit Kapila |
---|---|
Subject | Re: Skipping logical replication transactions on subscriber side |
Date | |
Msg-id | CAA4eK1KT0C2TDL++os03EpY_5v2STjUBmF8WJYX8zT36MJ5SfQ@mail.gmail.com Whole thread Raw |
In response to | Re: Skipping logical replication transactions on subscriber side (Masahiko Sawada <sawada.mshk@gmail.com>) |
Responses |
Re: Skipping logical replication transactions on subscriber side
|
List | pgsql-hackers |
On Fri, Jan 14, 2022 at 7:49 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote: > > I agree with all the comments above. I've attached an updated patch. > Review comments ================ 1. + + <para> + In this case, you need to consider changing the data on the subscriber so that it The starting of this sentence doesn't make sense to me. How about changing it like: "To resolve conflicts, you need to ... 2. + <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>) + is cleared. See <xref linkend="logical-replication-conflicts"/> for + the details of logical replication conflicts. + </para> + + <para> + <replaceable>skip_option</replaceable> specifies options for this operation. + The supported option is: + + <variablelist> + <varlistentry> + <term><literal>xid</literal> (<type>xid</type>)</term> + <listitem> + <para> + Specifies the ID of the transaction whose changes are to be skipped + by the logical replication worker. Setting <literal>NONE</literal> resets + the transaction ID. + </para> Empty spaces after line finish are inconsistent. I personally use a single space before a new line but I see that others use two spaces and the nearby documentation also uses two spaces in this regard so I am fine either way but let's be consistent. 3. + case ALTER_SUBSCRIPTION_SKIP: + { + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to skip transaction"))); + + parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts); + + if (IsSet(opts.specified_opts, SUBOPT_XID)) .. .. Is there a case when the above 'if (IsSet(..' won't be true? If not, then probably there should be Assert instead of 'if'. 4. +static TransactionId skipping_xid = InvalidTransactionId; I find this variable name bit odd. Can we name it skip_xid? 5. + * skipping_xid is valid if we are skipping all data modification changes + * (INSERT, UPDATE, etc.) of the specified transaction at MySubscription->skipxid. + * Once we start skipping changes, we don't stop it until we skip all changes I think it would be better to write the first line of comment as: "We enable skipping all data modification changes (INSERT, UPDATE, etc.) for the subscription if the user has specified skip_xid. Once we ..." 6. +static void +maybe_start_skipping_changes(TransactionId xid) +{ + Assert(!is_skipping_changes()); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + /* Make sure subscription cache is up-to-date */ + maybe_reread_subscription(); Why do we need to update the cache here by calling maybe_reread_subscription() and at other places in the patch? It is sufficient to get the skip_xid value at the start of the worker via GetSubscription(). 7. In maybe_reread_subscription(), isn't there a need to check whether skip_xid is changed where we exit and launch the worker and compare other subscription parameters? 8. +static void +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn, + TimestampTz origin_timestamp) +{ + Relation rel; + Form_pg_subscription subform; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + if (!IsTransactionState()) + StartTransactionCommand(); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); It is important to add a comment as to why we need a lock here. 9. + * needs to be set subskipxid again. We can reduce the possibility by + * logging a replication origin WAL record to advance the origin LSN + * instead but it doesn't seem to be worth since it's a very minor case. You can also add here that there is no way to advance origin_timestamp so that would be inconsistent. 10. +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn, + TimestampTz origin_timestamp) { .. .. + if (!IsTransactionState()) + StartTransactionCommand(); .. .. + CommitTransactionCommand(); .. } The transaction should be committed in this function if it is started here otherwise it should be the responsibility of the caller to commit it. -- With Regards, Amit Kapila.
pgsql-hackers by date: