Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

Here are my review comments for patch v21-0001:

Note - There are some "general" comments which will result in lots of
smaller changes. The subsequent "detailed" review comments have some
overlap with these general comments but I expect some will be missed
so please search/replace to fix all code related to those general
comments.

======

1. GENERAL - main_worker_pid and replorigin_session_setup

Quite a few of my subsequent review comments below are related to the
somewhat tricky (IMO) change to the code for this area. Here is a
summary of some things that can be done to clean/simplify this logic.

1a.
Make the existing replorigin_session_setup function just be a wrapper
that delegates to the other function passing the acquired_by as 0.
This is because in every case but one (in the apply bg worker main) we
are always passing 0, and IMO there is no need to spread the messy
extra param to places that do not use it.

1b.
'main_worker_pid' is a confusing member name given the way it gets
used - e.g. not even set when you actually *are* the main apply
worker? You can still keep all the same logic, but just change the
name to something more like 'apply_leader_pid' - then the code can
make sense because the main apply workers have no "apply leader" but
the apply background workers do.

1c.
IMO it will be much better to use pid_t and InvalidPid for the type
and the unset values of this member.

1d.
The checks/Asserts for main_worker_pid are confusing to read. (e.g.
Assert(worker->main_worker_pid != 0) means the worker is a apply
background worker. IMO there should be convenient macros for these -
then code can be readable again.
e.g.
#define isApplyMainWorker(worker) (worker->apply_leader_pid == InvalidPid)
#define isApplyBgWorker(worker) (worker->apply_leader_pid != InvalidPid)

======

2. GENERAL - ApplyBgworkerInfo

I like that the struct ApplyBgworkerState was renamed to the more
appropriate name ApplyBgworkerInfo. But now all the old variable names
(e.g. 'wstate') and parameters must be updated as well. Please
search/replace them all in code and comments.

e.g.
ApplyBgworkerInfo *wstate

should now be something like:
ApplyBgworkerInfo *winfo;

======

3. GENERAL - ApplyBgWorkerStatus --> ApplyBgworkerState

IMO the enum should be changed to ApplyBgWorkerState because the
values all represent the discrete state that the bgworker is at. See
the top StackOverflow answer here [1] which is the same as the point I
am trying to make with this comment.

This is a  simple mechanical exercise rename to fix the reliability
but it will impact lots of variables, parameters, function names, and
comments. Please search/replace to get them all.

======

4. Commit message

In addition, the patch extends the logical replication STREAM_ABORT message so
that abort_time and abort_lsn can also be sent which can be used to update the
replication origin in apply background worker when the streaming transaction is
aborted.

4a.
Should this para also mention something about the introduction of
protocol version 4?

4b.
Should this para also mention that these extensions are not strictly
mandatory for the parallel streaming to still work?

======

5. doc/src/sgml/catalogs.sgml

       <para>
-       If true, the subscription will allow streaming of in-progress
-       transactions
+       Controls how to handle the streaming of in-progress transactions:
+       <literal>f</literal> = disallow streaming of in-progress transactions,
+       <literal>t</literal> = spill the changes of in-progress transactions to
+       disk and apply at once after the transaction is committed on the
+       publisher,
+       <literal>p</literal> = apply changes directly using a background
+       worker if available(same as 't' if no worker is available)
       </para></entry>

Missing whitespace before '('

======

6. doc/src/sgml/logical-replication.sgml

@@ -1334,7 +1344,8 @@ CONTEXT:  processing remote data for replication
origin "pg_16395" during "INSER
    subscription.  A disabled subscription or a crashed subscription will have
    zero rows in this view.  If the initial data synchronization of any
    table is in progress, there will be additional workers for the tables
-   being synchronized.
+   being synchronized. Moreover, if the streaming transaction is applied
+   parallelly, there will be additional workers.
   </para>

"applied parallelly" sounds a bit strange.

SUGGESTION-1
Moreover, if the streaming transaction is applied in parallel, there
will be additional workers.

SUGGESTION-2
Moreover, if the streaming transaction is applied using 'parallel'
mode, there will be additional workers.

======

7. doc/src/sgml/protocol.sgml

@@ -3106,6 +3106,11 @@ psql "dbname=postgres replication=database" -c
"IDENTIFY_SYSTEM;"
        Version <literal>3</literal> is supported only for server version 15
        and above, and it allows streaming of two-phase commits.
       </para>
+      <para>
+       Version <literal>4</literal> is supported only for server version 16
+       and above, and it allows applying stream of large in-progress
+       transactions in parallel.
+      </para>

7a.
"applying stream of" -> "applying streams of"

7b.
Actually, I'm not sure that this description is strictly correct even
to say "it allows ..." because IIUC the streaming=parallel can still
work anyway without protocol 4 – it is just some of the extended
STREAM_ABORT message members will be missing, right?

======

8. doc/src/sgml/ref/create_subscription.sgml

+         <para>
+          If set to <literal>parallel</literal>, incoming changes are directly
+          applied via one of the apply background workers, if available. If no
+          background worker is free to handle streaming transaction then the
+          changes are written to temporary files and applied after the
+          transaction is committed. Note that if an error happens when
+          applying changes in a background worker, the finish LSN of the
+          remote transaction might not be reported in the server log.
          </para>

"is free to handle streaming transaction"
-> "is free to handle streaming transactions"
or -> "is free to handle the streaming transaction"

======

9. src/backend/replication/logical/applybgworker.c - general

Some of the messages refer to the "worker #%u" and some refer to the
"worker %u" (without the '#'). All the messages should have a
consistent format.

~~~

10. src/backend/replication/logical/applybgworker.c - general

Search/replace all 'wstate' and change to 'winfo' or similar. See comment #2

~~~

11. src/backend/replication/logical/applybgworker.c - define

+/* Queue size of DSM, 16 MB for now. */
+#define DSM_QUEUE_SIZE (16*1024*1024)

Missing whitespace between operators

~~~

12. src/backend/replication/logical/applybgworker.c - define

+/*
+ * There are three fields in message: start_lsn, end_lsn and send_time. Because
+ * we have updated these statistics in apply worker, we could ignore these
+ * fields in apply background worker. (see function LogicalRepApplyLoop).
+ */
+#define SIZE_STATS_MESSAGE (2*sizeof(XLogRecPtr)+sizeof(TimestampTz))

12a.
"worker." -> "worker" (since the sentence already has a period at the end)

12b.
Missing whitespace between operators

~~~

13. src/backend/replication/logical/applybgworker.c - ApplyBgworkerEntry

+/*
+ * Entry for a hash table we use to map from xid to our apply background worker
+ * state.
+ */
+typedef struct ApplyBgworkerEntry

"our" -> "the"

~~~

14. src/backend/replication/logical/applybgworker.c - apply_bgworker_can_start

+ /*
+ * For streaming transactions that are being applied in apply background
+ * worker, we cannot decide whether to apply the change for a relation
+ * that is not in the READY state (see should_apply_changes_for_rel) as we
+ * won't know remote_final_lsn by that time. So, we don't start new apply
+ * background worker in this case.
+ */

14a.
"applied in apply background worker" -> "applied using an apply
background worker"

14b.
"we don't start new apply" -> "we don't start the new apply"

~~~

15. src/backend/replication/logical/applybgworker.c - apply_bgworker_start

+/*
+ * Return the apply background worker that will be used for the specified xid.
+ *
+ * If an apply background worker is found in the free list then re-use it,
+ * otherwise start a fresh one. Cache the worker ApplyBgworkersHash keyed by
+ * the specified xid.
+ */
+ApplyBgworkerInfo *
+apply_bgworker_start(TransactionId xid)

"Cache the worker ApplyBgworkersHash" -> "Cache the worker in
ApplyBgworkersHash"

~~~

16.

+ /* Try to get a free apply background worker. */
+ if (list_length(ApplyBgworkersFreeList) > 0)

Please refer to the recent push [2] of my other patch. This code should say

if (ApplyBgworkersFreeList !- NIL)

~~~

17. src/backend/replication/logical/applybgworker.c - LogicalApplyBgworkerMain

+ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+ MyLogicalRepWorker->reply_time = 0;
+
+ InitializeApplyWorker();

Lots of things happen within InitializeApplyWorker(). I think this
call deserves at least some comment to say it does lots of common
initialization. And same for the other caller or this in the apply
main worker.

~~~

18. src/backend/replication/logical/applybgworker.c - apply_bgworker_setup_dsm

+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a ApplyBgworkerShared,
+ * plus one region per message queue. There are as many message queues as
+ * the number of workers.
+ */
+static bool
+apply_bgworker_setup_dsm(ApplyBgworkerInfo *wstate)

This function is now returning a bool, so it would be better for the
function comment to describe the meaning of the return value.

~~~

19.

+ /* Create the shared memory segment and establish a table of contents. */
+ seg = dsm_create(shm_toc_estimate(&e), 0);
+
+ if (seg == NULL)
+ return false;
+
+ toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
+ segsize);

This code is similar but inconsistent with other code in the function
LogicalApplyBgworkerMain

19a.
I think the whitespace should be the same as in the other fucntion

19b.
Shouldn't the 'toc' result be checked like it was in the other function?

~~~

20. src/backend/replication/logical/applybgworker.c - apply_bgworker_setup

I think this function could be refactored to be cleaner and share more
common logic.

SUGGESTION

/* Setup shared memory, and attempt launch. */
if (apply_bgworker_setup_dsm(wstate))
{
bool launched;
launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
InvalidOid,
dsm_segment_handle(wstate->dsm_seg));
if (launched)
{
ApplyBgworkersList = lappend(ApplyBgworkersList, wstate);
MemoryContextSwitchTo(oldcontext);
return wstate;
}
else
{
dsm_detach(wstate->dsm_seg);
wstate->dsm_seg = NULL;
}
}

pfree(wstate);
MemoryContextSwitchTo(oldcontext);
return NULL;

~~~

21. src/backend/replication/logical/applybgworker.c -
apply_bgworker_check_status

+apply_bgworker_check_status(void)
+{
+ ListCell   *lc;
+
+ if (am_apply_bgworker() || MySubscription->stream != SUBSTREAM_PARALLEL)
+ return;

IMO it makes more sense logically for the condition to be reordered:

if (MySubscription->stream != SUBSTREAM_PARALLEL || am_apply_bgworker())

~~~

22.

This function should be renamed to 'apply_bgworker_check_state'. See
review comment #3

~~~

23. src/backend/replication/logical/applybgworker.c - apply_bgworker_set_status

This function should be renamed to 'apply_bgworker_set_state'. See
review comment #3

~~~

24. src/backend/replication/logical/applybgworker.c -
apply_bgworker_subxact_info_add

+ /*
+ * CommitTransactionCommand is needed to start a subtransaction after
+ * issuing a SAVEPOINT inside a transaction block(see
+ * StartSubTransaction()).
+ */

Missing whitespace before '('

~~~

25. src/backend/replication/logical/applybgworker.c -
apply_bgworker_savepoint_name

+/*
+ * Form the savepoint name for streaming transaction.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+apply_bgworker_savepoint_name(Oid suboid, TransactionId xid,

"name for streaming" -> "name for the streaming"

======

26. src/backend/replication/logical/launcher.c - logicalrep_worker_find

@@ -223,6 +227,13 @@ logicalrep_worker_find(Oid subid, Oid relid, bool
only_running)
  {
  LogicalRepWorker *w = &LogicalRepCtx->workers[i];

+ /*
+ * We are only interested in the main apply worker or table sync worker
+ * here.
+ */
+ if (w->main_worker_pid != 0)
+ continue;
+

IMO the comment is not very well aligned with what the code is doing.

26a.
That comment saying "We are only interested in the main apply worker
or table sync worker here." is a general statement that I think
belongs outside this loop.

26b.
And the comment just for this condition should be like the below:

SUGGESTION
Skip apply background workers.

26c.
Also, code readability would be better if it used the earlier
suggested macros. See comment #1d.

SUGGESTION
/* Skip apply background workers. */
if (isApplyBgWorker(w))
continue;
~~~

27. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

@@ -259,11 +270,11 @@ logicalrep_workers_find(Oid subid, bool only_running)
 }

 /*
- * Start new apply background worker, if possible.
+ * Start new background worker, if possible.
  */
-void
+bool
 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+ Oid relid, dsm_handle subworker_dsm)

This function now returns bool so the function comment probably should
describe the meaning of that return value.

~~~

28.

+ worker->main_worker_pid = is_subworker ? MyProcPid : 0;

Here is an example where I think code would benefit from the
suggestions of comments #1b, #1c.

SUGGESTION
worker->apply_leader_pid = is_subworker ? MyProcPid : InvalidPid;

~~~

29. src/backend/replication/logical/launcher.c - logicalrep_worker_stop

+ Assert(worker->main_worker_pid == 0);

Here is an example where I think code readability would benefit from
comment #1d.

Assert(isApplyMainWorker(worker));

~~~

30. src/backend/replication/logical/launcher.c - logicalrep_worker_detach

+ /*
+ * This is the main apply worker, stop all the apply background workers
+ * previously started from here.
+ */

"worker, stop" -> "worker; stop"

~~~

31.

+ if (w->main_worker_pid != 0)
+ logicalrep_worker_stop_internal(w);

See comment #1d.

SUGGESTION:
if (isApplyBgWorker(w)) ...

~~~

32. src/backend/replication/logical/launcher.c - logicalrep_worker_cleanup

@@ -621,6 +678,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
  worker->userid = InvalidOid;
  worker->subid = InvalidOid;
  worker->relid = InvalidOid;
