Re: Race condition in SyncRepGetSyncStandbysPriority - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Race condition in SyncRepGetSyncStandbysPriority
Date
Msg-id 20200415.162650.893917978307659374.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Race condition in SyncRepGetSyncStandbysPriority  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
List pgsql-hackers
At Wed, 15 Apr 2020 11:35:58 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> I'm looking this more closer.

It looks to be in the right direction to me.

As mentioned in the previous mail, I removed is_sync_standby from
SycnStandbyData. But just doing that breaks pg_stat_get_wal_senders.
It is an exsting issue but the logic for sync_state (values[10]) looks
odd. Fixed in the attached.

SyncRepInitConfig uses mutex instead of SyncRepLock. Since anyway the
integrity of sync_standby_priority is not guaranteed, it seems OK to
me. It seems fine to remove the assertion and requirement about
SyncRepLock from SyncRepGetSyncRecPtr for the same reason. (Actually
the lock is held, though.) 

SyncRepGetSyncStandbysPriority doesn't seem worth existing as a
function. Removed in the attached.

+    num_standbys = SyncRepGetSyncStandbys(&sync_standbys);

The list is no longer consists only of synchronous standbys.  I
changed the function name, variable name and tried to adjust related
comments.

It's not what the patch did, but I don't understand why
SyncRepGetNthLatestSyncRecPtr takes SyncRepConfig->num_sync but
SyncRepGetOldest.. accesses it directly.  Changed the function
*Oldest* in the attached.  I didn't do that but finally, the two
functions can be consolidated, just by moving the selection logic
currently in SyncRepGetSyncRecPtr into the new function.


The resulting patch is attached.

- removed is_sync_standby from SyncRepStandbyData
- Fixed the logic for values[10] in pg_stat_get_wal_senders
- Changed the signature of SyncRepGetOldestSyncRecPtr
- Adjusted some comments to the behavioral change of
  SyncRepGet(Sync)Standbys.

regards.


-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ffd5b31eb2..e8a47f98b3 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -108,14 +108,17 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                                        XLogRecPtr *flushPtr,
                                        XLogRecPtr *applyPtr,
-                                       List *sync_standbys);
+                                       SyncRepStandbyData *sync_standbys,
+                                       int num_standbys,
+                                       uint8 nsyncs);
 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                                           XLogRecPtr *flushPtr,
                                           XLogRecPtr *applyPtr,
-                                          List *sync_standbys, uint8 nth);
+                                          SyncRepStandbyData *sync_standbys,
+                                          int num_standbys,
+                                          uint8 nth);
 static int    SyncRepGetStandbyPriority(void);
-static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
-static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int    standby_priority_comparator(const void *a, const void *b);
 static int    cmp_lsn(const void *a, const void *b);
 
 #ifdef USE_ASSERT_CHECKING
@@ -406,9 +409,10 @@ SyncRepInitConfig(void)
     priority = SyncRepGetStandbyPriority();
     if (MyWalSnd->sync_standby_priority != priority)
     {
-        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+        SpinLockAcquire(&MyWalSnd->mutex);
         MyWalSnd->sync_standby_priority = priority;
-        LWLockRelease(SyncRepLock);
+        SpinLockRelease(&MyWalSnd->mutex);
+
         ereport(DEBUG1,
                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
                         application_name, priority)));
@@ -523,8 +527,6 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
- * The caller must hold SyncRepLock.
- *
  * Return false if the number of sync standbys is less than
  * synchronous_standby_names specifies. Otherwise return true and
  * store the positions into *writePtr, *flushPtr and *applyPtr.
