RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers

From kuroda.hayato@fujitsu.com
Subject RE: Perform streaming logical transactions by background workers and parallel apply
Date
Msg-id TYAPR01MB586674C1EE91C06DBACE7728F52B9@TYAPR01MB5866.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
Responses RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
RE: Perform streaming logical transactions by background workers and parallel apply  ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>)
List pgsql-hackers
Dear Hou,

Thanks for updating the patch! Followings are my comments.

===
01. applyparallelworker.c - SIZE_STATS_MESSAGE

```
/*
 * There are three fields in each message received by the parallel apply
 * worker: start_lsn, end_lsn and send_time. Because we have updated these
 * statistics in the leader apply worker, we can ignore these fields in the
 * parallel apply worker (see function LogicalRepApplyLoop).
 */
#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
```

According to other comment styles, it seems that the first sentence of the comment should
represent the datatype and usage, not the detailed reason.
For example, about ParallelApplyWorkersList, you said "A list ...". How about adding like following message:
The message size that can be skipped by parallel apply worker


~~~
02. applyparallelworker.c - parallel_apply_start_subtrans

```
    if (current_xid != top_xid &&
        !list_member_xid(subxactlist, current_xid))
```

A macro TransactionIdEquals is defined in access/transam.h. Should we use it, or is it too trivial?


~~~
03. applyparallelwprker.c - LogicalParallelApplyLoop

```
            case SHM_MQ_WOULD_BLOCK:
                {
                    int            rc;

                    if (!in_streamed_transaction)
                    {
                        /*
                         * If we didn't get any transactions for a while there might be
                         * unconsumed invalidation messages in the queue, consume them
                         * now.
                         */
                        AcceptInvalidationMessages();
                        maybe_reread_subscription();
                    }

                    MemoryContextReset(ApplyMessageContext);
```

Is MemoryContextReset() needed? IIUC no one uses ApplyMessageContext if we reach here.


~~~
04. applyparallelwprker.c - HandleParallelApplyMessages

```
        else if (res == SHM_MQ_SUCCESS)
        {
            StringInfoData msg;

            initStringInfo(&msg);
            appendBinaryStringInfo(&msg, data, nbytes);
            HandleParallelApplyMessage(winfo, &msg);
            pfree(msg.data);
        }
```

In LogicalParallelApplyLoop(), appendBinaryStringInfo() is not used
but initialized StringInfoData directly initialized. Why there is a difrerence?
The function will do repalloc() and memcpy(), so it may be inefficient.


~~~
05. applyparallelwprker.c - parallel_apply_send_data

```
    if (result != SHM_MQ_SUCCESS)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("could not send data to shared-memory queue")));

```

I checked the enumeration of shm_mq_result, and I felt that shm_mq_send(nowait = false) failed
only when the opposite process has been exited.
How about add a hint or detailed message like "lost connection to parallel apply worker"?


===
06. worker.c - nchanges

```
/*
 * The number of changes sent to parallel apply workers during one streaming
 * block.
 */
static uint32 nchanges = 0;
```

I found that the name "nchanges" has been already used in apply_spooled_messages().
It works well because the local variable is always used
when name collision between local and global variables is occurred, but I think it may be confused.


~~~
07. worker.c - apply_handle_commit_internal

I think we can add an assertion like Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr &&
replorigin_session_origin= InvalidRepOriginId),
 
to avoid missing replorigin_session_setup. Previously it was set at the entry point at never reset.


~~~
08. worker.c - apply_handle_prepare_internal

Same as above.


~~~
09. worker.c - maybe_reread_subscription

```
    /*
     * Exit if any parameter that affects the remote connection was changed.
     * The launcher will start a new worker.
     */
    if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
        strcmp(newsub->name, MySubscription->name) != 0 ||
        strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
        newsub->binary != MySubscription->binary ||
        newsub->stream != MySubscription->stream ||
        strcmp(newsub->origin, MySubscription->origin) != 0 ||
        newsub->owner != MySubscription->owner ||
        !equal(newsub->publications, MySubscription->publications))
    {
        ereport(LOG,
                (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter
change",
                        MySubscription->name)));

        proc_exit(0);
    }
```

When the parallel apply worker has been launched and then the subscription option has been modified,
the same message will appear twice.
But if the option "streaming" is changed from "parallel" to "on", one of them will not restart again.
Should we modify message?


===
10. general

IIUC parallel apply workers could not detect the deadlock automatically, right?
I thought we might be able to use the heartbeat protocol between a leader worker and parallel workers.
 
You have already implemented a mechanism to send and receive messages between workers.
My idea is that each parallel apply worker records a timestamp that gets a message from the leader
and if a certain time (30s?) has passed it sends a heartbeat message like 'H'.
The leader consumes 'H' and sends a reply like LOGICAL_REP_MSG_KEEPALIVE in HandleParallelApplyMessage().
If the parallel apply worker does not receive any message for more than one minute,
it regards that the deadlock has occurred and can change the retry flag to on and exit.

The above assumes that the leader cannot reply to the message while waiting for the lock.
Moreover, it may have notable overhead and we must use a new logical replication message type.

How do you think? Have you already considered about it?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


pgsql-hackers by date:

Previous
From: Bharath Rupireddy
Date:
Subject: Re: Move backup-related code to xlogbackup.c/.h
Next
From: Alvaro Herrera
Date:
Subject: Re: Move backup-related code to xlogbackup.c/.h