Re: Logical replication keepalive flood - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Re: Logical replication keepalive flood |
Date | |
Msg-id | 20210610.150016.1709823354377067679.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: Logical replication keepalive flood (Abbas Butt <abbas.butt@enterprisedb.com>) |
Responses |
Re: Logical replication keepalive flood
|
List | pgsql-hackers |
At Wed, 9 Jun 2021 17:32:25 +0500, Abbas Butt <abbas.butt@enterprisedb.com> wrote in > > On Wed, Jun 9, 2021 at 2:30 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > Does these keepalive messages are sent at the same frequency even for > > subscribers? > > Yes, I have tested it with one publisher and one subscriber. > The moment I start pgbench session I can see keepalive messages sent and > replied by the subscriber with same frequency. > > > Basically, I wanted to check if we have logical > > replication set up between 2 nodes then do we send these keep-alive > > messages flood? > > Yes we do. > > > If not, then why is it different in the case of > > pg_recvlogical? > > Nothing, the WAL sender behaviour is same in both cases. > > > > Is it possible that the write/flush location is not > > updated at the pace at which we expect? Yes. MyWalSnd->flush/write are updated far frequently but still MyWalSnd->write is behind sentPtr by from thousands of bytes up to less than 1 block (1block = 8192 bytes). (Flush lags are larger than write lags, of course.) I counted how many times keepalives are sent for each request length to logical_read_xlog_page() for 10 seconds pgbench run and replicating pgbench_history, using the attached change. size: sent /notsent/ calls: write lag/ flush lag 8: 3 / 6 / 3: 5960 / 348962 16: 1 / 2 / 1: 520 / 201096 24: 2425 / 4852 / 2461: 5259 / 293569 98: 2 / 0 / 54: 5 / 1050 187: 2 / 0 / 94: 0 / 1060 4432: 1 / 0 / 1: 410473592 / 410473592 7617: 2 / 0 / 27: 317 / 17133 8280: 1 / 2 / 4: 390 / 390 Where, size is requested data length to logical_read_xlog_page() sent is the number of keepalives sent in the loop in WalSndWaitForWal notsent is the number of runs of the loop in WalSndWaitForWal without sending a keepalive calls is the number of calls to WalSndWaitForWal write lag is the bytes MyWalSnd->write is behind from sentPtr at the first run of the loop per call to logical_read_xlog_page. flush lag is the the same to the above for MyWalSnd->flush. Maybe the line of size=4432 is the first time fetch of WAL. So this numbers show that WalSndWaitForWal is called almost only at starting to fetching a record, and in that case the function runs the loop three times and sends one keepalive by average. > Well, it is async replication. The receiver can choose to update LSNs at > its own will, say after 10 mins interval. > It should only impact the size of WAL retained by the server. > > Please see commit 41d5f8ad73 > > which seems to be talking about a similar problem. > > > > That commit does not address this problem. Yeah, at least for me, WalSndWaitForWal send a keepalive per one call. regards. -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 42738eb940..ee78116e79 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -571,6 +571,7 @@ err: * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. */ +int hogestate = -1; static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { @@ -605,6 +606,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; + hogestate = pageptr + XLOG_BLCKSZ - state->currRecPtr; readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, state->currRecPtr, state->readBuf); @@ -623,6 +625,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * First, read the requested data length, but at least a short page header * so that we can validate it. */ + hogestate = pageptr + Max(reqLen, SizeOfXLogShortPHD) - state->currRecPtr; readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), state->currRecPtr, state->readBuf); @@ -642,6 +645,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* still not enough */ if (readLen < XLogPageHeaderSize(hdr)) { + hogestate = pageptr + XLogPageHeaderSize(hdr) - state->currRecPtr; readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, state->readBuf); @@ -649,6 +653,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) goto err; } + hogestate = -1; /* * Now that we know we have the full header, validate it. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 109c723f4e..0de10c4a31 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1363,17 +1363,45 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId * if we detect a shutdown request (either from postmaster or client) * we will return early, so caller must always check. */ +unsigned long counts[32768][3] = {0}; +unsigned long lagw[32768] = {0}; +unsigned long lagf[32768] = {0}; + +void +PrintCounts(void) +{ + int i = 0; + for (i = 0 ; i < 32768 ; i++) + { + if (counts[i][0] + counts[i][1] + counts[i][2] > 0) + { + unsigned long wl = 0, fl = 0; + if (counts[i][1] > 0) + { + wl = lagw[i] / counts[i][0]; + fl = lagf[i] / counts[i][0]; + + ereport(LOG, (errmsg ("[%5d]: %5lu / %5lu / %5lu: %5lu %5lu", + i, counts[i][1], counts[i][2], counts[i][0], wl, fl), errhidestmt(true))); + } + } + } +} + static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + extern int hogestate; + bool lagtaken = false; /* * Fast path to avoid acquiring the spinlock in case we already know we * have enough WAL available. This is particularly interesting if we're * far behind. */ + counts[hogestate][0]++; if (RecentFlushPtr != InvalidXLogRecPtr && loc <= RecentFlushPtr) return RecentFlushPtr; @@ -1439,7 +1467,39 @@ WalSndWaitForWal(XLogRecPtr loc) if (MyWalSnd->flush < sentPtr && MyWalSnd->write < sentPtr && !waiting_for_ping_response) + { + if (hogestate >= 0) + { + counts[hogestate][1]++; + if (!lagtaken) + { + lagf[hogestate] += sentPtr - MyWalSnd->flush; + lagw[hogestate] += sentPtr - MyWalSnd->write; + lagtaken = true; + } + } +// ereport(LOG, (errmsg ("KA[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld", +// ka, na, ka + na, +// LSN_FORMAT_ARGS(MyWalSnd->flush), +// LSN_FORMAT_ARGS(MyWalSnd->write), +// LSN_FORMAT_ARGS(sentPtr), +// waiting_for_ping_response, +// sentPtr - MyWalSnd->write))); WalSndKeepalive(false); + } + else + { + if (hogestate >= 0) + counts[hogestate][2]++; + +// ereport(LOG, (errmsg ("kap[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld", +// ka, na, ka + na, +// LSN_FORMAT_ARGS(MyWalSnd->flush), +// LSN_FORMAT_ARGS(MyWalSnd->write), +// LSN_FORMAT_ARGS(sentPtr), +// waiting_for_ping_response, +// sentPtr - MyWalSnd->write))); + } /* check whether we're done */ if (loc <= RecentFlushPtr)
pgsql-hackers by date: