Re: Race condition in SyncRepGetSyncStandbysPriority - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Race condition in SyncRepGetSyncStandbysPriority
Date
Msg-id 3020.1587137510@sss.pgh.pa.us
Whole thread Raw
In response to Re: Race condition in SyncRepGetSyncStandbysPriority  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Responses Re: Race condition in SyncRepGetSyncStandbysPriority
Re: Race condition in SyncRepGetSyncStandbysPriority
List pgsql-hackers
Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes:
> At Fri, 17 Apr 2020 16:03:34 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in
>> I agree that it might be worth considering the removal of am_sync for
>> the master branch or v14. But I think that it should not be
>> back-patched.

> Ah! Agreed.

Yeah, that's not necessary to fix the bug.  I'd be inclined to leave
it for v14 at this point.

I don't much like the patch Fujii-san posted, though.  An important part
of the problem, IMO, is that SyncRepGetSyncStandbysPriority is too
complicated and it's unclear what dependencies it has on the set of
priorities in shared memory being consistent.  His patch does not improve
that situation; if anything it makes it worse.

If we're concerned about not breaking ABI in the back branches, what
I propose we do about that is just leave SyncRepGetSyncStandbys in
place but not used by the core code, and remove it only in HEAD.
We can do an absolutely minimal fix for the assertion failure, in
case anybody is calling that code, by just dropping the Assert and
letting SyncRepGetSyncStandbys return NIL if it falls out.  (Or we
could let it return the incomplete list, which'd be the behavior
you get today in a non-assert build.)

Also, I realized while re-reading my patch that Kyotaro-san is onto
something about the is_sync_standby flag not being necessary: instead
we can just have the new function SyncRepGetCandidateStandbys return
a reduced count.  I'd initially believed that it was necessary for
that function to return the rejected candidate walsenders along with
the accepted ones, but that was a misunderstanding.  I still don't
want its API spec to say anything about ordering of the result array,
but we don't need to.

So that leads me to the attached.  I propose applying this to the
back branches except for the rearrangement of WALSnd field order.
In HEAD, I'd remove SyncRepGetSyncStandbys and subroutines altogether.

            regards, tom lane

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ffd5b31..b47c7fa 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                                        XLogRecPtr *flushPtr,
                                        XLogRecPtr *applyPtr,
-                                       List *sync_standbys);
+                                       SyncRepStandbyData *sync_standbys,
+                                       int num_standbys);
 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                                           XLogRecPtr *flushPtr,
                                           XLogRecPtr *applyPtr,
-                                          List *sync_standbys, uint8 nth);
+                                          SyncRepStandbyData *sync_standbys,
+                                          int num_standbys,
+                                          uint8 nth);
 static int    SyncRepGetStandbyPriority(void);
 static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
 static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int    standby_priority_comparator(const void *a, const void *b);
 static int    cmp_lsn(const void *a, const void *b);

 #ifdef USE_ASSERT_CHECKING
@@ -406,9 +410,10 @@ SyncRepInitConfig(void)
     priority = SyncRepGetStandbyPriority();
     if (MyWalSnd->sync_standby_priority != priority)
     {
-        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+        SpinLockAcquire(&MyWalSnd->mutex);
         MyWalSnd->sync_standby_priority = priority;
-        LWLockRelease(SyncRepLock);
+        SpinLockRelease(&MyWalSnd->mutex);
+
         ereport(DEBUG1,
                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
                         application_name, priority)));
@@ -523,8 +528,6 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
- * The caller must hold SyncRepLock.
- *
  * Return false if the number of sync standbys is less than
  * synchronous_standby_names specifies. Otherwise return true and
  * store the positions into *writePtr, *flushPtr and *applyPtr.
@@ -536,27 +539,41 @@ static bool
 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                      XLogRecPtr *applyPtr, bool *am_sync)
 {
-    List       *sync_standbys;
-
-    Assert(LWLockHeldByMe(SyncRepLock));
+    SyncRepStandbyData *sync_standbys;
+    int            num_standbys;
+    int            i;

+    /* Initialize default results */
     *writePtr = InvalidXLogRecPtr;
     *flushPtr = InvalidXLogRecPtr;
     *applyPtr = InvalidXLogRecPtr;
     *am_sync = false;

+    /* Quick out if not even configured to be synchronous */
+    if (SyncRepConfig == NULL)
+        return false;
+
     /* Get standbys that are considered as synchronous at this moment */
-    sync_standbys = SyncRepGetSyncStandbys(am_sync);
+    num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
+
+    /* Am I among the candidate sync standbys? */
+    for (i = 0; i < num_standbys; i++)
+    {
+        if (sync_standbys[i].is_me)
+        {
+            *am_sync = true;
+            break;
+        }
+    }

     /*
-     * Quick exit if we are not managing a sync standby or there are not
-     * enough synchronous standbys.
+     * Nothing more to do if we are not managing a sync standby or there are
+     * not enough synchronous standbys.
      */
     if (!(*am_sync) ||
-        SyncRepConfig == NULL ||
-        list_length(sync_standbys) < SyncRepConfig->num_sync)
+        num_standbys < SyncRepConfig->num_sync)
     {
-        list_free(sync_standbys);
+        pfree(sync_standbys);
         return false;
     }

@@ -576,15 +593,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     {
         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                   sync_standbys);
+                                   sync_standbys, num_standbys);
     }
     else
     {
         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                      sync_standbys, SyncRepConfig->num_sync);
+                                      sync_standbys, num_standbys,
+                                      SyncRepConfig->num_sync);
     }

-    list_free(sync_standbys);
+    pfree(sync_standbys);
     return true;
 }

@@ -592,27 +610,24 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
  */
 static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                           XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+                           XLogRecPtr *flushPtr,
+                           XLogRecPtr *applyPtr,
+                           SyncRepStandbyData *sync_standbys,
+                           int num_standbys)
 {
-    ListCell   *cell;
+    int            i;

     /*
      * Scan through all sync standbys and calculate the oldest Write, Flush
-     * and Apply positions.
+     * and Apply positions.  We assume *writePtr et al were initialized to
+     * InvalidXLogRecPtr.
      */
-    foreach(cell, sync_standbys)
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-        XLogRecPtr    write;
-        XLogRecPtr    flush;
-        XLogRecPtr    apply;
-
-        SpinLockAcquire(&walsnd->mutex);
-        write = walsnd->write;
-        flush = walsnd->flush;
-        apply = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
+        XLogRecPtr    write = sync_standbys[i].write;
+        XLogRecPtr    flush = sync_standbys[i].flush;
+        XLogRecPtr    apply = sync_standbys[i].apply;

         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
             *writePtr = write;
@@ -628,38 +643,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * standbys.
  */
 static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                              XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+                              XLogRecPtr *flushPtr,
+                              XLogRecPtr *applyPtr,
+                              SyncRepStandbyData *sync_standbys,
+                              int num_standbys,
+                              uint8 nth)
 {
-    ListCell   *cell;
     XLogRecPtr *write_array;
     XLogRecPtr *flush_array;
     XLogRecPtr *apply_array;
-    int            len;
-    int            i = 0;
+    int            i;

-    len = list_length(sync_standbys);
-    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+    /* Should have enough candidates, or somebody messed up */
+    Assert(nth > 0 && nth <= num_standbys);

-    foreach(cell, sync_standbys)
-    {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);

-        SpinLockAcquire(&walsnd->mutex);
-        write_array[i] = walsnd->write;
-        flush_array[i] = walsnd->flush;
-        apply_array[i] = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
-
-        i++;
+    for (i = 0; i < num_standbys; i++)
+    {
+        write_array[i] = sync_standbys[i].write;
+        flush_array[i] = sync_standbys[i].flush;
+        apply_array[i] = sync_standbys[i].apply;
     }

     /* Sort each array in descending order */
-    qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);

     /* Get Nth latest Write, Flush, Apply positions */
     *writePtr = write_array[nth - 1];
