Re: [HACKERS] Restricting maximum keep segments by repslots - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: [HACKERS] Restricting maximum keep segments by repslots
Date
Msg-id 20200406.185027.648866525989475817.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: [HACKERS] Restricting maximum keep segments by repslots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
Responses Re: [HACKERS] Restricting maximum keep segments by repslots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
Re: [HACKERS] Restricting maximum keep segments by repslots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
List pgsql-hackers
At Fri, 3 Apr 2020 20:14:03 -0300, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in 
> So, the more I look at this patch, the less I like the way the slots are
> handled.
> 
> * I think it's a mistake to try to do anything in KeepLogSeg itself;
>   that function is merely in charge of some arithmetic.  I propose to
>   make that function aware of the new size limitation (so that it
>   doesn't trust the slot's LSNs completely), but to make the function
>   have no side effects.  The attached patch does that, I hope.
>   To replace that responsibility, let's add another function.  I named it
>   InvalidateObsoleteReplicationSlots().  In CreateCheckPoint and
>   CreateRestartPoint, we call the new function just before removing
>   segments.  Note: the one in this patch doesn't actually work or even
>   compile.

Agreed and thanks for the code. The patch is enough to express the
intention. I fixed some compilation errors and made a clean up of
KeepLogSeg.  InvalidateObsoleteReplicationSlots requires the "oldest
preserved segment" so it should be called before _logSegNo--, not
after.

>   The new function must:
> 
>   1. mark the slot as "invalid" somehow.  Maybe it would make sense to
>   add a new flag in the on-disk struct for this; but for now I'm just
>   thinking that changing the slot's restart_lsn is sufficient.
>   (Of course, I haven't tested this, so there might be side-effects that
>   mean that this idea doesn't work).
> 
>   2. send SIGTERM to a walsender that's using such a slot.
> 
>   3. Send the warning message.  Instead of trying to construct a message
>   with a list of slots, send one message per slot.  (I think this is
>   better from a translatability point of view, and also from a
>   monitoring PoV).
> 
> * GetWalAvailability seems too much in competition with
>   DistanceToWalRemoval.  Which is weird, because both functions do
>   pretty much the same thing.  I think a better design is to make the
>   former function return the distance as an out parameter.

I agree to the aboves. When a slot is invlidated, the following
message is logged.

LOG: slot rep1 is invalidated at 0/1C00000 due to exceeding max_slot_wal_keep_size

> * Andres complained that the "distance" column was not a great value to
>   expose (20171106132050.6apzynxrqrzghb4r@alap3.anarazel.de).  That's
>   right: it changes both by the insertion LSN as well as the slot's
>   consumption.  Maybe we can expose the earliest live LSN (start of the
>   earliest segment?) as a new column.  It'll be the same for all slots,
>   I suppose, but we don't care, do we?

I don't care as far as users can calculate the "remain" of individual
slots (that is, how far the current LSN can advance before the slot
loses data). But the "earliest live LSN (EL-LSN) is really not
relevant to the safeness of each slot. The distance from EL-LSN to
restart_lsn or the current LSN doesn't generally suggest the safeness
of individual slots.  The only relevance would be if the distance from
EL-LSN to the current LSN is close to max_slot_wal_keep_size, the most
lagged slot could die in a short term.

FWIW, the relationship between the values are shown below.

                                    (now)>>>
<--- past ----------------------------+--------------------future --->
 lastRemovedSegment + 1
 "earliest_live_lsn"                                    | segment X |
 |   min(restart_lsn) restart_lsn[i]  current_lsn       |   "The LSN X"
.+...+................+...............+>>>..............|...+       |
                      <--------max_slot_wal_keep_size------>        |
                                       <---"remain" --------------->|

So the "remain" is calculated using "restart_lsn(pg_lsn)",
max_slot_wal_keep_size(int in MB), wal_keep_segments(in segments) and
wal_segment_size (int in MB) and pg_current_wal_lsn()(pg_lsn).  The
formula could be simplified by ignoring the segment size, but anyway
we don't have an arithmetic between pg_lsn and int in SQL interface.

Anyway in this version I added the "min_safe_lsn". And adjust the TAP
tests for that. It can use (pg_current_wal_lsn() - min_safe_lsn) as
the alternative index since there is only one slot while the test.

> I attach a rough sketch, which as I said before doesn't work and doesn't
> compile.  Sadly I have reached the end of my day here so I won't be able
> to work on this for today anymore.  I'll be glad to try again tomorrow,
> but in the meantime I thought it was better to send it over and see
> whether you had any thoughts about this proposed design (maybe you know
> it doesn't work for some reason), or better yet, you have the chance to
> actually complete the code or at least move it a little further.

WALAVAIL_BEING_REMOVED is removed since walsender is now actively
killed.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From b2815ce65fd72b5fcb85d785588b4a5adc5f99ae Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Mon, 6 Apr 2020 17:56:46 +0900
Subject: [PATCH v24] Add WAL relief vent for replication slots

Replication slot is useful to maintain replication connection in the
configurations where replication is so delayed that connection is
broken. On the other hand so many WAL files can fill up disk that the
master downs by a long delay. This feature, which is activated by a
GUC "max_slot_wal_keep_size", protects master servers from suffering
disk full by limiting the number of WAL files reserved by replication
slots.
---
 doc/src/sgml/catalogs.sgml                    |  39 +++++
 doc/src/sgml/config.sgml                      |  23 +++
 doc/src/sgml/high-availability.sgml           |   8 +-
 src/backend/access/transam/xlog.c             | 145 +++++++++++++++---
 src/backend/catalog/system_views.sql          |   4 +-
 src/backend/replication/slot.c                |  52 +++++++
 src/backend/replication/slotfuncs.c           |  38 ++++-
 src/backend/utils/misc/guc.c                  |  13 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |  17 ++
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/replication/slot.h                |   1 +
 src/test/regress/expected/rules.out           |   6 +-
 13 files changed, 320 insertions(+), 33 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 64614b569c..de8ca5ccca 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9907,6 +9907,45 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL files claimed by this slot.
+      Valid values are:
+       <simplelist>
+        <member>
+         <literal>normal</literal> means that the claimed files
+         are within <varname>max_wal_size</varname>
+        </member>
+        <member>
+         <literal>keeping</literal> means that <varname>max_wal_size</varname>
+         is exceeded but still held by replication slots or
+         <varname>wal_keep_segments</varname>
+        </member>
+        <member>
+         <literal>lost</literal> means that some of them are definitely lost
+         and the session using this slot cannot continue replication. This
+         state will be hardly seen because walsender that enters this state is
+         terminated immediately.
+        </member>
+       </simplelist>
+      The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is
+      non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+      field is null.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>min_safe_lsn</structfield></entry>
+      <entry><type>pg_lsn</type></entry>
+      <entry></entry>
+      <entry>The minimum LSN currently available for walsenders.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c4d6ed4bbc..17c18386e2 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3753,6 +3753,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited amount of WAL files.  If
+        restart_lsn of a replication slot gets behind more than that megabytes
+        from the current LSN, the standby using the slot may no longer be able
+        to continue replication due to removal of required WAL files. You
+        can see the WAL availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index b5d32bb720..624e5f94ad 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allocated
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 977d448f50..8f28ffaab9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -106,6 +106,7 @@ int            wal_level = WAL_LEVEL_MINIMAL;
 int            CommitDelay = 0;    /* precommit delay in microseconds */
 int            CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int            wal_retrieve_retry_interval = 5000;
+int            max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool        XLOG_DEBUG = false;
@@ -759,7 +760,7 @@ static ControlFileData *ControlFile = NULL;
  */
 #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
 
-/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
+/* Convert values of GUCs measured in megabytes to equiv. segment count */
 #define ConvertToXSegs(x, segsize)    \
     (x / ((segsize) / (1024 * 1024)))
 
@@ -3929,9 +3930,10 @@ XLogGetLastRemovedSegno(void)
     return lastRemovedSegNo;
 }
 
+
 /*
- * Update the last removed segno pointer in shared memory, to reflect
- * that the given XLOG file has been removed.
+ * Update the last removed segno pointer in shared memory, to reflect that the
+ * given XLOG file has been removed.
  */
 static void
 UpdateLastRemovedPtr(char *filename)
@@ -9049,6 +9051,7 @@ CreateCheckPoint(int flags)
      */
     XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
     KeepLogSeg(recptr, &_logSegNo);
+    InvalidateObsoleteReplicationSlots(_logSegNo);
     _logSegNo--;
     RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
@@ -9383,6 +9386,7 @@ CreateRestartPoint(int flags)
     replayPtr = GetXLogReplayRecPtr(&replayTLI);
     endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
     KeepLogSeg(endptr, &_logSegNo);
+    InvalidateObsoleteReplicationSlots(_logSegNo);
     _logSegNo--;
 
     /*
@@ -9451,48 +9455,143 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+/*
+ * Report availability of WAL for a replication slot
+ *        restart_lsn and active_pid are straight from the slot info
+ *
+ * Returns one of the following enum values.
+ *
+ * WALAVAIL_NORMAL means targetLSN is available because it is in the range of
+ * max_wal_size.  If max_slot_wal_keep_size is smaller than max_wal_size, this
+ * state is not returned.
+ *
+ * WALAVAIL_PRESERVED means it is still available by preserving extra segments
+ * beyond max_wal_size.
+ *
+ * WALAVAIL_REMOVED means it is definitely lost. The replication stream on the
+ * slot cannot continue.
+ *
+ * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
+ */
+WalAvailability
+GetWalAvailability(XLogRecPtr restart_lsn, pid_t walsender_pid)
+{
+    XLogRecPtr currpos;
+    XLogSegNo currSeg;        /* segid of currpos */
+    XLogSegNo restartSeg;    /* segid of restart_lsn */
+    XLogSegNo oldestSeg;    /* actual oldest segid */
+    XLogSegNo oldestSegMaxWalSize;    /* oldest segid kept by max_wal_size */
+    XLogSegNo oldestSlotSeg = InvalidXLogRecPtr;/* oldest segid kept by slot */
+    uint64      keepSegs;
+
+    /* slot does not reserve WAL. Either deactivated, or has never been active */
+    if (XLogRecPtrIsInvalid(restart_lsn))
+        return WALAVAIL_INVALID_LSN;
+
+    currpos = GetXLogWriteRecPtr();
+
+    /* calculate oldest segment currently needed by slots */
+    XLByteToSeg(restart_lsn, restartSeg, wal_segment_size);
+    KeepLogSeg(currpos, &oldestSlotSeg);
+
+    /*
+     * Find the oldest extant segment file. We get 1 until checkpoint removes
+     * the first WAL segment file since startup, which causes the status being
+     * wrong under certain abnormal conditions but that doesn't actually harm.
+     */
+    oldestSeg = XLogGetLastRemovedSegno() + 1;
+
+    /* calculate oldest segment by max_wal_size and wal_keep_segments */
+    XLByteToSeg(currpos, currSeg, wal_segment_size);
+    keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
+                              wal_segment_size) + 1;
+
+    if (currSeg > keepSegs)
+        oldestSegMaxWalSize = currSeg - keepSegs;
+    else
+        oldestSegMaxWalSize = 1;
+
+    /*
+     * If max_slot_wal_keep_size has changed after the last call, the segment
+     * that would been kept by the current setting might have been lost by the
+     * previous setting. No point in showing normal or keeping status values if
+     * the restartSeg is known to be lost.
+     */
+    if (restartSeg >= oldestSeg)
+    {
+        /*
+         * show "normal" when restartSeg is within max_wal_size. If
+         * max_slot_wal_keep_size is smaller than max_wal_size, there's no
+         * point in showing the status.
+         */
+        if ((max_slot_wal_keep_size_mb <= 0 ||
+             max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
+            oldestSegMaxWalSize <= restartSeg)
+            return WALAVAIL_NORMAL;
+
+        /* being retained by slots */
+        if (oldestSlotSeg <= restartSeg)
+            return WALAVAIL_PRESERVED;
+    }
+
+    /* definitely lost. the walsender can no longer restart */
+    return WALAVAIL_REMOVED;
+}
+
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
  *
  * This is calculated by subtracting wal_keep_segments from the given xlog
  * location, recptr and by making sure that that result is below the
- * requirement of replication slots.
+ * requirement of replication slots.  For the latter criterion we do consider
+ * the effects of max_slot_wal_keep_size: reserve at most that much space back
+ * from recptr.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
+    XLogSegNo    currSegNo;
     XLogSegNo    segno;
     XLogRecPtr    keep;
 
-    XLByteToSeg(recptr, segno, wal_segment_size);
+    XLByteToSeg(recptr, currSegNo, wal_segment_size);
+    segno = currSegNo;
+
     keep = XLogGetReplicationSlotMinimumLSN();
 
-    /* compute limit for wal_keep_segments first */
-    if (wal_keep_segments > 0)
+    /*
+     * Calculate how many segments are kept by slots first.
+     */
+    /* Cap keepSegs by max_slot_wal_keep_size */
+    if (keep != InvalidXLogRecPtr)
+    {
+        XLByteToSeg(keep, segno, wal_segment_size);
+
+        /* Reduce it if slots already reserves too many. */
+        if (max_slot_wal_keep_size_mb >= 0)
+        {
+            XLogRecPtr slot_keep_segs =
+                ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+            if (currSegNo - segno > slot_keep_segs)
+                segno = currSegNo - slot_keep_segs;
+        }
+    }
+
+    /* but, keep at least wal_keep_segments segments if any */
+    if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments)
     {
         /* avoid underflow, don't go below 1 */
-        if (segno <= wal_keep_segments)
+        if (currSegNo <= wal_keep_segments)
             segno = 1;
         else
-            segno = segno - wal_keep_segments;
-    }
-
-    /* then check whether slots limit removal further */
-    if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-    {
-        XLogSegNo    slotSegNo;
-
-        XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-        if (slotSegNo <= 0)
-            segno = 1;
-        else if (slotSegNo < segno)
-            segno = slotSegNo;
+            segno = currSegNo - wal_keep_segments;
     }
 
     /* don't delete WAL segments newer than the calculated segment */
-    if (segno < *logSegNo)
+    if (XLogRecPtrIsInvalid(*logSegNo) || segno < *logSegNo)
         *logSegNo = segno;
 }
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 813ea8bfc3..d406ea8118 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -876,7 +876,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.min_safe_lsn
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d90c7235e9..86ddff8b9d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,58 @@ ReplicationSlotReserveWal(void)
     }
 }
 
+/*
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ *
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ */
+void
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+{
+    XLogRecPtr    oldestLSN;
+    List       *pids = NIL;
+    ListCell   *cell;
+
+    XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+
+    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+    for (int i = 0; i < max_replication_slots; i++)
+    {
+        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+        if (!s->in_use || s->data.restart_lsn == InvalidXLogRecPtr)
+            continue;
+
+        if (s->data.restart_lsn < oldestLSN)
+        {
+            elog(LOG, "slot %s is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
+                 s->data.name.data,
+                 (uint32) (s->data.restart_lsn >> 32),
+                 (uint32) s->data.restart_lsn);
+            /* mark this slot as invalid */
+            SpinLockAcquire(&s->mutex);
+            s->data.restart_lsn = InvalidXLogRecPtr;
+
+            /* remember PID for killing, if active*/
+            if (s->active_pid != 0)
+                pids = lappend_int(pids, s->active_pid);
+            SpinLockRelease(&s->mutex);
+        }
+    }
+    LWLockRelease(ReplicationSlotControlLock);
+
+    /*
+     * Signal any active walsenders to terminate.  We do not wait to observe
+     * them gone.
+     */
+    foreach(cell, pids)
+    {
+        /* signal the walsender to terminate */
+        (void) kill(lfirst_int(cell), SIGTERM);
+    }
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ce0c9127bc..dc38b475c5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     TupleDesc    tupdesc;
     Tuplestorestate *tupstore;
@@ -288,6 +288,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         Oid            database;
         NameData    slot_name;
         NameData    plugin;
+        WalAvailability walstate;
+        XLogSegNo    last_removed_seg;
         int            i;
 
         if (!slot->in_use)
@@ -355,6 +357,40 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         else
             nulls[i++] = true;
 
+        walstate = GetWalAvailability(restart_lsn, active_pid);
+
+        switch (walstate)
+        {
+            case WALAVAIL_INVALID_LSN:
+                nulls[i++] = true;
+                break;
+
+            case WALAVAIL_NORMAL:
+                values[i++] = CStringGetTextDatum("normal");
+                break;
+
+            case WALAVAIL_PRESERVED:
+                values[i++] = CStringGetTextDatum("keeping");
+                break;
+
+            case WALAVAIL_REMOVED:
+                values[i++] = CStringGetTextDatum("lost");
+                break;
+        }
+
+        if (max_slot_wal_keep_size_mb >= 0 &&
+            (walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_PRESERVED) &&
+            ((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+        {
+            XLogRecPtr min_safe_lsn;
+
+            XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
+                                    wal_segment_size, min_safe_lsn);
+            values[i++] = Int64GetDatum(min_safe_lsn);
+        }
+        else
+            nulls[i++] = true;
+
         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
     }
     LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 64dc9fbd13..1cfb999748 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2763,6 +2763,19 @@ static struct config_int ConfigureNamesInt[] =
         NULL, NULL, NULL
     },
 
+    {
+        {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+            gettext_noop("Sets the maximum WAL size that can be reserved by replication slots."),
+            gettext_noop("Replication slots will be marked as failed, and segments released "
+                         "for deletion or recycling, if this much space is occupied by WAL "
+                         "on disk."),
+            GUC_UNIT_MB
+        },
+        &max_slot_wal_keep_size_mb,
+        -1, -1, MAX_KILOBYTES,
+        NULL, NULL, NULL
+    },
+
     {
         {"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
             gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e904fa7300..507a72b712 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -288,6 +288,7 @@
 #max_wal_senders = 10        # max number of walsender processes
                 # (change requires restart)
 #wal_keep_segments = 0        # in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1    # measured in bytes; -1 disables
 #wal_sender_timeout = 60s    # in milliseconds; 0 disables
 
 #max_replication_slots = 10    # max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9ec7b31cce..33812bb3f9 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -108,6 +108,7 @@ extern int    wal_segment_size;
 extern int    min_wal_size_mb;
 extern int    max_wal_size_mb;
 extern int    wal_keep_segments;
+extern int    max_slot_wal_keep_size_mb;
 extern int    XLOGbuffers;
 extern int    XLogArchiveTimeout;
 extern int    wal_retrieve_retry_interval;
@@ -255,6 +256,19 @@ typedef struct CheckpointStatsData
 
 extern CheckpointStatsData CheckpointStats;
 
+/*
+ * WAL segment availability status
+ *
+ * This is used as the return value of GetWalAvailability.
+ */
+typedef enum WalAvailability
+{
+    WALAVAIL_INVALID_LSN,            /* parameter error */
+    WALAVAIL_NORMAL,                /* WAL segment is within max_wal_size */
+    WALAVAIL_PRESERVED,                /* WAL segment is preserved by repslots */
+    WALAVAIL_REMOVED                /* WAL segment has been removed */
+} WalAvailability;
+
 struct XLogRecData;
 
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
@@ -305,6 +319,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern WalAvailability GetWalAvailability(XLogRecPtr restart_lsn,
+                                          pid_t walsender_pid);
+extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a649e44d08..ef808c5c43 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9986,9 +9986,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 3e95b019b3..6e469ea749 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -198,6 +198,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
+extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6eec8ec568..ac31840739 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1462,8 +1462,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn)
 
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.min_safe_lsn
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn)
 
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.18.2


pgsql-hackers by date:

Previous
From: "Ivan N. Taranov"
Date:
Subject: [PATCH] optimization of VALGRIND_MEMPOOL_* usage
Next
From: davinder singh
Date:
Subject: PG compilation error with Visual Studio 2015/2017/2019