Re: [HACKERS] Restricting maximum keep segments by repslots - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: [HACKERS] Restricting maximum keep segments by repslots
Date
Msg-id 20180319.170948.139803971.horiguchi.kyotaro@lab.ntt.co.jp
Whole thread Raw
In response to Re: [HACKERS] Restricting maximum keep segments by repslots  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Responses Re: [HACKERS] Restricting maximum keep segments by repslots  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
List pgsql-hackers
At Mon, 29 Jan 2018 19:40:23 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20180129.194023.228030941.horiguchi.kyotaro@lab.ntt.co.jp>
> Hello,
> 
> At Mon, 29 Jan 2018 19:26:34 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20180129.192634.217484965.horiguchi.kyotaro@lab.ntt.co.jp>
 
> > While rechecking the patch, I fixed the message issued on losing
> > segments in 0001, revised the TAP test since I found that it was
> > unstable.
> > 
> > The attached files are the correct version of the latest patch.
> 
> The name of the new function GetMinKeepSegment seems giving wrong
> meaning. I renamed it to GetOlestKeepSegment.

I found that fd1a421fe6 and df411e7c66 hit this . Rebased to the
current HEAD.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From dbb5ca5bb79e7910f00bff20e8295e2fa3005d2d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/4] Add WAL releaf vent for replication slots

Adds a capability to limit the number of segments kept by replication
slots by a GUC variable.
---
 src/backend/access/transam/xlog.c             | 116 +++++++++++++++++++++-----
 src/backend/utils/misc/guc.c                  |  11 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 4 files changed, 108 insertions(+), 21 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 47a6c4d895..542e1d78fe 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int            wal_level = WAL_LEVEL_MINIMAL;
 int            CommitDelay = 0;    /* precommit delay in microseconds */
 int            CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int            wal_retrieve_retry_interval = 5000;
+int            max_slot_wal_keep_size_mb = 0;
 
 #ifdef WAL_DEBUG
 bool        XLOG_DEBUG = false;
@@ -861,6 +862,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9344,6 +9346,74 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+/*
+ * Returns minimum segment number the next checktpoint must leave considering
+ * wal_keep_segments, replication slots and max_slot_wal_keep_size.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr)
+{
+    uint64 keepSegs;
+    XLogSegNo currSeg;
+    XLogSegNo tailSeg;
+    uint64 slotlimitbytes;
+    uint64 slotlimitfragment;
+    uint64 currposoff;
+    XLogRecPtr slotpos = minSlotPtr;
+    XLogSegNo    slotSeg;
+
+    Assert(wal_keep_segments >= 0);
+    Assert(max_slot_wal_keep_size_mb >= 0);
+
+    XLByteToSeg(currpos, currSeg, wal_segment_size);
+    XLByteToSeg(slotpos, slotSeg, wal_segment_size);
+
+    /*
+     * wal_keep_segments keeps more segments than slot, slotpos is no longer
+     * useful. Don't perform subtraction to keep values positive.
+     */
+    if (slotpos != InvalidXLogRecPtr && currSeg <= slotSeg + wal_keep_segments)
+        slotpos = InvalidXLogRecPtr;
+
+    /* slots aren't useful, consider only wal_keep_segments */
+    if (slotpos == InvalidXLogRecPtr)
+    {
+        /* avoid underflow, don't go below 1 */
+        if (currSeg <= wal_keep_segments)
+            return 1;
+
+        return currSeg - wal_keep_segments;
+    }
+
+    /* just return slotSeg if we don't put a limit */
+    if (max_slot_wal_keep_size_mb == 0)
+        return slotSeg;
+
+    /*
+     * Slot limit is defined and slot gives the oldest segment to keep,
+     * calculate the oldest segment that should not be removed
+     */
+    slotlimitbytes = 1024 * 1024 * max_slot_wal_keep_size_mb;
+    slotlimitfragment = XLogSegmentOffset(slotlimitbytes,
+                                                 wal_segment_size);
+    currposoff = XLogSegmentOffset(currpos, wal_segment_size);
+    keepSegs = wal_keep_segments +
+        ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+    if (currposoff < slotlimitfragment)
+        keepSegs++;
+
+    /*
+     * calculate the oldest segment that is kept by wal_keep_segments and
+     * max_slot_wal_keep_size.
+     */
+    if (currSeg <= keepSegs)
+        tailSeg = 1;
+    else
+        tailSeg = currSeg - keepSegs;
+
+    return tailSeg;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9356,33 +9426,37 @@ static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
     XLogSegNo    segno;
-    XLogRecPtr    keep;
+    XLogRecPtr    slotminptr = InvalidXLogRecPtr;
+    XLogSegNo    minSegNo;
+    XLogSegNo    slotSegNo;
 
     XLByteToSeg(recptr, segno, wal_segment_size);
-    keep = XLogGetReplicationSlotMinimumLSN();
 
-    /* compute limit for wal_keep_segments first */
-    if (wal_keep_segments > 0)
+    if (max_replication_slots > 0)
+        slotminptr = XLogGetReplicationSlotMinimumLSN();
+
+    /*
+     * We should keep certain number of WAL segments after this checktpoint.
+     */
+    minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+    /*
+     * warn if the checkpoint flushes the segments required by replication
+     * slots.
+     */
+    if (!XLogRecPtrIsInvalid(slotminptr))
     {
-        /* avoid underflow, don't go below 1 */
-        if (segno <= wal_keep_segments)
-            segno = 1;
-        else
-            segno = segno - wal_keep_segments;
+        XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+        if (slotSegNo < minSegNo)
+            ereport(WARNING,
+                    (errmsg ("some replication slots have lost required WAL segments"),
+                     errdetail("The mostly affected slot has lost %ld segments.",
+                           minSegNo - slotSegNo)));
     }
 
-    /* then check whether slots limit removal further */
-    if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-    {
-        XLogSegNo    slotSegNo;
-
-        XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-        if (slotSegNo <= 0)
-            segno = 1;
-        else if (slotSegNo < segno)
-            segno = slotSegNo;
-    }
+    if (minSegNo < segno)
+        segno = minSegNo;
 
     /* don't delete WAL segments newer than the calculated segment */
     if (segno < *logSegNo)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 7a7ac479c1..de43c7139b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2406,6 +2406,17 @@ static struct config_int ConfigureNamesInt[] =
         NULL, NULL, NULL
     },
 
+    {
+        {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+            gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+         NULL,
+         GUC_UNIT_MB
+        },
+        &max_slot_wal_keep_size_mb,
+        0, 0, INT_MAX,
+        NULL, NULL, NULL
+    },
+
     {
         {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
             gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 048bf4cccd..7d5171c32c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -239,6 +239,7 @@
 #max_wal_senders = 10        # max number of walsender processes
                 # (change requires restart)
 #wal_keep_segments = 0        # in logfile segments; 0 disables
+#max_slot_wal_keep_size = 0    # measured in bytes; 0 disables
 #wal_sender_timeout = 60s    # in milliseconds; 0 disables
 
 #max_replication_slots = 10    # max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 421ba6d775..12cd0d1d10 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern int    wal_segment_size;
 extern int    min_wal_size_mb;
 extern int    max_wal_size_mb;
 extern int    wal_keep_segments;
+extern int    max_slot_wal_keep_size_mb;
 extern int    XLOGbuffers;
 extern int    XLogArchiveTimeout;
 extern int    wal_retrieve_retry_interval;
-- 
2.16.2

From a4b6ae2ec3acfb8de4f702450dcc2960842d8fd5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH 2/4] Add monitoring aid for max_replication_slots.

Adds two columns "status" and "min_secure_lsn" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows that a slot can be reconnected or not, or about
to lose required WAL segments. And the LSN back to where the next
checkpoint will secure.
---
 contrib/test_decoding/expected/ddl.out |  4 +-
 src/backend/access/transam/xlog.c      | 93 ++++++++++++++++++++++++++++++++++
 src/backend/catalog/system_views.sql   |  4 +-
 src/backend/replication/slotfuncs.c    | 25 ++++++++-
 src/include/access/xlog.h              |  1 +
 src/include/catalog/pg_proc.h          |  2 +-
 src/test/regress/expected/rules.out    |  6 ++-
 7 files changed, 128 insertions(+), 7 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 1e22c1eefc..92cd56a5f0 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -702,7 +702,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn| confirmed_flush_lsn 
 

------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn| confirmed_flush_lsn | wal_status | min_keep_lsn 
 

+-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------
 (0 rows)
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 542e1d78fe..529aee9014 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9346,6 +9346,99 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+
+/*
+ * Returns the segment number of the oldest file in XLOG directory.
+ */
+static XLogSegNo
+GetOldestXLogFileSegNo(void)
+{
+    DIR        *xldir;
+    struct dirent *xlde;
+    XLogSegNo segno = 0;
+
+    xldir = AllocateDir(XLOGDIR);
+    if (xldir == NULL)
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not open write-ahead log directory \"%s\": %m",
+                        XLOGDIR)));
+
+    while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+    {
+        TimeLineID tli;
+        XLogSegNo fsegno;
+
+        /* Ignore files that are not XLOG segments */
+        if (!IsXLogFileName(xlde->d_name) &&
+            !IsPartialXLogFileName(xlde->d_name))
+            continue;
+
+        XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+        /* get minimum segment ignorig timeline ID */
+        if (segno == 0 || fsegno < segno)
+            segno = fsegno;
+    }
+
+    return segno;
+}
+
+/*
+ * Check if the record on the given restartLSN is present in XLOG files.
+ *
+ * Returns true if it is present. If minKeepLSN is given, it receives the
+ * LSN at the beginning of the oldest existing WAL segment.
+ */
+bool
+IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minKeepLSN)
+{
+    XLogRecPtr currpos;
+    XLogSegNo restartSeg;
+    XLogSegNo tailSeg;
+    XLogSegNo oldestSeg;
+
+    Assert(!XLogRecPtrIsInvalid(restartLSN));
+
+    currpos = GetXLogWriteRecPtr();
+
+    SpinLockAcquire(&XLogCtl->info_lck);
+    oldestSeg = XLogCtl->lastRemovedSegNo;
+    SpinLockRelease(&XLogCtl->info_lck);
+
+    /*
+     * oldestSeg is zero before at least one segment has been removed since
+     * startup. Use oldest segno taken from file names.
+     */
+    if (oldestSeg == 0)
+    {
+        static XLogSegNo oldestFileSeg = 0;
+
+        if (oldestFileSeg == 0)
+            oldestFileSeg = GetOldestXLogFileSegNo();
+        /* let it have the same meaning with lastRemovedSegNo here */
+        oldestSeg = oldestFileSeg - 1;
+    }
+
+    /* oldest segment is just after the last removed segment */
+    oldestSeg++;
+
+    XLByteToSeg(restartLSN, restartSeg, wal_segment_size);
+
+
+    if (minKeepLSN)
+    {
+        XLogRecPtr slotPtr = XLogGetReplicationSlotMinimumLSN();
+        Assert(!XLogRecPtrIsInvalid(slotPtr));
+
+        tailSeg = GetOldestKeepSegment(currpos, slotPtr);
+
+        XLogSegNoOffsetToRecPtr(tailSeg, 0, *minKeepLSN, wal_segment_size);
+    }
+
+    return    oldestSeg <= restartSeg;
+}
+
 /*
  * Returns minimum segment number the next checktpoint must leave considering
  * wal_keep_segments, replication slots and max_slot_wal_keep_size.
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 5e6e8a64f6..9284175f7d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -795,7 +795,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.min_keep_lsn
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index e873dd1f81..16575fa411 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     TupleDesc    tupdesc;
     Tuplestorestate *tupstore;
@@ -307,6 +307,29 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         else
             nulls[i++] = true;
 
+        if (restart_lsn == InvalidXLogRecPtr)
+        {
+            values[i++] = CStringGetTextDatum("unknown");
+            values[i++] = LSNGetDatum(InvalidXLogRecPtr);
+        }
+        else
+        {
+            XLogRecPtr    min_keep_lsn;
+            char *status = "lost";
+
+            if (BoolGetDatum(IsLsnStillAvaiable(restart_lsn,
+                                                &min_keep_lsn)))
+            {
+                if (min_keep_lsn <= restart_lsn)
+                    status = "streaming";
+                else
+                    status = "keeping";
+            }
+
+            values[i++] = CStringGetTextDatum(status);
+            values[i++] = LSNGetDatum(min_keep_lsn);
+        }
+
         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
     }
     LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 12cd0d1d10..52e64f392d 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern bool IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minSecureLSN);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 0fdb42f639..e8e32c1a97 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5385,7 +5385,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f t f v u 1 0 2278 "19" _null_ _null_
_null__null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots    PGNSP PGUID 12 1 10 0 0 f f f f t s s 0 0 2249 ""
"{19,19,25,26,16,16,23,28,28,3220,3220}""{o,o,o,o,o,o,o,o,o,o,o}"
"{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}"
_null__null_ pg_get_replication_slots _null_ _null_ _null_ ));
 
+DATA(insert OID = 3781 (  pg_get_replication_slots    PGNSP PGUID 12 1 10 0 0 f f f f t s s 0 0 2249 ""
"{19,19,25,26,16,16,23,28,28,3220,3220,25,3220}""{o,o,o,o,o,o,o,o,o,o,o,o,o}"
"{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_keep_lsn}"
_null__null_ pg_get_replication_slots _null_ _null_ _null_ )); 
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f t f v u 3 0 2249 "19 19 16"
"{19,19,16,25,3220}""{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_
pg_create_logical_replication_slot_null_ _null_ _null_ ));
 
 DESCR("set up a logical replication slot");
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5e0597e091..3944e36681 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn)
 
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.min_keep_lsn
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn, wal_status, min_keep_lsn)
 
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.16.2

From 079ba54bda2570709c51ff6da195c693cc39d6b7 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 17:33:53 +0900
Subject: [PATCH 3/4] TAP test for the slot limit feature

---
 src/test/recovery/t/015_replslot_limit.pl | 161 ++++++++++++++++++++++++++++++
 1 file changed, 161 insertions(+)
 create mode 100644 src/test/recovery/t/015_replslot_limit.pl

diff --git a/src/test/recovery/t/015_replslot_limit.pl b/src/test/recovery/t/015_replslot_limit.pl
new file mode 100644
index 0000000000..9e96714d39
--- /dev/null
+++ b/src/test/recovery/t/015_replslot_limit.pl
@@ -0,0 +1,161 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slot.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 32MB
+max_wal_size = 48MB
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using a replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1');
+$node_standby->append_conf('recovery.conf', qq(
+primary_slot_name = 'rep1'
+));
+$node_standby->start;
+
+# Wait until standby has replayed enough data on the standby
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, currently the slot must be secured.
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_keep_lsn FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|streaming|$start_lsn", 'check initial state of standby');
+
+# Advance WAL by ten segments (= 160MB) on master
+advance_wal($node_master, 10);
+
+# All segments still must be secured after a checkpoint.
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, min_keep_lsn FROM pg_replication_slots
WHEREslot_name = 'rep1'");
 
+is($result, "$start_lsn|streaming|$start_lsn", 'check that slot is keeping all segments');
+
+# The stanby can connect master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+
+# Advance WAL again
+advance_wal($node_master, 10);
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 32;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# Some segments become 'insecured'
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE
slot_name= 'rep1'");
 
+is($result, "$start_lsn|keeping", 'check that some segments are about to removed');
+
+# The stanby still can connect master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+                "requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that no replication failure is caused by insecure state');
+
+# Advance WAL again
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 10);
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+               "some replication slots have lost required WAL segments",
+               $logstart),
+   'check that the warning is correctly logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE
slot_name= 'rep1'");
 
+is($result, "$start_lsn|lost", 'check that overflown segments have been removed');
+
+# The stanby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+    if (find_in_log($node_standby,
+                    "requested WAL segment [0-9A-F]+ has already been removed",
+                    $logstart))
+    {
+        $failed = 1;
+        last;
+    }
+    usleep(100_000);
+}
+ok($failed, 'check replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+    my ($node, $n) = @_;
+
+    # Advance by ten segments (= 160MB) on master
+    for (my $i = 0 ; $i < $n ; $i++)
+    {
+        $node->safe_psql('postgres', "CREATE TABLE t (a int); DROP TABLE t; SELECT pg_switch_wal();");
+    }
+
+    $node->safe_psql('postgres', "CHECKPOINT;");
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+    my ($node) = @_;
+
+    return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+    my ($node, $pat, $off) = @_;
+
+    $off = 0 unless defined $off;
+    my $log = TestLib::slurp_file($node->logfile);
+    return 0 if (length($log) <= $off);
+
+    $log = substr($log, $off);
+
+    return $log =~ m/$pat/;
+}
-- 
2.16.2

From 1908e5c2bbe0e6b2945acd3ff32b0f72e72477e7 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 11 Jan 2018 15:00:32 +0900
Subject: [PATCH 4/4] Documentation for slot-limit feature

---
 doc/src/sgml/catalogs.sgml          | 28 ++++++++++++++++++++++++++++
 doc/src/sgml/config.sgml            | 24 ++++++++++++++++++++++++
 doc/src/sgml/high-availability.sgml | 14 ++++++++------
 3 files changed, 60 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 30e6741305..7a6f4540f1 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9802,6 +9802,34 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL records claimed by the
+      slot. <literal>streaming</literal>, <literal>keeping</literal>,
+      <literal>lost</literal>
+      or <literal>unknown</literal>. <literal>streaming</literal> means that
+      the claimed records are available. <literal>keeping</literal> means that
+      some of them are to be removed in the next
+      checkpoint. <literal>lost</literal> means that some of them have been
+      removed. The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is not zero. If the slot
+      doesn't have valid restart_lsn, this field is <literal>unknown</literal>.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>min_keep_lsn</structfield></entry>
+      <entry><type>pg_lsn</type></entry>
+      <entry></entry>
+      <entry>The address (<literal>LSN</literal>) back to which is available
+      to the replication slot. The user of the slot can no longer continue
+      streaming if this exceeds restart_lsn.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f18d2b3353..8715dee1ed 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3116,6 +3116,30 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files kept in
+        the <filename>pg_wal</filename> directory at checkpoint time, even in
+        case some of them are still claimed by
+        <link linkend="streaming-replication-slots">replication
+        slots</link>. If <varname>max_slot_wal_keep_size</varname> is zero
+        (the default), replication slots retain unlimited size of WAL
+        files.
+       </para>
+
+       <para>
+        This size is counted apart from
+        <xref linkend="guc-wal-keep-segments"/>. 
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 46bf198a2a..7bf5cc7f79 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -927,9 +927,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allotted
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
@@ -967,9 +969,9 @@ postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
  node_a_slot |
 
 postgres=# SELECT * FROM pg_replication_slots;
-  slot_name  | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn
--------------+-----------+--------+----------+--------+------+-------------+---------------------
- node_a_slot | physical  |        |          | f      |      |             |
+  slot_name  | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn | wal_status |
min_keep_lsn

+-------------+-----------+--------+----------+--------+------+-------------+---------------------+------------+--------------
+ node_a_slot | physical  |        |          | f      |      |             |                     | unknown    |
0/1000000
 (1 row)
 </programlisting>
      To configure the standby to use this slot, <varname>primary_slot_name</varname>
-- 
2.16.2


pgsql-hackers by date:

Previous
From: Amit Langote
Date:
Subject: Re: ON CONFLICT DO UPDATE for partitioned tables
Next
From: Andrey Borodin
Date:
Subject: Re: Google Summer of Code: Potential Applicant