Re: Race condition in SyncRepGetSyncStandbysPriority - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Race condition in SyncRepGetSyncStandbysPriority
Date
Msg-id 20200416.182626.526651137261602571.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Race condition in SyncRepGetSyncStandbysPriority  (Masahiko Sawada <masahiko.sawada@2ndquadrant.com>)
Responses Re: Race condition in SyncRepGetSyncStandbysPriority  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-hackers
At Thu, 16 Apr 2020 16:48:28 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in 
> This is just a notice; I'm reading your latest patch but it seems to
> include unrelated changes:
> 
> $ git diff --stat
>  src/backend/replication/syncrep.c           | 475
>
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------------------------------------------------------------------------------------------------------------------------
>  src/backend/replication/walsender.c         |  40 ++++++++++++++-----
>  src/bin/pg_dump/compress_io.c               |  12 ++++++
>  src/bin/pg_dump/pg_backup_directory.c       |  48 ++++++++++++++++++-----
>  src/include/replication/syncrep.h           |  20 +++++++++-
>  src/include/replication/walsender_private.h |  16 ++++----
>  6 files changed, 274 insertions(+), 337 deletions(-)

Ugg.  I failed to clean up working directory..  I didn't noticed as I
made the file by git diff. Thanks for noticing me of that.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ffd5b31eb2..99a7bbbc86 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -103,19 +103,21 @@ static int    SyncRepWakeQueue(bool all, int mode);
 
 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
                                  XLogRecPtr *flushPtr,
-                                 XLogRecPtr *applyPtr,
-                                 bool *am_sync);
+                                 XLogRecPtr *applyPtr);
 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                                        XLogRecPtr *flushPtr,
                                        XLogRecPtr *applyPtr,
-                                       List *sync_standbys);
+                                       SyncRepStandbyData *sync_standbys,
+                                       int num_standbys,
+                                       uint8 nsyncs);
 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                                           XLogRecPtr *flushPtr,
                                           XLogRecPtr *applyPtr,
-                                          List *sync_standbys, uint8 nth);
+                                          SyncRepStandbyData *sync_standbys,
+                                          int num_standbys,
+                                          uint8 nth);
 static int    SyncRepGetStandbyPriority(void);
-static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
-static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int    standby_priority_comparator(const void *a, const void *b);
 static int    cmp_lsn(const void *a, const void *b);
 
 #ifdef USE_ASSERT_CHECKING
@@ -406,9 +408,10 @@ SyncRepInitConfig(void)
     priority = SyncRepGetStandbyPriority();
     if (MyWalSnd->sync_standby_priority != priority)
     {
-        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+        SpinLockAcquire(&MyWalSnd->mutex);
         MyWalSnd->sync_standby_priority = priority;
-        LWLockRelease(SyncRepLock);
+        SpinLockRelease(&MyWalSnd->mutex);
+
         ereport(DEBUG1,
                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
                         application_name, priority)));
@@ -429,8 +432,7 @@ SyncRepReleaseWaiters(void)
     XLogRecPtr    writePtr;
     XLogRecPtr    flushPtr;
     XLogRecPtr    applyPtr;
-    bool        got_recptr;
-    bool        am_sync;
+    bool        release_waiters;
     int            numwrite = 0;
     int            numflush = 0;
     int            numapply = 0;
@@ -458,16 +460,24 @@ SyncRepReleaseWaiters(void)
     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
     /*
-     * Check whether we are a sync standby or not, and calculate the synced
-     * positions among all sync standbys.
+     * Check whether we may release any of waiter processes, and calculate the
+     * synced positions.
      */
-    got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
+    release_waiters = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr);
+
+    /* Return if nothing to do. */
+    if (!release_waiters)
+    {
+        LWLockRelease(SyncRepLock);
+        announce_next_takeover = true;
+        return;
+    }
 
     /*
-     * If we are managing a sync standby, though we weren't prior to this,
-     * then announce we are now a sync standby.
+     * If this walsender becomes to be able to release waiter processes,
+     * announce about that.
      */
