Re: Logical replication keepalive flood - Mailing list pgsql-hackers
| From | Kyotaro Horiguchi |
|---|---|
| Subject | Re: Logical replication keepalive flood |
| Date | |
| Msg-id | 20210610.150016.1709823354377067679.horikyota.ntt@gmail.com Whole thread Raw |
| In response to | Re: Logical replication keepalive flood (Abbas Butt <abbas.butt@enterprisedb.com>) |
| Responses |
Re: Logical replication keepalive flood
|
| List | pgsql-hackers |
At Wed, 9 Jun 2021 17:32:25 +0500, Abbas Butt <abbas.butt@enterprisedb.com> wrote in
>
> On Wed, Jun 9, 2021 at 2:30 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > Does these keepalive messages are sent at the same frequency even for
> > subscribers?
>
> Yes, I have tested it with one publisher and one subscriber.
> The moment I start pgbench session I can see keepalive messages sent and
> replied by the subscriber with same frequency.
>
> > Basically, I wanted to check if we have logical
> > replication set up between 2 nodes then do we send these keep-alive
> > messages flood?
>
> Yes we do.
>
> > If not, then why is it different in the case of
> > pg_recvlogical?
>
> Nothing, the WAL sender behaviour is same in both cases.
>
>
> > Is it possible that the write/flush location is not
> > updated at the pace at which we expect?
Yes. MyWalSnd->flush/write are updated far frequently but still
MyWalSnd->write is behind sentPtr by from thousands of bytes up to
less than 1 block (1block = 8192 bytes). (Flush lags are larger than
write lags, of course.)
I counted how many times keepalives are sent for each request length
to logical_read_xlog_page() for 10 seconds pgbench run and replicating
pgbench_history, using the attached change.
size: sent /notsent/ calls: write lag/ flush lag
8: 3 / 6 / 3: 5960 / 348962
16: 1 / 2 / 1: 520 / 201096
24: 2425 / 4852 / 2461: 5259 / 293569
98: 2 / 0 / 54: 5 / 1050
187: 2 / 0 / 94: 0 / 1060
4432: 1 / 0 / 1: 410473592 / 410473592
7617: 2 / 0 / 27: 317 / 17133
8280: 1 / 2 / 4: 390 / 390
Where,
size is requested data length to logical_read_xlog_page()
sent is the number of keepalives sent in the loop in WalSndWaitForWal
notsent is the number of runs of the loop in WalSndWaitForWal without
sending a keepalive
calls is the number of calls to WalSndWaitForWal
write lag is the bytes MyWalSnd->write is behind from sentPtr at the
first run of the loop per call to logical_read_xlog_page.
flush lag is the the same to the above for MyWalSnd->flush.
Maybe the line of size=4432 is the first time fetch of WAL.
So this numbers show that WalSndWaitForWal is called almost only at
starting to fetching a record, and in that case the function runs the
loop three times and sends one keepalive by average.
> Well, it is async replication. The receiver can choose to update LSNs at
> its own will, say after 10 mins interval.
> It should only impact the size of WAL retained by the server.
>
> Please see commit 41d5f8ad73
> > which seems to be talking about a similar problem.
> >
>
> That commit does not address this problem.
Yeah, at least for me, WalSndWaitForWal send a keepalive per one call.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 42738eb940..ee78116e79 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -571,6 +571,7 @@ err:
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
*/
+int hogestate = -1;
static int
ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
@@ -605,6 +606,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
+ hogestate = pageptr + XLOG_BLCKSZ - state->currRecPtr;
readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
state->currRecPtr,
state->readBuf);
@@ -623,6 +625,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* First, read the requested data length, but at least a short page header
* so that we can validate it.
*/
+ hogestate = pageptr + Max(reqLen, SizeOfXLogShortPHD) - state->currRecPtr;
readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
state->currRecPtr,
state->readBuf);
@@ -642,6 +645,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* still not enough */
if (readLen < XLogPageHeaderSize(hdr))
{
+ hogestate = pageptr + XLogPageHeaderSize(hdr) - state->currRecPtr;
readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
state->currRecPtr,
state->readBuf);
@@ -649,6 +653,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
goto err;
}
+ hogestate = -1;
/*
* Now that we know we have the full header, validate it.
*/
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 109c723f4e..0de10c4a31 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1363,17 +1363,45 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
* if we detect a shutdown request (either from postmaster or client)
* we will return early, so caller must always check.
*/
+unsigned long counts[32768][3] = {0};
+unsigned long lagw[32768] = {0};
+unsigned long lagf[32768] = {0};
+
+void
+PrintCounts(void)
+{
+ int i = 0;
+ for (i = 0 ; i < 32768 ; i++)
+ {
+ if (counts[i][0] + counts[i][1] + counts[i][2] > 0)
+ {
+ unsigned long wl = 0, fl = 0;
+ if (counts[i][1] > 0)
+ {
+ wl = lagw[i] / counts[i][0];
+ fl = lagf[i] / counts[i][0];
+
+ ereport(LOG, (errmsg ("[%5d]: %5lu / %5lu / %5lu: %5lu %5lu",
+ i, counts[i][1], counts[i][2], counts[i][0], wl, fl), errhidestmt(true)));
+ }
+ }
+ }
+}
+
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+ extern int hogestate;
+ bool lagtaken = false;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
* have enough WAL available. This is particularly interesting if we're
* far behind.
*/
+ counts[hogestate][0]++;
if (RecentFlushPtr != InvalidXLogRecPtr &&
loc <= RecentFlushPtr)
return RecentFlushPtr;
@@ -1439,7 +1467,39 @@ WalSndWaitForWal(XLogRecPtr loc)
if (MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
!waiting_for_ping_response)
+ {
+ if (hogestate >= 0)
+ {
+ counts[hogestate][1]++;
+ if (!lagtaken)
+ {
+ lagf[hogestate] += sentPtr - MyWalSnd->flush;
+ lagw[hogestate] += sentPtr - MyWalSnd->write;
+ lagtaken = true;
+ }
+ }
+// ereport(LOG, (errmsg ("KA[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld",
+// ka, na, ka + na,
+// LSN_FORMAT_ARGS(MyWalSnd->flush),
+// LSN_FORMAT_ARGS(MyWalSnd->write),
+// LSN_FORMAT_ARGS(sentPtr),
+// waiting_for_ping_response,
+// sentPtr - MyWalSnd->write)));
WalSndKeepalive(false);
+ }
+ else
+ {
+ if (hogestate >= 0)
+ counts[hogestate][2]++;
+
+// ereport(LOG, (errmsg ("kap[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld",
+// ka, na, ka + na,
+// LSN_FORMAT_ARGS(MyWalSnd->flush),
+// LSN_FORMAT_ARGS(MyWalSnd->write),
+// LSN_FORMAT_ARGS(sentPtr),
+// waiting_for_ping_response,
+// sentPtr - MyWalSnd->write)));
+ }
/* check whether we're done */
if (loc <= RecentFlushPtr)
pgsql-hackers by date: