Re: Race condition in SyncRepGetSyncStandbysPriority - Mailing list pgsql-hackers

From Tom Lane
Subject Re: Race condition in SyncRepGetSyncStandbysPriority
Date
Msg-id 13059.1586896360@sss.pgh.pa.us
Whole thread Raw
In response to Re: Race condition in SyncRepGetSyncStandbysPriority  (Tom Lane <tgl@sss.pgh.pa.us>)
Responses Re: Race condition in SyncRepGetSyncStandbysPriority  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Re: Race condition in SyncRepGetSyncStandbysPriority  (Noah Misch <noah@leadboat.com>)
List pgsql-hackers
I wrote:
> It doesn't seem to me to be that hard to implement the desired
> semantics for synchronous_standby_names with inconsistent info.
> In FIRST mode you basically just need to take the N smallest
> priorities you see in the array, but without assuming there are no
> duplicates or holes.  It might be a good idea to include ties at the
> end, that is if you see 1,2,2,4 or 1,3,3,4 and you want 2 sync
> standbys, include the first three of them in the calculation until
> the inconsistency is resolved.  In ANY mode I don't see that
> inconsistent priorities matter at all.

Concretely, I think we ought to do the attached, or something pretty
close to it.

I'm not really happy about breaking ties based on walsnd_index,
but I see that there are several TAP test cases that fail if we
do something else.  I'm inclined to think those tests are bogus ...
but I won't argue to change them right now.

            regards, tom lane

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ffd5b31..c66c371 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -108,14 +108,17 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                                        XLogRecPtr *flushPtr,
                                        XLogRecPtr *applyPtr,
-                                       List *sync_standbys);
+                                       SyncRepStandbyData *sync_standbys,
+                                       int num_standbys);
 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                                           XLogRecPtr *flushPtr,
                                           XLogRecPtr *applyPtr,
-                                          List *sync_standbys, uint8 nth);
+                                          SyncRepStandbyData *sync_standbys,
+                                          int num_standbys,
+                                          uint8 nth);
 static int    SyncRepGetStandbyPriority(void);
-static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
-static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static void SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n);
+static int    standby_priority_comparator(const void *a, const void *b);
 static int    cmp_lsn(const void *a, const void *b);

 #ifdef USE_ASSERT_CHECKING
@@ -406,9 +409,10 @@ SyncRepInitConfig(void)
     priority = SyncRepGetStandbyPriority();
     if (MyWalSnd->sync_standby_priority != priority)
     {
-        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+        SpinLockAcquire(&MyWalSnd->mutex);
         MyWalSnd->sync_standby_priority = priority;
-        LWLockRelease(SyncRepLock);
+        SpinLockRelease(&MyWalSnd->mutex);
+
         ereport(DEBUG1,
                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
                         application_name, priority)));
@@ -523,8 +527,6 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
- * The caller must hold SyncRepLock.
- *
  * Return false if the number of sync standbys is less than
  * synchronous_standby_names specifies. Otherwise return true and
  * store the positions into *writePtr, *flushPtr and *applyPtr.
@@ -536,27 +538,43 @@ static bool
 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                      XLogRecPtr *applyPtr, bool *am_sync)
 {
-    List       *sync_standbys;
-
-    Assert(LWLockHeldByMe(SyncRepLock));
+    SyncRepStandbyData *sync_standbys;
+    int            num_standbys;
+    int            i;

+    /* Initialize default results */
     *writePtr = InvalidXLogRecPtr;
     *flushPtr = InvalidXLogRecPtr;
     *applyPtr = InvalidXLogRecPtr;
     *am_sync = false;

+    /* Quick out if not even configured to be synchronous */
+    if (SyncRepConfig == NULL)
+        return false;
+
     /* Get standbys that are considered as synchronous at this moment */
-    sync_standbys = SyncRepGetSyncStandbys(am_sync);
+    num_standbys = SyncRepGetSyncStandbys(&sync_standbys);
+
+    /* Am I among the candidate sync standbys? */
+    for (i = 0; i < num_standbys; i++)
+    {
+        if (sync_standbys[i].is_me)
+        {
+            *am_sync = sync_standbys[i].is_sync_standby;
+            break;
+        }
+    }

     /*
-     * Quick exit if we are not managing a sync standby or there are not
-     * enough synchronous standbys.
+     * Nothing more to do if we are not managing a sync standby or there are
+     * not enough synchronous standbys.  (Note: if there are least num_sync
+     * candidates, then at least num_sync of them will be marked as
+     * is_sync_standby; we don't need to count them here.)
      */
     if (!(*am_sync) ||
-        SyncRepConfig == NULL ||
-        list_length(sync_standbys) < SyncRepConfig->num_sync)
+        num_standbys < SyncRepConfig->num_sync)
     {
-        list_free(sync_standbys);
+        pfree(sync_standbys);
         return false;
     }

@@ -576,15 +594,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     {
         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                   sync_standbys);
+                                   sync_standbys, num_standbys);
     }
     else
     {
         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-                                      sync_standbys, SyncRepConfig->num_sync);
+                                      sync_standbys, num_standbys,
+                                      SyncRepConfig->num_sync);
     }

-    list_free(sync_standbys);
+    pfree(sync_standbys);
     return true;
 }

@@ -592,27 +611,28 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
  */
 static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                           XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+                           XLogRecPtr *flushPtr,
+                           XLogRecPtr *applyPtr,
+                           SyncRepStandbyData *sync_standbys,
+                           int num_standbys)
 {
-    ListCell   *cell;
+    int            i;

     /*
      * Scan through all sync standbys and calculate the oldest Write, Flush
-     * and Apply positions.
+     * and Apply positions.  We assume *writePtr et al were initialized to
+     * InvalidXLogRecPtr.
      */
-    foreach(cell, sync_standbys)
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-        XLogRecPtr    write;
-        XLogRecPtr    flush;
-        XLogRecPtr    apply;
+        XLogRecPtr    write = sync_standbys[i].write;
+        XLogRecPtr    flush = sync_standbys[i].flush;
+        XLogRecPtr    apply = sync_standbys[i].apply;

-        SpinLockAcquire(&walsnd->mutex);
-        write = walsnd->write;
-        flush = walsnd->flush;
-        apply = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
+        /* Ignore candidates that aren't considered synchronous */
+        if (!sync_standbys[i].is_sync_standby)
+            continue;

         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
             *writePtr = write;
@@ -628,38 +648,43 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * standbys.
  */
 static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-                              XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+                              XLogRecPtr *flushPtr,
+                              XLogRecPtr *applyPtr,
+                              SyncRepStandbyData *sync_standbys,
+                              int num_standbys,
+                              uint8 nth)
 {
-    ListCell   *cell;
     XLogRecPtr *write_array;
     XLogRecPtr *flush_array;
     XLogRecPtr *apply_array;
-    int            len;
-    int            i = 0;
+    int            i;
+    int            n;

-    len = list_length(sync_standbys);
-    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);

-    foreach(cell, sync_standbys)
+    n = 0;
+    for (i = 0; i < num_standbys; i++)
     {
-        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-
-        SpinLockAcquire(&walsnd->mutex);
-        write_array[i] = walsnd->write;
-        flush_array[i] = walsnd->flush;
-        apply_array[i] = walsnd->apply;
-        SpinLockRelease(&walsnd->mutex);
+        /* Ignore candidates that aren't considered synchronous */
+        if (!sync_standbys[i].is_sync_standby)
+            continue;

-        i++;
+        write_array[n] = sync_standbys[i].write;
+        flush_array[n] = sync_standbys[i].flush;
+        apply_array[n] = sync_standbys[i].apply;
+        n++;
     }

+    /* Should have enough, or somebody messed up */
+    Assert(n >= nth);
+
     /* Sort each array in descending order */
-    qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
-    qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn);
+    qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn);

     /* Get Nth latest Write, Flush, Apply positions */
     *writePtr = write_array[nth - 1];
@@ -689,67 +714,48 @@ cmp_lsn(const void *a, const void *b)
 }

 /*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
- *
- * The caller must hold SyncRepLock.
+ * Return data about walsenders that are candidates to be sync standbys.
  *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * *standbys is set to a palloc'd array of structs of per-walsender data,
+ * and the number of valid entries (candidate sync senders) is returned.
  */
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+int
+SyncRepGetSyncStandbys(SyncRepStandbyData **standbys)
 {
-    Assert(LWLockHeldByMe(SyncRepLock));
+    int            i;
+    int            n;

-    /* Set default result */
-    if (am_sync != NULL)
-        *am_sync = false;
+    /* Create result array */
+    *standbys = (SyncRepStandbyData *)
+        palloc(max_wal_senders * sizeof(SyncRepStandbyData));

     /* Quick exit if sync replication is not requested */
     if (SyncRepConfig == NULL)
-        return NIL;
-
-    return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
-        SyncRepGetSyncStandbysPriority(am_sync) :
-        SyncRepGetSyncStandbysQuorum(am_sync);
-}
-
-/*
- * Return the list of all the candidates for quorum sync standbys,
- * or NIL if no such standby is connected.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a quorum-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysQuorum(bool *am_sync)
-{
-    List       *result = NIL;
-    int            i;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */
-
-    Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+        return false;

+    /* Collect raw data from shared memory */
+    n = 0;
     for (i = 0; i < max_wal_senders; i++)
     {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
+        volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
+                                     * rearrangement */
+        SyncRepStandbyData *stby;
+        WalSndState state;        /* not included in SyncRepStandbyData */

         walsnd = &WalSndCtl->walsnds[i];
+        stby = *standbys + n;

         SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
+        stby->pid = walsnd->pid;
         state = walsnd->state;
+        stby->write = walsnd->write;
+        stby->flush = walsnd->flush;
+        stby->apply = walsnd->apply;
+        stby->sync_standby_priority = walsnd->sync_standby_priority;
         SpinLockRelease(&walsnd->mutex);

         /* Must be active */
-        if (pid == 0)
+        if (stby->pid == 0)
             continue;

         /* Must be streaming or stopping */
@@ -758,200 +764,79 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
             continue;

         /* Must be synchronous */
-        if (walsnd->sync_standby_priority == 0)
+        if (stby->sync_standby_priority == 0)
             continue;

         /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
+        if (XLogRecPtrIsInvalid(stby->flush))
             continue;

-        /*
-         * Consider this standby as a candidate for quorum sync standbys and
-         * append it to the result.
-         */
-        result = lappend_int(result, i);
-        if (am_sync != NULL && walsnd == MyWalSnd)
-            *am_sync = true;
+        /* OK, it's a candidate */
+        stby->walsnd_index = i;
+        stby->is_me = (walsnd == MyWalSnd);
+        stby->is_sync_standby = true;    /* might change below */
+        n++;
     }

-    return result;
+    /*
+     * In quorum mode, that's all we have to do.  In priority mode, decide
+     * which ones are high enough priority to consider sync standbys.
+     */
+    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+        SyncRepGetSyncStandbysPriority(*standbys, n);
+
+    return n;
 }

 /*
- * Return the list of sync standbys chosen based on their priorities,
- * or NIL if no sync standby is connected.
- *
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a priority-based sync replication.
+ * Decide which standbys to consider synchronous.
  *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * This function must be called only in priority-based sync replication.
  */
-static List *
-SyncRepGetSyncStandbysPriority(bool *am_sync)
+static void
+SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n)
 {
-    List       *result = NIL;
-    List       *pending = NIL;
-    int            lowest_priority;
-    int            next_highest_priority;
-    int            this_priority;
-    int            priority;
     int            i;
-    bool        am_in_pending = false;
-    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
-                                 * rearrangement */

     Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);

-    lowest_priority = SyncRepConfig->nmembers;
-    next_highest_priority = lowest_priority + 1;
+    /* Nothing to do here unless there are too many candidates. */
+    if (n <= SyncRepConfig->num_sync)
+        return;

     /*
-     * Find the sync standbys which have the highest priority (i.e, 1). Also
-     * store all the other potential sync standbys into the pending list, in
-     * order to scan it later and find other sync standbys from it quickly.
+     * Sort the candidates by priority; then the first num_sync ones are
+     * synchronous, and the rest aren't.
      */
-    for (i = 0; i < max_wal_senders; i++)
-    {
-        XLogRecPtr    flush;
-        WalSndState state;
-        int            pid;
-
-        walsnd = &WalSndCtl->walsnds[i];
-
-        SpinLockAcquire(&walsnd->mutex);
-        pid = walsnd->pid;
-        flush = walsnd->flush;
-        state = walsnd->state;
-        SpinLockRelease(&walsnd->mutex);
-
-        /* Must be active */
-        if (pid == 0)
-            continue;
+    qsort(standbys, n, sizeof(SyncRepStandbyData),
+          standby_priority_comparator);

-        /* Must be streaming or stopping */
-        if (state != WALSNDSTATE_STREAMING &&
-            state != WALSNDSTATE_STOPPING)
-            continue;
-
-        /* Must be synchronous */
-        this_priority = walsnd->sync_standby_priority;
-        if (this_priority == 0)
-            continue;
-
-        /* Must have a valid flush position */
-        if (XLogRecPtrIsInvalid(flush))
-            continue;
-
-        /*
-         * If the priority is equal to 1, consider this standby as sync and
-         * append it to the result. Otherwise append this standby to the
-         * pending list to check if it's actually sync or not later.
-         */
-        if (this_priority == 1)
-        {
-            result = lappend_int(result, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                *am_sync = true;
-            if (list_length(result) == SyncRepConfig->num_sync)
-            {
-                list_free(pending);
-                return result;    /* Exit if got enough sync standbys */
-            }
-        }
-        else
-        {
-            pending = lappend_int(pending, i);
-            if (am_sync != NULL && walsnd == MyWalSnd)
-                am_in_pending = true;
-
-            /*
-             * Track the highest priority among the standbys in the pending
-             * list, in order to use it as the starting priority for later
-             * scan of the list. This is useful to find quickly the sync
-             * standbys from the pending list later because we can skip
-             * unnecessary scans for the unused priorities.
-             */
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-    }
+    for (i = SyncRepConfig->num_sync; i < n; i++)
+        standbys[i].is_sync_standby = false;
+}

-    /*
-     * Consider all pending standbys as sync if the number of them plus
-     * already-found sync ones is lower than the configuration requests.
-     */
-    if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
-    {
-        /*
-         * Set *am_sync to true if this walsender is in the pending list
-         * because all pending standbys are considered as sync.
-         */
-        if (am_sync != NULL && !(*am_sync))
-            *am_sync = am_in_pending;
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+    const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+    const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;

-        result = list_concat(result, pending);
-        list_free(pending);
-        return result;
-    }
+    /* First, sort by increasing priority value */
+    if (sa->sync_standby_priority != sb->sync_standby_priority)
+        return sa->sync_standby_priority - sb->sync_standby_priority;

     /*
-     * Find the sync standbys from the pending list.
+     * We might have equal priority values; arbitrarily break ties by position
+     * in the WALSnd array.  (This is utterly bogus, since that is arrival
+     * order dependent, but there are regression tests that rely on it.)
      */
-    priority = next_highest_priority;
-    while (priority <= lowest_priority)
-    {
-        ListCell   *cell;
-
-        next_highest_priority = lowest_priority + 1;
-
-        foreach(cell, pending)
-        {
-            i = lfirst_int(cell);
-            walsnd = &WalSndCtl->walsnds[i];
-
-            this_priority = walsnd->sync_standby_priority;
-            if (this_priority == priority)
-            {
-                result = lappend_int(result, i);
-                if (am_sync != NULL && walsnd == MyWalSnd)
-                    *am_sync = true;
-
-                /*
-                 * We should always exit here after the scan of pending list
-                 * starts because we know that the list has enough elements to
-                 * reach SyncRepConfig->num_sync.
-                 */
-                if (list_length(result) == SyncRepConfig->num_sync)
-                {
-                    list_free(pending);
-                    return result;    /* Exit if got enough sync standbys */
-                }
-
-                /*
-                 * Remove the entry for this sync standby from the list to
-                 * prevent us from looking at the same entry again.
-                 */
-                pending = foreach_delete_current(pending, cell);
-
-                continue;        /* don't adjust next_highest_priority */
-            }
-
-            if (this_priority < next_highest_priority)
-                next_highest_priority = this_priority;
-        }
-
-        priority = next_highest_priority;
-    }
-
-    /* never reached, but keep compiler quiet */
-    Assert(false);
-    return result;
+    return sa->walsnd_index - sb->walsnd_index;
 }

+
 /*
  * Check if we are in the list of sync standbys, and if so, determine
  * priority sequence. Return priority if set, or zero to indicate that
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d1..859ca60 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
              * Found a free slot. Reserve it for us.
              */
             walsnd->pid = MyProcPid;
+            walsnd->state = WALSNDSTATE_STARTUP;
             walsnd->sentPtr = InvalidXLogRecPtr;
+            walsnd->needreload = false;
             walsnd->write = InvalidXLogRecPtr;
             walsnd->flush = InvalidXLogRecPtr;
             walsnd->apply = InvalidXLogRecPtr;
             walsnd->writeLag = -1;
             walsnd->flushLag = -1;
             walsnd->applyLag = -1;
-            walsnd->state = WALSNDSTATE_STARTUP;
+            walsnd->sync_standby_priority = 0;
             walsnd->latch = &MyProc->procLatch;
             walsnd->replyTime = 0;
             walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     Tuplestorestate *tupstore;
     MemoryContext per_query_ctx;
     MemoryContext oldcontext;
-    List       *sync_standbys;
+    SyncRepStandbyData *sync_standbys;
+    int            num_standbys;
     int            i;

     /* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
     MemoryContextSwitchTo(oldcontext);

     /*
-     * Get the currently active synchronous standbys.
+     * Get the currently active synchronous standbys.  This could be out of
+     * date before we're done, but we'll use the data anyway.
      */
-    LWLockAcquire(SyncRepLock, LW_SHARED);
-    sync_standbys = SyncRepGetSyncStandbys(NULL);
-    LWLockRelease(SyncRepLock);
+    num_standbys = SyncRepGetSyncStandbys(&sync_standbys);

     for (i = 0; i < max_wal_senders; i++)
     {
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         int64        spillTxns;
         int64        spillCount;
         int64        spillBytes;
+        bool        is_sync_standby;
         Datum        values[PG_STAT_GET_WAL_SENDERS_COLS];
         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+        int            j;

+        /* Collect data from shared memory */
         SpinLockAcquire(&walsnd->mutex);
         if (walsnd->pid == 0)
         {
@@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
         spillBytes = walsnd->spillBytes;
         SpinLockRelease(&walsnd->mutex);

+        /* Detect whether walsender is/was considered synchronous */
+        is_sync_standby = false;
+        for (j = 0; j < num_standbys; j++)
+        {
+            if (sync_standbys[j].walsnd_index == i)
+            {
+                is_sync_standby = sync_standbys[j].is_sync_standby;
+                break;
+            }
+        }
+
         memset(nulls, 0, sizeof(nulls));
         values[0] = Int32GetDatum(pid);

@@ -3380,7 +3396,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
              */
             if (priority == 0)
                 values[10] = CStringGetTextDatum("async");
-            else if (list_member_int(sync_standbys, i))
+            else if (is_sync_standby)
                 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
                     CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
             else
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c5f0e91..1308ada 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -37,6 +37,26 @@
 #define SYNC_REP_QUORUM        1

 /*
+ * SyncRepGetSyncStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+    /* Copies of relevant fields from WalSnd shared-memory struct */
+    pid_t        pid;
+    XLogRecPtr    write;
+    XLogRecPtr    flush;
+    XLogRecPtr    apply;
+    int            sync_standby_priority;
+    /* Index of this walsender in the WalSnd shared-memory array */
+    int            walsnd_index;
+    /* This flag indicates whether this struct is about our own process */
+    bool        is_me;
+    /* After-the-fact conclusion about whether this is a sync standby */
+    bool        is_sync_standby;
+} SyncRepStandbyData;
+
+/*
  * Struct for the configuration of synchronous replication.
  *
  * Note: this must be a flat representation that can be held in a single
@@ -74,7 +94,7 @@ extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);

 /* called by wal sender and user backend */
-extern List *SyncRepGetSyncStandbys(bool *am_sync);
+extern int    SyncRepGetSyncStandbys(SyncRepStandbyData **standbys);

 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f..734acec 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -31,8 +31,7 @@ typedef enum WalSndState
 /*
  * Each walsender has a WalSnd struct in shared memory.
  *
- * This struct is protected by 'mutex', with two exceptions: one is
- * sync_standby_priority as noted below.  The other exception is that some
+ * This struct is protected by its 'mutex' spinlock field, except that some
  * members are only written by the walsender process itself, and thus that
  * process is free to read those members without holding spinlock.  pid and
  * needreload always require the spinlock to be held for all accesses.
@@ -60,6 +59,12 @@ typedef struct WalSnd
     TimeOffset    flushLag;
     TimeOffset    applyLag;

+    /*
+     * The priority order of the standby managed by this WALSender, as listed
+     * in synchronous_standby_names, or 0 if not-listed.
+     */
+    int            sync_standby_priority;
+
     /* Protects shared variables shown above. */
     slock_t        mutex;

@@ -70,13 +75,6 @@ typedef struct WalSnd
     Latch       *latch;

     /*
-     * The priority order of the standby managed by this WALSender, as listed
-     * in synchronous_standby_names, or 0 if not-listed. Protected by
-     * SyncRepLock.
-     */
-    int            sync_standby_priority;
-
-    /*
      * Timestamp of the last message received from standby.
      */
     TimestampTz replyTime;

pgsql-hackers by date:

Previous
From: Alvaro Herrera
Date:
Subject: Re: documenting the backup manifest file format
Next
From: Andrew Dunstan
Date:
Subject: Re: documenting the backup manifest file format