-    if (announce_next_takeover && am_sync)
+    if (announce_next_takeover)
     {
         announce_next_takeover = false;
 
@@ -481,17 +491,6 @@ SyncRepReleaseWaiters(void)
                             application_name)));
     }
 
-    /*
-     * If the number of sync standbys is less than requested or we aren't
-     * managing a sync standby then just leave.
-     */
-    if (!got_recptr || !am_sync)
-    {
-        LWLockRelease(SyncRepLock);
-        announce_next_takeover = !am_sync;
-        return;
-    }
-
     /*
      * Set the lsn first so that when we wake backends they will release up to
      * this location.
@@ -523,43 +522,62 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
- * The caller must hold SyncRepLock.
- *
- * Return false if the number of sync standbys is less than
- * synchronous_standby_names specifies. Otherwise return true and
- * store the positions into *writePtr, *flushPtr and *applyPtr.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * Return false if this walsender cannot release any of waiteres. Otherwise
+ * return true and store the positions into *writePtr, *flushPtr and *applyPtr.
  */
 static bool
 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                     XLogRecPtr *applyPtr, bool *am_sync)
+                     XLogRecPtr *applyPtr)
 {
-    List       *sync_standbys;
-
-    Assert(LWLockHeldByMe(SyncRepLock));
+    SyncRepStandbyData *standbys;
+    int            num_standbys;
+    int            i;
+    bool        sync_priority =
+        (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
 
+    /* Initialize default results */
     *writePtr = InvalidXLogRecPtr;
     *flushPtr = InvalidXLogRecPtr;
     *applyPtr = InvalidXLogRecPtr;
-    *am_sync = false;
 
-    /* Get standbys that are considered as synchronous at this moment */
-    sync_standbys = SyncRepGetSyncStandbys(am_sync);
+    /* Quick out if not even configured to be synchronous */
+    if (SyncRepConfig == NULL)
+        return false;
+
+    /* Get standbys.  They are in priority order. */
+    num_standbys = SyncRepGetStandbys(&standbys);
 
     /*
-     * Quick exit if we are not managing a sync standby or there are not
-     * enough synchronous standbys.
+     * Nothing more to do if there are not enough synchronous standbys or
+     * candidates.
      */
-    if (!(*am_sync) ||
-        SyncRepConfig == NULL ||
-        list_length(sync_standbys) < SyncRepConfig->num_sync)
+    if (num_standbys < SyncRepConfig->num_sync)
     {
-        list_free(sync_standbys);
+        pfree(standbys);
         return false;
     }
 
+    /* When priority mode, nothing to do if I an not a sync standby */
+    if (sync_priority)
+    {
+        bool        am_sync = false;
+
+        for (i = 0; i < num_standbys; i++)
+        {
+            if (standbys[i].is_me)
+            {
+                am_sync = true;
+                break;
+            }
+        }
+
+        if (!am_sync)
+        {
+            pfree(standbys);
+            return false;
+        }
+    }
+
     /*
      * In a priority-based sync replication, the synced positions are the
      * oldest ones among sync standbys. In a quorum-based, they are the Nth
@@ -573,46 +591,51 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
      * positions even in a quorum-based sync replication.
      */
-    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+    if (sync_priority)
     {
         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                   sync_standbys);
+                                   standbys, num_standbys,
+                                   SyncRepConfig->num_sync);
     }
     else
     {
         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                      sync_standbys, SyncRepConfig->num_sync);
+                                      standbys, num_standbys,
+                                      SyncRepConfig->num_sync);
     }
 
-    list_free(sync_standbys);
+    pfree(standbys);
     return true;
 }
 
 /*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Calculate the oldest Write among the first nsync standbys, Flush and Apply
+ * positions among sync standbys.
  */
 static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                           XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+                           XLogRecPtr *flushPtr,
+                           XLogRecPtr *applyPtr,
+                           SyncRepStandbyData *sync_standbys,
+                           int num_standbys,
+                           uint8 nsyncs)
 {
-    ListCell   *cell;
+    int            i;
 
     /*
      * Scan through all sync standbys and calculate the oldest Write, Flush
-     * and Apply positions.
+     * and Apply positions.  We assume *writePtr et al were initialized to
+     * InvalidXLogRecPtr.
      */
-    foreach(cell, sync_standbys)
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-        XLogRecPtr    write;
-        XLogRecPtr    flush;
-        XLogRecPtr    apply;
+        XLogRecPtr    write = sync_standbys[i].write;
+        XLogRecPtr    flush = sync_standbys[i].flush;
+        XLogRecPtr    apply = sync_standbys[i].apply;
 
-        SpinLockAcquire(&walsnd->mutex);
-        write = walsnd->write;
-        flush = walsnd->flush;
-        apply = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
+        /* Ignore candidates that aren't considered synchronous */
+        if (i >= nsyncs)
+            break;
 
         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
             *writePtr = write;
@@ -628,38 +651,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * standbys.
  */
 static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                              XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+                              XLogRecPtr *flushPtr,
+                              XLogRecPtr *applyPtr,
+                              SyncRepStandbyData *sync_standbys,
+                              int num_standbys,
+                              uint8 nth)
 {
-    ListCell   *cell;
     XLogRecPtr *write_array;
     XLogRecPtr *flush_array;
     XLogRecPtr *apply_array;
-    int            len;
-    int            i = 0;
+    int            i;
+    int            n;
 
-    len = list_length(sync_standbys);
-    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
 
-    foreach(cell, sync_standbys)
+    n = 0;
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-
-        SpinLockAcquire(&walsnd->mutex);
-        write_array[i] = walsnd->write;
-        flush_array[i] = walsnd->flush;
-        apply_array[i] = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
-
-        i++;
+        write_array[n] = sync_standbys[i].write;
+        flush_array[n] = sync_standbys[i].flush;
+        apply_array[n] = sync_standbys[i].apply;
+        n++;
     }
 
+    /* Should have enough, or somebody messed up */
+    Assert(n >= nth);
+
     /* Sort each array in descending order */
-    qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn);
 
     /* Get Nth latest Write, Flush, Apply positions */
     *writePtr = write_array[nth - 1];
@@ -689,67 +713,49 @@ cmp_lsn(const void *a, const void *b)
 }
 
 /*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
+ * Return data about active walsenders.
  *
- * The caller must hold SyncRepLock.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * *standbys is set to a palloc'd array of structs of per-walsender data, and
+ * the number of valid entries is returned.  The entries are in
+ * sync_standby_priority order.
  */
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+int
+SyncRepGetStandbys(SyncRepStandbyData **standbys)
 {
-    Assert(LWLockHeldByMe(SyncRepLock));
+    int            i;
+    int            n;
 
-    /* Set default result */
-    if (am_sync != NULL)
-        *am_sync = false;
+    /* Create result array */
+    *standbys = (SyncRepStandbyData *)
+        palloc(max_wal_senders * sizeof(SyncRepStandbyData));
 
     /* Quick exit if sync replication is not requested */
     if (SyncRepConfig == NULL)
-        return NIL;
-
-    return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
-        SyncRepGetSyncStandbysPriority(am_sync) :
-        SyncRepGetSyncStandbysQuorum(am_sync);
-}
-
-/*
- * Return the list of all the candidates for quorum sync standbys,
- * or NIL if no such standby is connected.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a quorum-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysQuorum(bool *am_sync)
-{
-    List       *result = NIL;
-    int            i;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */
-
-    Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+        return false;
 
+    /* Collect raw data from shared memory */
+    n = 0;
     for (i = 0; i < max_wal_senders; i++)
     {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
+        volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
+                                     * rearrangement */
+        SyncRepStandbyData *stby;
+        WalSndState state;        /* not included in SyncRepStandbyData */
 
         walsnd = &WalSndCtl->walsnds[i];
+        stby = *standbys + n;
 
         SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
+        stby->pid = walsnd->pid;
         state = walsnd->state;
+        stby->write = walsnd->write;
+        stby->flush = walsnd->flush;
+        stby->apply = walsnd->apply;
+        stby->sync_standby_priority = walsnd->sync_standby_priority;
         SpinLockRelease(&walsnd->mutex);
 
         /* Must be active */
-        if (pid == 0)
+        if (stby->pid == 0)
             continue;
 
         /* Must be streaming or stopping */
@@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
             continue;
 
         /* Must be synchronous */
-        if (walsnd->sync_standby_priority == 0)
+        if (stby->sync_standby_priority == 0)
             continue;
 
         /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
+        if (XLogRecPtrIsInvalid(stby->flush))
             continue;
 
-        /*
-         * Consider this standby as a candidate for quorum sync standbys and
-         * append it to the result.
-         */
-        result = lappend_int(result, i);
-        if (am_sync != NULL && walsnd == MyWalSnd)
-            *am_sync = true;
+        /* OK, it's a candidate */
+        stby->walsnd_index = i;
+        stby->is_me = (walsnd == MyWalSnd);
+        n++;
     }
 
-    return result;
-}
-
-/*
- * Return the list of sync standbys chosen based on their priorities,
- * or NIL if no sync standby is connected.
- *
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a priority-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysPriority(bool *am_sync)
-{
-    List       *result = NIL;
-    List       *pending = NIL;
-    int            lowest_priority;
-    int            next_highest_priority;
-    int            this_priority;
-    int            priority;
-    int            i;
-    bool        am_in_pending = false;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */
-
-    Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
-
-    lowest_priority = SyncRepConfig->nmembers;
-    next_highest_priority = lowest_priority + 1;
-
     /*
-     * Find the sync standbys which have the highest priority (i.e, 1). Also
-     * store all the other potential sync standbys into the pending list, in
-     * order to scan it later and find other sync standbys from it quickly.
+     * In quorum mode, that's all we have to do since all entries are in the
+     * same priority 1.  In priority mode, sort the candidates by priority.
      */