@@ -689,12 +702,121 @@ cmp_lsn(const void *a, const void *b)
 }

 /*
+ * Return data about walsenders that are candidates to be sync standbys.
+ *
+ * *standbys is set to a palloc'd array of structs of per-walsender data,
+ * and the number of valid entries (candidate sync senders) is returned.
+ * (This might be more or fewer than num_sync; caller must check.)
+ */
+int
+SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
+{
+    int            i;
+    int            n;
+
+    /* Create result array */
+    *standbys = (SyncRepStandbyData *)
+        palloc(max_wal_senders * sizeof(SyncRepStandbyData));
+
+    /* Quick exit if sync replication is not requested */
+    if (SyncRepConfig == NULL)
+        return 0;
+
+    /* Collect raw data from shared memory */
+    n = 0;
+    for (i = 0; i < max_wal_senders; i++)
+    {
+        volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
+                                     * rearrangement */
+        SyncRepStandbyData *stby;
+        WalSndState state;        /* not included in SyncRepStandbyData */
+
+        walsnd = &WalSndCtl->walsnds[i];
+        stby = *standbys + n;
+
+        SpinLockAcquire(&walsnd->mutex);
+        stby->pid = walsnd->pid;
+        state = walsnd->state;
+        stby->write = walsnd->write;
+        stby->flush = walsnd->flush;
+        stby->apply = walsnd->apply;
+        stby->sync_standby_priority = walsnd->sync_standby_priority;
+        SpinLockRelease(&walsnd->mutex);
+
+        /* Must be active */
+        if (stby->pid == 0)
+            continue;
+
+        /* Must be streaming or stopping */
+        if (state != WALSNDSTATE_STREAMING &&
+            state != WALSNDSTATE_STOPPING)
+            continue;
+
+        /* Must be synchronous */
+        if (stby->sync_standby_priority == 0)
+            continue;
+
+        /* Must have a valid flush position */
+        if (XLogRecPtrIsInvalid(stby->flush))
+            continue;
+
+        /* OK, it's a candidate */
+        stby->walsnd_index = i;
+        stby->is_me = (walsnd == MyWalSnd);
+        n++;
+    }
+
+    /*
+     * In quorum mode, we return all the candidates.  In priority mode, if we
+     * have too many candidates then return only the num_sync ones of highest
+     * priority.
+     */
+    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
+        n > SyncRepConfig->num_sync)
+    {
+        /* Sort by priority ... */
+        qsort(*standbys, n, sizeof(SyncRepStandbyData),
+              standby_priority_comparator);
+        /* ... then report just the first num_sync ones */
+        n = SyncRepConfig->num_sync;
+    }
+
+    return n;
+}
+
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+    const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+    const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
+
+    /* First, sort by increasing priority value */
+    if (sa->sync_standby_priority != sb->sync_standby_priority)
+        return sa->sync_standby_priority - sb->sync_standby_priority;
+
+    /*
+     * We might have equal priority values; arbitrarily break ties by position
+     * in the WALSnd array.  (This is utterly bogus, since that is arrival
+     * order dependent, but there are regression tests that rely on it.)
+     */
+    return sa->walsnd_index - sb->walsnd_index;
+}
+
+
+/*
  * Return the list of sync standbys, or NIL if no sync standby is connected.
  *
  * The caller must hold SyncRepLock.
  *
  * On return, *am_sync is set to true if this walsender is connecting to
  * sync standby. Otherwise it's set to false.
+ *
+ * XXX This function is BROKEN and should not be used in new code.  It has
+ * an inherent race condition, since the returned list of integer indexes
+ * might no longer correspond to reality.
  */
 List *
 SyncRepGetSyncStandbys(bool *am_sync)
@@ -947,9 +1069,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
         priority = next_highest_priority;
     }

-    /* never reached, but keep compiler quiet */
-    Assert(false);
-    return result;
+    /*
+     * We might get here if the set of sync_standby_priority values in shared
+     * memory is inconsistent, as can happen transiently after a change in the
+     * synchronous_standby_names setting.  In that case, give up and report
+     * that there are no synchronous candidates.
+     */
+    list_free(result);
+    list_free(pending);
+    return NIL;
 }

 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d1..0e93322 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
              * Found a free slot. Reserve it for us.
              */
             walsnd->pid = MyProcPid;
+            walsnd->state = WALSNDSTATE_STARTUP;
             walsnd->sentPtr = InvalidXLogRecPtr;
+            walsnd->needreload = false;
             walsnd->write = InvalidXLogRecPtr;
             walsnd->flush = InvalidXLogRecPtr;
             walsnd->apply = InvalidXLogRecPtr;
             walsnd->writeLag = -1;
             walsnd->flushLag = -1;
             walsnd->applyLag = -1;
-            walsnd->state = WALSNDSTATE_STARTUP;
+            walsnd->sync_standby_priority = 0;
             walsnd->latch = &MyProc->procLatch;
             walsnd->replyTime = 0;
             walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     Tuplestorestate *tupstore;
     MemoryContext per_query_ctx;
     MemoryContext oldcontext;
