Re: [HACKERS] Restricting maximum keep segments by repslots - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: [HACKERS] Restricting maximum keep segments by repslots
Date
Msg-id 20200123.212854.658794168913258596.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: [HACKERS] Restricting maximum keep segments by repslots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
Responses Re: [HACKERS] Restricting maximum keep segments by repslots  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Re: [HACKERS] Restricting maximum keep segments by repslots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
List pgsql-hackers
Hello, Jehan.

At Wed, 22 Jan 2020 17:47:23 +0100, Jehan-Guillaume de Rorthais <jgdr@dalibo.com> wrote in 
> Hi,
> 
> First, it seems you did not reply to Alvaro's concerns in your new set of
> patch. See:
> 
> https://www.postgresql.org/message-id/20190917195800.GA16694%40alvherre.pgsql

Mmmm. Thank you very much for noticing that, Jehan, and sorry for
overlooking, Alvaro.


At Tue, 17 Sep 2019 16:58:00 -0300, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in 
> suggest a substitute name, because the API itself doesn't convince me; I
> think it would be sufficient to have it return a single slot name,
> perhaps the one that is behind the most ... or maybe the one that is
> behind the least?  This simplifies a lot of code (in particular you do
> away with the bunch of statics, right?), and I don't think the warning
> messages loses anything, because for details the user should really look
> into the monitoring view anyway.

Ok, I removed the fannily-named function. The message become more or
less the following.  The DETAILS might not needed.

| WARNING:  2 replication slots have lost required WAL segments by 5 segments
| DETAIL:  Most affected slot is s1.

> I didn't like GetLsnAvailability() returning a string either.  It seems
> more reasonable to me to define a enum with possible return states, and
> have the enum value be expanded to some string in
> pg_get_replication_slots().

Agreed. Done.

> In the same function, I think that setting restBytes to -1 when
> "useless" is bad style.  I would just leave that variable alone when the
> returned status is not one that receives the number of bytes.  So the
> caller is only entitled to read the value if the returned enum value is
> such-and-such ("keeping" and "streaming" I think).

That is the only condition. If max_slot_wal_keep_size = -1, The value
is useless for the two states.  I added that explanation to the
comment of Get(Lsn)Walavailability().

> I'm somewhat uncomfortable with the API change to GetOldestKeepSegment
> in 0002.  Can't its caller do the math itself instead?

Mmm.  Finally I found that I merged two calculations that have scarce
relation. You're right here. Thanks.

The attached v18 addressed all of your (Alvaro's) comments.



> On Tue, 24 Dec 2019 21:26:14 +0900 (JST)
> Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote:
> > If we assume "losing" segments as "lost", a segment once "lost" can
> > return to "keeping" or "streaming" state. That is intuitively
> > impossible. On the other hand if we assume it as "keeping", it should
> > not be removed by the next checkpoint but actually it can be
> > removed. The state "losing" means such a unstable state different from
> > both "lost" and "keeping".
> 
> OK, indeed.
> 
> But I'm still unconfortable with this "unstable" state. It would be better if
> we could grab a stable state: either "keeping" or "lost".

I feel the same, but the being-removed WAL segments remain until
checkpoint runs and even after removal replication can continue if
walsender is reading the removed-but-already-opened file.  I'll put
more thought on that.

> > In short, the "streaming/normal" state is useless if
> > max_slot_wal_keep_size < max_wal_size.
> 
> Good catch!

Thanks!:)