+ worker->main_worker_pid = 0;
 }

See Comment #1c.

SUGGESTION:
worker->apply_leader_pid = InvalidPid;

~~~

33. src/backend/replication/logical/launcher.c - logicalrep_apply_bgworker_count

+ if (w->subid == subid && w->main_worker_pid != 0)
+ res++;

See comment #1d.

SUGGESTION
if (w->subid == subid && isApplyBgWorker(w))

======

34. src/backend/replication/logical/origin.c - replorigin_session_setup

@@ -1075,12 +1075,21 @@ ReplicationOriginExitCleanup(int code, Datum arg)
  * array doesn't have to be searched when calling
  * replorigin_session_advance().
  *
- * Obviously only one such cached origin can exist per process and the current
+ * Normally only one such cached origin can exist per process and the current
  * cached value can only be set again after the previous value is torn down
  * with replorigin_session_reset().
+ *
+ * However, if the function parameter 'acquired_by' is not 0, we allow the
+ * process to use the same slot already acquired by another process. It's safe
+ * because 1) The only caller (apply background workers) will maintain the
+ * commit order by allowing only one process to commit at a time, so no two
+ * workers will be operating on the same origin at the same time (see comments
+ * in logical/worker.c). 2) Even though we try to advance the session's origin
+ * concurrently, it's safe to do so as we change/advance the session_origin
+ * LSNs under replicate_state LWLock.
  */
 void
-replorigin_session_setup(RepOriginId node)
+replorigin_session_setup(RepOriginId node, int acquired_by)

34a.
The comment does not actually say that acquired_by is the PID of the
owning process. It should say that.

34b.
IMO better to change the int acquired_by to type pid_t.

~~~

35.

See comment #1a.

I suggest existing replorigin_session_setup should just now be a
wrapper function that delegates to this new function and it can pass
the 'acquired_by' as 0.

e.g.

void
replorigin_session_setup(RepOriginId node)
{
replorigin_session_setup_acquired(node, 0)
}

~~

- session_replication_state->acquired_by = MyProcPid;
+ if (acquired_by == 0)
+ session_replication_state->acquired_by = MyProcPid;
+ else if (session_replication_state->acquired_by == 0)
+ elog(ERROR, "could not find replication state slot for replication"
+   "origin with OID %u which was acquired by %d", node, acquired_by);

Is that right to compare == 0?

Shouldn't this really be checking the owner is the passed 'acquired_by' slot?

e.g.

else if (session_replication_state->acquired_by != acquired_by)

======

36. src/backend/replication/logical/tablesync.c - process_syncing_tables

@@ -589,6 +590,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
+ if (am_apply_bgworker())
+ return;
+

Perhaps should be a comment to describe why process_syncing_tables
should be skipped for the apply background worker?

======

37. src/backend/replication/logical/worker.c - file comment

+ * 2) Write to temporary files and apply when the final commit arrives
+ *
+ * If no worker is available to handle streamed transaction, the data is
+ * written to temporary files and then applied at once when the final commit
+ * arrives.

"streamed transaction" -> "the streamed transaction"

~~~

38. src/backend/replication/logical/worker.c - should_apply_changes_for_rel

+ *
+ * Note that for streaming transactions that is being applied in apply
+ * background worker, we disallow applying changes on a table that is not in
+ * the READY state, because we cannot decide whether to apply the change as we
+ * won't know remote_final_lsn by that time.
+ *
+ * We already checked this in apply_bgworker_can_start() before assigning the
+ * streaming transaction to the background worker, but it also needs to be
+ * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
+ * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
+ * in parallel to this transaction.
  */
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)

38a.
"transactions that is being applied" -> "transactions that are being applied"

38b.
It is a bit confusing to keep using the word "parallel" here in the
comments (which is nothing to do with streaming=parallel mode – you
just mean *simultaneously* or *concurrently*). Perhaps the code
comment can be slightly reworded? Also, "in parallel to" doesn't sound
right.

~~~

39. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /* Not in streaming mode and not in apply background worker. */
+ if (!(in_streamed_transaction || am_apply_bgworker()))
  return false;
IMO if you wanted to write the comment in that way then the code
should have matched it more closely like:
if (!in_streamed_transaction && !am_apply_bgworker())

OTOH, if you want to keep the code as-is then the comment should be
worded slightly differently.

~~~

40.

The coding styles do not seem particularly consistent. For example,
this function (handle_streamed_transaction) uses if/else and assigns
var 'res' to be a common return. But the previous function
(should_apply_changes_for_rel) uses if/else but returns directly from
every block. If possible, I think it's better to stick to the same
pattern instead of flip/flopping coding styles for no apparent reason.

~~~

41. src/backend/replication/logical/worker.c - apply_handle_prepare_internal

  /*
- * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * We must be in transaction block to balance the EndTransactionBlock
  * called within the PrepareTransactionBlock below.
  */

I'm not sure what this comment changes saying that is any different to
the original HEAD comment.

And even if it must be kept the grammar is wrong.

~~~

42. src/backend/replication/logical/worker.c - apply_handle_stream_commit

@@ -1468,8 +1793,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 static void
 apply_handle_stream_commit(StringInfo s)
 {
- TransactionId xid;
  LogicalRepCommitData commit_data;
+ TransactionId xid;

This change is just switching the order of declarations? If not
needed, remove it.

~~~

43.

+ else
+ {
+ /* This is the main apply worker. */
+ ApplyBgworkerInfo *wstate = apply_bgworker_find(xid);

- /* unlink the files with serialized changes and subxact info */
- stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+ /*
+ * Check if we are processing this transaction in an apply background
+ * worker and if so, send the changes to that worker.
+ */
+ if (wstate)
+ {
+ /* Send STREAM COMMIT message to the apply background worker. */
+ apply_bgworker_send_data(wstate, s->len, s->data);
+
+ /*
+ * After sending the data to the apply background worker, wait for
+ * that worker to finish. This is necessary to maintain commit
+ * order which avoids failures due to transaction dependencies and
+ * deadlocks.
+ */
+ apply_bgworker_wait_for(wstate, APPLY_BGWORKER_FINISHED);
+
+ pgstat_report_stat(false);
+ store_flush_position(commit_data.end_lsn);
+ stop_skipping_changes();
+
+ apply_bgworker_free(wstate);
+
+ /*
+ * The transaction is either non-empty or skipped, so we clear the
+ * subskiplsn.
+ */
+ clear_subscription_skip_lsn(commit_data.commit_lsn);
+ }
+ else
+ {
+ /*
+ * The transaction has been serialized to file, so replay all the
+ * spooled operations.
+ */
+ apply_spooled_messages(xid, commit_data.commit_lsn);
+
+ apply_handle_commit_internal(&commit_data);
+
+ /* Unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ }
+ }
+
+ /* Check the status of apply background worker if any. */
+ apply_bgworker_check_status();

  /* Process any tables that are being synchronized in parallel. */
  process_syncing_tables(commit_data.end_lsn);
43a.
AFAIK apply_bgworker_check_status() does nothing if am_apply_worker –
so can this call be moved into the code block where you already know
it is the main apply worker?

43b.
Similarly, AFAIK process_syncing_tables() does nothing if
am_apply_worker – so can this call can be moved into the code block
where you already know it is the main apply worker?

~~~

44. src/backend/replication/logical/worker.c - InitializeApplyWorker


+/*
+ * Initialize the databse connection, in-memory subscription and necessary
+ * config options.
+ */
 void
-ApplyWorkerMain(Datum main_arg)
+InitializeApplyWorker(void)
 {

44a.
typo "databse"

44b.
Should there be some more explanation in this comment to say that this
is common code for both the appl main workers and apply background
workers?

44c.
Following on from #44b, consider renaming this to something like
CommonApplyWorkerInit() to emphasize it is called from multiple
places?

~~~

45. src/backend/replication/logical/worker.c - ApplyWorkerMain

- replorigin_session_setup(originid);
+ replorigin_session_setup(originid, 0);


See #1a. Then this change won't be necessary.

~~~

46. src/backend/replication/logical/worker.c - apply_error_callback

+ if (errarg->remote_attnum < 0)
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" in transaction
%u",
+    errarg->origin_name,
+    logicalrep_message_type(errarg->command),
+    errarg->rel->remoterel.nspname,
+    errarg->rel->remoterel.relname,
+    errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" in transaction
%u finished at %X/%X",
+    errarg->origin_name,
+    logicalrep_message_type(errarg->command),
+    errarg->rel->remoterel.nspname,
+    errarg->rel->remoterel.relname,
+    errarg->remote_xid,
+    LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ else
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
in transaction %u",
+    errarg->origin_name,
+    logicalrep_message_type(errarg->command),
+    errarg->rel->remoterel.nspname,
+    errarg->rel->remoterel.relname,
+    errarg->rel->remoterel.attnames[errarg->remote_attnum],
+    errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
in transaction %u finished at %X/%X",
+    errarg->origin_name,
+    logicalrep_message_type(errarg->command),
+    errarg->rel->remoterel.nspname,
+    errarg->rel->remoterel.relname,
+    errarg->rel->remoterel.attnames[errarg->remote_attnum],
+    errarg->remote_xid,
+    LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ }

Hou-san had asked [3](comment #14) me how the above code can be
shortened. Below is one idea, but maybe you won't like it ;-)

#define MSG_O_T_S_R "processing remote data for replication origin
\"%s\" during \"%s\" for replication target relation \"%s.%s\" "
#define O_T_S_R\
errarg->origin_name,\
logicalrep_message_type(errarg->command),\
errarg->rel->remoterel.nspname,\
errarg->rel->remoterel.relname

if (errarg->remote_attnum < 0)
{
if (XLogRecPtrIsInvalid(errarg->finish_lsn))
errcontext(MSG_O_T_S_R "in transaction %u",
   O_T_S_R,
   errarg->remote_xid);
else
errcontext(MSG_O_T_S_R "in transaction %u finished at %X/%X",
   O_T_S_R,
   errarg->remote_xid,
   LSN_FORMAT_ARGS(errarg->finish_lsn));
}
else
{
if (XLogRecPtrIsInvalid(errarg->finish_lsn))
errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u",
   O_T_S_R,
   errarg->rel->remoterel.attnames[errarg->remote_attnum],
   errarg->remote_xid);
else
errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u finished at %X/%X",
   O_T_S_R,
   errarg->rel->remoterel.attnames[errarg->remote_attnum],
   errarg->remote_xid,
   LSN_FORMAT_ARGS(errarg->finish_lsn));
}
#undef O_T_S_R
#undef MSG_O_T_S_R

======

47. src/include/replication/logicalproto.h

@@ -32,12 +32,17 @@
  *
  * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
  * support for two-phase commit decoding (at prepare time). Introduced in PG15.
+ *
+ * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
+ * with support for streaming large transactions using apply background
+ * workers. Introduced in PG16.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
 #define LOGICALREP_PROTO_VERSION_NUM 1
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
+#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
+#define LOGICALREP_PROTO_MAX_VERSION_NUM
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

47a.
I don't think that comment is strictly true. IIUC the new protocol
version 4 is currently only affecting the *extra* STREAM_ABORT members
– but in fact streaming=parallel is still functional without using
those extra members, isn't it? So maybe this description needed to be
modified a bit to be more accurate?

47b.
And perhaps the entire constant should be renamed to something like
LOGICALREP_PROTO_PARALLEL_STREAM_ABORT_VERSION_NUM?

======

48. src/include/replication/origin.h

-extern void replorigin_session_setup(RepOriginId node);
+extern void replorigin_session_setup(RepOriginId node, int acquired_by);

See comment #1a, #35.

IMO original should be left as-is and a new "wrapped" function added
with pid_t param.

======

49. src/include/replication/worker_internal.h

@@ -60,6 +63,12 @@ typedef struct LogicalRepWorker
  */
  FileSet    *stream_fileset;

+ /*
+ * PID of main apply worker if this slot is used for an apply background
+ * worker.
+ */
+ int main_worker_pid;
+
  /* Stats. */
  XLogRecPtr last_lsn;
  TimestampTz last_send_time;
@@ -68,8 +77,70 @@ typedef struct LogicalRepWorker
  TimestampTz reply_time;
 } LogicalRepWorker;

49a.
See my general comments #1b, #1b, #1c about this.

49b.
Also, the comment should describe both cases.

SUGGESTION
/*
 * For apply background worker - 'apply_leader_pid' is the PID of the main
 * apply worker that launched the apply background worker.
 *
 * For main apply worker - 'apply_leader_pid' is InvalidPid.
 */
pid_t apply_leader_pid;

49c.
Here is where some helpful worker macros (mentioned in comment #1d)
can be defined.

SUGGESTION
#define isApplyMainWorker(worker) (worker->apply_leader_pid == InvalidPid)
#define isApplyBgWorker(worker) (worker->apply_leader_pid != InvalidPid)

~~~

50.

+/*
+ * Status of apply background worker.
+ */
+typedef enum ApplyBgworkerStatus
+{
+ APPLY_BGWORKER_BUSY = 0, /* assigned to a transaction */
+ APPLY_BGWORKER_FINISHED, /* transaction is completed */
+ APPLY_BGWORKER_EXIT /* exit */
+} ApplyBgworkerStatus;


50a.
See general comment #3 why this enum should be renamed to ApplyBgworkerState

50b.
The comment "/* exit */" is pretty meaningless. Maybe "worker has
shutdown/exited" or similar?

50c.
In fact, I think the enum value should be APPLY_BGWORKER_EXITED

50d.
There seems no reason to explicitly assign APPLY_BGWORKER_BUSY enum value to 0.

SUGGESTION
/*
 * Apply background worker states.
 */
typedef enum ApplyBgworkerState
{
APPLY_BGWORKER_BUSY, /* assigned to a transaction */
APPLY_BGWORKER_FINISHED, /* transaction is completed */
APPLY_BGWORKER_EXITED /* worker has shutdown/exited */
} ApplyBgworkerState;

~~~

51.

+typedef struct ApplyBgworkerShared
+{
+ slock_t mutex;
+
+ /* Status of apply background worker. */
+ ApplyBgworkerStatus status;
+
+ /* Logical protocol version. */
+ uint32 proto_version;
+
+ TransactionId stream_xid;
+
+ /* Id of apply background worker */
+ uint32 worker_id;
+} ApplyBgworkerShared;

51a.
+ /* Status of apply background worker. */
+ ApplyBgworkerStatus status;

See review comment #3.

SUGGESTION:
/* Current state of the apply background worker c. */
ApplyBgworkerState worker_state;

51b.
+ /* Id of apply background worker */

"Id" -> "ID" might be more usual.

~~~

52.

+/* Apply background worker setup and interactions */
+extern ApplyBgworkerInfo *apply_bgworker_start(TransactionId xid);
+extern ApplyBgworkerInfo *apply_bgworker_find(TransactionId xid);
+extern void apply_bgworker_wait_for(ApplyBgworkerInfo *wstate,
+ ApplyBgworkerStatus wait_for_status);
+extern void apply_bgworker_send_data(ApplyBgworkerInfo *wstate, Size nbytes,
+ const void *data);
+extern void apply_bgworker_free(ApplyBgworkerInfo *wstate);
+extern void apply_bgworker_check_status(void);
+extern void apply_bgworker_set_status(ApplyBgworkerStatus status);
+extern void apply_bgworker_subxact_info_add(TransactionId current_xid);
+extern void apply_bgworker_savepoint_name(Oid suboid, Oid relid,
+   char *spname, int szsp);

This big block of similarly named externs might as well be in
alphabetical order instead of apparently random.

~~~

53.

+static inline bool
+am_apply_bgworker(void)
+{
+ return MyLogicalRepWorker->main_worker_pid != 0;
+}

This can be simplified/improved using the new macros as previously
suggested in #1d.

SUGGESTION
static inline bool
am_apply_bgworker(void)
{
return isApplyBgWorker(MyLogicalRepWorker);
}

====

54. src/tools/pgindent/typedefs.list

 AppendState
+ApplyBgworkerEntry
+ApplyBgworkerShared
+ApplyBgworkerInfo
+ApplyBgworkerStatus
 ApplyErrorCallbackArg

Please rearrange these into alphabetical order.

------
[1]
https://softwareengineering.stackexchange.com/questions/219351/state-or-status-when-should-a-variable-name-contain-the-word-state-and-w#:~:text=status%20is%20used%20to%20describe,(e.g.%20pending%2Fdispatched)
[2] https://github.com/postgres/postgres/commit/efd0c16becbf45e3b0215e124fde75fee8fcbce4
[3]
https://www.postgresql.org/message-id/OS0PR01MB57169AEA399C6DC370EAF23B94649%40OS0PR01MB5716.jpnprd01.prod.outlook.com

Kind Regards,
Peter Smith.
Fujitsu Australia



pgsql-hackers by date:

Previous
From: "Drouvot, Bertrand"
Date:
Subject: Re: shared-memory based stats collector - v70
Next
From: Natarajan R
Date:
Subject: Re: Compressed pluggable storage experiments