Re: Race condition in SyncRepGetSyncStandbysPriority - Mailing list pgsql-hackers

From Kyotaro Horiguchi
Subject Re: Race condition in SyncRepGetSyncStandbysPriority
Date
Msg-id 20200416.162241.2092362802599118592.horikyota.ntt@gmail.com
Whole thread Raw
In response to Re: Race condition in SyncRepGetSyncStandbysPriority  (Tom Lane <tgl@sss.pgh.pa.us>)
Responses Re: Race condition in SyncRepGetSyncStandbysPriority  (Masahiko Sawada <masahiko.sawada@2ndquadrant.com>)
List pgsql-hackers
At Wed, 15 Apr 2020 11:31:49 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in 
> Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes:
> > At Tue, 14 Apr 2020 16:32:40 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in 
> > +        stby->is_sync_standby = true;    /* might change below */
> 
> > I'm uneasy with that.  In quorum mode all running standbys are marked
> > as "sync" and that's bogus.
> 
> I don't follow that?  The existing coding of SyncRepGetSyncStandbysQuorum
> returns all the candidates in its list, so this is isomorphic to that.

The existing code actully does that. On the other hand
SyncRepGetSyncStandbysPriority returns standbys that *are known to be*
synchronous, but *Quorum returns standbys that *can be* synchronous.
What the two functions return are different from each other.  So it
should be is_sync_standby for -Priority and is_sync_candidate for
-Quorum.

> Possibly a different name for the flag would be more suited?
> 
> > On the other hand sync_standbys is already sorted in priority order so I think we can get rid of the member by
setting*am_sync as the follows.
 
> 
> > SyncRepGetSyncRecPtr:
> >   if (sync_standbys[i].is_me)
> >   {
> >       *am_sync = (i < SyncRepConfig->num_sync);
> >       break;
> >   }
> 
> I disagree with this, it will change the behavior in the quorum case.

Oops, you're right.  I find the whole thing there (and me) is a bit
confusing. syncrep_method affects how some values (specifically
am_sync and sync_standbys) are translated at several calling depths.
And the *am_sync informs nothing in quorum mode.

> In any case, a change like this will cause callers to know way more than
> they ought to about the ordering of the array.  In my mind, the fact that
> SyncRepGetSyncStandbysPriority is sorting the array is an internal
> implementation detail; I do not want it to be part of the API.

Anyway the am_sync and is_sync_standby is utterly useless in quorum
mode.  That discussion is pretty persuasive if not, but actually the
upper layers (SyncRepReleaseWaiters and SyncRepGetSyncRecPtr) referes
to syncrep_method to differentiate the interpretation of the am_sync
flag and sync_standbys list.  So anyway the difference is actually a
part of API.

After thinking some more, I concluded that some of the variables are
wrongly named or considered, and redundant. The fucntion of am_sync is
covered by got_recptr in SyncRepReleaseWaiters, so it's enough that
SyncRepGetSyncRecPtr just reports to the caller whether the caller may
release some of the waiter processes. This simplifies the related
functions and make it (to me) clearer.

Please find the attached.


> (Apropos to that, I realized from working on this patch that there's
> another, completely undocumented assumption in the existing code, that
> the integer list will be sorted by walsender index for equal priorities.
> I don't like that either, and not just because it's undocumented.)

That seems accidentally. Sorting by priority is the disigned behavior
and documented, in contrast, entries of the same priority are ordered
in index order by accident and not documented, that means it can be
changed anytime.  I think we don't define everyting in such detail.

regards.

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ffd5b31eb2..99a7bbbc86 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -103,19 +103,21 @@ static int    SyncRepWakeQueue(bool all, int mode);
 
 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
                                  XLogRecPtr *flushPtr,
-                                 XLogRecPtr *applyPtr,
-                                 bool *am_sync);
+                                 XLogRecPtr *applyPtr);
 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                                        XLogRecPtr *flushPtr,
                                        XLogRecPtr *applyPtr,
-                                       List *sync_standbys);
+                                       SyncRepStandbyData *sync_standbys,
+                                       int num_standbys,
+                                       uint8 nsyncs);
 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                                           XLogRecPtr *flushPtr,
                                           XLogRecPtr *applyPtr,
-                                          List *sync_standbys, uint8 nth);
+                                          SyncRepStandbyData *sync_standbys,
+                                          int num_standbys,
+                                          uint8 nth);
 static int    SyncRepGetStandbyPriority(void);
-static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
-static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int    standby_priority_comparator(const void *a, const void *b);
 static int    cmp_lsn(const void *a, const void *b);
 
 #ifdef USE_ASSERT_CHECKING
@@ -406,9 +408,10 @@ SyncRepInitConfig(void)
     priority = SyncRepGetStandbyPriority();
     if (MyWalSnd->sync_standby_priority != priority)
     {
-        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+        SpinLockAcquire(&MyWalSnd->mutex);
         MyWalSnd->sync_standby_priority = priority;
-        LWLockRelease(SyncRepLock);
+        SpinLockRelease(&MyWalSnd->mutex);
+
         ereport(DEBUG1,
                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
                         application_name, priority)));
@@ -429,8 +432,7 @@ SyncRepReleaseWaiters(void)
     XLogRecPtr    writePtr;
     XLogRecPtr    flushPtr;
     XLogRecPtr    applyPtr;
-    bool        got_recptr;
-    bool        am_sync;
+    bool        release_waiters;
     int            numwrite = 0;
     int            numflush = 0;
     int            numapply = 0;
@@ -458,16 +460,24 @@ SyncRepReleaseWaiters(void)
     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
     /*
-     * Check whether we are a sync standby or not, and calculate the synced
-     * positions among all sync standbys.
+     * Check whether we may release any of waiter processes, and calculate the
+     * synced positions.
      */
-    got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
+    release_waiters = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr);
+
+    /* Return if nothing to do. */
+    if (!release_waiters)
+    {
+        LWLockRelease(SyncRepLock);
+        announce_next_takeover = true;
+        return;
+    }
 
     /*
-     * If we are managing a sync standby, though we weren't prior to this,
-     * then announce we are now a sync standby.
+     * If this walsender becomes to be able to release waiter processes,
+     * announce about that.
      */
-    if (announce_next_takeover && am_sync)
+    if (announce_next_takeover)
     {
         announce_next_takeover = false;
 
@@ -481,17 +491,6 @@ SyncRepReleaseWaiters(void)
                             application_name)));
     }
 
-    /*
-     * If the number of sync standbys is less than requested or we aren't
-     * managing a sync standby then just leave.
-     */
-    if (!got_recptr || !am_sync)
-    {
-        LWLockRelease(SyncRepLock);
-        announce_next_takeover = !am_sync;
-        return;
-    }
-
     /*
      * Set the lsn first so that when we wake backends they will release up to
      * this location.
@@ -523,43 +522,62 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
- * The caller must hold SyncRepLock.
- *
- * Return false if the number of sync standbys is less than
- * synchronous_standby_names specifies. Otherwise return true and
- * store the positions into *writePtr, *flushPtr and *applyPtr.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * Return false if this walsender cannot release any of waiteres. Otherwise
+ * return true and store the positions into *writePtr, *flushPtr and *applyPtr.
  */
 static bool
 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                     XLogRecPtr *applyPtr, bool *am_sync)
+                     XLogRecPtr *applyPtr)
 {
-    List       *sync_standbys;
-
-    Assert(LWLockHeldByMe(SyncRepLock));
+    SyncRepStandbyData *standbys;
+    int            num_standbys;
+    int            i;
+    bool        sync_priority =
+        (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
 
+    /* Initialize default results */
     *writePtr = InvalidXLogRecPtr;
     *flushPtr = InvalidXLogRecPtr;
     *applyPtr = InvalidXLogRecPtr;
-    *am_sync = false;
 
-    /* Get standbys that are considered as synchronous at this moment */
-    sync_standbys = SyncRepGetSyncStandbys(am_sync);
+    /* Quick out if not even configured to be synchronous */
+    if (SyncRepConfig == NULL)
+        return false;
+
+    /* Get standbys.  They are in priority order. */
+    num_standbys = SyncRepGetStandbys(&standbys);
 
     /*
-     * Quick exit if we are not managing a sync standby or there are not
-     * enough synchronous standbys.
+     * Nothing more to do if there are not enough synchronous standbys or
+     * candidates.
      */
-    if (!(*am_sync) ||
-        SyncRepConfig == NULL ||
-        list_length(sync_standbys) < SyncRepConfig->num_sync)
+    if (num_standbys < SyncRepConfig->num_sync)
     {
-        list_free(sync_standbys);
+        pfree(standbys);
         return false;
     }
 
+    /* When priority mode, nothing to do if I an not a sync standby */
+    if (sync_priority)
+    {
+        bool        am_sync = false;
+
+        for (i = 0; i < num_standbys; i++)
+        {
+            if (standbys[i].is_me)
+            {
+                am_sync = true;
+                break;
+            }
+        }
+
+        if (!am_sync)
+        {
+            pfree(standbys);
+            return false;
+        }
+    }
+
     /*
      * In a priority-based sync replication, the synced positions are the
      * oldest ones among sync standbys. In a quorum-based, they are the Nth
@@ -573,46 +591,51 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
      * positions even in a quorum-based sync replication.
      */
-    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+    if (sync_priority)
     {
         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                   sync_standbys);
+                                   standbys, num_standbys,
+                                   SyncRepConfig->num_sync);
     }
     else
     {
         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                      sync_standbys, SyncRepConfig->num_sync);
