Re: Review for GetWALAvailability() - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Review for GetWALAvailability()
Date
Msg-id 20200617.135607.687059791532071892.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Review for GetWALAvailability()  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses Re: Review for GetWALAvailability()
Re: Review for GetWALAvailability()
List pgsql-hackers
At Wed, 17 Jun 2020 10:17:07 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> At Tue, 16 Jun 2020 14:31:43 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in 
> > On 2020-Jun-16, Kyotaro Horiguchi wrote:
> > 
> > > I noticed the another issue. If some required WALs are removed, the
> > > slot will be "invalidated", that is, restart_lsn is set to invalid
> > > value. As the result we hardly see the "lost" state.
> > > 
> > > It can be "fixed" by remembering the validity of a slot separately
> > > from restart_lsn. Is that worth doing?
> > 
> > We discussed this before.  I agree it would be better to do this
> > in some way, but I fear that if we do it naively, some code might exist
> > that reads the LSN without realizing that it needs to check the validity
> > flag first.
> 
> Yes, that was my main concern on it. That's error-prone. How about
> remembering the LSN where invalidation happened?  It's safe since no
> others than slot-monitoring functions would look
> last_invalidated_lsn. It can be reset if active_pid is a valid pid.
> 
> InvalidateObsoleteReplicationSlots:
>  ...
>          SpinLockAcquire(&s->mutex);
> +        s->data.last_invalidated_lsn = s->data.restart_lsn;
>          s->data.restart_lsn = InvalidXLogRecPtr;
>          SpinLockRelease(&s->mutex);

The attached does that (Poc).  No document fix included.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d6fe205eb4..d3240d1e38 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9485,20 +9485,25 @@ CreateRestartPoint(int flags)
  *        (typically a slot's restart_lsn)
  *
  * Returns one of the following enum values:
- * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
- *   of max_wal_size.
  *
- * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * * WALAVAIL_RESERVED means targetLSN is available and it is in the range of
+ *   max_wal_size.
+ *
+ * * WALAVAIL_EXTENDED means it is still available by preserving extra
  *   segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
  *   than max_wal_size, this state is not returned.
  *
+ * * WALAVAIL_BEING_REMOVED means it is being lost. The walsender using this
+ *   slot may return to the above.
+ *
  * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
  *   a slot with this LSN cannot continue.
  *
  * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
  */
 WALAvailability
-GetWALAvailability(XLogRecPtr targetLSN)
+GetWALAvailability(XLogRecPtr targetLSN, XLogSegNo last_removed_seg,
+                   bool slot_is_active)
 {
     XLogRecPtr    currpos;        /* current write LSN */
     XLogSegNo    currSeg;        /* segid of currpos */
@@ -9509,7 +9514,11 @@ GetWALAvailability(XLogRecPtr targetLSN)
                                                      * slot */
     uint64        keepSegs;
 
-    /* slot does not reserve WAL. Either deactivated, or has never been active */
+    /*
+     * slot does not reserve WAL. Either deactivated, or has never been active
+     * The caller should have passed last_invalidated_lsn as targetLSN if the
+     * slot has been invalidated.
+     */
     if (XLogRecPtrIsInvalid(targetLSN))
         return WALAVAIL_INVALID_LSN;
 
@@ -9524,7 +9533,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
      * the first WAL segment file since startup, which causes the status being
      * wrong under certain abnormal conditions but that doesn't actually harm.
      */
-    oldestSeg = XLogGetLastRemovedSegno() + 1;
+    oldestSeg = last_removed_seg + 1;
 
     /* calculate oldest segment by max_wal_size and wal_keep_segments */
     XLByteToSeg(currpos, currSeg, wal_segment_size);
@@ -9544,20 +9553,21 @@ GetWALAvailability(XLogRecPtr targetLSN)
      */
     if (targetSeg >= oldestSeg)
     {
-        /*
-         * show "normal" when targetSeg is within max_wal_size, even if
-         * max_slot_wal_keep_size is smaller than max_wal_size.
-         */
-        if ((max_slot_wal_keep_size_mb <= 0 ||
-             max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
-            oldestSegMaxWalSize <= targetSeg)
-            return WALAVAIL_NORMAL;
-
-        /* being retained by slots */
-        if (oldestSlotSeg <= targetSeg)
+        /* show "reserved" when targetSeg is within max_wal_size */
+        if (oldestSegMaxWalSize <= targetSeg)
             return WALAVAIL_RESERVED;
+
+        /* being retained by slots exceeding max_wal_size */
+        return WALAVAIL_EXTENDED;
     }
 
+    /*
+     * However segments required by the slot has been lost, if walsender is
+     * active the walsender can read into the first reserved slot.
+     */
+    if (slot_is_active)
+        return WALAVAIL_BEING_REMOVED;
+
     /* Definitely lost */
     return WALAVAIL_REMOVED;
 }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..f141b29d28 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -285,6 +285,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     slot->candidate_restart_valid = InvalidXLogRecPtr;
     slot->candidate_restart_lsn = InvalidXLogRecPtr;
+    slot->last_invalidated_lsn = InvalidXLogRecPtr;
 
     /*
      * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -1144,6 +1145,7 @@ restart:
                         (uint32) restart_lsn)));
 
         SpinLockAcquire(&s->mutex);
+        s->last_invalidated_lsn = s->data.restart_lsn;
         s->data.restart_lsn = InvalidXLogRecPtr;
         SpinLockRelease(&s->mutex);
         ReplicationSlotRelease();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 1b929a603e..ed0abe0c39 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -243,6 +243,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
     MemoryContext per_query_ctx;
     MemoryContext oldcontext;
     int            slotno;
+    XLogSegNo    last_removed_seg;
 
     /* check to see if caller supports us returning a tuplestore */
     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -272,6 +273,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
     rsinfo->setResult = tupstore;
     rsinfo->setDesc = tupdesc;
 
+    /*
+     * Remember the last removed segment at this point for the consistency in
+     * this table. Since there's no interlock between slot data and
+     * checkpointer, the segment can be removed in-between, but that doesn't
+     * make any practical difference.
+     */
+    last_removed_seg = XLogGetLastRemovedSegno();
+
     MemoryContextSwitchTo(oldcontext);
 
     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -282,7 +291,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         Datum        values[PG_GET_REPLICATION_SLOTS_COLS];
         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
         WALAvailability walstate;
-        XLogSegNo    last_removed_seg;
+        XLogRecPtr    targetLSN;
         int            i;
 
         if (!slot->in_use)
@@ -342,7 +351,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         else
             nulls[i++] = true;
 
-        walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+        /* use last_invalidated_lsn when the slot is invalidated */
+        if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+            targetLSN = slot_contents.last_invalidated_lsn;
+        else
+            targetLSN = slot_contents.data.restart_lsn;
+
+        walstate = GetWALAvailability(targetLSN, last_removed_seg,
+                                      slot_contents.active_pid != 0);
 
         switch (walstate)
         {
@@ -350,14 +366,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                 nulls[i++] = true;
                 break;
 
-            case WALAVAIL_NORMAL:
-                values[i++] = CStringGetTextDatum("normal");
-                break;
-
             case WALAVAIL_RESERVED:
                 values[i++] = CStringGetTextDatum("reserved");
                 break;
 
+            case WALAVAIL_EXTENDED:
+                values[i++] = CStringGetTextDatum("extended");
+                break;
+
+            case WALAVAIL_BEING_REMOVED:
+                values[i++] = CStringGetTextDatum("being lost");
+                break;
+
             case WALAVAIL_REMOVED:
                 values[i++] = CStringGetTextDatum("lost");
                 break;
@@ -367,8 +387,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         }
 
         if (max_slot_wal_keep_size_mb >= 0 &&
-            (walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
-            ((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+            (walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
+            (last_removed_seg != 0))
         {
             XLogRecPtr    min_safe_lsn;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e917dfe92d..49d9578bc5 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -270,8 +270,9 @@ extern CheckpointStatsData CheckpointStats;
 typedef enum WALAvailability
 {
     WALAVAIL_INVALID_LSN,        /* parameter error */
-    WALAVAIL_NORMAL,            /* WAL segment is within max_wal_size */
-    WALAVAIL_RESERVED,            /* WAL segment is reserved by a slot */
+    WALAVAIL_RESERVED,            /* WAL segment is within max_wal_size */
+    WALAVAIL_EXTENDED,            /* WAL segment is reserved by a slot */
+    WALAVAIL_BEING_REMOVED,        /* WAL segment is being removed */
     WALAVAIL_REMOVED            /* WAL segment has been removed */
 } WALAvailability;
 
@@ -326,7 +327,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN,
+                                          XLogSegNo last_removed_seg,
+                                          bool slot_is_active);
 extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 917876010e..8090ca81fe 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -156,6 +156,9 @@ typedef struct ReplicationSlot
     XLogRecPtr    candidate_xmin_lsn;
     XLogRecPtr    candidate_restart_valid;
     XLogRecPtr    candidate_restart_lsn;
+
+    /* restart_lsn is copied here when the slot is invalidated */
+    XLogRecPtr    last_invalidated_lsn;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)

pgsql-hackers by date:

Previous
From: Thomas Munro
Date:
Subject: Re: Does TupleQueueReaderNext() really need to copy its result?
Next
From: Amit Kapila
Date:
Subject: Re: Transactions involving multiple postgres foreign servers, take 2