RE: Logical replication timeout problem - Mailing list pgsql-hackers

From kuroda.hayato@fujitsu.com
Subject RE: Logical replication timeout problem
Date
Msg-id TYAPR01MB5866BD2248EF82FF432FE599F52D9@TYAPR01MB5866.jpnprd01.prod.outlook.com
Whole thread Raw
In response to RE: Logical replication timeout problem  ("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>)
Responses RE: Logical replication timeout problem
List pgsql-hackers
Dear Wang,

Thank you for making a patch.
I applied your patch and confirmed that codes passed regression test.
I put a short reviewing:

```
+    static int skipped_changes_count = 0;
+    /*
+     * Conservatively, at least 150,000 changes can be skipped in 1s.
+     *
+     * Because we use half of wal_sender_timeout as the threshold, and the unit
+     * of wal_sender_timeout in process is ms, the final threshold is
+     * wal_sender_timeout * 75.
+     */
+    int skipped_changes_threshold = wal_sender_timeout * 75;
```

I'm not sure but could you tell me the background of this calculation? 
Is this assumption reasonable?

```
@@ -654,20 +663,62 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     {
         case REORDER_BUFFER_CHANGE_INSERT:
             if (!relentry->pubactions.pubinsert)
+            {
+                if (++skipped_changes_count >= skipped_changes_threshold)
+                {
+                    OutputPluginUpdateProgress(ctx, true);
+
+                    /*
+                     * After sending keepalive message, reset
+                     * skipped_changes_count.
+                     */
+                    skipped_changes_count = 0;
+                }
                 return;
+            }
             break;
```

Is the if-statement needed? In the walsender case OutputPluginUpdateProgress() leads WalSndUpdateProgress(),
and the function also has the threshold for ping-ing.

```
static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive)
 {
-    static TimestampTz sendTime = 0;
+    static TimestampTz trackTime = 0;
     TimestampTz now = GetCurrentTimestamp();
 
+    if (send_keep_alive)
+    {
+        /*
+         * If half of wal_sender_timeout has lapsed without send message standby,
+         * send a keep-alive message to the standby.
+         */
+        static TimestampTz sendTime = 0;
+        TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+                                            wal_sender_timeout / 2);
+        if (now >= ping_time)
+        {
+            WalSndKeepalive(false);
+
+            /* Try to flush pending output to the client */
+            if (pq_flush_if_writable() != 0)
+                WalSndShutdown();
+            sendTime = now;
+        }
+    }
+
```

* +1 about renaming to trackTime.
* `/2` might be magic number. How about following? Renaming is very welcome:

```
+#define WALSND_LOGICAL_PING_FACTOR     0.5
+               static TimestampTz sendTime = 0;
+               TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+                                                                                       wal_sender_timeout *
WALSND_LOGICAL_PING_FACTOR)
```

Could you add a commitfest entry for cfbot?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED


pgsql-hackers by date:

Previous
From: Bharath Rupireddy
Date:
Subject: Re: Add checkpoint and redo LSN to LogCheckpointEnd log message
Next
From: Ken Kato
Date:
Subject: Re: [PATCH] Add min() and max() aggregate functions for xid8