Re: Race condition in SyncRepGetSyncStandbysPriority - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Re: Race condition in SyncRepGetSyncStandbysPriority |
Date | |
Msg-id | 20200417.145834.1847060913969327935.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: Race condition in SyncRepGetSyncStandbysPriority (Tom Lane <tgl@sss.pgh.pa.us>) |
Responses |
Re: Race condition in SyncRepGetSyncStandbysPriority
Re: Race condition in SyncRepGetSyncStandbysPriority |
List | pgsql-hackers |
At Thu, 16 Apr 2020 11:39:06 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > [ syncrep-fixes-4.patch ] > > I agree that we could probably improve the clarity of this code with > further rewriting, but I'm still very opposed to the idea of having > callers know that the first num_sync array elements are the active > ones. It's wrong (or at least different from current behavior) for > quorum mode, where there might be more than num_sync walsenders to > consider. And it might not generalize very well to other syncrep > selection rules we might add in future, which might also not have > exactly num_sync interesting walsenders. So I much prefer an API > definition that uses bool flags in an array that has no particular > ordering (so far as the callers know, anyway). If you don't like > is_sync_standby, how about some more-neutral name like is_active > or is_interesting or include_position? I'm convinced that each element has is_sync_standby. I agree to the name is_sync_standby since I don't come up with a better name. > I dislike the proposed comment revisions in SyncRepReleaseWaiters, > too, particularly the change to say that what we're "announcing" > is the ability to release waiters. You did not change the actual > log messages, and you would have gotten a lot of pushback if > you tried, because the current messages make sense to users and > something like that would not. But by the same token this new > comment isn't too helpful to somebody reading the code. The current log messages look perfect to me. I don't insist on the comment change since I might take the definition of "sync standby" too strictly. > (Actually, I wonder why we even have the restriction that only > sync standbys can release waiters. It's not like they are > going to get different results from SyncRepGetSyncRecPtr than > any other walsender would. Maybe we should just drop all the > am_sync logic?) I thought the same thing, though I didn't do that in the last patch. am_sync seems intending to reduce spurious wakeups but actually spurious wakeup won't increase even without it. Thus the only remaining task of am_sync is the trigger for the log messages and that fact is the sign that the log messages should be emitted within SyncRepGetSyncRecPtr. That eliminates references to SyncRepConfig in SyncRepReleaseWaiters, which make me feel ease. The attached is baed on syncrep-fixes-1.patch + am_sync elimination. regards. -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..eb0616121a 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -103,19 +103,21 @@ static int SyncRepWakeQueue(bool all, int mode); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); + XLogRecPtr *applyPtr); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static void SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +408,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -430,7 +433,6 @@ SyncRepReleaseWaiters(void) XLogRecPtr flushPtr; XLogRecPtr applyPtr; bool got_recptr; - bool am_sync; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -461,34 +463,14 @@ SyncRepReleaseWaiters(void) * Check whether we are a sync standby or not, and calculate the synced * positions among all sync standbys. */ - got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); + got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr); /* - * If we are managing a sync standby, though we weren't prior to this, - * then announce we are now a sync standby. + * If we don't have enough sync standbys, just leave. */ - if (announce_next_takeover && am_sync) - { - announce_next_takeover = false; - - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) - ereport(LOG, - (errmsg("standby \"%s\" is now a synchronous standby with priority %u", - application_name, MyWalSnd->sync_standby_priority))); - else - ereport(LOG, - (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", - application_name))); - } - - /* - * If the number of sync standbys is less than requested or we aren't - * managing a sync standby then just leave. - */ - if (!got_recptr || !am_sync) + if (!got_recptr) { LWLockRelease(SyncRepLock); - announce_next_takeover = !am_sync; return; } @@ -523,40 +505,68 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and * store the positions into *writePtr, *flushPtr and *applyPtr. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. */ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, bool *am_sync) + XLogRecPtr *applyPtr) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *sync_standbys; + int num_standbys; + int i; + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; - *am_sync = false; + + /* + * We're a potential sync standby. Release waiters if there are enough + * sync standbys and we are considered as sync. + */ /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); + + /* Announce if I am a new sync standby or candidate if not yet */ + if (announce_next_takeover) + { + for (i = 0; i < num_standbys; i++) + { + if (!sync_standbys[i].is_me) + continue; + + /* + * If we are managing a sync standby, though we weren't prior to + * this, then announce we are now a sync standby. + */ + announce_next_takeover = false; + + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + ereport(LOG, + (errmsg("standby \"%s\" is now a synchronous standby with priority %u", + application_name, + MyWalSnd->sync_standby_priority))); + else + ereport(LOG, + (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", + application_name))); + + break; + } + } /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if we are not managing a sync standby or there are + * not enough synchronous standbys. (Note: if there are least num_sync + * candidates, then at least num_sync of them will be marked as + * is_sync_standby; we don't need to count them here.) */ - if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + if (num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(sync_standbys); return false; } @@ -576,15 +586,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + sync_standbys, num_standbys); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + sync_standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(sync_standbys); return true; } @@ -592,27 +603,28 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (!sync_standbys[i].is_sync_standby) + continue; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +640,43 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + /* Ignore candidates that aren't considered synchronous */ + if (!sync_standbys[i].is_sync_standby) + continue; - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +706,48 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about walsenders that are candidates to be sync standbys. * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, + * and the number of valid entries (candidate sync senders) is returned. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetSyncStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return 0; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +756,79 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + stby->is_sync_standby = true; /* might change below */ + n++; } - return result; + /* + * In quorum mode, that's all we have to do. In priority mode, decide + * which ones are high enough priority to consider sync standbys. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + SyncRepGetSyncStandbysPriority(*standbys, n); + + return n; } /* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. + * Decide which standbys to consider synchronous. * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * This function must be called only in priority-based sync replication. */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) +static void +SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n) { - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; + /* Nothing to do here unless there are too many candidates. */ + if (n <= SyncRepConfig->num_sync) + return; /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * Sort the candidates by priority; then the first num_sync ones are + * synchronous, and the rest aren't. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; + qsort(standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; - - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; - - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; - - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } + for (i = SyncRepConfig->num_sync; i < n; i++) + standbys[i].is_sync_standby = false; +} - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d..af8f67e769 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active synchronous standbys. This could be out of + * date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (sync_standbys[j].walsnd_index == i) + { + is_sync_standby = sync_standbys[j].is_sync_standby; + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,7 +3396,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) + else if (is_sync_standby) values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..1308ada102 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,26 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; + /* After-the-fact conclusion about whether this is a sync standby */ + bool is_sync_standby; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,7 +94,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetSyncStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a4..734acec2a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -69,13 +74,6 @@ typedef struct WalSnd */ Latch *latch; - /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - /* * Timestamp of the last message received from standby. */
pgsql-hackers by date: