RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From shiy.fnst@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id OSZPR01MB6310269420CA5588D6B81518FD6D9@OSZPR01MB6310.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>)
List pgsql-hackers
On Wed, Aug 17, 2022 2:28 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com> wrote:
> 
> On Tues, August 16, 2022 15:33 PM I wrote:
> > Attach the new patches.
> 
> I found that cfbot has a failure.
> After investigation, I think it is because the worker's exit state is not set
> correctly. So I made some slight modifications.
> 
> Attach the new patches.
> 

Thanks for updating the patch. Here are some comments.

0003 patch
==============
1. src/backend/replication/logical/applybgworker.c
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("cannot replicate target relation \"%s.%s\" using "
+                        "subscription parameter streaming=parallel",
+                        rel->remoterel.nspname, rel->remoterel.relname),
+                 errdetail("The unique column on subscriber is not the unique "
+                           "column on publisher or there is at least one "
+                           "non-immutable function."),
+                 errhint("Please change to use subscription parameter "
+                         "streaming=on.")));

Should we use "%s" instead of "streaming=parallel" and "streaming=on"?

2. src/backend/replication/logical/applybgworker.c
+     * If any worker is handling the streaming transaction, this check needs to
+     * be performed not only using the apply background worker, but also in the
+     * main apply worker. This is because without these restrictions, main

this check needs to be performed not only using the apply background worker, but
also in the main apply worker.
->
this check not only needs to be performed by apply background worker, but also
by the main apply worker

3. src/backend/replication/logical/relation.c
+    if (ukey)
+    {
+        i = -1;
+        while ((i = bms_next_member(ukey, i)) >= 0)
+        {
+            attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber);
+
+            if (entry->attrmap->attnums[attnum] < 0 ||
+                !bms_is_member(entry->attrmap->attnums[attnum], entry->remoterel.attunique))
+            {
+                entry->parallel_apply = PARALLEL_APPLY_UNSAFE;
+                return;
+            }
+        }
+
+        bms_free(ukey);

It looks we need to call bms_free() before return, right?

4. src/backend/replication/logical/relation.c
+        /* We don't need info for dropped or generated attributes */
+        if (att->attisdropped || att->attgenerated)
+            continue;

Would it be better to change the comment to:
We don't check dropped or generated attributes

5. src/test/subscription/t/032_streaming_apply.pl
+$node_publisher->wait_for_catchup($appname);
+
+# Then we check the foreign key on partition table.
+$node_publisher->wait_for_catchup($appname);

Here, wait_for_catchup() is called twice, we can remove the second one.

6. src/backend/replication/logical/applybgworker.c
+        /* If any workers (or the postmaster) have died, we have failed. */
+        if (status == APPLY_BGWORKER_EXIT)
+            ereport(ERROR,
+                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                     errmsg("background worker %u failed to apply transaction %u",
+                            entry->wstate->shared->worker_id,
+                            entry->wstate->shared->stream_xid)));

Should we change the error message to "apply background worker %u failed to
apply transaction %u" ? To be consistent with the error message in
apply_bgworker_wait_for().

0004 patch
==============
1.
I saw that the commit message says:
If the subscriber exits with an error, this flag will be set true, and
whenever the transaction is applied successfully, this flag is reset false.

"subretry" is set to false if a transaction is applied successfully, it looks
similar to what clear_subscription_skip_lsn() does, so maybe we should remove
the following change in apply_handle_stream_abort()? Or only call
set_subscription_retry() when rollbacking the toplevel transaction.

@@ -1671,6 +1688,9 @@ apply_handle_stream_abort(StringInfo s)
              */
             serialize_stream_abort(xid, subxid);
         }
+
+        /* Reset the retry flag. */
+        set_subscription_retry(false);
     }
 
     reset_apply_error_context_info();

2. src/backend/replication/logical/worker.c
+    /* Reset subretry */
+    values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry);
+    replaces[Anum_pg_subscription_subretry - 1] = true;

/* Reset subretry */
->
/* Set subretry */

3.
+# Insert dependent data on the publisher, now it works.
+$node_subscriber->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)");

In the case that the DELETE change from publisher has not been applied yet when
executing the INSERT, the INSERT will fail.

0005 patch
==============
1.
+      <para>
+       Process ID of the main apply worker, if this process is a apply
+       background worker. NULL if this process is a main apply worker or a
+       synchronization worker.
+      </para></entry>

a apply background worker
->
an apply background worker

Regards,
Shi yu

pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: shadow variables - pg15 edition
Next
From: Nikita Glukhov
Date:
Subject: Re: SQL/JSON features for v15