-    List       *sync_standbys;
+    SyncRepStandbyData *sync_standbys;
+    int            num_standbys;
     int            i;

     /* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     MemoryContextSwitchTo(oldcontext);

     /*
-     * Get the currently active synchronous standbys.
+     * Get the currently active synchronous standbys.  This could be out of
+     * date before we're done, but we'll use the data anyway.
      */
-    LWLockAcquire(SyncRepLock, LW_SHARED);
-    sync_standbys = SyncRepGetSyncStandbys(NULL);
-    LWLockRelease(SyncRepLock);
+    num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);

     for (i = 0; i < max_wal_senders; i++)
     {
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         int64        spillTxns;
         int64        spillCount;
         int64        spillBytes;
+        bool        is_sync_standby;
         Datum        values[PG_STAT_GET_WAL_SENDERS_COLS];
         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+        int            j;

+        /* Collect data from shared memory */
         SpinLockAcquire(&walsnd->mutex);
         if (walsnd->pid == 0)
         {
@@ -3311,6 +3316,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         spillBytes = walsnd->spillBytes;
         SpinLockRelease(&walsnd->mutex);

+        /*
+         * Detect whether walsender is/was considered synchronous.  We can
+         * provide some protection against stale data by checking the PID
+         * along with walsnd_index.
+         */
+        is_sync_standby = false;
+        for (j = 0; j < num_standbys; j++)
+        {
+            if (sync_standbys[j].walsnd_index == i &&
+                sync_standbys[j].pid == pid)
+            {
+                is_sync_standby = true;
+                break;
+            }
+        }
+
         memset(nulls, 0, sizeof(nulls));
         values[0] = Int32GetDatum(pid);

@@ -3380,7 +3401,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
              */
             if (priority == 0)
                 values[10] = CStringGetTextDatum("async");
-            else if (list_member_int(sync_standbys, i))
+            else if (is_sync_standby)
                 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
                     CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
             else
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c5f0e91..e38f6ba 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -37,6 +37,24 @@
 #define SYNC_REP_QUORUM        1

 /*
+ * SyncRepGetCandidateStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+    /* Copies of relevant fields from WalSnd shared-memory struct */
+    pid_t        pid;
+    XLogRecPtr    write;
+    XLogRecPtr    flush;
+    XLogRecPtr    apply;
+    int            sync_standby_priority;
+    /* Index of this walsender in the WalSnd shared-memory array */
+    int            walsnd_index;
+    /* This flag indicates whether this struct is about our own process */
+    bool        is_me;
+} SyncRepStandbyData;
+
+/*
  * Struct for the configuration of synchronous replication.
  *
  * Note: this must be a flat representation that can be held in a single
@@ -74,6 +92,9 @@ extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);

 /* called by wal sender and user backend */
+extern int    SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
+
+/* obsolete, do not use in new code */
 extern List *SyncRepGetSyncStandbys(bool *am_sync);

 /* called by checkpointer */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f..734acec 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -31,8 +31,7 @@ typedef enum WalSndState
 /*
  * Each walsender has a WalSnd struct in shared memory.
  *
- * This struct is protected by 'mutex', with two exceptions: one is
- * sync_standby_priority as noted below.  The other exception is that some
+ * This struct is protected by its 'mutex' spinlock field, except that some
  * members are only written by the walsender process itself, and thus that
  * process is free to read those members without holding spinlock.  pid and
  * needreload always require the spinlock to be held for all accesses.
@@ -60,6 +59,12 @@ typedef struct WalSnd
     TimeOffset    flushLag;
     TimeOffset    applyLag;

+    /*
+     * The priority order of the standby managed by this WALSender, as listed
+     * in synchronous_standby_names, or 0 if not-listed.
+     */
+    int            sync_standby_priority;
+
     /* Protects shared variables shown above. */
     slock_t        mutex;

@@ -70,13 +75,6 @@ typedef struct WalSnd
     Latch       *latch;

     /*
-     * The priority order of the standby managed by this WALSender, as listed
-     * in synchronous_standby_names, or 0 if not-listed. Protected by
-     * SyncRepLock.
-     */
-    int            sync_standby_priority;
-
-    /*
      * Timestamp of the last message received from standby.
      */
     TimestampTz replyTime;

pgsql-hackers by date:

Previous
From: Fujii Masao
Date:
Subject: Re: 001_rep_changes.pl stalls
Next
From: Nikita Glukhov
Date:
Subject: matchingsel() and NULL-returning operators