Re: Logical replication keepalive flood - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Logical replication keepalive flood
Date
Msg-id 20210609.171751.1579873424296912837.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Logical replication keepalive flood  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses Re: Logical replication keepalive flood
List pgsql-hackers
At Wed, 9 Jun 2021 11:21:55 +0900, Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> The issue - if actually it is - we send a keep-alive packet before a
> quite short sleep.
> 
> We really want to send it if the sleep gets long but we cannot predict
> that before entering a sleep.
> 
> Let me think a little more on this..

After some investigation, I find out that the keepalives are sent
almost always after XLogSendLogical requests for the *next* record. In
most of the cases the record is not yet inserted at the request time
but insertd very soon (in 1-digit milliseconds). It doesn't seem to be
expected that that happens with such a high frequency when
XLogSendLogical is keeping up-to-date with the bleeding edge of WAL
records.

It is completely unpredictable when the next record comes, so we
cannot decide whether to send a keepalive or not at the current
timing.

Since we want to send a keepalive when we have nothing to send for a
while, it is a bit different to keep sending keepalives at some
intervals while the loop is busy.

As a possible solution, the attached patch splits the sleep into two
pieces. If the first sleep reaches the timeout then send a keepalive
then sleep for the remaining time. The first timeout is quite
arbitrary but keepalive of 4Hz at maximum doesn't look so bad to me.

Is it acceptable?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 109c723f4e..49b3c0d4e2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -105,6 +105,9 @@
  */
 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
 
+/* Minimum idle time for sending an idle-time keepalive in milliseconds */
+#define KEEPALIVE_TIMEOUT 250
+
 /* Array of WalSnds in shared memory */
 WalSndCtlData *WalSndCtl = NULL;
 
@@ -244,7 +247,7 @@ static void WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
 static long WalSndComputeSleeptime(TimestampTz now);
-static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
+static int WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
@@ -1428,19 +1431,6 @@ WalSndWaitForWal(XLogRecPtr loc)
         if (got_STOPPING)
             break;
 
-        /*
-         * We only send regular messages to the client for full decoded
-         * transactions, but a synchronous replication and walsender shutdown
-         * possibly are waiting for a later location. So, before sleeping, we
-         * send a ping containing the flush location. If the receiver is
-         * otherwise idle, this keepalive will trigger a reply. Processing the
-         * reply will update these MyWalSnd locations.
-         */
-        if (MyWalSnd->flush < sentPtr &&
-            MyWalSnd->write < sentPtr &&
-            !waiting_for_ping_response)
-            WalSndKeepalive(false);
-
         /* check whether we're done */
         if (loc <= RecentFlushPtr)
             break;
@@ -1483,6 +1473,39 @@ WalSndWaitForWal(XLogRecPtr loc)
         if (pq_is_send_pending())
             wakeEvents |= WL_SOCKET_WRITEABLE;
 
+        /*
+         * We only send regular messages to the client for full decoded
+         * transactions, but a synchronous replication and walsender shutdown
+         * possibly are waiting for a later location. So, before sleeping, we
+         * send a ping containing the flush location. If the receiver is
+         * otherwise idle, this keepalive will trigger a reply. Processing the
+         * reply will update these MyWalSnd locations. If the sleep is shorter
+         * than KEEPALIVE_TIMEOUT milliseconds, we skip sending a keepalive to
+         * prevent it from getting too-frequent.
+         */
+        if (MyWalSnd->flush < sentPtr &&
+            MyWalSnd->write < sentPtr &&
+            !waiting_for_ping_response)
+        {
+            if (sleeptime > KEEPALIVE_TIMEOUT)
+            {
+                int r;
+
+                r = WalSndWait(wakeEvents, KEEPALIVE_TIMEOUT,
+                               WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+
+                if (r != 0)
+                    continue;
+
+                sleeptime -= KEEPALIVE_TIMEOUT;
+            }
+
+            WalSndKeepalive(false);
+
+            if (pq_flush_if_writable() != 0)
+                WalSndShutdown();
+        }
+        
         WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
     }
 
@@ -3136,15 +3159,18 @@ WalSndWakeup(void)
  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags.  Exit
  * on postmaster death.
  */
-static void
+static int
 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
 {
     WaitEvent    event;
+    int            ret;
 
     ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
-    if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
-        (event.events & WL_POSTMASTER_DEATH))
+    ret = WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event);
+    if (ret == 1 && (event.events & WL_POSTMASTER_DEATH))
         proc_exit(1);
+
+    return ret;
 }
 
 /*

pgsql-hackers by date:

Previous
From: "tsunakawa.takay@fujitsu.com"
Date:
Subject: RE: Transactions involving multiple postgres foreign servers, take 2
Next
From: Amit Kapila
Date:
Subject: Re: locking [user] catalog tables vs 2pc vs logical rep