Re: archive status ".ready" files may be created too early - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: archive status ".ready" files may be created too early
Date
Msg-id 20201216.110120.887433782054853494.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: archive status ".ready" files may be created too early  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses Re: archive status ".ready" files may be created too early
Re: archive status ".ready" files may be created too early
List pgsql-hackers
At Tue, 15 Dec 2020 19:32:57 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> At Mon, 14 Dec 2020 18:25:23 +0000, "Bossart, Nathan" <bossartn@amazon.com> wrote in 
> > I wonder if these are safe assumptions to make.  For your example, if
> > we've written record B to the WAL buffers, but neither record A nor B
> > have been written to disk or flushed, aren't we still in trouble?
> 
> You're right in that regard. There's a window where partial record is
> written when write location passes F0 after insertion location passes
> F1. However, remembering all spanning records seems overkilling to me.
> 
> I modifed the previous patch so that it remembers the start LSN of the
> *oldest* corss-segment continuation record in the last consecutive
> bonded segments, and the end LSN of the latest cross-segmetn
> continuation record. This doesn't foreget past segment boundaries.
> The region is cleard when WAL-write LSN goes beyond the remembered end
> LSN.  So the region may contain several wal-segments that are not
> connected to the next one, but that doesn't matter so much.

Mmm. Even tough it'a PoC, it was too bogus. I fixed it to work saner
way.

- Record the beginning LSN of the first cross-seg record and the end
  LSN of the last cross-seg recrod in a consecutive segments bonded by
  cross-seg recrods. Spcifically X and Y below.

       X                 Z         Y    
       [recA]  [recB]         [recC]
  [seg A] [seg B] [seg C] [seg D] [seg E]
(1)    (2.2)    (2.2)  (2.1)   (2.1)   (1)

1. If we wrote upto before X or beyond Y at a segment boundary, notify
  the finished segment immediately.

  1.1. If we have written beyond Y, clear the recorded region.

2. Otherwise we don't notify the segment immediately:

  2.1. If write request was up to exactly the current segment boundary
    and we know the end LSN of the record there (that is, it is recC
    above), extend the request to the end LSN. Then notify the segment
    after the record is written to the end.

  2.2. Otherwise (that is recA or recB), we don't know whether the
    last record of the last segment is ends just at the segment boundary
    (Z) or a record spans between segments (recB). Anyway even if there
    is such a record there, we don't know where it ends.  As the result
    what we can do there is only to refrain from notifying. It doesn't
    matter so much since we have already inserted recC so we will soon
    reach recC and will notify up to seg C.

There might be a case where we insert up to Y before writing up to Z,
the segment-region X-Y contains non-connected segment boundary in that
case. It is processed as if it is a connected segment
boundary. However, like 2.2 above, It doesn't matter since we write up
to Y soon.

At Tue, 15 Dec 2020 19:32:57 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
me> I added an assertion that a record must be shorter than a wal segment
me> to XLogRecordAssemble(). This guarantees the assumption to be true?
me> (The condition is tentative, would need to be adjusted.)

Changed the assertion more direct way.

me> Also, the attached is a PoC.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 50b3b05dd0eed79cd0b97991e82090b9d569cbac Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 16 Dec 2020 10:36:14 +0900
Subject: [PATCH v4 1/2] Avoid archiving a WAL segment that continues to the
 next  segment

If the last record of a finshed segment continues to the next segment,
we need to defer archiving of the segment until the record is flushed
to the end. Otherwise crash recovery can overwrite the last record of
a segment and history diverges between archive and pg_wal.
---
 src/backend/access/transam/xlog.c   | 214 +++++++++++++++++++++++++++-
 src/backend/replication/walsender.c |  14 +-
 src/include/access/xlog.h           |   1 +
 3 files changed, 217 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8dd225c2e1..8705809160 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -723,6 +723,16 @@ typedef struct XLogCtlData
      */
     XLogRecPtr    lastFpwDisableRecPtr;
 
+    /* The last segment notified to be archived. Protected by WALWriteLock */
+    XLogSegNo    lastNotifiedSeg;
+
+    /*
+     * Remember the oldest and newest known segment that ends with a
+     * continuation record.
+     */
+    XLogRecPtr    firstSegContRecStart;
+    XLogRecPtr    lastSegContRecEnd;
+    
     slock_t        info_lck;        /* locks shared variables shown above */
 } XLogCtlData;
 
@@ -1160,6 +1170,9 @@ XLogInsertRecord(XLogRecData *rdata,
      */
     if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
     {
+        XLogSegNo startseg;
+        XLogSegNo endseg;
+
         SpinLockAcquire(&XLogCtl->info_lck);
         /* advance global request to include new block(s) */
         if (XLogCtl->LogwrtRqst.Write < EndPos)
@@ -1167,6 +1180,24 @@ XLogInsertRecord(XLogRecData *rdata,
         /* update local result copy while I have the chance */
         LogwrtResult = XLogCtl->LogwrtResult;
         SpinLockRelease(&XLogCtl->info_lck);
+
+        /* Remember the range of segments end with a continuation recrod */
+        XLByteToSeg(StartPos, startseg, wal_segment_size);
+        XLByteToPrevSeg(EndPos, endseg, wal_segment_size);
+
+        if (startseg != endseg)
+        {
+            /*
+             * We shouldn't have a record spanning over more than two segments
+             */
+            Assert (startseg + 1 == endseg);
+
+            SpinLockAcquire(&XLogCtl->info_lck);
+            if (XLogCtl->firstSegContRecStart == InvalidXLogRecPtr)
+                XLogCtl->firstSegContRecStart = StartPos;
+            XLogCtl->lastSegContRecEnd = EndPos;
+            SpinLockRelease(&XLogCtl->info_lck);
+        }
     }
 
     /*
@@ -2400,6 +2431,43 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
     return false;
 }
 
+/*
+ * Returns last notified segment.
+ */
+static XLogSegNo
+GetLastNotifiedSegment(void)
+{
+    XLogSegNo last_notified;
+
+    SpinLockAcquire(&XLogCtl->info_lck);
+    last_notified = XLogCtl->lastNotifiedSeg;
+    SpinLockRelease(&XLogCtl->info_lck);
+
+    return last_notified;
+}
+
+/*
+ * Notify segments that are not yet notified.
+ */
+static void
+NotifySegmentsUpTo(XLogSegNo notifySegNo)
+{
+    XLogSegNo last_notified = GetLastNotifiedSegment();
+    XLogSegNo i;
+
+    if (notifySegNo <= last_notified)
+        return;
+
+    for (i = XLogCtl->lastNotifiedSeg + 1 ; i <= notifySegNo ; i++)
+        XLogArchiveNotifySeg(i);
+
+    /* Don't go back in the case someone else has made it go further. */
+    SpinLockAcquire(&XLogCtl->info_lck);
+    if (XLogCtl->lastNotifiedSeg < notifySegNo)
+        XLogCtl->lastNotifiedSeg = notifySegNo;
+    SpinLockRelease(&XLogCtl->info_lck);
+}
+
 /*
  * Write and/or fsync the log at least as far as WriteRqst indicates.
  *
@@ -2423,6 +2491,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
     int            npages;
     int            startidx;
     uint32        startoffset;
+    bool        seg_notify = false;
 
     /* We should always be inside a critical section here */
     Assert(CritSectionCount > 0);
@@ -2579,15 +2648,95 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
              */
             if (finishing_seg)
             {
+                XLogRecPtr firstSegContRecStart;
+                XLogRecPtr lastSegContRecEnd;
+
                 issue_xlog_fsync(openLogFile, openLogSegNo);
 
                 /* signal that we need to wakeup walsenders later */
                 WalSndWakeupRequest();
 
-                LogwrtResult.Flush = LogwrtResult.Write;    /* end of page */
+                SpinLockAcquire(&XLogCtl->info_lck);
+                firstSegContRecStart = XLogCtl->firstSegContRecStart;
+                lastSegContRecEnd = XLogCtl->lastSegContRecEnd;
+                SpinLockRelease(&XLogCtl->info_lck);
 
-                if (XLogArchivingActive())
-                    XLogArchiveNotifySeg(openLogSegNo);
+                /*
+                 * If we may be on a continuation record spans over segments,
+                 * don't archive the segment until the record is written to the
+                 * end. If we do, we could have corrupt archive having
+                 * different records at the boundary after a server crash
+                 * around here.  For the same reason, also for replication,
+                 * don't expose flush location until the record is written to
+                 * the end so that an incomplete record at segment boundary
+                 * won't be sent to standby.
+                 */
+                if (LogwrtResult.Write < firstSegContRecStart ||
+                    lastSegContRecEnd <= LogwrtResult.Write)
+                {
+                    LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
+
+                    if (XLogArchivingActive())
+                        NotifySegmentsUpTo(openLogSegNo);
+
+                    if (lastSegContRecEnd <= LogwrtResult.Write)
+                    {
+                        /*
+                         * We got out of the continuation region, reset the
+                         * locations.
+                         */
+                        SpinLockAcquire(&XLogCtl->info_lck);
+                        XLogCtl->firstSegContRecStart = InvalidXLogRecPtr;
+                        XLogCtl->lastSegContRecEnd = InvalidXLogRecPtr;
+                        SpinLockRelease(&XLogCtl->info_lck);
+                    }
+
+                    /* already notified */
+                    seg_notify = false;
+                }
+                else
+                {
+                    seg_notify = true;
+
+                    /*
+                     * There's a case where we are told to flush up not to the
+                     * end of a record but to WALbuffer page boundary. We
+                     * advance the request LSN to the end of the record in the
+                     * case the record at the requested LSN continues to the
+                     * next segment.
+                     */
+                    if (XLogSegmentOffset(WriteRqst.Write, wal_segment_size)
+                        == 0)
+                    {
+                        XLogSegNo oldseg;
+                        XLogSegNo currseg;
+
+                        XLByteToSeg(WriteRqst.Write, currseg,
+                                        wal_segment_size);
+                        XLByteToPrevSeg(lastSegContRecEnd, oldseg,
+                                        wal_segment_size);
+
+                        /*
+                         * If we know the exact placement of the record. Extend
+                         * request to the end of the recrod.
+                         */
+                        if (oldseg == currseg &&
+                            WriteRqst.Write < lastSegContRecEnd)
+                            WriteRqst.Write = lastSegContRecEnd;
+                        else
+                        {
+                            /*
+                             * We forgot the exact placement of the record
+                             * around requested write LSN. Refrain from
+                             * notifying the segment to avoid archiving a
+                             * segment having incomplete-record at the end. In
+                             * this case we are going to write another couple
+                             * of further segments thus we will soon reach the
+                             * next segment boundary.  */
+                            seg_notify = false;
+                        }
+                    }
+                }
 
                 XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
                 XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2616,11 +2765,23 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
         }
         curridx = NextBufIdx(curridx);
 
-        /* If flexible, break out of loop as soon as we wrote something */
-        if (flexible && npages == 0)
+        /*
+         * If flexible, break out of loop as soon as we wrote something.
+         * However, we don't leave the loop if the last record in the just
+         * finished segment needs to be finished.
+         */
+        if (flexible && seg_notify && npages == 0)
             break;
     }
 
+    /*
+     * We have extended the write request to the next segment if the record at
+     * the initial WriteRqst.Write continues to the next segment.  In that case
+     * need to notify the last segment here.
+     */
+    if (seg_notify)
+        NotifySegmentsUpTo(openLogSegNo - 1);
+
     Assert(npages == 0);
 
     /*
@@ -7705,6 +7866,18 @@ StartupXLOG(void)
     XLogCtl->LogwrtRqst.Write = EndOfLog;
     XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
+    /*
+     * We have archived up to the previous segment of EndOfLog so far.
+     * Initialize lastNotifiedSeg if needed.
+     */
+    if (XLogArchivingActive())
+    {
+        XLogSegNo    endLogSegNo;
+
+        XLByteToSeg(EndOfLog, endLogSegNo, wal_segment_size);
+        XLogCtl->lastNotifiedSeg = endLogSegNo - 1;
+    }
+
     /*
      * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
      * record before resource manager writes cleanup WAL records or checkpoint
@@ -8429,6 +8602,37 @@ GetFlushRecPtr(void)
     return LogwrtResult.Flush;
 }
 
+/*
+ * GetReplicationTargetRecPtr -- Returns the latest position that is safe to
+ * replicate.
+ */
+XLogRecPtr
+GetReplicationTargetRecPtr(void)
+{
+    static XLogRecPtr    lastTargetRecPtr = InvalidXLogRecPtr;
+    XLogRecPtr    firstSegContRecStart;
+    XLogRecPtr    lastSegContRecEnd;
+
+    SpinLockAcquire(&XLogCtl->info_lck);
+    LogwrtResult = XLogCtl->LogwrtResult;
+    firstSegContRecStart = XLogCtl->firstSegContRecStart;
+    lastSegContRecEnd = XLogCtl->lastSegContRecEnd;
+    SpinLockRelease(&XLogCtl->info_lck);
+
+    /*
+     * Don't move forward if the current flush position may be within a
+     * continuation record that spans over segments.
+     */
+    if (lastTargetRecPtr == InvalidXLogRecPtr ||
+        firstSegContRecStart == InvalidXLogRecPtr ||
+        XLogSegmentOffset(LogwrtResult.Flush, wal_segment_size) != 0 ||
+        LogwrtResult.Flush < firstSegContRecStart ||
+        lastSegContRecEnd <= LogwrtResult.Flush)
+        lastTargetRecPtr = LogwrtResult.Flush;
+
+    return lastTargetRecPtr;
+}
+
 /*
  * GetLastImportantRecPtr -- Returns the LSN of the last important record
  * inserted. All records not explicitly marked as unimportant are considered
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2eb19ad293..00d8701a60 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2637,14 +2637,14 @@ XLogSendPhysical(void)
         /*
          * Streaming the current timeline on a primary.
          *
-         * Attempt to send all data that's already been written out and
-         * fsync'd to disk.  We cannot go further than what's been written out
-         * given the current implementation of WALRead().  And in any case
-         * it's unsafe to send WAL that is not securely down to disk on the
-         * primary: if the primary subsequently crashes and restarts, standbys
-         * must not have applied any WAL that got lost on the primary.
+         * Attempt to send all data that's can be replicated.  We cannot go
+         * further than what's been written out given the current
+         * implementation of WALRead().  And in any case it's unsafe to send
+         * WAL that is not securely down to disk on the primary: if the primary
+         * subsequently crashes and restarts, standbys must not have applied
+         * any WAL that got lost on the primary.
          */
-        SendRqstPtr = GetFlushRecPtr();
+        SendRqstPtr = GetReplicationTargetRecPtr();
     }
 
     /*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 221af87e71..94876f628c 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -338,6 +338,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetReplicationTargetRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
 
-- 
2.27.0

From 6367ea9ffac7b6e6e692ddacfb8dc997c222c296 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 16 Dec 2020 10:36:42 +0900
Subject: [PATCH v4 2/2] debug print

---
 src/backend/access/transam/xlog.c | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8705809160..a1bd00ae1b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1192,6 +1192,11 @@ XLogInsertRecord(XLogRecData *rdata,
              */
             Assert (startseg + 1 == endseg);
 
+            if (XLogCtl->firstSegContRecStart == InvalidXLogRecPtr)
+                ereport(LOG, (errmsg("REG-REG: (%lX, %lX)", StartPos, EndPos),
errhidestmt(true),errhidecontext(true)));
+            else
+                ereport(LOG, (errmsg("UPD-REG: (%lX, %lX)", XLogCtl->firstSegContRecStart, EndPos),
errhidestmt(true),errhidecontext(true)));
+
             SpinLockAcquire(&XLogCtl->info_lck);
             if (XLogCtl->firstSegContRecStart == InvalidXLogRecPtr)
                 XLogCtl->firstSegContRecStart = StartPos;
@@ -2674,6 +2679,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                 if (LogwrtResult.Write < firstSegContRecStart ||
                     lastSegContRecEnd <= LogwrtResult.Write)
                 {
+                    ereport(LOG, (errmsg("NOTIFY1 %lX-%lX: (%lX, %lX) %lX(/%lX) %lX", GetLastNotifiedSegment() + 1,
openLogSegNo,firstSegContRecStart, lastSegContRecEnd, WriteRqst.Write, XLogSegmentOffset(WriteRqst.Write,
wal_segment_size),LogwrtResult.Write), errhidestmt(true),errhidecontext(true)));
 
+
                     LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
 
                     if (XLogArchivingActive())
@@ -2681,6 +2688,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 
                     if (lastSegContRecEnd <= LogwrtResult.Write)
                     {
+                        ereport(LOG, (errmsg("CLEAR-REG: (%lX, %lX) %lX, %lX", firstSegContRecStart,
lastSegContRecEnd,WriteRqst.Write, LogwrtResult.Write), errhidestmt(true),errhidecontext(true)));
 
                         /*
                          * We got out of the continuation region, reset the
                          * locations.
@@ -2705,6 +2713,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                      * case the record at the requested LSN continues to the
                      * next segment.
                      */
+                    ereport(LOG, (errmsg("NOT-NOTIFY: (%lX, %lX) %lX(/%lX) %lX(/%lX)", firstSegContRecStart,
lastSegContRecEnd,LogwrtResult.Write, XLogSegmentOffset(LogwrtResult.Write, wal_segment_size), WriteRqst.Write,
XLogSegmentOffset(WriteRqst.Write,wal_segment_size)), errhidestmt(true),errhidecontext(true)));
 
                     if (XLogSegmentOffset(WriteRqst.Write, wal_segment_size)
                         == 0)
                     {
@@ -2722,9 +2731,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                          */
                         if (oldseg == currseg &&
                             WriteRqst.Write < lastSegContRecEnd)
+                        {
+                            ereport(LOG, (errmsg("EXTEND-RQST: (%lX, %lX(%lX)) %lX(%lX) => %d", firstSegContRecStart,
lastSegContRecEnd,oldseg, WriteRqst.Write, currseg, (oldseg == currseg && WriteRqst.Write <= lastSegContRecEnd)),
errhidestmt(true),errhidecontext(true)));
                             WriteRqst.Write = lastSegContRecEnd;
+                        }
                         else
                         {
+                            ereport(LOG, (errmsg("SKIP-NOTIFY: (%lX, %lX(%lX)) %lX(%lX) => %d", firstSegContRecStart,
lastSegContRecEnd,oldseg, WriteRqst.Write, currseg, (oldseg == currseg && WriteRqst.Write <= lastSegContRecEnd)),
errhidestmt(true),errhidecontext(true)));
                             /*
                              * We forgot the exact placement of the record
                              * around requested write LSN. Refrain from
@@ -2780,7 +2793,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
      * need to notify the last segment here.
      */
     if (seg_notify)
+    {
+        ereport(LOG, (errmsg("NOTIFY2 %lX-%lX: (%lX, %lX) %lX, seg %lX", GetLastNotifiedSegment() + 1, openLogSegNo -
1,XLogCtl->firstSegContRecStart, XLogCtl->lastSegContRecEnd, LogwrtResult.Write, openLogSegNo - 1),
errhidestmt(true),errhidecontext(true)));
+
         NotifySegmentsUpTo(openLogSegNo - 1);
+    }
 
     Assert(npages == 0);
 
-- 
2.27.0


pgsql-hackers by date:

Previous
From: Michael Paquier
Date:
Subject: Re: pg_shmem_allocations & documentation
Next
From: Kyotaro Horiguchi
Date:
Subject: Re: Add Information during standby recovery conflicts