Re: Race condition in SyncRepGetSyncStandbysPriority - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Re: Race condition in SyncRepGetSyncStandbysPriority |
Date | |
Msg-id | 20200416.162241.2092362802599118592.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: Race condition in SyncRepGetSyncStandbysPriority (Tom Lane <tgl@sss.pgh.pa.us>) |
Responses |
Re: Race condition in SyncRepGetSyncStandbysPriority
|
List | pgsql-hackers |
At Wed, 15 Apr 2020 11:31:49 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > At Tue, 14 Apr 2020 16:32:40 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > + stby->is_sync_standby = true; /* might change below */ > > > I'm uneasy with that. In quorum mode all running standbys are marked > > as "sync" and that's bogus. > > I don't follow that? The existing coding of SyncRepGetSyncStandbysQuorum > returns all the candidates in its list, so this is isomorphic to that. The existing code actully does that. On the other hand SyncRepGetSyncStandbysPriority returns standbys that *are known to be* synchronous, but *Quorum returns standbys that *can be* synchronous. What the two functions return are different from each other. So it should be is_sync_standby for -Priority and is_sync_candidate for -Quorum. > Possibly a different name for the flag would be more suited? > > > On the other hand sync_standbys is already sorted in priority order so I think we can get rid of the member by setting*am_sync as the follows. > > > SyncRepGetSyncRecPtr: > > if (sync_standbys[i].is_me) > > { > > *am_sync = (i < SyncRepConfig->num_sync); > > break; > > } > > I disagree with this, it will change the behavior in the quorum case. Oops, you're right. I find the whole thing there (and me) is a bit confusing. syncrep_method affects how some values (specifically am_sync and sync_standbys) are translated at several calling depths. And the *am_sync informs nothing in quorum mode. > In any case, a change like this will cause callers to know way more than > they ought to about the ordering of the array. In my mind, the fact that > SyncRepGetSyncStandbysPriority is sorting the array is an internal > implementation detail; I do not want it to be part of the API. Anyway the am_sync and is_sync_standby is utterly useless in quorum mode. That discussion is pretty persuasive if not, but actually the upper layers (SyncRepReleaseWaiters and SyncRepGetSyncRecPtr) referes to syncrep_method to differentiate the interpretation of the am_sync flag and sync_standbys list. So anyway the difference is actually a part of API. After thinking some more, I concluded that some of the variables are wrongly named or considered, and redundant. The fucntion of am_sync is covered by got_recptr in SyncRepReleaseWaiters, so it's enough that SyncRepGetSyncRecPtr just reports to the caller whether the caller may release some of the waiter processes. This simplifies the related functions and make it (to me) clearer. Please find the attached. > (Apropos to that, I realized from working on this patch that there's > another, completely undocumented assumption in the existing code, that > the integer list will be sorted by walsender index for equal priorities. > I don't like that either, and not just because it's undocumented.) That seems accidentally. Sorting by priority is the disigned behavior and documented, in contrast, entries of the same priority are ordered in index order by accident and not documented, that means it can be changed anytime. I think we don't define everyting in such detail. regards. diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..99a7bbbc86 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -103,19 +103,21 @@ static int SyncRepWakeQueue(bool all, int mode); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); + XLogRecPtr *applyPtr); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +408,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -429,8 +432,7 @@ SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; - bool got_recptr; - bool am_sync; + bool release_waiters; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -458,16 +460,24 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* - * Check whether we are a sync standby or not, and calculate the synced - * positions among all sync standbys. + * Check whether we may release any of waiter processes, and calculate the + * synced positions. */ - got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); + release_waiters = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr); + + /* Return if nothing to do. */ + if (!release_waiters) + { + LWLockRelease(SyncRepLock); + announce_next_takeover = true; + return; + } /* - * If we are managing a sync standby, though we weren't prior to this, - * then announce we are now a sync standby. + * If this walsender becomes to be able to release waiter processes, + * announce about that. */ - if (announce_next_takeover && am_sync) + if (announce_next_takeover) { announce_next_takeover = false; @@ -481,17 +491,6 @@ SyncRepReleaseWaiters(void) application_name))); } - /* - * If the number of sync standbys is less than requested or we aren't - * managing a sync standby then just leave. - */ - if (!got_recptr || !am_sync) - { - LWLockRelease(SyncRepLock); - announce_next_takeover = !am_sync; - return; - } - /* * Set the lsn first so that when we wake backends they will release up to * this location. @@ -523,43 +522,62 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * - * Return false if the number of sync standbys is less than - * synchronous_standby_names specifies. Otherwise return true and - * store the positions into *writePtr, *flushPtr and *applyPtr. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * Return false if this walsender cannot release any of waiteres. Otherwise + * return true and store the positions into *writePtr, *flushPtr and *applyPtr. */ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, bool *am_sync) + XLogRecPtr *applyPtr) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *standbys; + int num_standbys; + int i; + bool sync_priority = + (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; - *am_sync = false; - /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* Get standbys. They are in priority order. */ + num_standbys = SyncRepGetStandbys(&standbys); /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if there are not enough synchronous standbys or + * candidates. */ - if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + if (num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(standbys); return false; } + /* When priority mode, nothing to do if I an not a sync standby */ + if (sync_priority) + { + bool am_sync = false; + + for (i = 0; i < num_standbys; i++) + { + if (standbys[i].is_me) + { + am_sync = true; + break; + } + } + + if (!am_sync) + { + pfree(standbys); + return false; + } + } + /* * In a priority-based sync replication, the synced positions are the * oldest ones among sync standbys. In a quorum-based, they are the Nth @@ -573,46 +591,51 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced * positions even in a quorum-based sync replication. */ - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + if (sync_priority) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + standbys, num_standbys, + SyncRepConfig->num_sync); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(standbys); return true; } /* - * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * Calculate the oldest Write among the first nsync standbys, Flush and Apply + * positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (i >= nsyncs) + break; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +651,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +713,49 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about active walsenders. * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, and + * the number of valid entries is returned. The entries are in + * sync_standby_priority order. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return false; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; } - return result; -} - -/* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. - * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) -{ - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; - int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; - /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * In quorum mode, that's all we have to do since all entries are in the + * same priority 1. In priority mode, sort the candidates by priority. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; - - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; - - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; + return n; +} - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d..7889ea5f6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active standbys in priority order. This could be out + * of date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetStandbys(&standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (standbys[j].walsnd_index == i) + { + is_sync_standby = (j < SyncRepConfig->num_sync); + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) - values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? - CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + if (is_sync_standby) + values[10] = CStringGetTextDatum("sync"); + else + values[10] = CStringGetTextDatum("potential"); + } else - values[10] = CStringGetTextDatum("potential"); + values[10] = CStringGetTextDatum("quorum"); if (replyTime == 0) nulls[11] = true; diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 1417401086..3a8394d7f2 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -645,6 +645,13 @@ cfgets(cfp *fp, char *buf, int len) return fgets(buf, len, fp->uncompressedfp); } +/* + * cfclose close the stream + * + * Returns 0 if successfully closed the cfp. Most of errors are reported as -1 + * and errno is set. Otherwise the return value is the return value from + * gzclose and errno doesn't hold a meangful value. + */ int cfclose(cfp *fp) { @@ -665,6 +672,11 @@ cfclose(cfp *fp) #endif { result = fclose(fp->uncompressedfp); + + /* normalize error return, just in case EOF is not -1 */ + if (result != 0) + result = -1; + fp->uncompressedfp = NULL; } free_keep_errno(fp); diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index c9cce5ed8a..ecc6aa5fbb 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -108,6 +108,7 @@ void InitArchiveFmt_Directory(ArchiveHandle *AH) { lclContext *ctx; + int ret; /* Assuming static functions, this can be copied for each format. */ AH->ArchiveEntryPtr = _ArchiveEntry; @@ -218,8 +219,14 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) ReadToc(AH); /* Nothing else in the file, so close it again... */ - if (cfclose(tocFH) != 0) - fatal("could not close TOC file: %m"); + ret = cfclose(tocFH); + if (ret < 0) + { + if (ret == -1) + fatal("could not close TOC file: %m"); + else + fatal("could not close TOC file: zlib error (%d)", ret); + } ctx->dataFH = NULL; } } @@ -378,6 +385,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename) char *buf; size_t buflen; cfp *cfp; + int ret; if (!filename) return; @@ -396,8 +404,15 @@ _PrintFileData(ArchiveHandle *AH, char *filename) } free(buf); - if (cfclose(cfp) !=0) - fatal("could not close data file: %m"); + + ret = cfclose(cfp); + if (ret < 0) + { + if (ret == -1) + fatal("could not close data file: %m"); + else + fatal("could not close data file: zlib error (%d)", ret); + } } /* @@ -429,6 +444,7 @@ _LoadBlobs(ArchiveHandle *AH) lclContext *ctx = (lclContext *) AH->formatData; char fname[MAXPGPATH]; char line[MAXPGPATH]; + int ret; StartRestoreBlobs(AH); @@ -460,9 +476,16 @@ _LoadBlobs(ArchiveHandle *AH) fatal("error reading large object TOC file \"%s\"", fname); - if (cfclose(ctx->blobsTocFH) != 0) - fatal("could not close large object TOC file \"%s\": %m", - fname); + ret = cfclose(ctx->blobsTocFH); + if (ret < 0) + { + if (ret == -1) + fatal("could not close large object TOC file \"%s\": %m", + fname); + else + fatal("could not close large object TOC file \"%s\": zlib error (%d)", + fname, ret); + } ctx->blobsTocFH = NULL; @@ -555,6 +578,7 @@ _CloseArchive(ArchiveHandle *AH) { cfp *tocFH; char fname[MAXPGPATH]; + int ret; setFilePath(AH, fname, "toc.dat"); @@ -576,8 +600,14 @@ _CloseArchive(ArchiveHandle *AH) WriteHead(AH); AH->format = archDirectory; WriteToc(AH); - if (cfclose(tocFH) != 0) - fatal("could not close TOC file: %m"); + ret = cfclose(tocFH); + if (ret < 0) + { + if (ret == -1) + fatal("could not close TOC file: %m"); + else + fatal("could not close TOC file: zlib error (%d)", ret); + } WriteDataChunks(AH, ctx->pstate); ParallelBackupEnd(AH, ctx->pstate); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..533aac25c7 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,24 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a4..734acec2a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -69,13 +74,6 @@ typedef struct WalSnd */ Latch *latch; - /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - /* * Timestamp of the last message received from standby. */
pgsql-hackers by date: