Thread: Race condition in SyncRepGetSyncStandbysPriority
Twice in the past month [1][2], buildfarm member hoverfly has managed to reach the "unreachable" Assert(false) at the end of SyncRepGetSyncStandbysPriority. What seems likely to me, after quickly eyeballing the code, is that hoverfly is hitting the blatantly-obvious race condition in that function. Namely, that the second loop supposes that the state of the walsender array hasn't changed since the first loop. The minimum fix for this, I suppose, would have the first loop capture the sync_standby_priority value for each walsender along with what it's already capturing. But I wonder if the whole function shouldn't be rewritten from scratch, because it seems like the algorithm is both expensively brute-force and unintelligible, which is a sad combination. It's likely that the number of walsenders would never be high enough that efficiency could matter, but then couldn't we use an algorithm that is less complicated and more obviously correct? (Because the alternative conclusion, if you reject the theory that a race is happening, is that the algorithm is just flat out buggy; something that's not too easy to disprove either.) Another fairly dubious thing here is that whether or not *am_sync gets set depends not only on whether MyWalSnd is claiming to be synchronous but on how many lower-numbered walsenders are too. Is that really the right thing? But worse than any of that is that the return value seems to be a list of walsender array indexes, meaning that the callers cannot use it without making even stronger assumptions about the array contents not having changed since the start of this function. It sort of looks like the design is based on the assumption that the array contents can't change while SyncRepLock is held ... but if that's the plan then why bother with the per-walsender spinlocks? In any case this assumption seems to be failing, suggesting either that there's a caller that's not holding SyncRepLock when it calls this function, or that somebody is failing to take that lock while modifying the array. regards, tom lane [1] https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=hoverfly&dt=2020-02-29%2001%3A34%3A55 [2] https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=hoverfly&dt=2020-03-26%2013%3A51%3A15
On 2020/03/27 10:26, Tom Lane wrote: > Twice in the past month [1][2], buildfarm member hoverfly has managed > to reach the "unreachable" Assert(false) at the end of > SyncRepGetSyncStandbysPriority. When I search the past discussions, I found that Noah Misch reported the same issue. https://www.postgresql.org/message-id/20200206074552.GB3326097@rfd.leadboat.com > What seems likely to me, after quickly eyeballing the code, is that > hoverfly is hitting the blatantly-obvious race condition in that function. > Namely, that the second loop supposes that the state of the walsender > array hasn't changed since the first loop. > > The minimum fix for this, I suppose, would have the first loop capture > the sync_standby_priority value for each walsender along with what it's > already capturing. But I wonder if the whole function shouldn't be > rewritten from scratch, because it seems like the algorithm is both > expensively brute-force and unintelligible, which is a sad combination. > It's likely that the number of walsenders would never be high enough > that efficiency could matter, but then couldn't we use an algorithm > that is less complicated and more obviously correct? +1 to rewrite the function with better algorithm. > (Because the > alternative conclusion, if you reject the theory that a race is happening, > is that the algorithm is just flat out buggy; something that's not too > easy to disprove either.) > > Another fairly dubious thing here is that whether or not *am_sync > gets set depends not only on whether MyWalSnd is claiming to be > synchronous but on how many lower-numbered walsenders are too. > Is that really the right thing? > > But worse than any of that is that the return value seems to be > a list of walsender array indexes, meaning that the callers cannot > use it without making even stronger assumptions about the array > contents not having changed since the start of this function. > > It sort of looks like the design is based on the assumption that > the array contents can't change while SyncRepLock is held ... but > if that's the plan then why bother with the per-walsender spinlocks? > In any case this assumption seems to be failing, suggesting either > that there's a caller that's not holding SyncRepLock when it calls > this function, or that somebody is failing to take that lock while > modifying the array. As far as I read the code, that assumption seems still valid. But the problem is that each walsender updates MyWalSnd->sync_standby_priority at each convenient timing, when SIGHUP is signaled. That is, at a certain moment, some walsenders (also their WalSnd entries in shmem) work based on the latest configuration but the others (also their WalSnd entries) work based on the old one. lowest_priority = SyncRepConfig->nmembers; next_highest_priority = lowest_priority + 1; SyncRepGetSyncStandbysPriority() calculates the lowest priority among all running walsenders as the above, by using the configuration info that this walsender is based on. But this calculated lowest priority would be invalid if other walsender is based on different (e.g., old) configuraiton. This can cause the (other) walsender to have lower priority than the calculated lowest priority and the second loop in SyncRepGetSyncStandbysPriority() to unexpectedly end. Therefore, the band-aid fix seems to be to set the lowest priority to very large number at the beginning of SyncRepGetSyncStandbysPriority(). Regards, -- Fujii Masao NTT DATA CORPORATION Advanced Platform Technology Group Research and Development Headquarters
At Fri, 27 Mar 2020 13:54:25 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in > > > On 2020/03/27 10:26, Tom Lane wrote: > > Twice in the past month [1][2], buildfarm member hoverfly has managed > > to reach the "unreachable" Assert(false) at the end of > > SyncRepGetSyncStandbysPriority. > > When I search the past discussions, I found that Noah Misch reported > the same issue. > https://www.postgresql.org/message-id/20200206074552.GB3326097@rfd.leadboat.com > > > What seems likely to me, after quickly eyeballing the code, is that > > hoverfly is hitting the blatantly-obvious race condition in that > > function. > > Namely, that the second loop supposes that the state of the walsender > > array hasn't changed since the first loop. > > The minimum fix for this, I suppose, would have the first loop capture > > the sync_standby_priority value for each walsender along with what > > it's > > already capturing. But I wonder if the whole function shouldn't be > > rewritten from scratch, because it seems like the algorithm is both > > expensively brute-force and unintelligible, which is a sad > > combination. > > It's likely that the number of walsenders would never be high enough > > that efficiency could matter, but then couldn't we use an algorithm > > that is less complicated and more obviously correct? > > +1 to rewrite the function with better algorithm. > > > (Because the > > alternative conclusion, if you reject the theory that a race is > > happening, > > is that the algorithm is just flat out buggy; something that's not too > > easy to disprove either.) > > Another fairly dubious thing here is that whether or not *am_sync > > gets set depends not only on whether MyWalSnd is claiming to be > > synchronous but on how many lower-numbered walsenders are too. > > Is that really the right thing? > > But worse than any of that is that the return value seems to be > > a list of walsender array indexes, meaning that the callers cannot > > use it without making even stronger assumptions about the array > > contents not having changed since the start of this function. > > It sort of looks like the design is based on the assumption that > > the array contents can't change while SyncRepLock is held ... but > > if that's the plan then why bother with the per-walsender spinlocks? > > In any case this assumption seems to be failing, suggesting either > > that there's a caller that's not holding SyncRepLock when it calls > > this function, or that somebody is failing to take that lock while > > modifying the array. > > As far as I read the code, that assumption seems still valid. But the > problem > is that each walsender updates MyWalSnd->sync_standby_priority at each > convenient timing, when SIGHUP is signaled. That is, at a certain > moment, > some walsenders (also their WalSnd entries in shmem) work based on > the latest configuration but the others (also their WalSnd entries) > work based > on the old one. > > lowest_priority = SyncRepConfig->nmembers; > next_highest_priority = lowest_priority + 1; > > SyncRepGetSyncStandbysPriority() calculates the lowest priority among > all running walsenders as the above, by using the configuration info > that > this walsender is based on. But this calculated lowest priority would > be > invalid if other walsender is based on different (e.g., old) > configuraiton. > This can cause the (other) walsender to have lower priority than > the calculated lowest priority and the second loop in > SyncRepGetSyncStandbysPriority() to unexpectedly end. > > Therefore, the band-aid fix seems to be to set the lowest priority to > very large number at the beginning of > SyncRepGetSyncStandbysPriority(). Or just ignore impossible priorities as non-sync standby. Anyway the confused state is fixed after all walsenders have loaded the new configuration. I remember that I posted a bandaid for maybe the same issue. https://www.postgresql.org/message-id/20200207.125251.146972241588695685.horikyota.ntt@gmail.com regards. -- Kyotaro Horiguchi NTT Open Source Software Center
On Fri, 27 Mar 2020 at 13:54, Fujii Masao <masao.fujii@oss.nttdata.com> wrote: > > > > On 2020/03/27 10:26, Tom Lane wrote: > > Twice in the past month [1][2], buildfarm member hoverfly has managed > > to reach the "unreachable" Assert(false) at the end of > > SyncRepGetSyncStandbysPriority. > > When I search the past discussions, I found that Noah Misch reported > the same issue. > https://www.postgresql.org/message-id/20200206074552.GB3326097@rfd.leadboat.com > > > What seems likely to me, after quickly eyeballing the code, is that > > hoverfly is hitting the blatantly-obvious race condition in that function. > > Namely, that the second loop supposes that the state of the walsender > > array hasn't changed since the first loop. > > > > The minimum fix for this, I suppose, would have the first loop capture > > the sync_standby_priority value for each walsender along with what it's > > already capturing. But I wonder if the whole function shouldn't be > > rewritten from scratch, because it seems like the algorithm is both > > expensively brute-force and unintelligible, which is a sad combination. > > It's likely that the number of walsenders would never be high enough > > that efficiency could matter, but then couldn't we use an algorithm > > that is less complicated and more obviously correct? > > +1 to rewrite the function with better algorithm. > > > (Because the > > alternative conclusion, if you reject the theory that a race is happening, > > is that the algorithm is just flat out buggy; something that's not too > > easy to disprove either.) > > > > Another fairly dubious thing here is that whether or not *am_sync > > gets set depends not only on whether MyWalSnd is claiming to be > > synchronous but on how many lower-numbered walsenders are too. > > Is that really the right thing? > > > > But worse than any of that is that the return value seems to be > > a list of walsender array indexes, meaning that the callers cannot > > use it without making even stronger assumptions about the array > > contents not having changed since the start of this function. > > > > It sort of looks like the design is based on the assumption that > > the array contents can't change while SyncRepLock is held ... but > > if that's the plan then why bother with the per-walsender spinlocks? > > In any case this assumption seems to be failing, suggesting either > > that there's a caller that's not holding SyncRepLock when it calls > > this function, or that somebody is failing to take that lock while > > modifying the array. > > As far as I read the code, that assumption seems still valid. But the problem > is that each walsender updates MyWalSnd->sync_standby_priority at each > convenient timing, when SIGHUP is signaled. That is, at a certain moment, > some walsenders (also their WalSnd entries in shmem) work based on > the latest configuration but the others (also their WalSnd entries) work based > on the old one. > > lowest_priority = SyncRepConfig->nmembers; > next_highest_priority = lowest_priority + 1; > > SyncRepGetSyncStandbysPriority() calculates the lowest priority among > all running walsenders as the above, by using the configuration info that > this walsender is based on. But this calculated lowest priority would be > invalid if other walsender is based on different (e.g., old) configuraiton. > This can cause the (other) walsender to have lower priority than > the calculated lowest priority and the second loop in > SyncRepGetSyncStandbysPriority() to unexpectedly end. I have the same understanding. Since sync_standby_priroity is protected by SyncRepLock these values of each walsender are not changed through two loops in SyncRepGetSyncStandbysPriority(). However, as Fujii-san already mentioned the true lowest priority can be lower than lowest_priority, nmembers, when only part of walsenders reloaded the configuration, which in turn could be the cause of leaving entries in the pending list at the end of the function. > Therefore, the band-aid fix seems to be to set the lowest priority to > very large number at the beginning of SyncRepGetSyncStandbysPriority(). I think we can use max_wal_senders. And we can change the second loop so that we exit from the function if the pending list is empty. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Tue, 31 Mar 2020 at 23:16, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote: > > On Fri, 27 Mar 2020 at 13:54, Fujii Masao <masao.fujii@oss.nttdata.com> wrote: > > > > > > > > On 2020/03/27 10:26, Tom Lane wrote: > > > Twice in the past month [1][2], buildfarm member hoverfly has managed > > > to reach the "unreachable" Assert(false) at the end of > > > SyncRepGetSyncStandbysPriority. > > > > When I search the past discussions, I found that Noah Misch reported > > the same issue. > > https://www.postgresql.org/message-id/20200206074552.GB3326097@rfd.leadboat.com > > > > > What seems likely to me, after quickly eyeballing the code, is that > > > hoverfly is hitting the blatantly-obvious race condition in that function. > > > Namely, that the second loop supposes that the state of the walsender > > > array hasn't changed since the first loop. > > > > > > The minimum fix for this, I suppose, would have the first loop capture > > > the sync_standby_priority value for each walsender along with what it's > > > already capturing. But I wonder if the whole function shouldn't be > > > rewritten from scratch, because it seems like the algorithm is both > > > expensively brute-force and unintelligible, which is a sad combination. > > > It's likely that the number of walsenders would never be high enough > > > that efficiency could matter, but then couldn't we use an algorithm > > > that is less complicated and more obviously correct? > > > > +1 to rewrite the function with better algorithm. > > > > > (Because the > > > alternative conclusion, if you reject the theory that a race is happening, > > > is that the algorithm is just flat out buggy; something that's not too > > > easy to disprove either.) > > > > > > Another fairly dubious thing here is that whether or not *am_sync > > > gets set depends not only on whether MyWalSnd is claiming to be > > > synchronous but on how many lower-numbered walsenders are too. > > > Is that really the right thing? > > > > > > But worse than any of that is that the return value seems to be > > > a list of walsender array indexes, meaning that the callers cannot > > > use it without making even stronger assumptions about the array > > > contents not having changed since the start of this function. > > > > > > It sort of looks like the design is based on the assumption that > > > the array contents can't change while SyncRepLock is held ... but > > > if that's the plan then why bother with the per-walsender spinlocks? > > > In any case this assumption seems to be failing, suggesting either > > > that there's a caller that's not holding SyncRepLock when it calls > > > this function, or that somebody is failing to take that lock while > > > modifying the array. > > > > As far as I read the code, that assumption seems still valid. But the problem > > is that each walsender updates MyWalSnd->sync_standby_priority at each > > convenient timing, when SIGHUP is signaled. That is, at a certain moment, > > some walsenders (also their WalSnd entries in shmem) work based on > > the latest configuration but the others (also their WalSnd entries) work based > > on the old one. > > > > lowest_priority = SyncRepConfig->nmembers; > > next_highest_priority = lowest_priority + 1; > > > > SyncRepGetSyncStandbysPriority() calculates the lowest priority among > > all running walsenders as the above, by using the configuration info that > > this walsender is based on. But this calculated lowest priority would be > > invalid if other walsender is based on different (e.g., old) configuraiton. > > This can cause the (other) walsender to have lower priority than > > the calculated lowest priority and the second loop in > > SyncRepGetSyncStandbysPriority() to unexpectedly end. > > I have the same understanding. Since sync_standby_priroity is > protected by SyncRepLock these values of each walsender are not > changed through two loops in SyncRepGetSyncStandbysPriority(). > However, as Fujii-san already mentioned the true lowest priority can > be lower than lowest_priority, nmembers, when only part of walsenders > reloaded the configuration, which in turn could be the cause of > leaving entries in the pending list at the end of the function. > > > Therefore, the band-aid fix seems to be to set the lowest priority to > > very large number at the beginning of SyncRepGetSyncStandbysPriority(). > > I think we can use max_wal_senders. Sorry, that's not true. We need another number large enough. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > On Tue, 31 Mar 2020 at 23:16, Masahiko Sawada > <masahiko.sawada@2ndquadrant.com> wrote: >>> Therefore, the band-aid fix seems to be to set the lowest priority to >>> very large number at the beginning of SyncRepGetSyncStandbysPriority(). >> I think we can use max_wal_senders. > Sorry, that's not true. We need another number large enough. The buildfarm had another three failures of this type today, so that motivated me to look at it some more. I don't think this code needs a band-aid fix; I think "nuke from orbit" is more nearly the right level of response. The point that I was trying to make originally is that it seems quite insane to imagine that a walsender's sync_standby_priority value is somehow more stable than the very existence of the process. Yet we only require a walsender to lock its own mutex while claiming or disowning its WalSnd entry (by setting or clearing the pid field). So I think it's nuts to define those fields as being protected by the global SyncRepLock. Even without considering the possibility that a walsender has just started or stopped, we have the problem Fujii-san described that after a change in the synchronous_standby_names GUC setting, different walsenders will update their values of sync_standby_priority at different instants. (BTW, I now notice that Noah had previously identified this problem at [1].) Thus, even while holding SyncRepLock, we do not have a guarantee that we'll see a consistent set of sync_standby_priority values. In fact we don't even know that the walsnd array entries still belong to the processes that last set those values. This is what is breaking SyncRepGetSyncStandbysPriority, and what it means is that there's really fundamentally no chance of that function producing trustworthy results. The "band aid" fixes discussed here might avoid crashing on the Assert, but they won't fix the problems that (a) the result is possibly wrong and (b) it can become stale immediately even if it's right when returned. Now, there are only two callers of SyncRepGetSyncStandbys: SyncRepGetSyncRecPtr and pg_stat_get_wal_senders. The latter is mostly cosmetic (which is a good thing, because to add insult to injury, it continues to use the list after releasing SyncRepLock; not that continuing to hold that lock would make things much safer). If I'm reading the code correctly, the former doesn't really care exactly which walsenders are sync standbys: all it cares about is to collect their WAL position pointers. What I think we should do about this is, essentially, to get rid of SyncRepGetSyncStandbys. Instead, let's have each walsender advertise whether *it* believes that it is a sync standby, based on its last evaluation of the relevant GUCs. This would be a bool that it'd compute and set alongside sync_standby_priority. (Hm, maybe we'd not even need to have that field anymore? Not sure.) We should also redefine that flag, and sync_standby_priority if it survives, as being protected by the per-walsender mutex not SyncRepLock. Then, what SyncRepGetSyncRecPtr would do is just sweep through the walsender array and collect WAL position pointers from the walsenders that claim to be sync standbys at the instant that it's inspecting them. pg_stat_get_wal_senders could also use those flags instead of the list from SyncRepGetSyncStandbys. It's likely that this definition would have slightly different behavior from the current implementation during the period where the system is absorbing a change in the set of synchronous walsenders. However, since the current implementation is visibly wrong during that period anyway, I'm not sure how this could be worse. And at least we can be certain that SyncRepGetSyncRecPtr will not include WAL positions from already-dead walsenders in its calculations, which *is* a hazard in the code as it stands. I also estimate that this would be noticeably more efficient than the current code, since the logic to decide who's a sync standby would only run when we're dealing with walsender start/stop or SIGHUP, rather than every time SyncRepGetSyncRecPtr runs. Don't especially want to code this myself, though. Anyone? regards, tom lane [1] https://www.postgresql.org/message-id/flat/20200206074552.GB3326097%40rfd.leadboat.com
At Sat, 11 Apr 2020 18:30:30 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > > On Tue, 31 Mar 2020 at 23:16, Masahiko Sawada > > <masahiko.sawada@2ndquadrant.com> wrote: > >>> Therefore, the band-aid fix seems to be to set the lowest priority to > >>> very large number at the beginning of SyncRepGetSyncStandbysPriority(). > > >> I think we can use max_wal_senders. > > > Sorry, that's not true. We need another number large enough. > > The buildfarm had another three failures of this type today, so that > motivated me to look at it some more. I don't think this code needs > a band-aid fix; I think "nuke from orbit" is more nearly the right > level of response. > > The point that I was trying to make originally is that it seems quite > insane to imagine that a walsender's sync_standby_priority value is > somehow more stable than the very existence of the process. Yet we > only require a walsender to lock its own mutex while claiming or > disowning its WalSnd entry (by setting or clearing the pid field). > So I think it's nuts to define those fields as being protected by > the global SyncRepLock. Right. FWIW, furthermore, even SyncRepConfig->syncrep_method can be inconsistent among walsenders. I haven't thought that it can be relied on as always consistent and it is enough that it makes a consistent result only while the setting and the set of walsenders is stable. > Even without considering the possibility that a walsender has just > started or stopped, we have the problem Fujii-san described that after > a change in the synchronous_standby_names GUC setting, different > walsenders will update their values of sync_standby_priority at > different instants. (BTW, I now notice that Noah had previously > identified this problem at [1].) > > Thus, even while holding SyncRepLock, we do not have a guarantee that > we'll see a consistent set of sync_standby_priority values. In fact > we don't even know that the walsnd array entries still belong to the > processes that last set those values. This is what is breaking > SyncRepGetSyncStandbysPriority, and what it means is that there's > really fundamentally no chance of that function producing trustworthy > results. The "band aid" fixes discussed here might avoid crashing on > the Assert, but they won't fix the problems that (a) the result is > possibly wrong and (b) it can become stale immediately even if it's > right when returned. Agreed. And I thought that it's not a problem if we had wrong result temporarily. And the unstability persists for the standby-reply interval at most (unless the next cause of instability comes). > Now, there are only two callers of SyncRepGetSyncStandbys: > SyncRepGetSyncRecPtr and pg_stat_get_wal_senders. The latter is > mostly cosmetic (which is a good thing, because to add insult to > injury, it continues to use the list after releasing SyncRepLock; > not that continuing to hold that lock would make things much safer). > If I'm reading the code correctly, the former doesn't really care > exactly which walsenders are sync standbys: all it cares about is > to collect their WAL position pointers. Agreed. To find the sync standby with the largest delay. > What I think we should do about this is, essentially, to get rid of > SyncRepGetSyncStandbys. Instead, let's have each walsender advertise > whether *it* believes that it is a sync standby, based on its last > evaluation of the relevant GUCs. This would be a bool that it'd > compute and set alongside sync_standby_priority. (Hm, maybe we'd not Mmm.. SyncRepGetStandbyPriority returns the "priority" that a walsender thinks it is at, among synchronous_standby_names. Then to decide "I am a sync standby" we need to know how many walsenders with higher priority are alive now. SyncRepGetSyncStandbyPriority does the judgment now and suffers from the inconsistency of priority values. In short, it seems to me like moving the problem into another place. But I think that there might be a smarter way to find "I am sync". > even need to have that field anymore? Not sure.) We should also > redefine that flag, and sync_standby_priority if it survives, as being > protected by the per-walsender mutex not SyncRepLock. Then, what > SyncRepGetSyncRecPtr would do is just sweep through the walsender > array and collect WAL position pointers from the walsenders that > claim to be sync standbys at the instant that it's inspecting them. > pg_stat_get_wal_senders could also use those flags instead of the > list from SyncRepGetSyncStandbys. > > It's likely that this definition would have slightly different > behavior from the current implementation during the period where > the system is absorbing a change in the set of synchronous > walsenders. However, since the current implementation is visibly > wrong during that period anyway, I'm not sure how this could be > worse. And at least we can be certain that SyncRepGetSyncRecPtr > will not include WAL positions from already-dead walsenders in > its calculations, which *is* a hazard in the code as it stands. > > I also estimate that this would be noticeably more efficient than > the current code, since the logic to decide who's a sync standby > would only run when we're dealing with walsender start/stop or > SIGHUP, rather than every time SyncRepGetSyncRecPtr runs. > > Don't especially want to code this myself, though. Anyone? > > regards, tom lane > > [1] https://www.postgresql.org/message-id/flat/20200206074552.GB3326097%40rfd.leadboat.com regards. -- Kyotaro Horiguchi NTT Open Source Software Center
At Mon, 13 Apr 2020 15:31:01 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in > At Sat, 11 Apr 2020 18:30:30 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > The point that I was trying to make originally is that it seems quite > > insane to imagine that a walsender's sync_standby_priority value is > > somehow more stable than the very existence of the process. Yet we > > only require a walsender to lock its own mutex while claiming or > > disowning its WalSnd entry (by setting or clearing the pid field). > > So I think it's nuts to define those fields as being protected by > > the global SyncRepLock. > > Right. FWIW, furthermore, even SyncRepConfig->syncrep_method can be > inconsistent among walsenders. I haven't thought that it can be > relied on as always consistent and it is enough that it makes a > consistent result only while the setting and the set of walsenders is > stable. Yes, the sentene "and (I haven't thought that) it is enough .." is a mistake of "and I have thought that it is enough that..". regards. -- Kyotaro Horiguchi NTT Open Source Software Center
Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > At Sat, 11 Apr 2020 18:30:30 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in >> What I think we should do about this is, essentially, to get rid of >> SyncRepGetSyncStandbys. Instead, let's have each walsender advertise >> whether *it* believes that it is a sync standby, based on its last >> evaluation of the relevant GUCs. This would be a bool that it'd >> compute and set alongside sync_standby_priority. (Hm, maybe we'd not > Mmm.. SyncRepGetStandbyPriority returns the "priority" that a > walsender thinks it is at, among synchronous_standby_names. Then to > decide "I am a sync standby" we need to know how many walsenders with > higher priority are alive now. SyncRepGetSyncStandbyPriority does the > judgment now and suffers from the inconsistency of priority values. Yeah. After looking a bit closer, I think that the current definition of sync_standby_priority (that is, as the result of local evaluation of SyncRepGetStandbyPriority()) is OK. The problem is what we're doing with it. I suggest that what we should do in SyncRepGetSyncRecPtr() is make one sweep across the WalSnd array, collecting PID, sync_standby_priority, *and* the WAL pointers from each valid entry. Then examine that data and decide which WAL value we need, without assuming that the sync_standby_priority values are necessarily totally consistent. But in any case we must examine each entry just once while holding its mutex, not go back to it later expecting it to still be the same. Another thing that I'm finding interesting is that I now see this is not at all new code. It doesn't look like SyncRepGetSyncStandbysPriority has changed much since 2016. So how come we didn't detect this problem long ago? I searched the buildfarm logs for assertion failures in syncrep.c, looking back one year, and here's what I found: sysname | branch | snapshot | stage | l ------------+---------------+---------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------- nightjar | REL_10_STABLE | 2019-08-13 23:04:41 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "/pgbuild/root/REL_10_STABLE/pgsql.build/../pgsql/src/backend/replication/syncrep.c",Line: 940) hoverfly | REL9_6_STABLE | 2019-11-07 17:19:12 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "syncrep.c",Line: 723) hoverfly | HEAD | 2019-11-22 12:15:08 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) francolin | HEAD | 2020-01-16 23:10:06 | recoveryCheck | TRAP: FailedAssertion("false", File: "/home/andres/build/buildfarm-francolin/HEAD/pgsql.build/../pgsql/src/backend/replication/syncrep.c",Line: 951) hoverfly | REL_11_STABLE | 2020-02-29 01:34:55 | recoveryCheck | TRAP: FailedAssertion("!(0)", File: "syncrep.c", Line:946) hoverfly | REL9_6_STABLE | 2020-03-26 13:51:15 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "syncrep.c",Line: 723) hoverfly | REL9_6_STABLE | 2020-04-07 21:52:07 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "syncrep.c",Line: 723) curculio | HEAD | 2020-04-11 18:30:21 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) sidewinder | HEAD | 2020-04-11 18:45:39 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) curculio | HEAD | 2020-04-11 20:30:26 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) sidewinder | HEAD | 2020-04-11 21:45:48 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) sidewinder | HEAD | 2020-04-13 10:45:35 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) conchuela | HEAD | 2020-04-13 16:00:18 | recoveryCheck | TRAP: FailedAssertion("false", File: "/home/pgbf/buildroot/HEAD/pgsql.build/../pgsql/src/backend/replication/syncrep.c",Line: 951) sidewinder | HEAD | 2020-04-13 18:45:34 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) (14 rows) The line numbers vary in the back branches, but all of these crashes are at that same Assert. So (a) yes, this does happen in the back branches, but (b) some fairly recent change has made it a whole lot more probable. Neither syncrep.c nor 007_sync_rep.pl have changed much in some time, so whatever the change was was indirect. Curious. Is it just timing? I'm giving the side-eye to Noah's recent changes 328c70997 and 421685812, but this isn't enough evidence to say definitely that that's what boosted the failure rate. regards, tom lane
On Tue, 14 Apr 2020 at 10:34, Tom Lane <tgl@sss.pgh.pa.us> wrote: > > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > At Sat, 11 Apr 2020 18:30:30 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > >> What I think we should do about this is, essentially, to get rid of > >> SyncRepGetSyncStandbys. Instead, let's have each walsender advertise > >> whether *it* believes that it is a sync standby, based on its last > >> evaluation of the relevant GUCs. This would be a bool that it'd > >> compute and set alongside sync_standby_priority. (Hm, maybe we'd not > > > Mmm.. SyncRepGetStandbyPriority returns the "priority" that a > > walsender thinks it is at, among synchronous_standby_names. Then to > > decide "I am a sync standby" we need to know how many walsenders with > > higher priority are alive now. SyncRepGetSyncStandbyPriority does the > > judgment now and suffers from the inconsistency of priority values. > > Yeah. After looking a bit closer, I think that the current definition > of sync_standby_priority (that is, as the result of local evaluation > of SyncRepGetStandbyPriority()) is OK. The problem is what we're doing > with it. I suggest that what we should do in SyncRepGetSyncRecPtr() > is make one sweep across the WalSnd array, collecting PID, > sync_standby_priority, *and* the WAL pointers from each valid entry. > Then examine that data and decide which WAL value we need, without assuming > that the sync_standby_priority values are necessarily totally consistent. > But in any case we must examine each entry just once while holding its > mutex, not go back to it later expecting it to still be the same. Can we have a similar approach of sync_standby_defined for sync_standby_priority? That is, checkpionter is responsible for changing sync_standby_priority of all walsenders when SIGHUP. That way, all walsenders can see a consistent view of sync_standby_priority. And when a walsender starts, it sets sync_standby_priority by itself. The logic to decide who's a sync standby doesn't change. SyncRepGetSyncRecPtr() gets all walsenders having higher priority along with their WAL positions. > > Another thing that I'm finding interesting is that I now see this is > not at all new code. It doesn't look like SyncRepGetSyncStandbysPriority > has changed much since 2016. So how come we didn't detect this problem > long ago? I searched the buildfarm logs for assertion failures in > syncrep.c, looking back one year, and here's what I found: > > sysname | branch | snapshot | stage | l > ------------+---------------+---------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------- > nightjar | REL_10_STABLE | 2019-08-13 23:04:41 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "/pgbuild/root/REL_10_STABLE/pgsql.build/../pgsql/src/backend/replication/syncrep.c",Line: 940) > hoverfly | REL9_6_STABLE | 2019-11-07 17:19:12 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "syncrep.c",Line: 723) > hoverfly | HEAD | 2019-11-22 12:15:08 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) > francolin | HEAD | 2020-01-16 23:10:06 | recoveryCheck | TRAP: FailedAssertion("false", File: "/home/andres/build/buildfarm-francolin/HEAD/pgsql.build/../pgsql/src/backend/replication/syncrep.c",Line: 951) > hoverfly | REL_11_STABLE | 2020-02-29 01:34:55 | recoveryCheck | TRAP: FailedAssertion("!(0)", File: "syncrep.c", Line:946) > hoverfly | REL9_6_STABLE | 2020-03-26 13:51:15 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "syncrep.c",Line: 723) > hoverfly | REL9_6_STABLE | 2020-04-07 21:52:07 | recoveryCheck | TRAP: FailedAssertion("!(((bool) 0))", File: "syncrep.c",Line: 723) > curculio | HEAD | 2020-04-11 18:30:21 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) > sidewinder | HEAD | 2020-04-11 18:45:39 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) > curculio | HEAD | 2020-04-11 20:30:26 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) > sidewinder | HEAD | 2020-04-11 21:45:48 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) > sidewinder | HEAD | 2020-04-13 10:45:35 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) > conchuela | HEAD | 2020-04-13 16:00:18 | recoveryCheck | TRAP: FailedAssertion("false", File: "/home/pgbf/buildroot/HEAD/pgsql.build/../pgsql/src/backend/replication/syncrep.c",Line: 951) > sidewinder | HEAD | 2020-04-13 18:45:34 | recoveryCheck | TRAP: FailedAssertion("false", File: "syncrep.c", Line:951) > (14 rows) > > The line numbers vary in the back branches, but all of these crashes are > at that same Assert. So (a) yes, this does happen in the back branches, > but (b) some fairly recent change has made it a whole lot more probable. > Neither syncrep.c nor 007_sync_rep.pl have changed much in some time, > so whatever the change was was indirect. Curious. Is it just timing? Interesting. It's happening on certain animals, not all. Especially tests with HEAD on sidewinder and curculio, which are NetBSD 7 and OpenBSD 5.9 respectively, started to fail at a high rate since a couple of days ago. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
At Tue, 14 Apr 2020 13:06:14 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in > On Tue, 14 Apr 2020 at 10:34, Tom Lane <tgl@sss.pgh.pa.us> wrote: > > > > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > > At Sat, 11 Apr 2020 18:30:30 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > >> What I think we should do about this is, essentially, to get rid of > > >> SyncRepGetSyncStandbys. Instead, let's have each walsender advertise > > >> whether *it* believes that it is a sync standby, based on its last > > >> evaluation of the relevant GUCs. This would be a bool that it'd > > >> compute and set alongside sync_standby_priority. (Hm, maybe we'd not > > > > > Mmm.. SyncRepGetStandbyPriority returns the "priority" that a > > > walsender thinks it is at, among synchronous_standby_names. Then to > > > decide "I am a sync standby" we need to know how many walsenders with > > > higher priority are alive now. SyncRepGetSyncStandbyPriority does the > > > judgment now and suffers from the inconsistency of priority values. > > > > Yeah. After looking a bit closer, I think that the current definition > > of sync_standby_priority (that is, as the result of local evaluation > > of SyncRepGetStandbyPriority()) is OK. The problem is what we're doing > > with it. I suggest that what we should do in SyncRepGetSyncRecPtr() > > is make one sweep across the WalSnd array, collecting PID, > > sync_standby_priority, *and* the WAL pointers from each valid entry. > > Then examine that data and decide which WAL value we need, without assuming > > that the sync_standby_priority values are necessarily totally consistent. > > But in any case we must examine each entry just once while holding its > > mutex, not go back to it later expecting it to still be the same. SyncRepGetSyncStandbysPriority() is runing holding SyncRepLock so sync_standby_priority of any walsender can be changed while the function is scanning welsenders. The issue is we already have inconsistent walsender information before we enter the function. Thus how many times we scan on the array doesn't make any difference. I think we need to do one of the followings. A) prevent SyncRepGetSyncStandbysPriority from being entered while walsender priority is inconsistent. B) make SyncRepGetSyncStandbysPriority be tolerant of priority inconsistency. C) protect walsender priority array from beinig inconsistent. The (B) is the band aids. To achieve A we need to central controller of priority config handling. C is: > Can we have a similar approach of sync_standby_defined for > sync_standby_priority? That is, checkpionter is responsible for > changing sync_standby_priority of all walsenders when SIGHUP. That > way, all walsenders can see a consistent view of > sync_standby_priority. And when a walsender starts, it sets > sync_standby_priority by itself. The logic to decide who's a sync > standby doesn't change. SyncRepGetSyncRecPtr() gets all walsenders > having higher priority along with their WAL positions. Yeah, it works if we do , but the problem of that way is that to determin priority of walsenders, we need to know what walsenders are running. That is, when new walsender comes the process needs to aware of the arrival (or leaving) right away and reassign the priority of every wal senders again. If we accept to share variable-length information among processes, sharing sync_standby_names or parsed SyncRepConfigData among processes would work. > > > > Another thing that I'm finding interesting is that I now see this is > > not at all new code. It doesn't look like SyncRepGetSyncStandbysPriority > > has changed much since 2016. So how come we didn't detect this problem > > long ago? I searched the buildfarm logs for assertion failures in > > syncrep.c, looking back one year, and here's what I found: ... > > The line numbers vary in the back branches, but all of these crashes are > > at that same Assert. So (a) yes, this does happen in the back branches, > > but (b) some fairly recent change has made it a whole lot more probable. > > Neither syncrep.c nor 007_sync_rep.pl have changed much in some time, > > so whatever the change was was indirect. Curious. Is it just timing? > > Interesting. It's happening on certain animals, not all. Especially > tests with HEAD on sidewinder and curculio, which are NetBSD 7 and > OpenBSD 5.9 respectively, started to fail at a high rate since a > couple of days ago. Coundn't this align the timing of config reloading? (I didn't checked anything yet.) | commit 421685812290406daea58b78dfab0346eb683bbb | Author: Noah Misch <noah@leadboat.com> | Date: Sat Apr 11 10:30:00 2020 -0700 | | When WalSndCaughtUp, sleep only in WalSndWaitForWal(). | Before sleeping, WalSndWaitForWal() sends a keepalive if MyWalSnd->write | < sentPtr. That is important in logical replication. When the latest | physical LSN yields no logical replication messages (a common case), | that keepalive elicits a reply, and processing the reply updates | pg_stat_replication.replay_lsn. WalSndLoop() lacks that; when | WalSndLoop() slept, replay_lsn advancement could stall until | wal_receiver_status_interval elapsed. This sometimes stalled | src/test/subscription/t/001_rep_changes.pl for up to 10s. | | Discussion: https://postgr.es/m/20200406063649.GA3738151@rfd.leadboat.com regards. -- Kyotaro Horiguchi NTT Open Source Software Center
Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > SyncRepGetSyncStandbysPriority() is runing holding SyncRepLock so > sync_standby_priority of any walsender can be changed while the > function is scanning welsenders. The issue is we already have > inconsistent walsender information before we enter the function. Thus > how many times we scan on the array doesn't make any difference. *Yes it does*. The existing code can deliver entirely broken results if some walsender exits between where we examine the priorities and where we fetch the WAL pointers. While that doesn't seem to be the exact issue we're seeing in the buildfarm, it's still another obvious bug in this code. I will not accept a "fix" that doesn't fix that. > I think we need to do one of the followings. > A) prevent SyncRepGetSyncStandbysPriority from being entered while > walsender priority is inconsistent. > B) make SyncRepGetSyncStandbysPriority be tolerant of priority > inconsistency. > C) protect walsender priority array from beinig inconsistent. (B) seems like the only practical solution from here. We could probably arrange for synchronous update of the priorities when they change in response to a GUC change, but it doesn't seem to me to be practical to do that in response to walsender exit. You'd end up finding that an unexpected walsender exit results in panic'ing the system, which is no better than where we are now. It doesn't seem to me to be that hard to implement the desired semantics for synchronous_standby_names with inconsistent info. In FIRST mode you basically just need to take the N smallest priorities you see in the array, but without assuming there are no duplicates or holes. It might be a good idea to include ties at the end, that is if you see 1,2,2,4 or 1,3,3,4 and you want 2 sync standbys, include the first three of them in the calculation until the inconsistency is resolved. In ANY mode I don't see that inconsistent priorities matter at all. > If we accept to share variable-length information among processes, > sharing sync_standby_names or parsed SyncRepConfigData among processes > would work. Not sure that we really need more than what's being shared now, ie each process's last-known index in the sync_standby_names list. regards, tom lane
I wrote: > It doesn't seem to me to be that hard to implement the desired > semantics for synchronous_standby_names with inconsistent info. > In FIRST mode you basically just need to take the N smallest > priorities you see in the array, but without assuming there are no > duplicates or holes. It might be a good idea to include ties at the > end, that is if you see 1,2,2,4 or 1,3,3,4 and you want 2 sync > standbys, include the first three of them in the calculation until > the inconsistency is resolved. In ANY mode I don't see that > inconsistent priorities matter at all. Concretely, I think we ought to do the attached, or something pretty close to it. I'm not really happy about breaking ties based on walsnd_index, but I see that there are several TAP test cases that fail if we do something else. I'm inclined to think those tests are bogus ... but I won't argue to change them right now. regards, tom lane diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31..c66c371 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -108,14 +108,17 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static void SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +409,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -523,8 +527,6 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and * store the positions into *writePtr, *flushPtr and *applyPtr. @@ -536,27 +538,43 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *sync_standbys; + int num_standbys; + int i; + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; *am_sync = false; + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); + + /* Am I among the candidate sync standbys? */ + for (i = 0; i < num_standbys; i++) + { + if (sync_standbys[i].is_me) + { + *am_sync = sync_standbys[i].is_sync_standby; + break; + } + } /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if we are not managing a sync standby or there are + * not enough synchronous standbys. (Note: if there are least num_sync + * candidates, then at least num_sync of them will be marked as + * is_sync_standby; we don't need to count them here.) */ if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(sync_standbys); return false; } @@ -576,15 +594,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + sync_standbys, num_standbys); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + sync_standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(sync_standbys); return true; } @@ -592,27 +611,28 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (!sync_standbys[i].is_sync_standby) + continue; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +648,43 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (!sync_standbys[i].is_sync_standby) + continue; - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +714,48 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. - * - * The caller must hold SyncRepLock. + * Return data about walsenders that are candidates to be sync standbys. * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, + * and the number of valid entries (candidate sync senders) is returned. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetSyncStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return false; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +764,79 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + stby->is_sync_standby = true; /* might change below */ + n++; } - return result; + /* + * In quorum mode, that's all we have to do. In priority mode, decide + * which ones are high enough priority to consider sync standbys. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + SyncRepGetSyncStandbysPriority(*standbys, n); + + return n; } /* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. - * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. + * Decide which standbys to consider synchronous. * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * This function must be called only in priority-based sync replication. */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) +static void +SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n) { - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; + /* Nothing to do here unless there are too many candidates. */ + if (n <= SyncRepConfig->num_sync) + return; /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * Sort the candidates by priority; then the first num_sync ones are + * synchronous, and the rest aren't. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; - - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; + qsort(standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; - - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; - - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } + for (i = SyncRepConfig->num_sync; i < n; i++) + standbys[i].is_sync_standby = false; +} - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d1..859ca60 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active synchronous standbys. This could be out of + * date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (sync_standbys[j].walsnd_index == i) + { + is_sync_standby = sync_standbys[j].is_sync_standby; + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,7 +3396,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) + else if (is_sync_standby) values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91..1308ada 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -37,6 +37,26 @@ #define SYNC_REP_QUORUM 1 /* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; + /* After-the-fact conclusion about whether this is a sync standby */ + bool is_sync_standby; +} SyncRepStandbyData; + +/* * Struct for the configuration of synchronous replication. * * Note: this must be a flat representation that can be held in a single @@ -74,7 +94,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetSyncStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f..734acec 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -70,13 +75,6 @@ typedef struct WalSnd Latch *latch; /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - - /* * Timestamp of the last message received from standby. */ TimestampTz replyTime;
At Tue, 14 Apr 2020 09:52:42 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > SyncRepGetSyncStandbysPriority() is runing holding SyncRepLock so > > sync_standby_priority of any walsender can be changed while the > > function is scanning welsenders. The issue is we already have > > inconsistent walsender information before we enter the function. Thus > > how many times we scan on the array doesn't make any difference. > > *Yes it does*. The existing code can deliver entirely broken results > if some walsender exits between where we examine the priorities and > where we fetch the WAL pointers. While that doesn't seem to be the Ah. I didn't take that as inconsistency. Actually walsender exit inactivate the corresponding slot by setting pid = 0. In a bad case (as you mentioned upthread) the entry can be occupied by another wal sender. However, sync_standby_priority cannot be updated until the whole work is finished. > exact issue we're seeing in the buildfarm, it's still another obvious > bug in this code. I will not accept a "fix" that doesn't fix that. I think that the "inconsistency" that can be observed in a process is disagreement between SyncRepConfig->nmembers and <each_walsnd_entry>->sync_standby_priority. If any one of walsenders regards its priority as lower (larger in value) than nmembers in the "current" process, the assertion fires. If that is the issue, the issue is not dynamic inconsistency. # It's the assumption of my band-aid. > > I think we need to do one of the followings. > > > A) prevent SyncRepGetSyncStandbysPriority from being entered while > > walsender priority is inconsistent. > > B) make SyncRepGetSyncStandbysPriority be tolerant of priority > > inconsistency. > > C) protect walsender priority array from beinig inconsistent. > > (B) seems like the only practical solution from here. We could > probably arrange for synchronous update of the priorities when > they change in response to a GUC change, but it doesn't seem to > me to be practical to do that in response to walsender exit. > You'd end up finding that an unexpected walsender exit results > in panic'ing the system, which is no better than where we are now. I agree to you as a whole. I thought of several ways to keep the consistency of the array but all of them looked too much. > It doesn't seem to me to be that hard to implement the desired > semantics for synchronous_standby_names with inconsistent info. > In FIRST mode you basically just need to take the N smallest > priorities you see in the array, but without assuming there are no > duplicates or holes. It might be a good idea to include ties at the > end, that is if you see 1,2,2,4 or 1,3,3,4 and you want 2 sync > standbys, include the first three of them in the calculation until > the inconsistency is resolved. In ANY mode I don't see that > inconsistent priorities matter at all. Mmm, the priority lists like 1,2,2,4 are not thought as inconsistency at all in the context of walsender priority. That happenes stablly if any two or more walreceivers reported the same application_name. I believe the existing code is already taking that case into consideration. > > If we accept to share variable-length information among processes, > > sharing sync_standby_names or parsed SyncRepConfigData among processes > > would work. > > Not sure that we really need more than what's being shared now, > ie each process's last-known index in the sync_standby_names list. If we take the (B), we don't need anymore. (A) and (C) would need more. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
At Tue, 14 Apr 2020 16:32:40 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > I wrote: > > It doesn't seem to me to be that hard to implement the desired > > semantics for synchronous_standby_names with inconsistent info. > > In FIRST mode you basically just need to take the N smallest > > priorities you see in the array, but without assuming there are no > > duplicates or holes. It might be a good idea to include ties at the > > end, that is if you see 1,2,2,4 or 1,3,3,4 and you want 2 sync > > standbys, include the first three of them in the calculation until > > the inconsistency is resolved. In ANY mode I don't see that > > inconsistent priorities matter at all. > > Concretely, I think we ought to do the attached, or something pretty > close to it. Looking SyncRepGetSyncStandbys, I agree that it's good not assuming lowest_priority, which I thought as the culprit of the assertion failure. The current code intends to use less memory. I don't think there is a case where only 3 out of 1000 standbys are required to be sync-standby so collecting all wal senders then sorting them seems reasonable strategy. The new code looks clearer. + stby->is_sync_standby = true; /* might change below */ I'm uneasy with that. In quorum mode all running standbys are marked as "sync" and that's bogus. The only users of the flag seems to be: SyncRepGetSyncRecPtr: + *am_sync = sync_standbys[i].is_sync_standby; and SyncRepGetOldestSyncRecPtr: + /* Ignore candidates that aren't considered synchronous */ + if (!sync_standbys[i].is_sync_standby) + continue; On the other hand sync_standbys is already sorted in priority order so I think we can get rid of the member by setting *am_syncas the follows. SyncRepGetSyncRecPtr: if (sync_standbys[i].is_me) { *am_sync = (i < SyncRepConfig->num_sync); break; } And the second user can be as the follows. SyncRepGetOldestSyncRecPtr: /* Ignore candidates that aren't considered synchronous */ if (i >= SyncRepConfig->num_sync) break; > I'm not really happy about breaking ties based on walsnd_index, > but I see that there are several TAP test cases that fail if we > do something else. I'm inclined to think those tests are bogus ... > but I won't argue to change them right now. Agreed about the tie-breaker. I'm looking this more closer. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
On Tue, Apr 14, 2020 at 04:32:40PM -0400, Tom Lane wrote: > I wrote: > > It doesn't seem to me to be that hard to implement the desired > > semantics for synchronous_standby_names with inconsistent info. > > In FIRST mode you basically just need to take the N smallest > > priorities you see in the array, but without assuming there are no > > duplicates or holes. It might be a good idea to include ties at the > > end, that is if you see 1,2,2,4 or 1,3,3,4 and you want 2 sync > > standbys, include the first three of them in the calculation until > > the inconsistency is resolved. In ANY mode I don't see that > > inconsistent priorities matter at all. > > Concretely, I think we ought to do the attached, or something pretty > close to it. > > I'm not really happy about breaking ties based on walsnd_index, > but I see that there are several TAP test cases that fail if we > do something else. I'm inclined to think those tests are bogus ... > but I won't argue to change them right now. This passes the test battery I wrote in preparation for the 2020-02 thread.
On Tue, 14 Apr 2020 at 18:35, Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: > > At Tue, 14 Apr 2020 13:06:14 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in > > On Tue, 14 Apr 2020 at 10:34, Tom Lane <tgl@sss.pgh.pa.us> wrote: > > > > > > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > > > At Sat, 11 Apr 2020 18:30:30 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > > >> What I think we should do about this is, essentially, to get rid of > > > >> SyncRepGetSyncStandbys. Instead, let's have each walsender advertise > > > >> whether *it* believes that it is a sync standby, based on its last > > > >> evaluation of the relevant GUCs. This would be a bool that it'd > > > >> compute and set alongside sync_standby_priority. (Hm, maybe we'd not > > > > > > > Mmm.. SyncRepGetStandbyPriority returns the "priority" that a > > > > walsender thinks it is at, among synchronous_standby_names. Then to > > > > decide "I am a sync standby" we need to know how many walsenders with > > > > higher priority are alive now. SyncRepGetSyncStandbyPriority does the > > > > judgment now and suffers from the inconsistency of priority values. > > > > > > Yeah. After looking a bit closer, I think that the current definition > > > of sync_standby_priority (that is, as the result of local evaluation > > > of SyncRepGetStandbyPriority()) is OK. The problem is what we're doing > > > with it. I suggest that what we should do in SyncRepGetSyncRecPtr() > > > is make one sweep across the WalSnd array, collecting PID, > > > sync_standby_priority, *and* the WAL pointers from each valid entry. > > > Then examine that data and decide which WAL value we need, without assuming > > > that the sync_standby_priority values are necessarily totally consistent. > > > But in any case we must examine each entry just once while holding its > > > mutex, not go back to it later expecting it to still be the same. > > SyncRepGetSyncStandbysPriority() is runing holding SyncRepLock so > sync_standby_priority of any walsender can be changed while the > function is scanning welsenders. The issue is we already have > inconsistent walsender information before we enter the function. Thus > how many times we scan on the array doesn't make any difference. > > I think we need to do one of the followings. > > A) prevent SyncRepGetSyncStandbysPriority from being entered while > walsender priority is inconsistent. > > B) make SyncRepGetSyncStandbysPriority be tolerant of priority > inconsistency. > > C) protect walsender priority array from beinig inconsistent. > > The (B) is the band aids. To achieve A we need to central controller > of priority config handling. C is: > > > Can we have a similar approach of sync_standby_defined for > > sync_standby_priority? That is, checkpionter is responsible for > > changing sync_standby_priority of all walsenders when SIGHUP. That > > way, all walsenders can see a consistent view of > > sync_standby_priority. And when a walsender starts, it sets > > sync_standby_priority by itself. The logic to decide who's a sync > > standby doesn't change. SyncRepGetSyncRecPtr() gets all walsenders > > having higher priority along with their WAL positions. > > Yeah, it works if we do , but the problem of that way is that to > determin priority of walsenders, we need to know what walsenders are > running. That is, when new walsender comes the process needs to aware > of the arrival (or leaving) right away and reassign the priority of > every wal senders again. I think we don't need to reassign the priority when new walsender comes or leaves. IIUC The priority is calculated based on only synchronous_standby_names. Coming or leaving a walsender doesn't affect other's priorities. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
At Wed, 15 Apr 2020 11:35:58 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in > I'm looking this more closer. It looks to be in the right direction to me. As mentioned in the previous mail, I removed is_sync_standby from SycnStandbyData. But just doing that breaks pg_stat_get_wal_senders. It is an exsting issue but the logic for sync_state (values[10]) looks odd. Fixed in the attached. SyncRepInitConfig uses mutex instead of SyncRepLock. Since anyway the integrity of sync_standby_priority is not guaranteed, it seems OK to me. It seems fine to remove the assertion and requirement about SyncRepLock from SyncRepGetSyncRecPtr for the same reason. (Actually the lock is held, though.) SyncRepGetSyncStandbysPriority doesn't seem worth existing as a function. Removed in the attached. + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); The list is no longer consists only of synchronous standbys. I changed the function name, variable name and tried to adjust related comments. It's not what the patch did, but I don't understand why SyncRepGetNthLatestSyncRecPtr takes SyncRepConfig->num_sync but SyncRepGetOldest.. accesses it directly. Changed the function *Oldest* in the attached. I didn't do that but finally, the two functions can be consolidated, just by moving the selection logic currently in SyncRepGetSyncRecPtr into the new function. The resulting patch is attached. - removed is_sync_standby from SyncRepStandbyData - Fixed the logic for values[10] in pg_stat_get_wal_senders - Changed the signature of SyncRepGetOldestSyncRecPtr - Adjusted some comments to the behavioral change of SyncRepGet(Sync)Standbys. regards. -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..e8a47f98b3 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -108,14 +108,17 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +409,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -523,8 +527,6 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and * store the positions into *writePtr, *flushPtr and *applyPtr. @@ -536,27 +538,41 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *standbys; + int num_standbys; + int i; + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; *am_sync = false; - /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* Get standbys. They are in priority order if we are in priority mode */ + num_standbys = SyncRepGetStandbys(&standbys); + + /* Am I among the candidate sync standbys? */ + for (i = 0; i < num_standbys; i++) + { + if (standbys[i].is_me) + { + *am_sync = (i < SyncRepConfig->num_sync); + break; + } + } /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if we are not managing a sync standby or there are + * not enough synchronous standbys. */ if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(standbys); return false; } @@ -576,43 +592,48 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + standbys, num_standbys, + SyncRepConfig->num_sync); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(standbys); return true; } /* - * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * Calculate the oldest Write among the first nsync standbys, Flush and Apply + * positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (i >= nsyncs) + break; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +649,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +711,51 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about active walsenders. * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, and + * the number of valid entries is returned. The entries are in + * sync_standby_priority order if the current SyncRepConfig->syncrep_method is + * SYNC_REP_PRIORITY. The first SyncRepConfig->num_sync entries are + * the candiates of synchronous standby. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return false; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; } - return result; -} - -/* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. - * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) -{ - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; - int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; - /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * In quorum mode, that's all we have to do. In priority mode, sort the + * candidates by priority. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; - - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; - - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; + return n; +} - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d..7889ea5f6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active standbys in priority order. This could be out + * of date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetStandbys(&standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (standbys[j].walsnd_index == i) + { + is_sync_standby = (j < SyncRepConfig->num_sync); + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) - values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? - CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + if (is_sync_standby) + values[10] = CStringGetTextDatum("sync"); + else + values[10] = CStringGetTextDatum("potential"); + } else - values[10] = CStringGetTextDatum("potential"); + values[10] = CStringGetTextDatum("quorum"); if (replyTime == 0) nulls[11] = true; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..533aac25c7 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,24 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a4..734acec2a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -69,13 +74,6 @@ typedef struct WalSnd */ Latch *latch; - /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - /* * Timestamp of the last message received from standby. */
At Wed, 15 Apr 2020 13:01:02 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in > On Tue, 14 Apr 2020 at 18:35, Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: > > > > At Tue, 14 Apr 2020 13:06:14 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in > > > On Tue, 14 Apr 2020 at 10:34, Tom Lane <tgl@sss.pgh.pa.us> wrote: > > > > > > > > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > > > > At Sat, 11 Apr 2020 18:30:30 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > > > >> What I think we should do about this is, essentially, to get rid of > > > > >> SyncRepGetSyncStandbys. Instead, let's have each walsender advertise > > > > >> whether *it* believes that it is a sync standby, based on its last > > > > >> evaluation of the relevant GUCs. This would be a bool that it'd > > > > >> compute and set alongside sync_standby_priority. (Hm, maybe we'd not > > > > > > > > > Mmm.. SyncRepGetStandbyPriority returns the "priority" that a > > > > > walsender thinks it is at, among synchronous_standby_names. Then to > > > > > decide "I am a sync standby" we need to know how many walsenders with > > > > > higher priority are alive now. SyncRepGetSyncStandbyPriority does the > > > > > judgment now and suffers from the inconsistency of priority values. > > > > > > > > Yeah. After looking a bit closer, I think that the current definition > > > > of sync_standby_priority (that is, as the result of local evaluation > > > > of SyncRepGetStandbyPriority()) is OK. The problem is what we're doing > > > > with it. I suggest that what we should do in SyncRepGetSyncRecPtr() > > > > is make one sweep across the WalSnd array, collecting PID, > > > > sync_standby_priority, *and* the WAL pointers from each valid entry. > > > > Then examine that data and decide which WAL value we need, without assuming > > > > that the sync_standby_priority values are necessarily totally consistent. > > > > But in any case we must examine each entry just once while holding its > > > > mutex, not go back to it later expecting it to still be the same. > > > > SyncRepGetSyncStandbysPriority() is runing holding SyncRepLock so > > sync_standby_priority of any walsender can be changed while the > > function is scanning welsenders. The issue is we already have > > inconsistent walsender information before we enter the function. Thus > > how many times we scan on the array doesn't make any difference. > > > > I think we need to do one of the followings. > > > > A) prevent SyncRepGetSyncStandbysPriority from being entered while > > walsender priority is inconsistent. > > > > B) make SyncRepGetSyncStandbysPriority be tolerant of priority > > inconsistency. > > > > C) protect walsender priority array from beinig inconsistent. > > > > The (B) is the band aids. To achieve A we need to central controller > > of priority config handling. C is: > > > > > Can we have a similar approach of sync_standby_defined for > > > sync_standby_priority? That is, checkpionter is responsible for > > > changing sync_standby_priority of all walsenders when SIGHUP. That > > > way, all walsenders can see a consistent view of > > > sync_standby_priority. And when a walsender starts, it sets > > > sync_standby_priority by itself. The logic to decide who's a sync > > > standby doesn't change. SyncRepGetSyncRecPtr() gets all walsenders > > > having higher priority along with their WAL positions. > > > > Yeah, it works if we do , but the problem of that way is that to > > determin priority of walsenders, we need to know what walsenders are > > running. That is, when new walsender comes the process needs to aware > > of the arrival (or leaving) right away and reassign the priority of > > every wal senders again. > > I think we don't need to reassign the priority when new walsender > comes or leaves. IIUC The priority is calculated based on only > synchronous_standby_names. Coming or leaving a walsender doesn't > affect other's priorities. Sorry, the "priority" in this area is a bit confusing. The "priority" defined by synchronous_standby_names is determined in isolation from the presence of walsenders. The "priority" in walsnd->sync_standby_priority needs walsender presence to determine. I thought of the latter in the discussion. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > At Tue, 14 Apr 2020 16:32:40 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > + stby->is_sync_standby = true; /* might change below */ > I'm uneasy with that. In quorum mode all running standbys are marked > as "sync" and that's bogus. I don't follow that? The existing coding of SyncRepGetSyncStandbysQuorum returns all the candidates in its list, so this is isomorphic to that. Possibly a different name for the flag would be more suited? > On the other hand sync_standbys is already sorted in priority order so I think we can get rid of the member by setting*am_sync as the follows. > SyncRepGetSyncRecPtr: > if (sync_standbys[i].is_me) > { > *am_sync = (i < SyncRepConfig->num_sync); > break; > } I disagree with this, it will change the behavior in the quorum case. In any case, a change like this will cause callers to know way more than they ought to about the ordering of the array. In my mind, the fact that SyncRepGetSyncStandbysPriority is sorting the array is an internal implementation detail; I do not want it to be part of the API. (Apropos to that, I realized from working on this patch that there's another, completely undocumented assumption in the existing code, that the integer list will be sorted by walsender index for equal priorities. I don't like that either, and not just because it's undocumented.) regards, tom lane
At Wed, 15 Apr 2020 11:31:49 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > At Tue, 14 Apr 2020 16:32:40 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > + stby->is_sync_standby = true; /* might change below */ > > > I'm uneasy with that. In quorum mode all running standbys are marked > > as "sync" and that's bogus. > > I don't follow that? The existing coding of SyncRepGetSyncStandbysQuorum > returns all the candidates in its list, so this is isomorphic to that. The existing code actully does that. On the other hand SyncRepGetSyncStandbysPriority returns standbys that *are known to be* synchronous, but *Quorum returns standbys that *can be* synchronous. What the two functions return are different from each other. So it should be is_sync_standby for -Priority and is_sync_candidate for -Quorum. > Possibly a different name for the flag would be more suited? > > > On the other hand sync_standbys is already sorted in priority order so I think we can get rid of the member by setting*am_sync as the follows. > > > SyncRepGetSyncRecPtr: > > if (sync_standbys[i].is_me) > > { > > *am_sync = (i < SyncRepConfig->num_sync); > > break; > > } > > I disagree with this, it will change the behavior in the quorum case. Oops, you're right. I find the whole thing there (and me) is a bit confusing. syncrep_method affects how some values (specifically am_sync and sync_standbys) are translated at several calling depths. And the *am_sync informs nothing in quorum mode. > In any case, a change like this will cause callers to know way more than > they ought to about the ordering of the array. In my mind, the fact that > SyncRepGetSyncStandbysPriority is sorting the array is an internal > implementation detail; I do not want it to be part of the API. Anyway the am_sync and is_sync_standby is utterly useless in quorum mode. That discussion is pretty persuasive if not, but actually the upper layers (SyncRepReleaseWaiters and SyncRepGetSyncRecPtr) referes to syncrep_method to differentiate the interpretation of the am_sync flag and sync_standbys list. So anyway the difference is actually a part of API. After thinking some more, I concluded that some of the variables are wrongly named or considered, and redundant. The fucntion of am_sync is covered by got_recptr in SyncRepReleaseWaiters, so it's enough that SyncRepGetSyncRecPtr just reports to the caller whether the caller may release some of the waiter processes. This simplifies the related functions and make it (to me) clearer. Please find the attached. > (Apropos to that, I realized from working on this patch that there's > another, completely undocumented assumption in the existing code, that > the integer list will be sorted by walsender index for equal priorities. > I don't like that either, and not just because it's undocumented.) That seems accidentally. Sorting by priority is the disigned behavior and documented, in contrast, entries of the same priority are ordered in index order by accident and not documented, that means it can be changed anytime. I think we don't define everyting in such detail. regards. diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..99a7bbbc86 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -103,19 +103,21 @@ static int SyncRepWakeQueue(bool all, int mode); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); + XLogRecPtr *applyPtr); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +408,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -429,8 +432,7 @@ SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; - bool got_recptr; - bool am_sync; + bool release_waiters; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -458,16 +460,24 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* - * Check whether we are a sync standby or not, and calculate the synced - * positions among all sync standbys. + * Check whether we may release any of waiter processes, and calculate the + * synced positions. */ - got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); + release_waiters = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr); + + /* Return if nothing to do. */ + if (!release_waiters) + { + LWLockRelease(SyncRepLock); + announce_next_takeover = true; + return; + } /* - * If we are managing a sync standby, though we weren't prior to this, - * then announce we are now a sync standby. + * If this walsender becomes to be able to release waiter processes, + * announce about that. */ - if (announce_next_takeover && am_sync) + if (announce_next_takeover) { announce_next_takeover = false; @@ -481,17 +491,6 @@ SyncRepReleaseWaiters(void) application_name))); } - /* - * If the number of sync standbys is less than requested or we aren't - * managing a sync standby then just leave. - */ - if (!got_recptr || !am_sync) - { - LWLockRelease(SyncRepLock); - announce_next_takeover = !am_sync; - return; - } - /* * Set the lsn first so that when we wake backends they will release up to * this location. @@ -523,43 +522,62 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * - * Return false if the number of sync standbys is less than - * synchronous_standby_names specifies. Otherwise return true and - * store the positions into *writePtr, *flushPtr and *applyPtr. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * Return false if this walsender cannot release any of waiteres. Otherwise + * return true and store the positions into *writePtr, *flushPtr and *applyPtr. */ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, bool *am_sync) + XLogRecPtr *applyPtr) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *standbys; + int num_standbys; + int i; + bool sync_priority = + (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; - *am_sync = false; - /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* Get standbys. They are in priority order. */ + num_standbys = SyncRepGetStandbys(&standbys); /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if there are not enough synchronous standbys or + * candidates. */ - if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + if (num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(standbys); return false; } + /* When priority mode, nothing to do if I an not a sync standby */ + if (sync_priority) + { + bool am_sync = false; + + for (i = 0; i < num_standbys; i++) + { + if (standbys[i].is_me) + { + am_sync = true; + break; + } + } + + if (!am_sync) + { + pfree(standbys); + return false; + } + } + /* * In a priority-based sync replication, the synced positions are the * oldest ones among sync standbys. In a quorum-based, they are the Nth @@ -573,46 +591,51 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced * positions even in a quorum-based sync replication. */ - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + if (sync_priority) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + standbys, num_standbys, + SyncRepConfig->num_sync); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(standbys); return true; } /* - * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * Calculate the oldest Write among the first nsync standbys, Flush and Apply + * positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (i >= nsyncs) + break; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +651,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +713,49 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about active walsenders. * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, and + * the number of valid entries is returned. The entries are in + * sync_standby_priority order. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return false; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; } - return result; -} - -/* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. - * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) -{ - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; - int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; - /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * In quorum mode, that's all we have to do since all entries are in the + * same priority 1. In priority mode, sort the candidates by priority. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; - - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; - - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; + return n; +} - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d..7889ea5f6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active standbys in priority order. This could be out + * of date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetStandbys(&standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (standbys[j].walsnd_index == i) + { + is_sync_standby = (j < SyncRepConfig->num_sync); + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) - values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? - CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + if (is_sync_standby) + values[10] = CStringGetTextDatum("sync"); + else + values[10] = CStringGetTextDatum("potential"); + } else - values[10] = CStringGetTextDatum("potential"); + values[10] = CStringGetTextDatum("quorum"); if (replyTime == 0) nulls[11] = true; diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 1417401086..3a8394d7f2 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -645,6 +645,13 @@ cfgets(cfp *fp, char *buf, int len) return fgets(buf, len, fp->uncompressedfp); } +/* + * cfclose close the stream + * + * Returns 0 if successfully closed the cfp. Most of errors are reported as -1 + * and errno is set. Otherwise the return value is the return value from + * gzclose and errno doesn't hold a meangful value. + */ int cfclose(cfp *fp) { @@ -665,6 +672,11 @@ cfclose(cfp *fp) #endif { result = fclose(fp->uncompressedfp); + + /* normalize error return, just in case EOF is not -1 */ + if (result != 0) + result = -1; + fp->uncompressedfp = NULL; } free_keep_errno(fp); diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index c9cce5ed8a..ecc6aa5fbb 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -108,6 +108,7 @@ void InitArchiveFmt_Directory(ArchiveHandle *AH) { lclContext *ctx; + int ret; /* Assuming static functions, this can be copied for each format. */ AH->ArchiveEntryPtr = _ArchiveEntry; @@ -218,8 +219,14 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) ReadToc(AH); /* Nothing else in the file, so close it again... */ - if (cfclose(tocFH) != 0) - fatal("could not close TOC file: %m"); + ret = cfclose(tocFH); + if (ret < 0) + { + if (ret == -1) + fatal("could not close TOC file: %m"); + else + fatal("could not close TOC file: zlib error (%d)", ret); + } ctx->dataFH = NULL; } } @@ -378,6 +385,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename) char *buf; size_t buflen; cfp *cfp; + int ret; if (!filename) return; @@ -396,8 +404,15 @@ _PrintFileData(ArchiveHandle *AH, char *filename) } free(buf); - if (cfclose(cfp) !=0) - fatal("could not close data file: %m"); + + ret = cfclose(cfp); + if (ret < 0) + { + if (ret == -1) + fatal("could not close data file: %m"); + else + fatal("could not close data file: zlib error (%d)", ret); + } } /* @@ -429,6 +444,7 @@ _LoadBlobs(ArchiveHandle *AH) lclContext *ctx = (lclContext *) AH->formatData; char fname[MAXPGPATH]; char line[MAXPGPATH]; + int ret; StartRestoreBlobs(AH); @@ -460,9 +476,16 @@ _LoadBlobs(ArchiveHandle *AH) fatal("error reading large object TOC file \"%s\"", fname); - if (cfclose(ctx->blobsTocFH) != 0) - fatal("could not close large object TOC file \"%s\": %m", - fname); + ret = cfclose(ctx->blobsTocFH); + if (ret < 0) + { + if (ret == -1) + fatal("could not close large object TOC file \"%s\": %m", + fname); + else + fatal("could not close large object TOC file \"%s\": zlib error (%d)", + fname, ret); + } ctx->blobsTocFH = NULL; @@ -555,6 +578,7 @@ _CloseArchive(ArchiveHandle *AH) { cfp *tocFH; char fname[MAXPGPATH]; + int ret; setFilePath(AH, fname, "toc.dat"); @@ -576,8 +600,14 @@ _CloseArchive(ArchiveHandle *AH) WriteHead(AH); AH->format = archDirectory; WriteToc(AH); - if (cfclose(tocFH) != 0) - fatal("could not close TOC file: %m"); + ret = cfclose(tocFH); + if (ret < 0) + { + if (ret == -1) + fatal("could not close TOC file: %m"); + else + fatal("could not close TOC file: zlib error (%d)", ret); + } WriteDataChunks(AH, ctx->pstate); ParallelBackupEnd(AH, ctx->pstate); diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..533aac25c7 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,24 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a4..734acec2a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -69,13 +74,6 @@ typedef struct WalSnd */ Latch *latch; - /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - /* * Timestamp of the last message received from standby. */
On Thu, 16 Apr 2020 at 16:22, Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: > > At Wed, 15 Apr 2020 11:31:49 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > > At Tue, 14 Apr 2020 16:32:40 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > > + stby->is_sync_standby = true; /* might change below */ > > > > > I'm uneasy with that. In quorum mode all running standbys are marked > > > as "sync" and that's bogus. > > > > I don't follow that? The existing coding of SyncRepGetSyncStandbysQuorum > > returns all the candidates in its list, so this is isomorphic to that. > > The existing code actully does that. On the other hand > SyncRepGetSyncStandbysPriority returns standbys that *are known to be* > synchronous, but *Quorum returns standbys that *can be* synchronous. > What the two functions return are different from each other. So it > should be is_sync_standby for -Priority and is_sync_candidate for > -Quorum. > > > Possibly a different name for the flag would be more suited? > > > > > On the other hand sync_standbys is already sorted in priority order so I think we can get rid of the member by setting*am_sync as the follows. > > > > > SyncRepGetSyncRecPtr: > > > if (sync_standbys[i].is_me) > > > { > > > *am_sync = (i < SyncRepConfig->num_sync); > > > break; > > > } > > > > I disagree with this, it will change the behavior in the quorum case. > > Oops, you're right. I find the whole thing there (and me) is a bit > confusing. syncrep_method affects how some values (specifically > am_sync and sync_standbys) are translated at several calling depths. > And the *am_sync informs nothing in quorum mode. > > > In any case, a change like this will cause callers to know way more than > > they ought to about the ordering of the array. In my mind, the fact that > > SyncRepGetSyncStandbysPriority is sorting the array is an internal > > implementation detail; I do not want it to be part of the API. > > Anyway the am_sync and is_sync_standby is utterly useless in quorum > mode. That discussion is pretty persuasive if not, but actually the > upper layers (SyncRepReleaseWaiters and SyncRepGetSyncRecPtr) referes > to syncrep_method to differentiate the interpretation of the am_sync > flag and sync_standbys list. So anyway the difference is actually a > part of API. > > After thinking some more, I concluded that some of the variables are > wrongly named or considered, and redundant. The fucntion of am_sync is > covered by got_recptr in SyncRepReleaseWaiters, so it's enough that > SyncRepGetSyncRecPtr just reports to the caller whether the caller may > release some of the waiter processes. This simplifies the related > functions and make it (to me) clearer. > > Please find the attached. > > > > (Apropos to that, I realized from working on this patch that there's > > another, completely undocumented assumption in the existing code, that > > the integer list will be sorted by walsender index for equal priorities. > > I don't like that either, and not just because it's undocumented.) > > That seems accidentally. Sorting by priority is the disigned behavior > and documented, in contrast, entries of the same priority are ordered > in index order by accident and not documented, that means it can be > changed anytime. I think we don't define everyting in such detail. > This is just a notice; I'm reading your latest patch but it seems to include unrelated changes: $ git diff --stat src/backend/replication/syncrep.c | 475 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------- src/backend/replication/walsender.c | 40 ++++++++++++++----- src/bin/pg_dump/compress_io.c | 12 ++++++ src/bin/pg_dump/pg_backup_directory.c | 48 ++++++++++++++++++----- src/include/replication/syncrep.h | 20 +++++++++- src/include/replication/walsender_private.h | 16 ++++---- 6 files changed, 274 insertions(+), 337 deletions(-) Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
At Thu, 16 Apr 2020 16:48:28 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in > This is just a notice; I'm reading your latest patch but it seems to > include unrelated changes: > > $ git diff --stat > src/backend/replication/syncrep.c | 475 > +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------- > src/backend/replication/walsender.c | 40 ++++++++++++++----- > src/bin/pg_dump/compress_io.c | 12 ++++++ > src/bin/pg_dump/pg_backup_directory.c | 48 ++++++++++++++++++----- > src/include/replication/syncrep.h | 20 +++++++++- > src/include/replication/walsender_private.h | 16 ++++---- > 6 files changed, 274 insertions(+), 337 deletions(-) Ugg. I failed to clean up working directory.. I didn't noticed as I made the file by git diff. Thanks for noticing me of that. regards. -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..99a7bbbc86 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -103,19 +103,21 @@ static int SyncRepWakeQueue(bool all, int mode); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); + XLogRecPtr *applyPtr); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +408,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -429,8 +432,7 @@ SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; - bool got_recptr; - bool am_sync; + bool release_waiters; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -458,16 +460,24 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* - * Check whether we are a sync standby or not, and calculate the synced - * positions among all sync standbys. + * Check whether we may release any of waiter processes, and calculate the + * synced positions. */ - got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); + release_waiters = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr); + + /* Return if nothing to do. */ + if (!release_waiters) + { + LWLockRelease(SyncRepLock); + announce_next_takeover = true; + return; + } /* - * If we are managing a sync standby, though we weren't prior to this, - * then announce we are now a sync standby. + * If this walsender becomes to be able to release waiter processes, + * announce about that. */ - if (announce_next_takeover && am_sync) + if (announce_next_takeover) { announce_next_takeover = false; @@ -481,17 +491,6 @@ SyncRepReleaseWaiters(void) application_name))); } - /* - * If the number of sync standbys is less than requested or we aren't - * managing a sync standby then just leave. - */ - if (!got_recptr || !am_sync) - { - LWLockRelease(SyncRepLock); - announce_next_takeover = !am_sync; - return; - } - /* * Set the lsn first so that when we wake backends they will release up to * this location. @@ -523,43 +522,62 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * - * Return false if the number of sync standbys is less than - * synchronous_standby_names specifies. Otherwise return true and - * store the positions into *writePtr, *flushPtr and *applyPtr. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * Return false if this walsender cannot release any of waiteres. Otherwise + * return true and store the positions into *writePtr, *flushPtr and *applyPtr. */ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, bool *am_sync) + XLogRecPtr *applyPtr) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *standbys; + int num_standbys; + int i; + bool sync_priority = + (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; - *am_sync = false; - /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* Get standbys. They are in priority order. */ + num_standbys = SyncRepGetStandbys(&standbys); /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if there are not enough synchronous standbys or + * candidates. */ - if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + if (num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(standbys); return false; } + /* When priority mode, nothing to do if I an not a sync standby */ + if (sync_priority) + { + bool am_sync = false; + + for (i = 0; i < num_standbys; i++) + { + if (standbys[i].is_me) + { + am_sync = true; + break; + } + } + + if (!am_sync) + { + pfree(standbys); + return false; + } + } + /* * In a priority-based sync replication, the synced positions are the * oldest ones among sync standbys. In a quorum-based, they are the Nth @@ -573,46 +591,51 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced * positions even in a quorum-based sync replication. */ - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + if (sync_priority) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + standbys, num_standbys, + SyncRepConfig->num_sync); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(standbys); return true; } /* - * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * Calculate the oldest Write among the first nsync standbys, Flush and Apply + * positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (i >= nsyncs) + break; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +651,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +713,49 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about active walsenders. * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, and + * the number of valid entries is returned. The entries are in + * sync_standby_priority order. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return false; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; } - return result; -} - -/* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. - * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) -{ - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; - int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; - /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * In quorum mode, that's all we have to do since all entries are in the + * same priority 1. In priority mode, sort the candidates by priority. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; - - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; - - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; + return n; +} - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d..7889ea5f6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active standbys in priority order. This could be out + * of date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetStandbys(&standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (standbys[j].walsnd_index == i) + { + is_sync_standby = (j < SyncRepConfig->num_sync); + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) - values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? - CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + if (is_sync_standby) + values[10] = CStringGetTextDatum("sync"); + else + values[10] = CStringGetTextDatum("potential"); + } else - values[10] = CStringGetTextDatum("potential"); + values[10] = CStringGetTextDatum("quorum"); if (replyTime == 0) nulls[11] = true; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..533aac25c7 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,24 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a4..734acec2a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -69,13 +74,6 @@ typedef struct WalSnd */ Latch *latch; - /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - /* * Timestamp of the last message received from standby. */
Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > [ syncrep-fixes-4.patch ] I agree that we could probably improve the clarity of this code with further rewriting, but I'm still very opposed to the idea of having callers know that the first num_sync array elements are the active ones. It's wrong (or at least different from current behavior) for quorum mode, where there might be more than num_sync walsenders to consider. And it might not generalize very well to other syncrep selection rules we might add in future, which might also not have exactly num_sync interesting walsenders. So I much prefer an API definition that uses bool flags in an array that has no particular ordering (so far as the callers know, anyway). If you don't like is_sync_standby, how about some more-neutral name like is_active or is_interesting or include_position? I dislike the proposed comment revisions in SyncRepReleaseWaiters, too, particularly the change to say that what we're "announcing" is the ability to release waiters. You did not change the actual log messages, and you would have gotten a lot of pushback if you tried, because the current messages make sense to users and something like that would not. But by the same token this new comment isn't too helpful to somebody reading the code. (Actually, I wonder why we even have the restriction that only sync standbys can release waiters. It's not like they are going to get different results from SyncRepGetSyncRecPtr than any other walsender would. Maybe we should just drop all the am_sync logic?) regards, tom lane
On 2020/04/14 22:52, Tom Lane wrote: > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: >> SyncRepGetSyncStandbysPriority() is runing holding SyncRepLock so >> sync_standby_priority of any walsender can be changed while the >> function is scanning welsenders. The issue is we already have >> inconsistent walsender information before we enter the function. Thus >> how many times we scan on the array doesn't make any difference. > > *Yes it does*. The existing code can deliver entirely broken results > if some walsender exits between where we examine the priorities and > where we fetch the WAL pointers. So, in this case, the oldest lsn that SyncRepGetOldestSyncRecPtr() calculates may be based on also the lsn of already-exited walsender. This is what you say "broken results"? If yes, ISTM that this issue still remains even after applying your patch. No? The walsender marked as sync may still exit just before SyncRepGetOldestSyncRecPtr() calculates the oldest lsn. IMO that the broken results can be delivered when walsender marked as sync exits *and* new walsender comes at that moment. If this new walsender uses the WalSnd slot that the exited walsender used, SyncRepGetOldestSyncRecPtr() wronly calculates the oldest lsn based on this new walsender (i.e., different walsender from one marked as sync). If this is actually what you tried to say "broken results", your patch seems fine and fixes the issue. BTW, since the patch changes the API of SyncRepGetSyncStandbys(), it should not be back-patched to avoid ABI break. Right? Regards, -- Fujii Masao Advanced Computing Technology Center Research and Development Headquarters NTT DATA CORPORATION
Fujii Masao <masao.fujii@oss.nttdata.com> writes: > On 2020/04/14 22:52, Tom Lane wrote: >> *Yes it does*. The existing code can deliver entirely broken results >> if some walsender exits between where we examine the priorities and >> where we fetch the WAL pointers. > IMO that the broken results can be delivered when walsender marked > as sync exits *and* new walsender comes at that moment. If this new > walsender uses the WalSnd slot that the exited walsender used, > SyncRepGetOldestSyncRecPtr() wronly calculates the oldest lsn based > on this new walsender (i.e., different walsender from one marked as sync). Right, exactly, sorry that I was not more specific. > BTW, since the patch changes the API of SyncRepGetSyncStandbys(), > it should not be back-patched to avoid ABI break. Right? Anything that is using that is just as broken as the core code is, for the same reasons, so I don't have a problem with changing its API. Maybe we should rename it while we're at it, just to make it clear that we are breaking any external callers. (If there are any, which seems somewhat unlikely.) The only concession to ABI that I had in mind was to not re-order the fields of WalSnd in the back branches. regards, tom lane
At Thu, 16 Apr 2020 11:39:06 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > [ syncrep-fixes-4.patch ] > > I agree that we could probably improve the clarity of this code with > further rewriting, but I'm still very opposed to the idea of having > callers know that the first num_sync array elements are the active > ones. It's wrong (or at least different from current behavior) for > quorum mode, where there might be more than num_sync walsenders to > consider. And it might not generalize very well to other syncrep > selection rules we might add in future, which might also not have > exactly num_sync interesting walsenders. So I much prefer an API > definition that uses bool flags in an array that has no particular > ordering (so far as the callers know, anyway). If you don't like > is_sync_standby, how about some more-neutral name like is_active > or is_interesting or include_position? I'm convinced that each element has is_sync_standby. I agree to the name is_sync_standby since I don't come up with a better name. > I dislike the proposed comment revisions in SyncRepReleaseWaiters, > too, particularly the change to say that what we're "announcing" > is the ability to release waiters. You did not change the actual > log messages, and you would have gotten a lot of pushback if > you tried, because the current messages make sense to users and > something like that would not. But by the same token this new > comment isn't too helpful to somebody reading the code. The current log messages look perfect to me. I don't insist on the comment change since I might take the definition of "sync standby" too strictly. > (Actually, I wonder why we even have the restriction that only > sync standbys can release waiters. It's not like they are > going to get different results from SyncRepGetSyncRecPtr than > any other walsender would. Maybe we should just drop all the > am_sync logic?) I thought the same thing, though I didn't do that in the last patch. am_sync seems intending to reduce spurious wakeups but actually spurious wakeup won't increase even without it. Thus the only remaining task of am_sync is the trigger for the log messages and that fact is the sign that the log messages should be emitted within SyncRepGetSyncRecPtr. That eliminates references to SyncRepConfig in SyncRepReleaseWaiters, which make me feel ease. The attached is baed on syncrep-fixes-1.patch + am_sync elimination. regards. -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..eb0616121a 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -103,19 +103,21 @@ static int SyncRepWakeQueue(bool all, int mode); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); + XLogRecPtr *applyPtr); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static void SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +408,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -430,7 +433,6 @@ SyncRepReleaseWaiters(void) XLogRecPtr flushPtr; XLogRecPtr applyPtr; bool got_recptr; - bool am_sync; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -461,34 +463,14 @@ SyncRepReleaseWaiters(void) * Check whether we are a sync standby or not, and calculate the synced * positions among all sync standbys. */ - got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); + got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr); /* - * If we are managing a sync standby, though we weren't prior to this, - * then announce we are now a sync standby. + * If we don't have enough sync standbys, just leave. */ - if (announce_next_takeover && am_sync) - { - announce_next_takeover = false; - - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) - ereport(LOG, - (errmsg("standby \"%s\" is now a synchronous standby with priority %u", - application_name, MyWalSnd->sync_standby_priority))); - else - ereport(LOG, - (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", - application_name))); - } - - /* - * If the number of sync standbys is less than requested or we aren't - * managing a sync standby then just leave. - */ - if (!got_recptr || !am_sync) + if (!got_recptr) { LWLockRelease(SyncRepLock); - announce_next_takeover = !am_sync; return; } @@ -523,40 +505,68 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and * store the positions into *writePtr, *flushPtr and *applyPtr. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. */ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, bool *am_sync) + XLogRecPtr *applyPtr) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *sync_standbys; + int num_standbys; + int i; + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; - *am_sync = false; + + /* + * We're a potential sync standby. Release waiters if there are enough + * sync standbys and we are considered as sync. + */ /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); + + /* Announce if I am a new sync standby or candidate if not yet */ + if (announce_next_takeover) + { + for (i = 0; i < num_standbys; i++) + { + if (!sync_standbys[i].is_me) + continue; + + /* + * If we are managing a sync standby, though we weren't prior to + * this, then announce we are now a sync standby. + */ + announce_next_takeover = false; + + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + ereport(LOG, + (errmsg("standby \"%s\" is now a synchronous standby with priority %u", + application_name, + MyWalSnd->sync_standby_priority))); + else + ereport(LOG, + (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", + application_name))); + + break; + } + } /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if we are not managing a sync standby or there are + * not enough synchronous standbys. (Note: if there are least num_sync + * candidates, then at least num_sync of them will be marked as + * is_sync_standby; we don't need to count them here.) */ - if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + if (num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(sync_standbys); return false; } @@ -576,15 +586,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + sync_standbys, num_standbys); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + sync_standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(sync_standbys); return true; } @@ -592,27 +603,28 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (!sync_standbys[i].is_sync_standby) + continue; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +640,43 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + /* Ignore candidates that aren't considered synchronous */ + if (!sync_standbys[i].is_sync_standby) + continue; - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +706,48 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about walsenders that are candidates to be sync standbys. * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, + * and the number of valid entries (candidate sync senders) is returned. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetSyncStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return 0; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +756,79 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + stby->is_sync_standby = true; /* might change below */ + n++; } - return result; + /* + * In quorum mode, that's all we have to do. In priority mode, decide + * which ones are high enough priority to consider sync standbys. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + SyncRepGetSyncStandbysPriority(*standbys, n); + + return n; } /* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. + * Decide which standbys to consider synchronous. * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * This function must be called only in priority-based sync replication. */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) +static void +SyncRepGetSyncStandbysPriority(SyncRepStandbyData *standbys, int n) { - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; + /* Nothing to do here unless there are too many candidates. */ + if (n <= SyncRepConfig->num_sync) + return; /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * Sort the candidates by priority; then the first num_sync ones are + * synchronous, and the rest aren't. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; + qsort(standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; - - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; - - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; - - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } + for (i = SyncRepConfig->num_sync; i < n; i++) + standbys[i].is_sync_standby = false; +} - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d..af8f67e769 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active synchronous standbys. This could be out of + * date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (sync_standbys[j].walsnd_index == i) + { + is_sync_standby = sync_standbys[j].is_sync_standby; + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,7 +3396,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) + else if (is_sync_standby) values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..1308ada102 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,26 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; + /* After-the-fact conclusion about whether this is a sync standby */ + bool is_sync_standby; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,7 +94,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetSyncStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a4..734acec2a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -69,13 +74,6 @@ typedef struct WalSnd */ Latch *latch; - /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - /* * Timestamp of the last message received from standby. */
On 2020/04/17 14:58, Kyotaro Horiguchi wrote: > At Thu, 16 Apr 2020 11:39:06 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in >> Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: >>> [ syncrep-fixes-4.patch ] >> >> I agree that we could probably improve the clarity of this code with >> further rewriting, but I'm still very opposed to the idea of having >> callers know that the first num_sync array elements are the active >> ones. It's wrong (or at least different from current behavior) for >> quorum mode, where there might be more than num_sync walsenders to >> consider. And it might not generalize very well to other syncrep >> selection rules we might add in future, which might also not have >> exactly num_sync interesting walsenders. So I much prefer an API >> definition that uses bool flags in an array that has no particular >> ordering (so far as the callers know, anyway). If you don't like >> is_sync_standby, how about some more-neutral name like is_active >> or is_interesting or include_position? > > I'm convinced that each element has is_sync_standby. I agree to the > name is_sync_standby since I don't come up with a better name. > >> I dislike the proposed comment revisions in SyncRepReleaseWaiters, >> too, particularly the change to say that what we're "announcing" >> is the ability to release waiters. You did not change the actual >> log messages, and you would have gotten a lot of pushback if >> you tried, because the current messages make sense to users and >> something like that would not. But by the same token this new >> comment isn't too helpful to somebody reading the code. > > The current log messages look perfect to me. I don't insist on the > comment change since I might take the definition of "sync standby" too > strictly. > >> (Actually, I wonder why we even have the restriction that only >> sync standbys can release waiters. It's not like they are >> going to get different results from SyncRepGetSyncRecPtr than >> any other walsender would. Maybe we should just drop all the >> am_sync logic?) > > I thought the same thing, though I didn't do that in the last patch. > > am_sync seems intending to reduce spurious wakeups but actually > spurious wakeup won't increase even without it. Thus the only > remaining task of am_sync is the trigger for the log messages and that > fact is the sign that the log messages should be emitted within > SyncRepGetSyncRecPtr. That eliminates references to SyncRepConfig in > SyncRepReleaseWaiters, which make me feel ease. > > The attached is baed on syncrep-fixes-1.patch + am_sync elimination. I agree that it might be worth considering the removal of am_sync for the master branch or v14. But I think that it should not be back-patched. Regards, -- Fujii Masao Advanced Computing Technology Center Research and Development Headquarters NTT DATA CORPORATION
On 2020/04/17 3:00, Tom Lane wrote: > Fujii Masao <masao.fujii@oss.nttdata.com> writes: >> On 2020/04/14 22:52, Tom Lane wrote: >>> *Yes it does*. The existing code can deliver entirely broken results >>> if some walsender exits between where we examine the priorities and >>> where we fetch the WAL pointers. > >> IMO that the broken results can be delivered when walsender marked >> as sync exits *and* new walsender comes at that moment. If this new >> walsender uses the WalSnd slot that the exited walsender used, >> SyncRepGetOldestSyncRecPtr() wronly calculates the oldest lsn based >> on this new walsender (i.e., different walsender from one marked as sync). > > Right, exactly, sorry that I was not more specific. > >> BTW, since the patch changes the API of SyncRepGetSyncStandbys(), >> it should not be back-patched to avoid ABI break. Right? > > Anything that is using that is just as broken as the core code is, for the > same reasons, so I don't have a problem with changing its API. Maybe we > should rename it while we're at it, just to make it clear that we are > breaking any external callers. (If there are any, which seems somewhat > unlikely.) I agree to change the API if that's the only way to fix the bug. But ISTM that we can fix the bug without changing the API, like the attached patch does. Your patch changes the logic to pick up sync standbys, e.g., use qsort(), in addition to the bug fix. This might be an improvement and I agree that it's worth considering that idea for the master branch or v14. But I'm not fan of adding such changes into the back branches if they are not necessary for the bug fix. I like to basically keep the current logic as it is, at least for the back branch, like the attached patch does. Regards, -- Fujii Masao Advanced Computing Technology Center Research and Development Headquarters NTT DATA CORPORATION
Attachment
On Fri, 17 Apr 2020 at 14:58, Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: > > At Thu, 16 Apr 2020 11:39:06 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in > > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > > [ syncrep-fixes-4.patch ] > > > > I agree that we could probably improve the clarity of this code with > > further rewriting, but I'm still very opposed to the idea of having > > callers know that the first num_sync array elements are the active > > ones. It's wrong (or at least different from current behavior) for > > quorum mode, where there might be more than num_sync walsenders to > > consider. And it might not generalize very well to other syncrep > > selection rules we might add in future, which might also not have > > exactly num_sync interesting walsenders. So I much prefer an API > > definition that uses bool flags in an array that has no particular > > ordering (so far as the callers know, anyway). If you don't like > > is_sync_standby, how about some more-neutral name like is_active > > or is_interesting or include_position? > > I'm convinced that each element has is_sync_standby. I agree to the > name is_sync_standby since I don't come up with a better name. > > > I dislike the proposed comment revisions in SyncRepReleaseWaiters, > > too, particularly the change to say that what we're "announcing" > > is the ability to release waiters. You did not change the actual > > log messages, and you would have gotten a lot of pushback if > > you tried, because the current messages make sense to users and > > something like that would not. But by the same token this new > > comment isn't too helpful to somebody reading the code. > > The current log messages look perfect to me. I don't insist on the > comment change since I might take the definition of "sync standby" too > strictly. > > > (Actually, I wonder why we even have the restriction that only > > sync standbys can release waiters. It's not like they are > > going to get different results from SyncRepGetSyncRecPtr than > > any other walsender would. Maybe we should just drop all the > > am_sync logic?) > > I thought the same thing, though I didn't do that in the last patch. > > am_sync seems intending to reduce spurious wakeups but actually > spurious wakeup won't increase even without it. Thus the only > remaining task of am_sync is the trigger for the log messages and that > fact is the sign that the log messages should be emitted within > SyncRepGetSyncRecPtr. That eliminates references to SyncRepConfig in > SyncRepReleaseWaiters, which make me feel ease. > > The attached is baed on syncrep-fixes-1.patch + am_sync elimination. > Just for confirmation, since the new approach doesn't change that walsenders reload new config at their convenient timing, it still can happen that a walsender releases waiters according to the old config that defines fewer number of sync standbys, during walsenders absorbing a change in the set of synchronous walsenders. In the worst case where the master crashes in the middle, we cannot be sure how many sync servers the data has been replicated to. Is that right? Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
At Fri, 17 Apr 2020 16:03:34 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in > I agree that it might be worth considering the removal of am_sync for > the master branch or v14. But I think that it should not be > back-patched. Ah! Agreed. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
At Fri, 17 Apr 2020 17:03:11 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in > On Fri, 17 Apr 2020 at 14:58, Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: > > The attached is baed on syncrep-fixes-1.patch + am_sync elimination. > > > > Just for confirmation, since the new approach doesn't change that > walsenders reload new config at their convenient timing, it still can > happen that a walsender releases waiters according to the old config > that defines fewer number of sync standbys, during walsenders Right. > absorbing a change in the set of synchronous walsenders. In the worst > case where the master crashes in the middle, we cannot be sure how > many sync servers the data has been replicated to. Is that right? Wal senders can set a stupid value as priority or in a worse case the shared walsender information might be of another walsender that is launched just now. In any case SyncRepGetSyncStandbys can return a set of walsenders with descending priority (in priority mode). What can be happen in the worst case is some transactions are released by a bit wrong LSN information. Such inconsistency also can be happen when the oldest sync standby in priority mode goes out and sync-LSN goes back even if the wal-sender list is strictly kept consistent. In quorum mode, we cannot even know which servers that endorsed the master's commit after a crash. I don't come up of clean solution for such inconsistency or unrecoverability(?) for now.. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > At Fri, 17 Apr 2020 16:03:34 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in >> I agree that it might be worth considering the removal of am_sync for >> the master branch or v14. But I think that it should not be >> back-patched. > Ah! Agreed. Yeah, that's not necessary to fix the bug. I'd be inclined to leave it for v14 at this point. I don't much like the patch Fujii-san posted, though. An important part of the problem, IMO, is that SyncRepGetSyncStandbysPriority is too complicated and it's unclear what dependencies it has on the set of priorities in shared memory being consistent. His patch does not improve that situation; if anything it makes it worse. If we're concerned about not breaking ABI in the back branches, what I propose we do about that is just leave SyncRepGetSyncStandbys in place but not used by the core code, and remove it only in HEAD. We can do an absolutely minimal fix for the assertion failure, in case anybody is calling that code, by just dropping the Assert and letting SyncRepGetSyncStandbys return NIL if it falls out. (Or we could let it return the incomplete list, which'd be the behavior you get today in a non-assert build.) Also, I realized while re-reading my patch that Kyotaro-san is onto something about the is_sync_standby flag not being necessary: instead we can just have the new function SyncRepGetCandidateStandbys return a reduced count. I'd initially believed that it was necessary for that function to return the rejected candidate walsenders along with the accepted ones, but that was a misunderstanding. I still don't want its API spec to say anything about ordering of the result array, but we don't need to. So that leads me to the attached. I propose applying this to the back branches except for the rearrangement of WALSnd field order. In HEAD, I'd remove SyncRepGetSyncStandbys and subroutines altogether. regards, tom lane diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31..b47c7fa 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -108,14 +108,18 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); static List *SyncRepGetSyncStandbysPriority(bool *am_sync); static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +410,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -523,8 +528,6 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and * store the positions into *writePtr, *flushPtr and *applyPtr. @@ -536,27 +539,41 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *sync_standbys; + int num_standbys; + int i; + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; *am_sync = false; + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); + + /* Am I among the candidate sync standbys? */ + for (i = 0; i < num_standbys; i++) + { + if (sync_standbys[i].is_me) + { + *am_sync = true; + break; + } + } /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if we are not managing a sync standby or there are + * not enough synchronous standbys. */ if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(sync_standbys); return false; } @@ -576,15 +593,16 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + sync_standbys, num_standbys); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + sync_standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(sync_standbys); return true; } @@ -592,27 +610,24 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; - - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +643,36 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + /* Should have enough candidates, or somebody messed up */ + Assert(nth > 0 && nth <= num_standbys); - foreach(cell, sync_standbys) - { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + for (i = 0; i < num_standbys; i++) + { + write_array[i] = sync_standbys[i].write; + flush_array[i] = sync_standbys[i].flush; + apply_array[i] = sync_standbys[i].apply; } /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,12 +702,121 @@ cmp_lsn(const void *a, const void *b) } /* + * Return data about walsenders that are candidates to be sync standbys. + * + * *standbys is set to a palloc'd array of structs of per-walsender data, + * and the number of valid entries (candidate sync senders) is returned. + * (This might be more or fewer than num_sync; caller must check.) + */ +int +SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys) +{ + int i; + int n; + + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return 0; + + /* Collect raw data from shared memory */ + n = 0; + for (i = 0; i < max_wal_senders; i++) + { + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ + + walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; + + SpinLockAcquire(&walsnd->mutex); + stby->pid = walsnd->pid; + state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; + SpinLockRelease(&walsnd->mutex); + + /* Must be active */ + if (stby->pid == 0) + continue; + + /* Must be streaming or stopping */ + if (state != WALSNDSTATE_STREAMING && + state != WALSNDSTATE_STOPPING) + continue; + + /* Must be synchronous */ + if (stby->sync_standby_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(stby->flush)) + continue; + + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; + } + + /* + * In quorum mode, we return all the candidates. In priority mode, if we + * have too many candidates then return only the num_sync ones of highest + * priority. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY && + n > SyncRepConfig->num_sync) + { + /* Sort by priority ... */ + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); + /* ... then report just the first num_sync ones */ + n = SyncRepConfig->num_sync; + } + + return n; +} + +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; + + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; + + /* + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) + */ + return sa->walsnd_index - sb->walsnd_index; +} + + +/* * Return the list of sync standbys, or NIL if no sync standby is connected. * * The caller must hold SyncRepLock. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. + * + * XXX This function is BROKEN and should not be used in new code. It has + * an inherent race condition, since the returned list of integer indexes + * might no longer correspond to reality. */ List * SyncRepGetSyncStandbys(bool *am_sync) @@ -947,9 +1069,15 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) priority = next_highest_priority; } - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + /* + * We might get here if the set of sync_standby_priority values in shared + * memory is inconsistent, as can happen transiently after a change in the + * synchronous_standby_names setting. In that case, give up and report + * that there are no synchronous candidates. + */ + list_free(result); + list_free(pending); + return NIL; } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d1..0e93322 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *sync_standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active synchronous standbys. This could be out of + * date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* + * Detect whether walsender is/was considered synchronous. We can + * provide some protection against stale data by checking the PID + * along with walsnd_index. + */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (sync_standbys[j].walsnd_index == i && + sync_standbys[j].pid == pid) + { + is_sync_standby = true; + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,7 +3401,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) + else if (is_sync_standby) values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91..e38f6ba 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -37,6 +37,24 @@ #define SYNC_REP_QUORUM 1 /* + * SyncRepGetCandidateStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + +/* * Struct for the configuration of synchronous replication. * * Note: this must be a flat representation that can be held in a single @@ -74,6 +92,9 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ +extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys); + +/* obsolete, do not use in new code */ extern List *SyncRepGetSyncStandbys(bool *am_sync); /* called by checkpointer */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f..734acec 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -70,13 +75,6 @@ typedef struct WalSnd Latch *latch; /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - - /* * Timestamp of the last message received from standby. */ TimestampTz replyTime;
Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > At Fri, 17 Apr 2020 17:03:11 +0900, Masahiko Sawada <masahiko.sawada@2ndquadrant.com> wrote in >> On Fri, 17 Apr 2020 at 14:58, Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: >> Just for confirmation, since the new approach doesn't change that >> walsenders reload new config at their convenient timing, it still can >> happen that a walsender releases waiters according to the old config >> that defines fewer number of sync standbys, during walsenders > Right. >> absorbing a change in the set of synchronous walsenders. In the worst >> case where the master crashes in the middle, we cannot be sure how >> many sync servers the data has been replicated to. Is that right? > Wal senders can set a stupid value as priority or in a worse case the > shared walsender information might be of another walsender that is > launched just now. In any case SyncRepGetSyncStandbys can return a set > of walsenders with descending priority (in priority mode). What can > be happen in the worst case is some transactions are released by a bit > wrong LSN information. Such inconsistency also can be happen when the > oldest sync standby in priority mode goes out and sync-LSN goes back > even if the wal-sender list is strictly kept consistent. I don't really see a problem here. It's true that transactions might be released based on either the old or the new value of num_sync, depending on whether the particular walsender executing the release logic has noticed the SIGHUP yet. But if a transaction was released, then there were at least num_sync confirmed transmissions of data to someplace, so it's not like you've got no redundancy at all. The only thing that seems slightly odd is that there could in principle be some transactions released on the basis of the new num_sync, and then slightly later some transactions released on the basis of the old num_sync. But I don't think it's really going to be possible to avoid that, given that the GUC update is propagated in an asynchronous fashion. I spent a few moments wondering if we could avoid such cases by having SyncRepReleaseWaiters check for GUC updates after it's acquired SyncRepLock. But that wouldn't really guarantee much, since the postmaster can't deliver SIGHUP to all the walsenders simultaneously. I think the main practical effect would be to allow some possibly-slow processing to happen while holding SyncRepLock, which surely isn't a great idea. BTW, it might be worth documenting in this thread that my proposed patch intentionally doesn't move SyncRepReleaseWaiters' acquisition of SyncRepLock. With the patch, SyncRepGetSyncRecPtr does not require SyncRepLock so one could consider acquiring that lock only while updating walsndctl and releasing waiters. My concern about that is that then it'd be possible for a later round of waiter-releasing to happen on the basis of slightly older SyncRepGetSyncRecPtr results, if a walsender that had done SyncRepGetSyncRecPtr first were only able to acquire the lock second. Perhaps that would be okay, but I'm not sure, so I left it alone. regards, tom lane
On Sat, 18 Apr 2020 at 00:31, Tom Lane <tgl@sss.pgh.pa.us> wrote: > > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: > > At Fri, 17 Apr 2020 16:03:34 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in > >> I agree that it might be worth considering the removal of am_sync for > >> the master branch or v14. But I think that it should not be > >> back-patched. > > > Ah! Agreed. > > Yeah, that's not necessary to fix the bug. I'd be inclined to leave > it for v14 at this point. > > I don't much like the patch Fujii-san posted, though. An important part > of the problem, IMO, is that SyncRepGetSyncStandbysPriority is too > complicated and it's unclear what dependencies it has on the set of > priorities in shared memory being consistent. His patch does not improve > that situation; if anything it makes it worse. > > If we're concerned about not breaking ABI in the back branches, what > I propose we do about that is just leave SyncRepGetSyncStandbys in > place but not used by the core code, and remove it only in HEAD. > We can do an absolutely minimal fix for the assertion failure, in > case anybody is calling that code, by just dropping the Assert and > letting SyncRepGetSyncStandbys return NIL if it falls out. (Or we > could let it return the incomplete list, which'd be the behavior > you get today in a non-assert build.) +1 > > Also, I realized while re-reading my patch that Kyotaro-san is onto > something about the is_sync_standby flag not being necessary: instead > we can just have the new function SyncRepGetCandidateStandbys return > a reduced count. I'd initially believed that it was necessary for > that function to return the rejected candidate walsenders along with > the accepted ones, but that was a misunderstanding. I still don't > want its API spec to say anything about ordering of the result array, > but we don't need to. > > So that leads me to the attached. I propose applying this to the > back branches except for the rearrangement of WALSnd field order. > In HEAD, I'd remove SyncRepGetSyncStandbys and subroutines altogether. > + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; I felt strange a bit that we do the above check in SyncRepGetSyncRecPtr() because SyncRepReleaseWaiters() which is the only caller says the following before calling it: /* * We're a potential sync standby. Release waiters if there are enough * sync standbys and we are considered as sync. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); Can we either change it to an assertion, move it to before acquiring SyncRepLock in SyncRepReleaseWaiters or just remove it? Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > On Sat, 18 Apr 2020 at 00:31, Tom Lane <tgl@sss.pgh.pa.us> wrote: >> + /* Quick out if not even configured to be synchronous */ >> + if (SyncRepConfig == NULL) >> + return false; > I felt strange a bit that we do the above check in > SyncRepGetSyncRecPtr() because SyncRepReleaseWaiters() which is the > only caller says the following before calling it: Notice there was such a test in SyncRepGetSyncRecPtr already --- I just moved it to be before doing some work instead of after. > Can we either change it to an assertion, move it to before acquiring > SyncRepLock in SyncRepReleaseWaiters or just remove it? I have no objection to that in principle, but it seems like it's a change in SyncRepGetSyncRecPtr's API that is not necessary to fix this bug. So I'd rather leave it to happen along with the larger API changes (getting rid of am_sync) that are proposed for v14. regards, tom lane
On Sun, 19 Apr 2020 at 01:00, Tom Lane <tgl@sss.pgh.pa.us> wrote: > > Masahiko Sawada <masahiko.sawada@2ndquadrant.com> writes: > > On Sat, 18 Apr 2020 at 00:31, Tom Lane <tgl@sss.pgh.pa.us> wrote: > >> + /* Quick out if not even configured to be synchronous */ > >> + if (SyncRepConfig == NULL) > >> + return false; > > > I felt strange a bit that we do the above check in > > SyncRepGetSyncRecPtr() because SyncRepReleaseWaiters() which is the > > only caller says the following before calling it: > > Notice there was such a test in SyncRepGetSyncRecPtr already --- I just > moved it to be before doing some work instead of after. > > > Can we either change it to an assertion, move it to before acquiring > > SyncRepLock in SyncRepReleaseWaiters or just remove it? > > I have no objection to that in principle, but it seems like it's a > change in SyncRepGetSyncRecPtr's API that is not necessary to fix > this bug. So I'd rather leave it to happen along with the larger > API changes (getting rid of am_sync) that are proposed for v14. Agreed. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 2020/04/18 0:31, Tom Lane wrote: > Kyotaro Horiguchi <horikyota.ntt@gmail.com> writes: >> At Fri, 17 Apr 2020 16:03:34 +0900, Fujii Masao <masao.fujii@oss.nttdata.com> wrote in >>> I agree that it might be worth considering the removal of am_sync for >>> the master branch or v14. But I think that it should not be >>> back-patched. > >> Ah! Agreed. > > Yeah, that's not necessary to fix the bug. I'd be inclined to leave > it for v14 at this point. > > I don't much like the patch Fujii-san posted, though. An important part > of the problem, IMO, is that SyncRepGetSyncStandbysPriority is too > complicated and it's unclear what dependencies it has on the set of > priorities in shared memory being consistent. His patch does not improve > that situation; if anything it makes it worse. Understood. > > If we're concerned about not breaking ABI in the back branches, what > I propose we do about that is just leave SyncRepGetSyncStandbys in > place but not used by the core code, and remove it only in HEAD. > We can do an absolutely minimal fix for the assertion failure, in > case anybody is calling that code, by just dropping the Assert and > letting SyncRepGetSyncStandbys return NIL if it falls out. (Or we > could let it return the incomplete list, which'd be the behavior > you get today in a non-assert build.) > > Also, I realized while re-reading my patch that Kyotaro-san is onto > something about the is_sync_standby flag not being necessary: instead > we can just have the new function SyncRepGetCandidateStandbys return > a reduced count. I'd initially believed that it was necessary for > that function to return the rejected candidate walsenders along with > the accepted ones, but that was a misunderstanding. I still don't > want its API spec to say anything about ordering of the result array, > but we don't need to. > > So that leads me to the attached. I propose applying this to the > back branches except for the rearrangement of WALSnd field order. > In HEAD, I'd remove SyncRepGetSyncStandbys and subroutines altogether. Thanks for making and committing the patch! Regards, -- Fujii Masao Advanced Computing Technology Center Research and Development Headquarters NTT DATA CORPORATION