+                                      standbys, num_standbys,
+                                      SyncRepConfig->num_sync);
     }
 
-    list_free(sync_standbys);
+    pfree(standbys);
     return true;
 }
 
 /*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Calculate the oldest Write among the first nsync standbys, Flush and Apply
+ * positions among sync standbys.
  */
 static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                           XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+                           XLogRecPtr *flushPtr,
+                           XLogRecPtr *applyPtr,
+                           SyncRepStandbyData *sync_standbys,
+                           int num_standbys,
+                           uint8 nsyncs)
 {
-    ListCell   *cell;
+    int            i;
 
     /*
      * Scan through all sync standbys and calculate the oldest Write, Flush
-     * and Apply positions.
+     * and Apply positions.  We assume *writePtr et al were initialized to
+     * InvalidXLogRecPtr.
      */
-    foreach(cell, sync_standbys)
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-        XLogRecPtr    write;
-        XLogRecPtr    flush;
-        XLogRecPtr    apply;
+        XLogRecPtr    write = sync_standbys[i].write;
+        XLogRecPtr    flush = sync_standbys[i].flush;
+        XLogRecPtr    apply = sync_standbys[i].apply;
 
-        SpinLockAcquire(&walsnd->mutex);
-        write = walsnd->write;
-        flush = walsnd->flush;
-        apply = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
+        /* Ignore candidates that aren't considered synchronous */
+        if (i >= nsyncs)
+            break;
 
         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
             *writePtr = write;
@@ -628,38 +651,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * standbys.
  */
 static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                              XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+                              XLogRecPtr *flushPtr,
+                              XLogRecPtr *applyPtr,
+                              SyncRepStandbyData *sync_standbys,
+                              int num_standbys,
+                              uint8 nth)
 {
-    ListCell   *cell;
     XLogRecPtr *write_array;
     XLogRecPtr *flush_array;
     XLogRecPtr *apply_array;
-    int            len;
-    int            i = 0;
+    int            i;
+    int            n;
 
-    len = list_length(sync_standbys);
-    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
 
-    foreach(cell, sync_standbys)
+    n = 0;
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-
-        SpinLockAcquire(&walsnd->mutex);
-        write_array[i] = walsnd->write;
-        flush_array[i] = walsnd->flush;
-        apply_array[i] = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
-
-        i++;
+        write_array[n] = sync_standbys[i].write;
+        flush_array[n] = sync_standbys[i].flush;
+        apply_array[n] = sync_standbys[i].apply;
+        n++;
     }
 
+    /* Should have enough, or somebody messed up */
+    Assert(n >= nth);
+
     /* Sort each array in descending order */
-    qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn);
 
     /* Get Nth latest Write, Flush, Apply positions */
     *writePtr = write_array[nth - 1];
@@ -689,67 +713,49 @@ cmp_lsn(const void *a, const void *b)
 }
 
 /*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
+ * Return data about active walsenders.
  *
- * The caller must hold SyncRepLock.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * *standbys is set to a palloc'd array of structs of per-walsender data, and
+ * the number of valid entries is returned.  The entries are in
+ * sync_standby_priority order.
  */
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+int
+SyncRepGetStandbys(SyncRepStandbyData **standbys)
 {
-    Assert(LWLockHeldByMe(SyncRepLock));
+    int            i;
+    int            n;
 
-    /* Set default result */
-    if (am_sync != NULL)
-        *am_sync = false;
+    /* Create result array */
+    *standbys = (SyncRepStandbyData *)
+        palloc(max_wal_senders * sizeof(SyncRepStandbyData));
 
     /* Quick exit if sync replication is not requested */
     if (SyncRepConfig == NULL)
-        return NIL;
-
-    return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
-        SyncRepGetSyncStandbysPriority(am_sync) :
-        SyncRepGetSyncStandbysQuorum(am_sync);
-}
-
-/*
- * Return the list of all the candidates for quorum sync standbys,
- * or NIL if no such standby is connected.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a quorum-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysQuorum(bool *am_sync)
-{
-    List       *result = NIL;
-    int            i;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */
-
-    Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+        return false;
 
+    /* Collect raw data from shared memory */
+    n = 0;
     for (i = 0; i < max_wal_senders; i++)
     {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
+        volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
+                                     * rearrangement */
+        SyncRepStandbyData *stby;
+        WalSndState state;        /* not included in SyncRepStandbyData */
 
         walsnd = &WalSndCtl->walsnds[i];
+        stby = *standbys + n;
 
         SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
+        stby->pid = walsnd->pid;
         state = walsnd->state;
+        stby->write = walsnd->write;
+        stby->flush = walsnd->flush;
+        stby->apply = walsnd->apply;
+        stby->sync_standby_priority = walsnd->sync_standby_priority;
         SpinLockRelease(&walsnd->mutex);
 
         /* Must be active */
-        if (pid == 0)
+        if (stby->pid == 0)
             continue;
 
         /* Must be streaming or stopping */
@@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
             continue;
 
         /* Must be synchronous */
-        if (walsnd->sync_standby_priority == 0)
+        if (stby->sync_standby_priority == 0)
             continue;
 
         /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
+        if (XLogRecPtrIsInvalid(stby->flush))
             continue;
 
-        /*
-         * Consider this standby as a candidate for quorum sync standbys and
-         * append it to the result.
-         */
-        result = lappend_int(result, i);
-        if (am_sync != NULL && walsnd == MyWalSnd)
-            *am_sync = true;
+        /* OK, it's a candidate */
+        stby->walsnd_index = i;
+        stby->is_me = (walsnd == MyWalSnd);
+        n++;
     }
 
-    return result;
-}
-
-/*
- * Return the list of sync standbys chosen based on their priorities,
- * or NIL if no sync standby is connected.
- *
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a priority-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysPriority(bool *am_sync)
-{
-    List       *result = NIL;
-    List       *pending = NIL;
-    int            lowest_priority;
-    int            next_highest_priority;
-    int            this_priority;
-    int            priority;
-    int            i;
-    bool        am_in_pending = false;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */
-
-    Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
-
-    lowest_priority = SyncRepConfig->nmembers;
-    next_highest_priority = lowest_priority + 1;
-
     /*
-     * Find the sync standbys which have the highest priority (i.e, 1). Also
-     * store all the other potential sync standbys into the pending list, in
-     * order to scan it later and find other sync standbys from it quickly.
+     * In quorum mode, that's all we have to do since all entries are in the
+     * same priority 1.  In priority mode, sort the candidates by priority.
      */
-    for (i = 0; i < max_wal_senders; i++)
-    {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
-
-        walsnd = &WalSndCtl->walsnds[i];
-
-        SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
-        state = walsnd->state;
-        SpinLockRelease(&walsnd->mutex);
-
-        /* Must be active */
-        if (pid == 0)
-            continue;
-
-        /* Must be streaming or stopping */
-        if (state != WALSNDSTATE_STREAMING &&
-            state != WALSNDSTATE_STOPPING)
-            continue;
-
-        /* Must be synchronous */
-        this_priority = walsnd->sync_standby_priority;
-        if (this_priority == 0)
-            continue;
-
-        /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
-            continue;
+    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+        qsort(*standbys, n, sizeof(SyncRepStandbyData),
+              standby_priority_comparator);
 
-        /*
-         * If the priority is equal to 1, consider this standby as sync and
-         * append it to the result. Otherwise append this standby to the
-         * pending list to check if it's actually sync or not later.
-         */
-        if (this_priority == 1)
-        {
-            result = lappend_int(result, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                *am_sync = true;
-            if (list_length(result) == SyncRepConfig->num_sync)
-            {
-                list_free(pending);
-                return result;    /* Exit if got enough sync standbys */
-            }
-        }
-        else
-        {
-            pending = lappend_int(pending, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                am_in_pending = true;
+    return n;
+}
 
-            /*
-             * Track the highest priority among the standbys in the pending
-             * list, in order to use it as the starting priority for later
-             * scan of the list. This is useful to find quickly the sync
-             * standbys from the pending list later because we can skip
-             * unnecessary scans for the unused priorities.
-             */
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-    }
 
-    /*
-     * Consider all pending standbys as sync if the number of them plus
-     * already-found sync ones is lower than the configuration requests.
-     */
-    if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
-    {
-        /*
-         * Set *am_sync to true if this walsender is in the pending list
-         * because all pending standbys are considered as sync.
-         */
-        if (am_sync != NULL && !(*am_sync))
-            *am_sync = am_in_pending;
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+    const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+    const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
 
-        result = list_concat(result, pending);
-        list_free(pending);
-        return result;
-    }
+    /* First, sort by increasing priority value */
+    if (sa->sync_standby_priority != sb->sync_standby_priority)
+        return sa->sync_standby_priority - sb->sync_standby_priority;
 
     /*
-     * Find the sync standbys from the pending list.
+     * We might have equal priority values; arbitrarily break ties by position
+     * in the WALSnd array.  (This is utterly bogus, since that is arrival
+     * order dependent, but there are regression tests that rely on it.)
      */
-    priority = next_highest_priority;
-    while (priority <= lowest_priority)
-    {
-        ListCell   *cell;
-
-        next_highest_priority = lowest_priority + 1;
-
-        foreach(cell, pending)
-        {
-            i = lfirst_int(cell);
-            walsnd = &WalSndCtl->walsnds[i];
-
-            this_priority = walsnd->sync_standby_priority;
-            if (this_priority == priority)
-            {
-                result = lappend_int(result, i);
-                if (am_sync != NULL && walsnd == MyWalSnd)
-                    *am_sync = true;
-
-                /*
-                 * We should always exit here after the scan of pending list
-                 * starts because we know that the list has enough elements to
-                 * reach SyncRepConfig->num_sync.
-                 */
-                if (list_length(result) == SyncRepConfig->num_sync)
-                {
-                    list_free(pending);
-                    return result;    /* Exit if got enough sync standbys */
-                }
-
-                /*
-                 * Remove the entry for this sync standby from the list to
-                 * prevent us from looking at the same entry again.
-                 */
-                pending = foreach_delete_current(pending, cell);
-
-                continue;        /* don't adjust next_highest_priority */
-            }
-
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-
-        priority = next_highest_priority;
-    }
-
-    /* never reached, but keep compiler quiet */
-    Assert(false);
-    return result;
+    return sa->walsnd_index - sb->walsnd_index;
 }
 
+
 /*
  * Check if we are in the list of sync standbys, and if so, determine
  * priority sequence. Return priority if set, or zero to indicate that
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d144d..7889ea5f6f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
              * Found a free slot. Reserve it for us.
              */
             walsnd->pid = MyProcPid;
+            walsnd->state = WALSNDSTATE_STARTUP;
             walsnd->sentPtr = InvalidXLogRecPtr;
+            walsnd->needreload = false;
             walsnd->write = InvalidXLogRecPtr;
             walsnd->flush = InvalidXLogRecPtr;
             walsnd->apply = InvalidXLogRecPtr;
             walsnd->writeLag = -1;
             walsnd->flushLag = -1;
             walsnd->applyLag = -1;
-            walsnd->state = WALSNDSTATE_STARTUP;
+            walsnd->sync_standby_priority = 0;
             walsnd->latch = &MyProc->procLatch;
             walsnd->replyTime = 0;
             walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     Tuplestorestate *tupstore;
     MemoryContext per_query_ctx;
     MemoryContext oldcontext;
-    List       *sync_standbys;
+    SyncRepStandbyData *standbys;
+    int            num_standbys;
     int            i;
 
     /* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     MemoryContextSwitchTo(oldcontext);
 
     /*
-     * Get the currently active synchronous standbys.
+     * Get the currently active standbys in priority order.  This could be out
+     * of date before we're done, but we'll use the data anyway.
      */
-    LWLockAcquire(SyncRepLock, LW_SHARED);
-    sync_standbys = SyncRepGetSyncStandbys(NULL);
-    LWLockRelease(SyncRepLock);
+    num_standbys = SyncRepGetStandbys(&standbys);
 
     for (i = 0; i < max_wal_senders; i++)
     {
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         int64        spillTxns;
         int64        spillCount;
         int64        spillBytes;
+        bool        is_sync_standby;
         Datum        values[PG_STAT_GET_WAL_SENDERS_COLS];
         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+        int            j;
 
+        /* Collect data from shared memory */
         SpinLockAcquire(&walsnd->mutex);
         if (walsnd->pid == 0)
         {
@@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         spillBytes = walsnd->spillBytes;
         SpinLockRelease(&walsnd->mutex);
 
+        /* Detect whether walsender is/was considered synchronous */
+        is_sync_standby = false;
+        for (j = 0; j < num_standbys; j++)
+        {
+            if (standbys[j].walsnd_index == i)
+            {
+                is_sync_standby = (j < SyncRepConfig->num_sync);
+                break;
+            }
+        }
+
         memset(nulls, 0, sizeof(nulls));
         values[0] = Int32GetDatum(pid);
 
@@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
              */
             if (priority == 0)
                 values[10] = CStringGetTextDatum("async");
-            else if (list_member_int(sync_standbys, i))
-                values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
-                    CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
+            else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+            {
+                if (is_sync_standby)
+                    values[10] = CStringGetTextDatum("sync");
+                else
+                    values[10] = CStringGetTextDatum("potential");
+            }
             else
-                values[10] = CStringGetTextDatum("potential");
+                values[10] = CStringGetTextDatum("quorum");
 
             if (replyTime == 0)
                 nulls[11] = true;
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 1417401086..3a8394d7f2 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -645,6 +645,13 @@ cfgets(cfp *fp, char *buf, int len)
         return fgets(buf, len, fp->uncompressedfp);
 }
 
+/*
+ * cfclose close the stream
+ *
+ * Returns 0 if successfully closed the cfp. Most of errors are reported as -1
+ * and errno is set.  Otherwise the return value is the return value from
+ * gzclose and errno doesn't hold a meangful value.
+ */
 int
 cfclose(cfp *fp)
 {
@@ -665,6 +672,11 @@ cfclose(cfp *fp)
 #endif
     {
         result = fclose(fp->uncompressedfp);
+
+        /* normalize error return, just in case EOF is not -1 */
+        if (result != 0)
+            result = -1;
+
         fp->uncompressedfp = NULL;
     }
     free_keep_errno(fp);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index c9cce5ed8a..ecc6aa5fbb 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -108,6 +108,7 @@ void
 InitArchiveFmt_Directory(ArchiveHandle *AH)
 {
     lclContext *ctx;
+    int            ret;
 
     /* Assuming static functions, this can be copied for each format. */
     AH->ArchiveEntryPtr = _ArchiveEntry;
@@ -218,8 +219,14 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
         ReadToc(AH);
 
         /* Nothing else in the file, so close it again... */
-        if (cfclose(tocFH) != 0)
-            fatal("could not close TOC file: %m");
+        ret = cfclose(tocFH);
+        if (ret < 0)
+        {
+            if (ret == -1)
+                fatal("could not close TOC file: %m");
+            else
+                fatal("could not close TOC file: zlib error (%d)", ret);
+        }
         ctx->dataFH = NULL;
     }
 }
@@ -378,6 +385,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
     char       *buf;
     size_t        buflen;
     cfp           *cfp;
+    int            ret;
 
     if (!filename)
         return;
@@ -396,8 +404,15 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
     }
 
     free(buf);
-    if (cfclose(cfp) !=0)
-        fatal("could not close data file: %m");
+
+    ret = cfclose(cfp);
+    if (ret < 0)
+    {
+        if (ret == -1)
+            fatal("could not close data file: %m");
+        else
+            fatal("could not close data file: zlib error (%d)", ret);
+    }
 }
 
 /*
@@ -429,6 +444,7 @@ _LoadBlobs(ArchiveHandle *AH)
     lclContext *ctx = (lclContext *) AH->formatData;
     char        fname[MAXPGPATH];
     char        line[MAXPGPATH];
+    int            ret;
 
     StartRestoreBlobs(AH);
 
@@ -460,9 +476,16 @@ _LoadBlobs(ArchiveHandle *AH)
         fatal("error reading large object TOC file \"%s\"",
               fname);
 
-    if (cfclose(ctx->blobsTocFH) != 0)
-        fatal("could not close large object TOC file \"%s\": %m",
-              fname);
+    ret = cfclose(ctx->blobsTocFH);
+    if (ret < 0)
+    {
+        if (ret == -1)
+            fatal("could not close large object TOC file \"%s\": %m",
+                  fname);
+        else
+            fatal("could not close large object TOC file \"%s\": zlib error (%d)",
+                  fname, ret);
+    }
 
     ctx->blobsTocFH = NULL;
 
@@ -555,6 +578,7 @@ _CloseArchive(ArchiveHandle *AH)
     {
         cfp           *tocFH;
         char        fname[MAXPGPATH];
+        int            ret;
 
         setFilePath(AH, fname, "toc.dat");
 
@@ -576,8 +600,14 @@ _CloseArchive(ArchiveHandle *AH)
         WriteHead(AH);
         AH->format = archDirectory;
         WriteToc(AH);
-        if (cfclose(tocFH) != 0)
-            fatal("could not close TOC file: %m");
+        ret = cfclose(tocFH);
+        if (ret < 0)
+        {
+            if (ret == -1)
+                fatal("could not close TOC file: %m");
+            else
+                fatal("could not close TOC file: zlib error (%d)", ret);
+        }
         WriteDataChunks(AH, ctx->pstate);
 
         ParallelBackupEnd(AH, ctx->pstate);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c5f0e91aad..533aac25c7 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -36,6 +36,24 @@
 #define SYNC_REP_PRIORITY        0
 #define SYNC_REP_QUORUM        1
 
+/*
+ * SyncRepGetSyncStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+    /* Copies of relevant fields from WalSnd shared-memory struct */
+    pid_t        pid;
+    XLogRecPtr    write;
+    XLogRecPtr    flush;
+    XLogRecPtr    apply;
+    int            sync_standby_priority;
+    /* Index of this walsender in the WalSnd shared-memory array */
+    int            walsnd_index;
+    /* This flag indicates whether this struct is about our own process */
+    bool        is_me;
+} SyncRepStandbyData;
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
 
 /* called by wal sender and user backend */
-extern List *SyncRepGetSyncStandbys(bool *am_sync);
+extern int    SyncRepGetStandbys(SyncRepStandbyData **standbys);
 
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f0a4..734acec2a4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -31,8 +31,7 @@ typedef enum WalSndState
 /*
  * Each walsender has a WalSnd struct in shared memory.
  *
- * This struct is protected by 'mutex', with two exceptions: one is
- * sync_standby_priority as noted below.  The other exception is that some
+ * This struct is protected by its 'mutex' spinlock field, except that some
  * members are only written by the walsender process itself, and thus that
  * process is free to read those members without holding spinlock.  pid and
  * needreload always require the spinlock to be held for all accesses.
@@ -60,6 +59,12 @@ typedef struct WalSnd
     TimeOffset    flushLag;
     TimeOffset    applyLag;
 
+    /*
+     * The priority order of the standby managed by this WALSender, as listed
+     * in synchronous_standby_names, or 0 if not-listed.
+     */
+    int            sync_standby_priority;
+
     /* Protects shared variables shown above. */
     slock_t        mutex;
 
@@ -69,13 +74,6 @@ typedef struct WalSnd
      */
     Latch       *latch;
 
-    /*
-     * The priority order of the standby managed by this WALSender, as listed
-     * in synchronous_standby_names, or 0 if not-listed. Protected by
-     * SyncRepLock.
-     */
-    int            sync_standby_priority;
-
     /*
      * Timestamp of the last message received from standby.
      */

pgsql-hackers by date:

Previous
From: Amit Khandekar
Date:
Subject: Re: spin_delay() for ARM
Next
From: Masahiko Sawada
Date:
Subject: Re: xid wraparound danger due to INDEX_CLEANUP false