> > Finally I used the following wordings.
> > 
> > (there's no "inactive" wal_state)
> > 
> > * normal: required WAL within max_wal_size when max_slot_wal_keep_size
> >           is larger than max_wal_size.
> > * keeping: required segments are held by replication slots or
> >   wal_keep_segments.
> > 
> > * losing: required segments are about to be removed or may be already
> >   removed but streaming is not dead yet.
> 
> As I wrote, I'm still uncomfortable with this state. Maybe we should ask
> other reviewers opinions on this.
> 
> [...]
> > >   WARNING:  some replication slots have lost required WAL segments
> > >   DETAIL:  Slot slot_limit_st lost 177 segment(s)
> > > 
> > > I wonder if this is useful to show these messages for slots that were
> > > already dead before this checkpoint?  
> > 
> > Makes sense. I changed KeepLogSeg so that it emits the message only on
> > slot_names changes.
> 
> Thanks.
> 
> Bellow some code review.

Thank you for the review, I don't have a time right now but address
the below comments them soon.


> In regard with FindOldestXLogFileSegNo(...):
> 
> > /*
> >  * Return the oldest WAL segment file.
> >  *
> >  * The returned value is XLogGetLastRemovedSegno() + 1 when the function
> >  * returns a valid value.  Otherwise this function scans over WAL files and
> >  * finds the oldest segment at the first time, which could be very slow.
> >  */
> > XLogSegNo
> > FindOldestXLogFileSegNo(void)
> 
> The comment is not clear to me. I suppose "at the first time" might better be
> expressed as "if none has been removed since last startup"?
> 
> Moreover, what about patching XLogGetLastRemovedSegno() itself instead of
> adding a function?
> 
> In regard with GetLsnAvailability(...):
> 
> > /*
> >  * Detect availability of the record at given targetLSN.
> >  *
> >  * targetLSN is restart_lsn of a slot.
> 
> Wrong argument name. It is called "restart_lsn" in the function
> declaration.
> 
> >  * restBytes is the pointer to uint64 variable, to store the remaining bytes
> >  * until the slot goes into "losing" state.
> 
> I'm not convinced with this argument name. What about "remainingBytes"? Note
> that you use remaining_bytes elsewhere in your patch.
> 
> >  * -1 is stored to restBytes if the values is useless.
> 
> What about returning a true negative value when the slot is really lost?
> 
> All in all, I feel like this function is on the fence between being generic
> because of its name and being slot-only oriented because of the first parameter
> name, use of "max_slot_wal_keep_size_mb", returned status and "slotPtr".
> 
> I wonder if it should be more generic and stay here or move to xlogfuncs.c with
> a more specific name?
> 
> > * slot limitation is not activated, WAL files are kept unlimitedlllly
> 
> "unlimitedly"? "infinitely"? "unconditionally"?
> 
> >   /* it is useless for the states below */
> >   *restBytes = -1;
> 
> This might be set to the real bytes kept, even if status is "losing".
> 
> > * The segment is alrady lost or being lost. If the oldest segment is just
> 
> "already"
> 
> >  if (oldestSeg == restartSeg + 1 && walsender_pid != 0)
> >      return  "losing";
> 
> I wonder if this should be "oldestSeg > restartSeg"?
> Many segments can be removed by the next or running checkpoint. And a running
> walsender can send more than one segment in the meantime I suppose?
> 
> In regard with GetOldestKeepSegment(...):
> 
> > static XLogSegNo
> > GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN,
> >                                       XLogRecPtr targetLSN, int64 *restBytes)
> 
> I wonder if minSlotLSN is really useful as a parameter or if it should be
> fetched from GetOldestKeepSegment() itself? Currently,
> XLogGetReplicationSlotMinimumLSN() is always called right before
> GetOldestKeepSegment() just to fill this argument.
> 
> >      walstate =
> >              GetLsnAvailability(restart_lsn, active_pid, &remaining_bytes);
> 
> I agree with Alvaro: we might want to return an enum and define the related
> state string here. Or, if we accept negative remaining_bytes, GetLsnAvailability
> might even only return remaining_bytes and we deduce the state directly from
> here.
> 
> Regards,

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From cf652ad945242ec7591c62d76de7cf2f81065f9e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH v18 1/6] Add WAL relief vent for replication slots

Replication slot is useful to maintain replication connection in the
configurations where replication is so delayed that connection is
broken. On the other hand so many WAL files can fill up disk that the
master downs by a long delay. This feature, which is activated by a
GUC "max_slot_wal_keep_size", protects master servers from suffering
disk full by limiting the number of WAL files reserved by replication
slots.
---
 src/backend/access/transam/xlog.c             | 141 ++++++++++++++----
 src/backend/replication/slot.c                |  65 ++++++++
 src/backend/utils/misc/guc.c                  |  12 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/replication/slot.h                |   1 +
 6 files changed, 196 insertions(+), 25 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7f4f784c0e..7015300c77 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -104,6 +104,7 @@ int            wal_level = WAL_LEVEL_MINIMAL;
 int            CommitDelay = 0;    /* precommit delay in microseconds */
 int            CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int            wal_retrieve_retry_interval = 5000;
+int            max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool        XLOG_DEBUG = false;
@@ -871,6 +872,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9320,6 +9322,54 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+/*
+ * Returns minimum segment number that the next checkpoint must leave
+ * considering wal_keep_segments, replication slots and
+ * max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location.
+ * minSlotLSN is the minimum restart_lsn of all active slots.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+    XLogSegNo    currSeg;
+    XLogSegNo    minSlotSeg;
+    uint64        keepSegs = 0;    /* # of segments actually kept */
+
+    XLByteToSeg(currLSN, currSeg, wal_segment_size);
+    XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+    /*
+     * Calculate how many segments are kept by slots first. The second
+     * term of the condition is just a sanity check.
+     */
+    if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+        keepSegs = currSeg - minSlotSeg;
+
+    /* Cap keepSegs by max_slot_wal_keep_size */
+    if (max_slot_wal_keep_size_mb >= 0)
+    {
+        uint64 limitSegs;
+
+        limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+        /* Reduce it if slots already reserves too many. */
+        if (limitSegs < keepSegs)
+            keepSegs = limitSegs;
+    }
+
+    /* but, keep at least wal_keep_segments segments if any */
+    if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+        keepSegs = wal_keep_segments;
+
+    /* avoid underflow, don't go below 1 */
+    if (currSeg <= keepSegs)
+        return 1;
+
+    return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9331,38 +9381,79 @@ CreateRestartPoint(int flags)
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
-    XLogSegNo    segno;
-    XLogRecPtr    keep;
+    XLogRecPtr    slotminptr = InvalidXLogRecPtr;
+    XLogSegNo    minSegNo;
+    XLogSegNo    slotSegNo;
 
-    XLByteToSeg(recptr, segno, wal_segment_size);
-    keep = XLogGetReplicationSlotMinimumLSN();
+    if (max_replication_slots > 0)
+        slotminptr = XLogGetReplicationSlotMinimumLSN();
 
-    /* compute limit for wal_keep_segments first */
-    if (wal_keep_segments > 0)
+    /*
+     * We should keep certain number of WAL segments after this checkpoint.
+     */
+    minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+    /*
+     * Warn the checkpoint is going to flush the segments required by
+     * replication slots.
+     */
+    if (!XLogRecPtrIsInvalid(slotminptr))
     {
-        /* avoid underflow, don't go below 1 */
-        if (segno <= wal_keep_segments)
-            segno = 1;
+        static XLogSegNo prev_lost_segs = 0;    /* avoid duplicate messages */
+
+        XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+        if (slotSegNo < minSegNo)
+        {
+            XLogSegNo lost_segs = minSegNo - slotSegNo;
+            if (prev_lost_segs != lost_segs)
+            {
+                /* We have lost a new segment, warn it.*/
+                XLogRecPtr minlsn;
+                static char *prev_slot_names = NULL;
+                char *slot_names;
+                int nslots;
+
+                XLogSegNoOffsetToRecPtr(minSegNo, 0, wal_segment_size, minlsn);
+                slot_names =
+                    ReplicationSlotsEnumerateBehinds(minlsn, ", ", &nslots);
+
+                /*
+                 * Some of the affected slots could have just been removed. We
+                 * don't need show anything here if no affected slots are
+                 * remaining.
+                 */
+                if (slot_names)
+                {
+                    if (prev_slot_names == NULL ||
+                        strcmp(slot_names, prev_slot_names) != 0)
+                    {
+                        MemoryContext oldcxt;
+
+                        ereport(WARNING,
+                                (errmsg ("some replication slots have lost required WAL segments"),
+                                 errdetail_plural(
+                                     "Slot %s lost %ld segment(s).",
+                                     "Slots %s lost at most %ld segment(s).",
+                                     nslots, slot_names, lost_segs)));
+
+                        if (prev_slot_names)
+                            pfree(prev_slot_names);
+                        oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+                        prev_slot_names = pstrdup(slot_names);
+                        MemoryContextSwitchTo(oldcxt);
+                    }
+                }
+            }
+            prev_lost_segs = lost_segs;
+        }
         else
-            segno = segno - wal_keep_segments;
-    }
-
-    /* then check whether slots limit removal further */
-    if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-    {
-        XLogSegNo    slotSegNo;
-
-        XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-        if (slotSegNo <= 0)
-            segno = 1;
-        else if (slotSegNo < segno)
-            segno = slotSegNo;
+            prev_lost_segs = 0;
     }
 
     /* don't delete WAL segments newer than the calculated segment */
-    if (segno < *logSegNo)
-        *logSegNo = segno;
+    if (minSegNo < *logSegNo)
+        *logSegNo = minSegNo;
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 976f6479a9..fcaede60d0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/memutils.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -1064,6 +1065,70 @@ ReplicationSlotReserveWal(void)
     }
 }
 
+/*
+ * Returns names of inactive replication slots that their restart_lsn are
+ * behind specified LSN for the purpose of error message character array
+ * stuffed with slot names delimited by the given separator. Returns NULL if no
+ * slot matches. If pnslots is given, the number of the returned slots is
+ * returned there. The returned array is palloc'ed in TopMemoryContext.
+ */
+char *
+ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots)
+{
+    static StringInfoData retstr;
+    static bool retstr_initialized = false;
+    bool insert_separator = false;
+    int i;
+    int nslots = 0;
+
+    Assert (separator);
+    if (max_replication_slots <= 0)
+        return NULL;
+
+    if (!retstr_initialized)
+    {
+        MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+        initStringInfo(&retstr);
+        MemoryContextSwitchTo(oldcxt);
+        retstr_initialized = true;
+    }
+    else
+        resetStringInfo(&retstr);
+
+    /* construct name list */
+    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+    for (i = 0 ; i < max_replication_slots ; i++)
+    {
+        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+        /*
+         * We are collecting slots that are definitely behind the given target
+         * LSN. Active slots are exluded since they can catch up later.
+         */
+        if (s->in_use && s->active_pid == 0 && s->data.restart_lsn < target)
+        {
+            if (insert_separator)
+                appendStringInfoString(&retstr, separator);
+
+            /*
+             * Slot names consist only with lower-case letters. We don't
+             * bother quoting.
+             */
+            appendStringInfoString(&retstr, NameStr(s->data.name));
+            insert_separator = true;
+            nslots++;
+        }
+    }
+    LWLockRelease(ReplicationSlotControlLock);
+
+    /* return the number of slots in the list if requested */
+    if (pnslots)
+        *pnslots = nslots;
+
+    /* return NULL instead of an empty string */
+    return retstr.data[0] ? retstr.data : NULL;
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e44f71e991..0d01e1f042 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2695,6 +2695,18 @@ static struct config_int ConfigureNamesInt[] =
         NULL, NULL, NULL
     },
 
+    {
+        {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+            gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+         NULL,
+         GUC_UNIT_MB
+        },
+        &max_slot_wal_keep_size_mb,
+        -1, -1,
+        MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */
+        NULL, NULL, NULL
+    },
+
     {
         {"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
             gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e1048c0047..8a39bf7582 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -287,6 +287,7 @@
 #max_wal_senders = 10        # max number of walsender processes
                 # (change requires restart)
 #wal_keep_segments = 0        # in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1    # measured in bytes; -1 disables
 #wal_sender_timeout = 60s    # in milliseconds; 0 disables
 
 #max_replication_slots = 10    # max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 98b033fc20..5d117d5cfc 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -108,6 +108,7 @@ extern int    wal_segment_size;
 extern int    min_wal_size_mb;
 extern int    max_wal_size_mb;
 extern int    wal_keep_segments;
+extern int    max_slot_wal_keep_size_mb;
 extern int    XLOGbuffers;
 extern int    XLogArchiveTimeout;
 extern int    wal_retrieve_retry_interval;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 3e95b019b3..09b0ab7953 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -198,6 +198,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
+extern char *ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
2.18.2

From b0fb4d797697fc9d96f88a61b7464613f150cbed Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH v18 2/6] Add monitoring aid for max_slot_wal_keep_size

Adds two columns "status" and "remain" in pg_replication_slot. Setting
max_slot_wal_keep_size, replication connections may lose sync by a
long delay. The "status" column shows whether the slot is
reconnectable or not, or about to lose reserving WAL segments. The
"remain" column shows the remaining bytes of WAL that can be advance
until the slot loses required WAL records.
---
 contrib/test_decoding/expected/ddl.out |   4 +-
 contrib/test_decoding/sql/ddl.sql      |   2 +
 src/backend/access/transam/xlog.c      | 298 +++++++++++++++++++++----
 src/backend/catalog/system_views.sql   |   4 +-
 src/backend/replication/slot.c         |  64 ------
 src/backend/replication/slotfuncs.c    |  39 +++-
 src/include/access/xlog.h              |  18 ++
 src/include/catalog/pg_proc.dat        |   6 +-
 src/include/replication/slot.h         |   1 -
 src/test/regress/expected/rules.out    |   6 +-
 10 files changed, 328 insertions(+), 114 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 2c999fd3eb..cf0318f697 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -723,8 +723,8 @@ SELECT pg_drop_replication_slot('regression_slot');
 (1 row)
 
 /* check that the slot is gone */
+\x
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn| confirmed_flush_lsn 
 

------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
 (0 rows)
 
+\x
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index 856495c952..0f2b9992f7 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -387,4 +387,6 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
+\x
 SELECT * FROM pg_replication_slots;
+\x
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7015300c77..8a83f87c8a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -3900,6 +3900,55 @@ XLogGetLastRemovedSegno(void)
     return lastRemovedSegNo;
 }
 
+/*
+ * Return the oldest WAL segment file.
+ *
+ * The returned value is XLogGetLastRemovedSegno() + 1 when the function
+ * returns a valid value.  Otherwise this function scans over WAL files and
+ * finds the oldest segment at the first time, which could be very slow.
+ */
+XLogSegNo
+FindOldestXLogFileSegNo(void)
+{
+    static XLogSegNo lastFoundOldestSeg = 0;
+    DIR        *xldir;
+    struct dirent *xlde;
+    XLogSegNo segno = XLogGetLastRemovedSegno();
+
+    if (segno > 0)
+        return segno + 1;
+
+    if (lastFoundOldestSeg > 0)
+        return lastFoundOldestSeg;
+
+    xldir = AllocateDir(XLOGDIR);
+    while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+    {
+        TimeLineID tli;
+        XLogSegNo fsegno;
+
+        /* Ignore files that are not XLOG segments */
+        if (!IsXLogFileName(xlde->d_name) &&
+            !IsPartialXLogFileName(xlde->d_name))
+            continue;
+
+        XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+        /*
+         * Get minimum segment ignoring timeline ID, the same way with
+         * RemoveOldXlogFiles().
+         */
+        if (segno == 0 || fsegno < segno)
+            segno = fsegno;
+    }
+
+    FreeDir(xldir);
+
+    lastFoundOldestSeg = segno;
+
+    return segno;
+}
+
 /*
  * Update the last removed segno pointer in shared memory, to reflect
  * that the given XLOG file has been removed.
@@ -9322,6 +9371,105 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+/*
+ * Detect availability of the record at given targetLSN.
+ *
+ * targetLSN is restart_lsn of a slot.
+ * walsender_pid is the slot's walsender PID.
+ * restBytes is pointer to uint64 variable, to store the remaining bytes until
+ * the slot goes into WAL_BEING_REMOVED state if max_slot_wal_keep_size >=
+ * 0. It is set only when WALAVAIL_NORMAL or WALAVAIL_PRESERVED is returned.
+ *
+ * Returns one of the following enum values.
+ *
+ * WALAVAIL_NORMAL_ means targetLSN is available because it is in the range of
+ * max_wal_size.  If max_slot_wal_keep_size is smaller than max_wal_size, this
+ * state is not returned.
+ *
+ * WALAVAIL_PRESERVED means it is still available by preserving extra segments
+ * beyond max_wal_size.
+ *
+ * WALAVAIL_BEING_REMOVED means it is being removed or already removed but the
+ * replication stream on the given slot is live yet. The state may transit to
+ * WALAVAIL_PRESERVED or WALAVAIL_NORMAL state if the walsender advances
+ * restart_lsn.
+ *
+ * WALAVAIL_REMOVED means it is definitly lost. The replication stream on the
+ * slot cannot continue.
+ *
+ * returns WALAVAIL_NULL if restart_lsn is invalid.
+ */
+WalAvailability
+GetWalAvailability(XLogRecPtr restart_lsn, pid_t walsender_pid)
+{
+    XLogRecPtr currpos;
+    XLogRecPtr slotPtr;
+    XLogSegNo currSeg;        /* segid of currpos */
+    XLogSegNo restartSeg;    /* segid of restart_lsn */
+    XLogSegNo oldestSeg;    /* actual oldest segid */
+    XLogSegNo oldestSegMaxWalSize;    /* oldest segid kept by max_wal_size */
+    XLogSegNo oldestSlotSeg;/* oldest segid kept by slot */
+    uint64      keepSegs;
+
+    /* the case where the slot has never been activated */
+    if (XLogRecPtrIsInvalid(restart_lsn))
+        return WALAVAIL_INVALID_LSN;
+
+    currpos = GetXLogWriteRecPtr();
+
+    /* calculate oldest segment currently needed by slots */
+    XLByteToSeg(restart_lsn, restartSeg, wal_segment_size);
+    slotPtr = XLogGetReplicationSlotMinimumLSN();
+    oldestSlotSeg = GetOldestKeepSegment(currpos, slotPtr);
+
+    /* find the oldest segment file actually exists */
+    oldestSeg = FindOldestXLogFileSegNo();
+
+    /* calculate oldest segment by max_wal_size */
+    XLByteToSeg(currpos, currSeg, wal_segment_size);
+    keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
+
+    if (currSeg > keepSegs)
+        oldestSegMaxWalSize = currSeg - keepSegs;
+    else
+        oldestSegMaxWalSize = 1;
+
+    /*
+     * If max_slot_wal_keep_size has changed after the last call, the segment
+     * that would been kept by the current setting might have been lost by the
+     * previous setting. No point in showing normal or keeping status values if
+     * the restartSeg is known to be lost.
+     */
+    if (restartSeg >= oldestSeg)
+    {
+        /*
+         * show "normal" when restartSeg is within max_wal_size. If
+         * max_slot_wal_keep_size is smaller than max_wal_size, there's no
+         * point in showing the status.
+         */
+        if ((max_slot_wal_keep_size_mb <= 0 ||
+             max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
+            oldestSegMaxWalSize <= restartSeg)
+            return WALAVAIL_NORMAL;
+
+        /* being retained by slots */
+        if (oldestSlotSeg <= restartSeg)
+            return WALAVAIL_PRESERVED;
+    }
+    
+    /*
+     * The segment is alrady lost or being lost. If the oldest segment is just
+     * after the restartSeg, running walsender may be reading the just removed
+     * segment. The walsender may safely move to the oldest existing segment in
+     * that case.
+     */
+    if (oldestSeg == restartSeg + 1 && walsender_pid != 0)
+        return    WALAVAIL_BEING_REMOVED;
+
+    /* definitely lost. the walsender can no longer restart */
+    return WALAVAIL_REMOVED;
+}
+
 /*
  * Returns minimum segment number that the next checkpoint must leave
  * considering wal_keep_segments, replication slots and
@@ -9370,6 +9518,53 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
     return currSeg - keepSegs;
 }
 
+/*
+ * Calculate remaining bytes until WAL segment for targetLSN will be removed.
+ */
+int64
+DistanceToWalRemoval(XLogRecPtr currLSN, XLogRecPtr targetLSN)
+{
+    XLogSegNo    currSeg;
+    uint64        limitSegs = 0;
+    int64         restbytes;
+    uint64        fragbytes;
+    XLogSegNo    targetSeg;
+
+    XLByteToSeg(currLSN, currSeg, wal_segment_size);
+
+    /* Calculate how far back WAL segments are preserved */
+    if (max_slot_wal_keep_size_mb >= 0)
+        limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+    if (wal_keep_segments > 0 && limitSegs < wal_keep_segments)
+        limitSegs = wal_keep_segments;
+
+    XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+
+    /* avoid underflow */
+    if (targetSeg + limitSegs < currSeg)
+        return 0;
+
+    /*
+     * This slot still has all required segments. Calculate how
+     * many LSN bytes the slot has until it loses targetLSN.
+     */
+    fragbytes = wal_segment_size - (currLSN % wal_segment_size);
+    XLogSegNoOffsetToRecPtr(targetSeg + limitSegs - currSeg,
+                            fragbytes, wal_segment_size,
+                            restbytes);
+
+    /*
+     * not realistic, but make sure that it is not out of the
+     * range of int64. No problem to do so since such large values
+     * have no significant difference.
+     */
+    if (restbytes > PG_INT64_MAX)
+        restbytes = PG_INT64_MAX;
+
+    return restbytes;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9381,9 +9576,13 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
+    static XLogSegNo last_lost_segs = 0;
+    static int last_nslots = 0;
+    static char *last_slot_name = NULL;
     XLogRecPtr    slotminptr = InvalidXLogRecPtr;
     XLogSegNo    minSegNo;
-    XLogSegNo    slotSegNo;
+    XLogSegNo    minSlotSegNo;
+    int            nslots_affected = 0;
 
     if (max_replication_slots > 0)
         slotminptr = XLogGetReplicationSlotMinimumLSN();
@@ -9399,56 +9598,75 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
      */
     if (!XLogRecPtrIsInvalid(slotminptr))
     {
-        static XLogSegNo prev_lost_segs = 0;    /* avoid duplicate messages */
+        Assert (max_replication_slots > 0);
 
-        XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+        XLByteToSeg(slotminptr, minSlotSegNo, wal_segment_size);
 
-        if (slotSegNo < minSegNo)
+        if (minSlotSegNo < minSegNo)
         {
-            XLogSegNo lost_segs = minSegNo - slotSegNo;
-            if (prev_lost_segs != lost_segs)
+            /* Some slots has lost requred segments */
+            XLogSegNo    lost_segs = minSegNo - minSlotSegNo;
+            ReplicationSlot *earliest = NULL;
+            char       *earliest_name = NULL;
+            int            i;
+
+            /* Find the most affected slot */
+            LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+            for (i = 0 ; i < max_replication_slots ; i++)
             {
-                /* We have lost a new segment, warn it.*/
-                XLogRecPtr minlsn;
-                static char *prev_slot_names = NULL;
-                char *slot_names;
-                int nslots;
+                ReplicationSlot *s =
+                    &ReplicationSlotCtl->replication_slots[i];
+                XLogSegNo slotSegNo;
 
-                XLogSegNoOffsetToRecPtr(minSegNo, 0, wal_segment_size, minlsn);
-                slot_names =
-                    ReplicationSlotsEnumerateBehinds(minlsn, ", ", &nslots);
+                XLByteToSeg(s->data.restart_lsn, slotSegNo, wal_segment_size);
 
-                /*
-                 * Some of the affected slots could have just been removed. We
-                 * don't need show anything here if no affected slots are
-                 * remaining.
-                 */
-                if (slot_names)
+                if (s->in_use && s->active_pid == 0 && slotSegNo < minSegNo)
                 {
-                    if (prev_slot_names == NULL ||
-                        strcmp(slot_names, prev_slot_names) != 0)
-                    {
-                        MemoryContext oldcxt;
+                    nslots_affected++;
 
-                        ereport(WARNING,
-                                (errmsg ("some replication slots have lost required WAL segments"),
-                                 errdetail_plural(
-                                     "Slot %s lost %ld segment(s).",
-                                     "Slots %s lost at most %ld segment(s).",
-                                     nslots, slot_names, lost_segs)));
-
-                        if (prev_slot_names)
-                            pfree(prev_slot_names);
-                        oldcxt = MemoryContextSwitchTo(TopMemoryContext);
-                        prev_slot_names = pstrdup(slot_names);
-                        MemoryContextSwitchTo(oldcxt);
-                    }
+                    if (earliest == NULL ||
+                        s->data.restart_lsn < earliest->data.restart_lsn)
+                        earliest = s;
                 }
             }
-            prev_lost_segs = lost_segs;
+
+            if (earliest)
+            {
+                MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+                earliest_name = pstrdup(NameStr(earliest->data.name));
+                MemoryContextSwitchTo(oldcxt);
+            }
+
+            LWLockRelease(ReplicationSlotControlLock);
+
+            /* Emit WARNING if something has changed */
+            if (earliest_name &&
+                (last_lost_segs != lost_segs || last_nslots != nslots_affected))
+            {
+                ereport(WARNING,
+                        (errmsg_plural ("%d replication slot has lost required WAL segments by %lu segments",
+                                        "%d replication slots have lost required WAL segments by %lu segments",
+                                        nslots_affected, nslots_affected,
+                                        lost_segs),
+                         errdetail("Most affected slot is %s.",
+                                   earliest_name)));
+
+                if (last_slot_name)
+                    pfree(last_slot_name);
+                last_slot_name = earliest_name;
+                last_lost_segs = lost_segs;
+                last_nslots = nslots_affected;
+            }
         }
-        else
-            prev_lost_segs = 0;
+    }
+
+    /* Reset the state if no affected slots remain. */
+    if (nslots_affected == 0 && last_slot_name)
+    {
+        pfree(last_slot_name);
+        last_slot_name = NULL;
+        last_lost_segs = 0;
+        last_nslots = 0;
     }
 
     /* don't delete WAL segments newer than the calculated segment */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c9e75f4370..a3c7373d4f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -860,7 +860,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index fcaede60d0..bba61fd324 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1065,70 +1065,6 @@ ReplicationSlotReserveWal(void)
     }
 }
 
-/*
- * Returns names of inactive replication slots that their restart_lsn are
- * behind specified LSN for the purpose of error message character array
- * stuffed with slot names delimited by the given separator. Returns NULL if no
- * slot matches. If pnslots is given, the number of the returned slots is
- * returned there. The returned array is palloc'ed in TopMemoryContext.
- */
-char *
-ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots)
-{
-    static StringInfoData retstr;
-    static bool retstr_initialized = false;
-    bool insert_separator = false;
-    int i;
-    int nslots = 0;
-
-    Assert (separator);
-    if (max_replication_slots <= 0)
-        return NULL;
-
-    if (!retstr_initialized)
-    {
-        MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext);
-        initStringInfo(&retstr);
-        MemoryContextSwitchTo(oldcxt);
-        retstr_initialized = true;
-    }
-    else
-        resetStringInfo(&retstr);
-
-    /* construct name list */
-    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-    for (i = 0 ; i < max_replication_slots ; i++)
-    {
-        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-
-        /*
-         * We are collecting slots that are definitely behind the given target
-         * LSN. Active slots are exluded since they can catch up later.
-         */
-        if (s->in_use && s->active_pid == 0 && s->data.restart_lsn < target)
-        {
-            if (insert_separator)
-                appendStringInfoString(&retstr, separator);
-
-            /*
-             * Slot names consist only with lower-case letters. We don't
-             * bother quoting.
-             */
-            appendStringInfoString(&retstr, NameStr(s->data.name));
-            insert_separator = true;
-            nslots++;
-        }
-    }
-    LWLockRelease(ReplicationSlotControlLock);
-
-    /* return the number of slots in the list if requested */
-    if (pnslots)
-        *pnslots = nslots;
-
-    /* return NULL instead of an empty string */
-    return retstr.data[0] ? retstr.data : NULL;
-}
-
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index bb69683e2a..83533ea6c2 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -221,7 +221,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     TupleDesc    tupdesc;
     Tuplestorestate *tupstore;
@@ -275,6 +275,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         Oid            database;
         NameData    slot_name;
         NameData    plugin;
+        WalAvailability walstate;
         int            i;
 
         if (!slot->in_use)
@@ -342,6 +343,42 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         else
             nulls[i++] = true;
 
+        walstate = GetWalAvailability(restart_lsn, active_pid);
+
+        switch (walstate)
+        {
+            case WALAVAIL_INVALID_LSN:
+                nulls[i++] = true;
+                break;
+
+            case WALAVAIL_NORMAL:
+                values[i++] = CStringGetTextDatum("normal");
+                break;
+
+            case WALAVAIL_PRESERVED:
+                values[i++] = CStringGetTextDatum("keeping");
+                break;
+
+            case WALAVAIL_BEING_REMOVED:
+                values[i++] = CStringGetTextDatum("losing");
+                break;
+
+            case WALAVAIL_REMOVED:
+                values[i++] = CStringGetTextDatum("lost");
+                break;
+        }
+
+        if (max_slot_wal_keep_size_mb >=0 &&
+            (walstate == WALAVAIL_NORMAL ||
+             walstate == WALAVAIL_PRESERVED))
+        {
+            XLogRecPtr currptr = GetXLogWriteRecPtr();
+            values[i++] =
+                Int64GetDatum(DistanceToWalRemoval(currptr, restart_lsn));
+        }
+        else
+            nulls[i++] = true;
+
         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
     }
     LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 5d117d5cfc..52ff676638 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -255,6 +255,20 @@ typedef struct CheckpointStatsData
 
 extern CheckpointStatsData CheckpointStats;
 
+/*
+ * WAL segment availability status
+ *
+ * This is used as the return value of GetWalAvailability.
+ */
+typedef enum WalAvailability
+{
+    WALAVAIL_INVALID_LSN,            /* parameter errror */
+    WALAVAIL_NORMAL,                /* WAL segment is within max_wal_size */
+    WALAVAIL_PRESERVED,                /* WAL segment is preserved by repslots */
+    WALAVAIL_BEING_REMOVED,            /* WAL segment is no longer preserved */
+    WALAVAIL_REMOVED                /* WAL segment has been removed */
+} WalAvailability;
+
 struct XLogRecData;
 
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
@@ -268,6 +282,7 @@ extern int    XLogFileOpen(XLogSegNo segno);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern XLogSegNo XLogGetLastRemovedSegno(void);
+extern XLogSegNo FindOldestXLogFileSegNo(void);
 extern void XLogSetAsyncXactLSN(XLogRecPtr record);
 extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
 
@@ -304,6 +319,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern WalAvailability GetWalAvailability(XLogRecPtr restart_lsn,
+                                          pid_t walsender_pid);
+extern int64 DistanceToWalRemoval(XLogRecPtr currLSN, XLogRecPtr targetLSN);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fcf2a1214c..e70e62a657 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9892,9 +9892,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 09b0ab7953..3e95b019b3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -198,7 +198,6 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern char *ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 70e1e2f78d..4dec2b1c3d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1461,8 +1461,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn)
 
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn, wal_status, remain)
 
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.18.2

From 232e0b6ae9da6ae9fc0cd0fe7b50984eba6bb4d6 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 19 Dec 2018 12:43:57 +0900
Subject: [PATCH v18 3/6] Add primary_slot_name to init_from_backup in TAP
 test.

It is convenient that priary_slot_name can be specified on taking a
base backup. This adds a new parameter of the name to the perl
function.
---
 src/test/perl/PostgresNode.pm | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 2e0cf4a2f3..5f2659c3fc 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -698,6 +698,10 @@ port = $port
         $self->append_conf('postgresql.conf',
             "unix_socket_directories = '$host'");
     }
+    $self->append_conf('postgresql.conf',
+                       qq(primary_slot_name = $params{primary_slot_name}))
+      if (defined $params{primary_slot_name});
+
     $self->enable_streaming($root_node) if $params{has_streaming};
     $self->enable_restoring($root_node) if $params{has_restoring};
     return;
-- 
2.18.2

From 6a77f2c86ca26abd1d2b2da95f8df80256a53ff7 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 17:33:53 +0900
Subject: [PATCH v18 4/6] TAP test for the slot limit feature

---
 src/test/recovery/t/018_replslot_limit.pl | 202 ++++++++++++++++++++++
 1 file changed, 202 insertions(+)
 create mode 100644 src/test/recovery/t/018_replslot_limit.pl

diff --git a/src/test/recovery/t/018_replslot_limit.pl b/src/test/recovery/t/018_replslot_limit.pl
new file mode 100644
index 0000000000..6688167546
--- /dev/null
+++ b/src/test/recovery/t/018_replslot_limit.pl
@@ -0,0 +1,202 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slots.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 13;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node, setting wal-segsize to 1MB
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 2MB
+max_wal_size = 4MB
+log_checkpoints = yes
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+# The slot state and remain should be null before the first connection
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn is NULL, wal_status is NULL, remain is NULL FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1');
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, the slot is the state "normal" now
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|normal|t", 'check the catching-up state');
+
+# Advance WAL by five segments (= 5MB) on master
+advance_wal($node_master, 1);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when fitting max_wal_size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|normal|t", 'check that restart_lsn is in max_wal_size');
+
+advance_wal($node_master, 4);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when max_slot_wal_keep_size is not set
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|normal|t", 'check that slot is working');
+
+# The standby can reconnect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 6;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state. The remaining bytes should be as almost
+# (max_slot_wal_keep_size + 1) times large as the segment size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|normal|7168 kB", 'check that max_slot_wal_keep_size is working');
+
+# Advance WAL again then checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|normal|5120 kB", 'check that remaining byte is calculated correctly');
+
+# wal_keep_segments overrides max_slot_wal_keep_size
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 8; SELECT pg_reload_conf();");
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|normal|7168 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size');
+
+# restore wal_keep_segments
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# Advance WAL again without checkpoint, reducing remain by 4 MB.
+advance_wal($node_master, 4);
+
+# Slot gets into 'keeping' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|keeping|1024 kB", 'check that the slot state changes to "keeping"');
+
+# do checkpoint so that the next checkpoint runs too early
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# Advance WAL again without checkpoint; remain goes to 0.
+advance_wal($node_master, 1);
+
+# Slot gets into 'lost' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby still can connect to master before a checkpoint
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+                "requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that required WAL segments are still available');
+
+# Advance WAL again, the slot loses the oldest segment.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 7);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+               "1 replication slot has lost required WAL segments by 1 segments\n".
+               ".*Most affected slot is rep1.",
+               $logstart),
+   'check that the warning is logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+    if (find_in_log($node_standby,
+                    "requested WAL segment [0-9A-F]+ has already been removed",
+                    $logstart))
+    {
+        $failed = 1;
+        last;
+    }
+    usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+    my ($node, $n) = @_;
+
+    # Advance by $n segments (= (16 * $n) MB) on master
+    for (my $i = 0 ; $i < $n ; $i++)
+    {
+        $node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+    }
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+    my ($node) = @_;
+
+    return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+    my ($node, $pat, $off) = @_;
+
+    $off = 0 unless defined $off;
+    my $log = TestLib::slurp_file($node->logfile);
+    return 0 if (length($log) <= $off);
+
+    $log = substr($log, $off);
+
+    return $log =~ m/$pat/;
+}
-- 
2.18.2

From 84403b9717d4090513a12a3499b5a9b181efe68a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 11 Jan 2018 15:00:32 +0900
Subject: [PATCH v18 5/6] Documentation for slot-limit feature

---
 doc/src/sgml/catalogs.sgml          | 37 +++++++++++++++++++++++++++++
 doc/src/sgml/config.sgml            | 23 ++++++++++++++++++
 doc/src/sgml/high-availability.sgml |  8 ++++---
 3 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 85ac79f07e..58dd7b6445 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9974,6 +9974,43 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL files claimed by this
+      slot. <literal>normal</literal>, <literal>keeping</literal>,
+      <literal>losing</literal> or <literal>lost</literal>.
+      <literal>normal</literal> means that the claimed files are
+      available within max_wal_size. <literal>keeping</literal> means
+      max_wal_size is exceeded but still held by replication slots or
+      wal_keep_segments.
+      <literal>losing</literal> means that some of them are on the verge of
+      removal but the using session may go further.
+      <literal>lost</literal> means that some of them are definitely lost and
+      the session that used this slot cannot continue replication. This state
+      also implies the using session has been stopped.
+
+      The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is
+      non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+      field is null.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>remain</structfield></entry>
+      <entry><type>bigint</type></entry>
+      <entry></entry>
+      <entry>The amount in bytes WAL location (LSN) can advance that bytes
+        until this slot may lose required WAL
+        files. If <structfield>restart_lsn</structfield> is null
+        or <structfield>wal_status</structfield> is <literal>losing</literal>
+        or <literal>lost</literal>, this field is null.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3ccacd528b..3e8884458c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3730,6 +3730,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited amount of WAL files.  If
+        restart_lsn of a replication slot gets behind more than that megabytes
+        from the current LSN, the standby using the slot may no longer be able
+        to continue replication due to removal of required WAL files. You
+        can see the WAL availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index bc4d98fe03..328464c240 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allocated
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
-- 
2.18.2

From b12f9890f129f9cd6e5a811adc656069b429c108 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 26 Oct 2018 10:07:05 +0900
Subject: [PATCH v18 6/6] Check removal of in-reading segment file.

Checkpoints can recycle a segment file while it is being read by
ReadRecord and that leads to an apparently odd error message during
logical decoding. This patch explicitly checks that then error out
immediately.  Reading a recycled file is safe. Inconsistency caused by
overwrites as a new segment are caught by page/record validation. So
this is only for keeping consistency with the wal_status shown in
pg_replication_slots.
---
 src/backend/access/transam/xlogreader.c | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3aa68127a3..f6566d17ae 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -246,7 +246,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
     uint32        pageHeaderSize;
     bool        gotheader;
     int            readOff;
-
+#ifndef FRONTEND
+    XLogSegNo    targetSegNo;
+#endif
     /*
      * randAccess indicates whether to verify the previous-record pointer of
      * the record we're reading.  We only do this if we're reading
@@ -292,6 +294,22 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
     targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
     targetRecOff = RecPtr % XLOG_BLCKSZ;
 
+#ifndef FRONTEND
+    /*
+     * Although It's safe that the current segment is recycled as a new
+     * segment since we check the page/record header at reading, it leads to
+     * an apparently strange error message when logical replication, which can
+     * be prevented by explicitly checking if the current segment is removed.
+     */
+    XLByteToSeg(targetPagePtr, targetSegNo, state->segcxt.ws_segsize);
+    if (targetSegNo <= XLogGetLastRemovedSegno())
+    {
+        report_invalid_record(state,
+                              "WAL segment for LSN %X/%X has been removed",
+                              (uint32)(RecPtr >> 32), (uint32) RecPtr);
+        goto err;
+    }
+#endif
     /*
      * Read the page containing the record into state->readBuf. Request enough
      * byte to cover the whole record header, or at least the part of it that
-- 
2.18.2


pgsql-hackers by date:

Previous
From: Mahendra Singh Thalor
Date:
Subject: Re: Error message inconsistency
Next
From: Kyotaro Horiguchi
Date:
Subject: Re: [HACKERS] Restricting maximum keep segments by repslots