@@ -536,27 +538,41 @@ static bool
 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                      XLogRecPtr *applyPtr, bool *am_sync)
 {
-    List       *sync_standbys;
-
-    Assert(LWLockHeldByMe(SyncRepLock));
+    SyncRepStandbyData *standbys;
+    int            num_standbys;
+    int            i;
 
+    /* Initialize default results */
     *writePtr = InvalidXLogRecPtr;
     *flushPtr = InvalidXLogRecPtr;
     *applyPtr = InvalidXLogRecPtr;
     *am_sync = false;
 
-    /* Get standbys that are considered as synchronous at this moment */
-    sync_standbys = SyncRepGetSyncStandbys(am_sync);
+    /* Quick out if not even configured to be synchronous */
+    if (SyncRepConfig == NULL)
+        return false;
+
+    /* Get standbys.  They are in priority order if we are in priority mode */
+    num_standbys = SyncRepGetStandbys(&standbys);
+
+    /* Am I among the candidate sync standbys? */
+    for (i = 0; i < num_standbys; i++)
+    {
+        if (standbys[i].is_me)
+        {
+            *am_sync = (i < SyncRepConfig->num_sync);
+            break;
+        }
+    }
 
     /*
-     * Quick exit if we are not managing a sync standby or there are not
-     * enough synchronous standbys.
+     * Nothing more to do if we are not managing a sync standby or there are
+     * not enough synchronous standbys.
      */
     if (!(*am_sync) ||
-        SyncRepConfig == NULL ||
-        list_length(sync_standbys) < SyncRepConfig->num_sync)
+        num_standbys < SyncRepConfig->num_sync)
     {
-        list_free(sync_standbys);
+        pfree(standbys);
         return false;
     }
 
@@ -576,43 +592,48 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     {
         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                   sync_standbys);
+                                   standbys, num_standbys,
+                                   SyncRepConfig->num_sync);
     }
     else
     {
         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                      sync_standbys, SyncRepConfig->num_sync);
+                                      standbys, num_standbys,
+                                      SyncRepConfig->num_sync);
     }
 
-    list_free(sync_standbys);
+    pfree(standbys);
     return true;
 }
 
 /*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Calculate the oldest Write among the first nsync standbys, Flush and Apply
+ * positions among sync standbys.
  */
 static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                           XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+                           XLogRecPtr *flushPtr,
+                           XLogRecPtr *applyPtr,
+                           SyncRepStandbyData *sync_standbys,
+                           int num_standbys,
+                           uint8 nsyncs)
 {
-    ListCell   *cell;
+    int            i;
 
     /*
      * Scan through all sync standbys and calculate the oldest Write, Flush
-     * and Apply positions.
+     * and Apply positions.  We assume *writePtr et al were initialized to
+     * InvalidXLogRecPtr.
      */
-    foreach(cell, sync_standbys)
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-        XLogRecPtr    write;
-        XLogRecPtr    flush;
-        XLogRecPtr    apply;
+        XLogRecPtr    write = sync_standbys[i].write;
+        XLogRecPtr    flush = sync_standbys[i].flush;
+        XLogRecPtr    apply = sync_standbys[i].apply;
 
-        SpinLockAcquire(&walsnd->mutex);
-        write = walsnd->write;
-        flush = walsnd->flush;
-        apply = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
+        /* Ignore candidates that aren't considered synchronous */
+        if (i >= nsyncs)
+            break;
 
         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
             *writePtr = write;
@@ -628,38 +649,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * standbys.
  */
 static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                              XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+                              XLogRecPtr *flushPtr,
+                              XLogRecPtr *applyPtr,
+                              SyncRepStandbyData *sync_standbys,
+                              int num_standbys,
+                              uint8 nth)
 {
-    ListCell   *cell;
     XLogRecPtr *write_array;
     XLogRecPtr *flush_array;
     XLogRecPtr *apply_array;
-    int            len;
-    int            i = 0;
+    int            i;
+    int            n;
 
-    len = list_length(sync_standbys);
-    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
 
-    foreach(cell, sync_standbys)
+    n = 0;
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-
-        SpinLockAcquire(&walsnd->mutex);
-        write_array[i] = walsnd->write;
-        flush_array[i] = walsnd->flush;
-        apply_array[i] = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
-
-        i++;
+        write_array[n] = sync_standbys[i].write;
+        flush_array[n] = sync_standbys[i].flush;
+        apply_array[n] = sync_standbys[i].apply;
+        n++;
     }
 
+    /* Should have enough, or somebody messed up */
+    Assert(n >= nth);
+
     /* Sort each array in descending order */
-    qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn);
 
     /* Get Nth latest Write, Flush, Apply positions */
     *writePtr = write_array[nth - 1];
@@ -689,67 +711,51 @@ cmp_lsn(const void *a, const void *b)
 }
 
 /*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
+ * Return data about active walsenders.
  *
- * The caller must hold SyncRepLock.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * *standbys is set to a palloc'd array of structs of per-walsender data, and
+ * the number of valid entries is returned.  The entries are in
+ * sync_standby_priority order if the current SyncRepConfig->syncrep_method is
+ * SYNC_REP_PRIORITY.  The first SyncRepConfig->num_sync entries are
+ * the candiates of synchronous standby.
  */
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+int
+SyncRepGetStandbys(SyncRepStandbyData **standbys)
 {
-    Assert(LWLockHeldByMe(SyncRepLock));
+    int            i;
+    int            n;
 
-    /* Set default result */
-    if (am_sync != NULL)
-        *am_sync = false;
+    /* Create result array */
+    *standbys = (SyncRepStandbyData *)
+        palloc(max_wal_senders * sizeof(SyncRepStandbyData));
 
     /* Quick exit if sync replication is not requested */
     if (SyncRepConfig == NULL)
-        return NIL;
-
-    return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
-        SyncRepGetSyncStandbysPriority(am_sync) :
-        SyncRepGetSyncStandbysQuorum(am_sync);
-}
-
-/*
- * Return the list of all the candidates for quorum sync standbys,
- * or NIL if no such standby is connected.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a quorum-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysQuorum(bool *am_sync)
-{
-    List       *result = NIL;
-    int            i;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */
-
-    Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+        return false;
 
+    /* Collect raw data from shared memory */
+    n = 0;
     for (i = 0; i < max_wal_senders; i++)
     {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
+        volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
+                                     * rearrangement */
+        SyncRepStandbyData *stby;
+        WalSndState state;        /* not included in SyncRepStandbyData */
 
         walsnd = &WalSndCtl->walsnds[i];
+        stby = *standbys + n;
 
         SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
+        stby->pid = walsnd->pid;
         state = walsnd->state;
+        stby->write = walsnd->write;
+        stby->flush = walsnd->flush;
+        stby->apply = walsnd->apply;
+        stby->sync_standby_priority = walsnd->sync_standby_priority;
         SpinLockRelease(&walsnd->mutex);
 
         /* Must be active */
-        if (pid == 0)
+        if (stby->pid == 0)
             continue;
 
         /* Must be streaming or stopping */
@@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
             continue;
 
         /* Must be synchronous */
-        if (walsnd->sync_standby_priority == 0)
+        if (stby->sync_standby_priority == 0)
             continue;
 
         /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
+        if (XLogRecPtrIsInvalid(stby->flush))
             continue;
 
-        /*
-         * Consider this standby as a candidate for quorum sync standbys and
-         * append it to the result.
-         */
-        result = lappend_int(result, i);
-        if (am_sync != NULL && walsnd == MyWalSnd)
-            *am_sync = true;
+        /* OK, it's a candidate */
+        stby->walsnd_index = i;
+        stby->is_me = (walsnd == MyWalSnd);
+        n++;
     }
 
-    return result;
-}
-
-/*
- * Return the list of sync standbys chosen based on their priorities,
- * or NIL if no sync standby is connected.
- *
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a priority-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysPriority(bool *am_sync)
-{
-    List       *result = NIL;
-    List       *pending = NIL;
-    int            lowest_priority;
-    int            next_highest_priority;
-    int            this_priority;
-    int            priority;
-    int            i;
-    bool        am_in_pending = false;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */
-
-    Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
-
-    lowest_priority = SyncRepConfig->nmembers;
-    next_highest_priority = lowest_priority + 1;
-
     /*
-     * Find the sync standbys which have the highest priority (i.e, 1). Also
-     * store all the other potential sync standbys into the pending list, in
-     * order to scan it later and find other sync standbys from it quickly.
+     * In quorum mode, that's all we have to do.  In priority mode, sort the
+     * candidates by priority.
      */
-    for (i = 0; i < max_wal_senders; i++)
-    {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
-
-        walsnd = &WalSndCtl->walsnds[i];
-
-        SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
-        state = walsnd->state;
-        SpinLockRelease(&walsnd->mutex);
-
-        /* Must be active */
-        if (pid == 0)
-            continue;
-
-        /* Must be streaming or stopping */
-        if (state != WALSNDSTATE_STREAMING &&
-            state != WALSNDSTATE_STOPPING)
-            continue;
-
-        /* Must be synchronous */
-        this_priority = walsnd->sync_standby_priority;
-        if (this_priority == 0)
-            continue;
-
-        /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
-            continue;
+    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+        qsort(*standbys, n, sizeof(SyncRepStandbyData),
+              standby_priority_comparator);
 
-        /*
-         * If the priority is equal to 1, consider this standby as sync and
-         * append it to the result. Otherwise append this standby to the
-         * pending list to check if it's actually sync or not later.
-         */
-        if (this_priority == 1)
-        {
-            result = lappend_int(result, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                *am_sync = true;
-            if (list_length(result) == SyncRepConfig->num_sync)
-            {
-                list_free(pending);
-                return result;    /* Exit if got enough sync standbys */
-            }
-        }
-        else
-        {
-            pending = lappend_int(pending, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                am_in_pending = true;
+    return n;
+}
 
-            /*
-             * Track the highest priority among the standbys in the pending
-             * list, in order to use it as the starting priority for later
-             * scan of the list. This is useful to find quickly the sync
-             * standbys from the pending list later because we can skip
-             * unnecessary scans for the unused priorities.
-             */
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-    }
 
-    /*
-     * Consider all pending standbys as sync if the number of them plus
-     * already-found sync ones is lower than the configuration requests.
-     */
-    if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
-    {
-        /*
-         * Set *am_sync to true if this walsender is in the pending list
-         * because all pending standbys are considered as sync.
-         */
-        if (am_sync != NULL && !(*am_sync))
-            *am_sync = am_in_pending;
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+    const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+    const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
 
-        result = list_concat(result, pending);
-        list_free(pending);
-        return result;
-    }
+    /* First, sort by increasing priority value */
+    if (sa->sync_standby_priority != sb->sync_standby_priority)
+        return sa->sync_standby_priority - sb->sync_standby_priority;
 
     /*
-     * Find the sync standbys from the pending list.
+     * We might have equal priority values; arbitrarily break ties by position
+     * in the WALSnd array.  (This is utterly bogus, since that is arrival
+     * order dependent, but there are regression tests that rely on it.)
      */
-    priority = next_highest_priority;
-    while (priority <= lowest_priority)
-    {
-        ListCell   *cell;
-
-        next_highest_priority = lowest_priority + 1;
-
-        foreach(cell, pending)
-        {
-            i = lfirst_int(cell);
-            walsnd = &WalSndCtl->walsnds[i];
-
-            this_priority = walsnd->sync_standby_priority;
-            if (this_priority == priority)
-            {
-                result = lappend_int(result, i);
-                if (am_sync != NULL && walsnd == MyWalSnd)
-                    *am_sync = true;
-
-                /*
-                 * We should always exit here after the scan of pending list
-                 * starts because we know that the list has enough elements to
-                 * reach SyncRepConfig->num_sync.
-                 */
-                if (list_length(result) == SyncRepConfig->num_sync)
-                {
-                    list_free(pending);
-                    return result;    /* Exit if got enough sync standbys */
-                }
-
-                /*
-                 * Remove the entry for this sync standby from the list to
-                 * prevent us from looking at the same entry again.
-                 */
-                pending = foreach_delete_current(pending, cell);
-
-                continue;        /* don't adjust next_highest_priority */
-            }
-
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-
-        priority = next_highest_priority;
-    }
-
-    /* never reached, but keep compiler quiet */
-    Assert(false);
-    return result;
+    return sa->walsnd_index - sb->walsnd_index;
 }
 
+
 /*
  * Check if we are in the list of sync standbys, and if so, determine
  * priority sequence. Return priority if set, or zero to indicate that
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d144d..7889ea5f6f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
              * Found a free slot. Reserve it for us.
              */
             walsnd->pid = MyProcPid;
+            walsnd->state = WALSNDSTATE_STARTUP;
             walsnd->sentPtr = InvalidXLogRecPtr;
+            walsnd->needreload = false;
             walsnd->write = InvalidXLogRecPtr;
             walsnd->flush = InvalidXLogRecPtr;
             walsnd->apply = InvalidXLogRecPtr;
             walsnd->writeLag = -1;
             walsnd->flushLag = -1;
             walsnd->applyLag = -1;
-            walsnd->state = WALSNDSTATE_STARTUP;
+            walsnd->sync_standby_priority = 0;
             walsnd->latch = &MyProc->procLatch;
             walsnd->replyTime = 0;
             walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     Tuplestorestate *tupstore;
     MemoryContext per_query_ctx;
     MemoryContext oldcontext;
-    List       *sync_standbys;
+    SyncRepStandbyData *standbys;
+    int            num_standbys;
     int            i;
 
     /* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     MemoryContextSwitchTo(oldcontext);
 
     /*
-     * Get the currently active synchronous standbys.
+     * Get the currently active standbys in priority order.  This could be out
+     * of date before we're done, but we'll use the data anyway.
      */
-    LWLockAcquire(SyncRepLock, LW_SHARED);
-    sync_standbys = SyncRepGetSyncStandbys(NULL);
-    LWLockRelease(SyncRepLock);
+    num_standbys = SyncRepGetStandbys(&standbys);
 
     for (i = 0; i < max_wal_senders; i++)
     {
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         int64        spillTxns;
         int64        spillCount;
         int64        spillBytes;
+        bool        is_sync_standby;
         Datum        values[PG_STAT_GET_WAL_SENDERS_COLS];
         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+        int            j;
 
+        /* Collect data from shared memory */
         SpinLockAcquire(&walsnd->mutex);
         if (walsnd->pid == 0)
         {
@@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         spillBytes = walsnd->spillBytes;
         SpinLockRelease(&walsnd->mutex);
 
+        /* Detect whether walsender is/was considered synchronous */
+        is_sync_standby = false;
+        for (j = 0; j < num_standbys; j++)
+        {
+            if (standbys[j].walsnd_index == i)
+            {
+                is_sync_standby = (j < SyncRepConfig->num_sync);
+                break;
+            }
+        }
+
         memset(nulls, 0, sizeof(nulls));
         values[0] = Int32GetDatum(pid);
 
@@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
              */
             if (priority == 0)
                 values[10] = CStringGetTextDatum("async");
-            else if (list_member_int(sync_standbys, i))
-                values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
-                    CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
+            else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+            {
+                if (is_sync_standby)
+                    values[10] = CStringGetTextDatum("sync");
+                else
+                    values[10] = CStringGetTextDatum("potential");
+            }
             else
-                values[10] = CStringGetTextDatum("potential");
+                values[10] = CStringGetTextDatum("quorum");
 
             if (replyTime == 0)
                 nulls[11] = true;
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c5f0e91aad..533aac25c7 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -36,6 +36,24 @@
 #define SYNC_REP_PRIORITY        0
 #define SYNC_REP_QUORUM        1
 
+/*
+ * SyncRepGetSyncStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+    /* Copies of relevant fields from WalSnd shared-memory struct */
+    pid_t        pid;
+    XLogRecPtr    write;
+    XLogRecPtr    flush;
+    XLogRecPtr    apply;
+    int            sync_standby_priority;
+    /* Index of this walsender in the WalSnd shared-memory array */
+    int            walsnd_index;
+    /* This flag indicates whether this struct is about our own process */
+    bool        is_me;
+} SyncRepStandbyData;
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
 
 /* called by wal sender and user backend */
-extern List *SyncRepGetSyncStandbys(bool *am_sync);
+extern int    SyncRepGetStandbys(SyncRepStandbyData **standbys);
 
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f0a4..734acec2a4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -31,8 +31,7 @@ typedef enum WalSndState
 /*
  * Each walsender has a WalSnd struct in shared memory.
  *
- * This struct is protected by 'mutex', with two exceptions: one is
- * sync_standby_priority as noted below.  The other exception is that some
+ * This struct is protected by its 'mutex' spinlock field, except that some
  * members are only written by the walsender process itself, and thus that
  * process is free to read those members without holding spinlock.  pid and
  * needreload always require the spinlock to be held for all accesses.
@@ -60,6 +59,12 @@ typedef struct WalSnd
     TimeOffset    flushLag;
     TimeOffset    applyLag;
 
+    /*
+     * The priority order of the standby managed by this WALSender, as listed
+     * in synchronous_standby_names, or 0 if not-listed.
+     */
+    int            sync_standby_priority;
+
     /* Protects shared variables shown above. */
     slock_t        mutex;
 
@@ -69,13 +74,6 @@ typedef struct WalSnd
      */
     Latch       *latch;
 
-    /*
-     * The priority order of the standby managed by this WALSender, as listed
-     * in synchronous_standby_names, or 0 if not-listed. Protected by
-     * SyncRepLock.
-     */
-    int            sync_standby_priority;
-
     /*
      * Timestamp of the last message received from standby.
      */

pgsql-hackers by date:

Previous
From: David Rowley
Date:
Subject: Parallel Append can break run-time partition pruning
Next
From: Kyotaro Horiguchi
Date:
Subject: Re: Race condition in SyncRepGetSyncStandbysPriority