-    for (i = 0; i < max_wal_senders; i++)
-    {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
-
-        walsnd = &WalSndCtl->walsnds[i];
-
-        SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
-        state = walsnd->state;
-        SpinLockRelease(&walsnd->mutex);
-
-        /* Must be active */
-        if (pid == 0)
-            continue;
-
-        /* Must be streaming or stopping */
-        if (state != WALSNDSTATE_STREAMING &&
-            state != WALSNDSTATE_STOPPING)
-            continue;
-
-        /* Must be synchronous */
-        this_priority = walsnd->sync_standby_priority;
-        if (this_priority == 0)
-            continue;
-
-        /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
-            continue;
+    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+        qsort(*standbys, n, sizeof(SyncRepStandbyData),
+              standby_priority_comparator);
 
-        /*
-         * If the priority is equal to 1, consider this standby as sync and
-         * append it to the result. Otherwise append this standby to the
-         * pending list to check if it's actually sync or not later.
-         */
-        if (this_priority == 1)
-        {
-            result = lappend_int(result, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                *am_sync = true;
-            if (list_length(result) == SyncRepConfig->num_sync)
-            {
-                list_free(pending);
-                return result;    /* Exit if got enough sync standbys */
-            }
-        }
-        else
-        {
-            pending = lappend_int(pending, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                am_in_pending = true;
+    return n;
+}
 
-            /*
-             * Track the highest priority among the standbys in the pending
-             * list, in order to use it as the starting priority for later
-             * scan of the list. This is useful to find quickly the sync
-             * standbys from the pending list later because we can skip
-             * unnecessary scans for the unused priorities.
-             */
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-    }
 
-    /*
-     * Consider all pending standbys as sync if the number of them plus
-     * already-found sync ones is lower than the configuration requests.
-     */
-    if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
-    {
-        /*
-         * Set *am_sync to true if this walsender is in the pending list
-         * because all pending standbys are considered as sync.
-         */
-        if (am_sync != NULL && !(*am_sync))
-            *am_sync = am_in_pending;
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+    const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+    const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
 
-        result = list_concat(result, pending);
-        list_free(pending);
-        return result;
-    }
+    /* First, sort by increasing priority value */
+    if (sa->sync_standby_priority != sb->sync_standby_priority)
+        return sa->sync_standby_priority - sb->sync_standby_priority;
 
     /*
-     * Find the sync standbys from the pending list.
+     * We might have equal priority values; arbitrarily break ties by position
+     * in the WALSnd array.  (This is utterly bogus, since that is arrival
+     * order dependent, but there are regression tests that rely on it.)
      */
-    priority = next_highest_priority;
-    while (priority <= lowest_priority)
-    {
-        ListCell   *cell;
-
-        next_highest_priority = lowest_priority + 1;
-
-        foreach(cell, pending)
-        {
-            i = lfirst_int(cell);
-            walsnd = &WalSndCtl->walsnds[i];
-
-            this_priority = walsnd->sync_standby_priority;
-            if (this_priority == priority)
-            {
-                result = lappend_int(result, i);
-                if (am_sync != NULL && walsnd == MyWalSnd)
-                    *am_sync = true;
-
-                /*
-                 * We should always exit here after the scan of pending list
-                 * starts because we know that the list has enough elements to
-                 * reach SyncRepConfig->num_sync.
-                 */
-                if (list_length(result) == SyncRepConfig->num_sync)
-                {
-                    list_free(pending);
-                    return result;    /* Exit if got enough sync standbys */
-                }
-
-                /*
-                 * Remove the entry for this sync standby from the list to
-                 * prevent us from looking at the same entry again.
-                 */
-                pending = foreach_delete_current(pending, cell);
-
-                continue;        /* don't adjust next_highest_priority */
-            }
-
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-
-        priority = next_highest_priority;
-    }
-
-    /* never reached, but keep compiler quiet */
-    Assert(false);
-    return result;
+    return sa->walsnd_index - sb->walsnd_index;
 }
 
+
 /*
  * Check if we are in the list of sync standbys, and if so, determine
  * priority sequence. Return priority if set, or zero to indicate that
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d144d..7889ea5f6f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
              * Found a free slot. Reserve it for us.
              */
             walsnd->pid = MyProcPid;
+            walsnd->state = WALSNDSTATE_STARTUP;
             walsnd->sentPtr = InvalidXLogRecPtr;
+            walsnd->needreload = false;
             walsnd->write = InvalidXLogRecPtr;
             walsnd->flush = InvalidXLogRecPtr;
             walsnd->apply = InvalidXLogRecPtr;
             walsnd->writeLag = -1;
             walsnd->flushLag = -1;
             walsnd->applyLag = -1;
-            walsnd->state = WALSNDSTATE_STARTUP;
+            walsnd->sync_standby_priority = 0;
             walsnd->latch = &MyProc->procLatch;
             walsnd->replyTime = 0;
             walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     Tuplestorestate *tupstore;
     MemoryContext per_query_ctx;
     MemoryContext oldcontext;
-    List       *sync_standbys;
+    SyncRepStandbyData *standbys;
+    int            num_standbys;
     int            i;
 
     /* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     MemoryContextSwitchTo(oldcontext);
 
     /*
-     * Get the currently active synchronous standbys.
+     * Get the currently active standbys in priority order.  This could be out
+     * of date before we're done, but we'll use the data anyway.
      */
-    LWLockAcquire(SyncRepLock, LW_SHARED);
-    sync_standbys = SyncRepGetSyncStandbys(NULL);
-    LWLockRelease(SyncRepLock);
+    num_standbys = SyncRepGetStandbys(&standbys);
 
     for (i = 0; i < max_wal_senders; i++)
     {
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         int64        spillTxns;
         int64        spillCount;
         int64        spillBytes;
+        bool        is_sync_standby;
         Datum        values[PG_STAT_GET_WAL_SENDERS_COLS];
         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+        int            j;
 
+        /* Collect data from shared memory */
         SpinLockAcquire(&walsnd->mutex);
         if (walsnd->pid == 0)
         {
@@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         spillBytes = walsnd->spillBytes;
         SpinLockRelease(&walsnd->mutex);
 
+        /* Detect whether walsender is/was considered synchronous */
+        is_sync_standby = false;
+        for (j = 0; j < num_standbys; j++)
+        {
+            if (standbys[j].walsnd_index == i)
+            {
+                is_sync_standby = (j < SyncRepConfig->num_sync);
+                break;
+            }
+        }
+
         memset(nulls, 0, sizeof(nulls));
         values[0] = Int32GetDatum(pid);
 
@@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
              */
             if (priority == 0)
                 values[10] = CStringGetTextDatum("async");
-            else if (list_member_int(sync_standbys, i))
-                values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
-                    CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
+            else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+            {
+                if (is_sync_standby)
+                    values[10] = CStringGetTextDatum("sync");
+                else
+                    values[10] = CStringGetTextDatum("potential");
+            }
             else
-                values[10] = CStringGetTextDatum("potential");
+                values[10] = CStringGetTextDatum("quorum");
 
             if (replyTime == 0)
                 nulls[11] = true;
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c5f0e91aad..533aac25c7 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -36,6 +36,24 @@
 #define SYNC_REP_PRIORITY        0
 #define SYNC_REP_QUORUM        1
 
+/*
+ * SyncRepGetSyncStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+    /* Copies of relevant fields from WalSnd shared-memory struct */
+    pid_t        pid;
+    XLogRecPtr    write;
+    XLogRecPtr    flush;
+    XLogRecPtr    apply;
+    int            sync_standby_priority;
+    /* Index of this walsender in the WalSnd shared-memory array */
+    int            walsnd_index;
+    /* This flag indicates whether this struct is about our own process */
+    bool        is_me;
+} SyncRepStandbyData;
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
 
 /* called by wal sender and user backend */
-extern List *SyncRepGetSyncStandbys(bool *am_sync);
+extern int    SyncRepGetStandbys(SyncRepStandbyData **standbys);
 
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f0a4..734acec2a4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -31,8 +31,7 @@ typedef enum WalSndState
 /*
  * Each walsender has a WalSnd struct in shared memory.
  *
- * This struct is protected by 'mutex', with two exceptions: one is
- * sync_standby_priority as noted below.  The other exception is that some
+ * This struct is protected by its 'mutex' spinlock field, except that some
  * members are only written by the walsender process itself, and thus that
  * process is free to read those members without holding spinlock.  pid and
  * needreload always require the spinlock to be held for all accesses.
@@ -60,6 +59,12 @@ typedef struct WalSnd
     TimeOffset    flushLag;
     TimeOffset    applyLag;
 
+    /*
+     * The priority order of the standby managed by this WALSender, as listed
+     * in synchronous_standby_names, or 0 if not-listed.
+     */
+    int            sync_standby_priority;
+
     /* Protects shared variables shown above. */
     slock_t        mutex;
 
@@ -69,13 +74,6 @@ typedef struct WalSnd
      */
     Latch       *latch;
 
-    /*
-     * The priority order of the standby managed by this WALSender, as listed
-     * in synchronous_standby_names, or 0 if not-listed. Protected by
-     * SyncRepLock.
-     */
-    int            sync_standby_priority;
-
     /*
      * Timestamp of the last message received from standby.
      */

pgsql-hackers by date:

Previous
From: Etsuro Fujita
Date:
Subject: Re: [HACKERS] advanced partition matching algorithm forpartition-wise join
Next
From: Dilip Kumar
Date:
Subject: Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions