Re: Optionally automatically disable logical replication subscriptions on error - Mailing list pgsql-hackers

From Peter Smith
Subject Re: Optionally automatically disable logical replication subscriptions on error
Date
Msg-id CAHut+PviMp-5MTgs-Wq=ryV87j6k=GdcPfkZ1R51wv7ecDqq4g@mail.gmail.com
Whole thread Raw
In response to RE: Optionally automatically disable logical replication subscriptions on error  ("osumi.takamichi@fujitsu.com" <osumi.takamichi@fujitsu.com>)
Responses RE: Optionally automatically disable logical replication subscriptions on error  ("osumi.takamichi@fujitsu.com" <osumi.takamichi@fujitsu.com>)
List pgsql-hackers
Hi. Below are my code review comments for v18.

==========

1. Commit Message - wording

BEFORE
To partially remedy the situation, adding a new subscription_parameter
named 'disable_on_error'.

AFTER
To partially remedy the situation, this patch adds a new
subscription_parameter named 'disable_on_error'.

~~~

2. Commit message - wording

BEFORE
Require to bump catalog version.

AFTER
A catalog version bump is required.

~~~

3. doc/src/sgml/ref/alter_subscription.sgml - whitespace

@@ -201,8 +201,8 @@ ALTER SUBSCRIPTION <replaceable
class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, and
-      <literal>streaming</literal>.
+      <literal>binary</literal>,<literal>streaming</literal>, and
+      <literal>disable_on_error</literal>.
      </para>

There is a missing space before <literal>streaming</literal>.

~~~

4. src/backend/replication/logical/worker.c - WorkerErrorRecovery

@@ -2802,6 +2803,89 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 }

 /*
+ * Worker error recovery processing, in preparation for disabling the
+ * subscription.
+ */
+static void
+WorkerErrorRecovery(void)

I was wondering about the need for this to be a separate function? It
is only called immediately before calling 'DisableSubscriptionOnError'
so would it maybe be better just to put this code inside
DisableSubscriptionOnError with the appropriate comments?

~~~

5. src/backend/replication/logical/worker.c - DisableSubscriptionOnError

+ /*
+ * We would not be here unless this subscription's disableonerr field was
+ * true when our worker began applying changes, but check whether that
+ * field has changed in the interim.
+ */

Apparently, this function might just do nothing if it detects some
situation where the flag was changed somehow, but I’m not 100% sure
that the callers are properly catering for when nothing happens.

IMO it would be better if this function would return true/false to
mean "did disable subscription happen or not?" because that will give
the calling code the chance to check the function return and do the
right thing - e.g. if the caller first thought it should be disabled
but then it turned out it did NOT disable...

~~~

6. src/backend/replication/logical/worker.c - LogicalRepHandleTableSync name

+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ */
+static void
+LogicalRepHandleTableSync(XLogRecPtr *origin_startpos,
+   char **myslotname, MemoryContext cctx)

I felt that it is a bit overkill to put a "LogicalRep" prefix here
because it is a static function.

IMO this function should be renamed as 'SyncTableStartWrapper' because
that describes better what it is doing.

~~~

7. src/backend/replication/logical/worker.c - LogicalRepHandleTableSync Assert

Even though we can know this to be true because of where it is called
from, I think the readability of the function will be improved if you
add an assertion at the top:

Assert(am_tablesync_worker());

And then, because the function is clearly for Tablesync worker only
there is no need to keep mentioning that in the subsequent comments...

e.g.1
/* This is table synchronization worker, call initial sync. */
AFTER:
/* Call initial sync. */

e.g.2
/*
 * Report the table sync error. There is no corresponding message type
 * for table synchronization.
 */
AFTER
/*
 * Report the error. There is no corresponding message type for table
 * synchronization.
 */

~~~

8. src/backend/replication/logical/worker.c -
LogicalRepHandleTableSync unnecessarily complex

+static void
+LogicalRepHandleTableSync(XLogRecPtr *origin_startpos,
+   char **myslotname, MemoryContext cctx)
+{
+ char    *syncslotname;
+ bool error_recovery_done = false;

IMO this logic is way more complex than it needed to be. IIUC that
'error_recovery_done' and various conditions can be removed, and the
whole thing be simplified quite a lot.

I re-wrote this function as a POC. Please see the attached file [2].
All the tests are still passing OK.

(Perhaps the scenario for my comment #5 above still needs to be addressed?)

~~~

9. src/backend/replication/logical/worker.c - LogicalRepHandleApplyMessages name

+/*
+ * Run the apply loop with error handling. Disable the subscription,
+ * if necessary.
+ */
+static void
+LogicalRepHandleApplyMessages(XLogRecPtr origin_startpos,
+   MemoryContext cctx)

I felt that it is a bit overkill to put a "LogicalRep" prefix here
because it is a static function.

IMO this function should be renamed as 'ApplyLoopWrapper' because that
describes better what it is doing.

~~~

10. src/backend/replication/logical/worker.c -
LogicalRepHandleApplyMessages unnecessarily complex

+static void
+LogicalRepHandleApplyMessages(XLogRecPtr origin_startpos,
+   MemoryContext cctx)
+{
+ bool error_recovery_done = false;

IMO this logic is way more complex than it needed to be. IIUC that
'error_recovery_done' and various conditions can be removed, and the
whole thing be simplified quite a lot.

I re-wrote this function as a POC. Please see the attached file [2].
All the tests are still passing OK.

(Perhaps the scenario for my comment #5 above still needs to be addressed?)

~~~

11. src/bin/pg_dump/pg_dump.c - dumpSubscription

@@ -4441,6 +4451,9 @@ dumpSubscription(Archive *fout, const
SubscriptionInfo *subinfo)
  if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
  appendPQExpBufferStr(query, ", two_phase = on");

+ if (strcmp(subinfo->subdisableonerr, "f") != 0)
+ appendPQExpBufferStr(query, ", disable_on_error = on");
+

I felt saying disable_on_err is "true" would look more natural than
saying it is "on".

~~~

12. src/bin/psql/describe.c - describeSubscriptions typo

@@ -6096,11 +6096,13 @@ describeSubscriptions(const char *pattern, bool verbose)
    gettext_noop("Binary"),
    gettext_noop("Streaming"));

- /* Two_phase is only supported in v15 and higher */
+ /* Two_phase and disable_on_error is only supported in v15 and higher */

Typo

"is only" --> "are only"

~~~

13. src/include/catalog/pg_subscription.h - comments

@@ -103,6 +106,9 @@ typedef struct Subscription
  * binary format */
  bool stream; /* Allow streaming in-progress transactions. */
  char twophasestate; /* Allow streaming two-phase transactions */
+ bool disableonerr; /* Indicates if the subscription should be
+ * automatically disabled when subscription
+ * workers detect any errors. */

It's not usual to have a full stop here.
Maybe not needed to repeat the word "subscription".
IMO, generally, it all can be simplified a bit.

BEFORE
Indicates if the subscription should be automatically disabled when
subscription workers detect any errors.

AFTER
Indicates if the subscription should be automatically disabled if a
worker error occurs

~~~

14. src/test/regress/sql/subscription.sql - missing test case.

The "conflicting options" error from the below code is not currently
being tested.

@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate,
List *stmt_options,
  opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
  opts->twophase = defGetBoolean(defel);
  }
+ else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+ strcmp(defel->defname, "disable_on_error") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+ errorConflictingDefElem(defel, pstate);

~~~

15. src/test/subscription/t/028_disable_on_error.pl - 028 clash

Just a heads-up that this 028 is going to clash with the Row-Filter
patch 028 which has been announced to be pushed soon, so be prepared
to change this number again shortly :)

~~~

16. src/test/subscription/t/028_disable_on_error.pl - done_testing

AFAIK is a new style now for the TAP tests where it uses
"done_testing();" instead of saying up-front how many tests there are.
See here [1].

~~~

17. src/test/subscription/t/028_disable_on_error.pl - more comments

+# Create an additional unique index in schema s1 on the subscriber only.  When
+# we create subscriptions, below, this should cause subscription "s1" on the
+# subscriber to fail during initial synchronization and to get automatically
+# disabled.

I felt it could be made a bit more obvious upfront in a comment that 2
pairs of pub/sub will be created, and their names will same as the
schemas:
e.g.
Publisher "s1" --> Subscriber "s1"
Publisher "s2" --> Subscriber "s2"

~~~

18. src/test/subscription/t/028_disable_on_error.pl - ALTER tests?

The tests here are only using the hardwired 'disable_on_error' options
set at CREATE SUBSCRIPTION time. There are no TAP tests for changing
the disable_on_error using ALTER SUBSCRIPTION.

Should there be?

------
[1] https://github.com/postgres/postgres/commit/549ec201d6132b7c7ee11ee90a4e02119259ba5b
[2] worker.c.peter.txt is same as your v18 worker.c but I re-wrote
functions LogicalRepHandleTableSync and LogicalRepHandleApplyMessages
as POC

Kind Regards,
Peter Smith.
Fujitsu Australia

Attachment

pgsql-hackers by date:

Previous
From: Dilip Kumar
Date:
Subject: Re: [Proposal] Fully WAL logged CREATE DATABASE - No Checkpoints
Next
From: Nitin Jadhav
Date:
Subject: Re: Report checkpoint progress with pg_stat_progress_checkpoint (was: Report checkpoint progress in server logs)