fix: bbr limiter maxPass minRt cache problem (#690)

pull/808/head v1.0.0
HeXiaoliang 4 years ago committed by GitHub
parent 92703869e6
commit 3daca16dc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 60
      pkg/ratelimit/bbr/bbr.go
  2. 41
      pkg/ratelimit/bbr/bbr_test.go

@ -71,11 +71,20 @@ type BBR struct {
rtStat metric.RollingCounter rtStat metric.RollingCounter
inFlight int64 inFlight int64
winBucketPerSec int64 winBucketPerSec int64
bucketDuration time.Duration
winSize int
conf *Config conf *Config
prevDrop atomic.Value prevDrop atomic.Value
prevDropHit int32 maxPASSCache atomic.Value
rawMaxPASS int64 minRtCache atomic.Value
rawMinRt int64 }
// CounterCache is used to cache maxPASS and minRt result.
// Value of current bucket is not counted in real time.
// Cache time is equal to a bucket duration.
type CounterCache struct {
val int64
time time.Time
} }
// Config contains configs of bbr limiter. // Config contains configs of bbr limiter.
@ -89,11 +98,14 @@ type Config struct {
} }
func (l *BBR) maxPASS() int64 { func (l *BBR) maxPASS() int64 {
rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS) passCache := l.maxPASSCache.Load()
if rawMaxPass > 0 && l.passStat.Timespan() < 1 { if passCache != nil {
return rawMaxPass ps := passCache.(*CounterCache)
if l.timespan(ps.time) < 1 {
return ps.val
}
} }
rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { rawMaxPass := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
var result = 1.0 var result = 1.0
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket() bucket := iterator.Bucket()
@ -108,16 +120,30 @@ func (l *BBR) maxPASS() int64 {
if rawMaxPass == 0 { if rawMaxPass == 0 {
rawMaxPass = 1 rawMaxPass = 1
} }
atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass) l.maxPASSCache.Store(&CounterCache{
val: rawMaxPass,
time: time.Now(),
})
return rawMaxPass return rawMaxPass
} }
func (l *BBR) timespan(lastTime time.Time) int {
v := int(time.Since(lastTime) / l.bucketDuration)
if v > -1 {
return v
}
return l.winSize
}
func (l *BBR) minRT() int64 { func (l *BBR) minRT() int64 {
rawMinRT := atomic.LoadInt64(&l.rawMinRt) rtCache := l.minRtCache.Load()
if rawMinRT > 0 && l.rtStat.Timespan() < 1 { if rtCache != nil {
return rawMinRT rc := rtCache.(*CounterCache)
if l.timespan(rc.time) < 1 {
return rc.val
}
} }
rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 { rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
var result = math.MaxFloat64 var result = math.MaxFloat64
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket() bucket := iterator.Bucket()
@ -136,7 +162,10 @@ func (l *BBR) minRT() int64 {
if rawMinRT <= 0 { if rawMinRT <= 0 {
rawMinRT = 1 rawMinRT = 1
} }
atomic.StoreInt64(&l.rawMinRt, rawMinRT) l.minRtCache.Store(&CounterCache{
val: rawMinRT,
time: time.Now(),
})
return rawMinRT return rawMinRT
} }
@ -151,9 +180,6 @@ func (l *BBR) shouldDrop() bool {
return false return false
} }
if time.Since(initTime)-prevDrop <= time.Second { if time.Since(initTime)-prevDrop <= time.Second {
if atomic.LoadInt32(&l.prevDropHit) == 0 {
atomic.StoreInt32(&l.prevDropHit, 1)
}
inFlight := atomic.LoadInt64(&l.inFlight) inFlight := atomic.LoadInt64(&l.inFlight)
return inFlight > 1 && inFlight > l.maxFlight() return inFlight > 1 && inFlight > l.maxFlight()
} }
@ -226,6 +252,8 @@ func newLimiter(conf *Config) limit.Limiter {
passStat: passStat, passStat: passStat,
rtStat: rtStat, rtStat: rtStat,
winBucketPerSec: int64(time.Second) / (int64(conf.Window) / int64(conf.WinBucket)), winBucketPerSec: int64(time.Second) / (int64(conf.Window) / int64(conf.WinBucket)),
bucketDuration: bucketDuration,
winSize: conf.WinBucket,
} }
return limiter return limiter
} }

@ -86,19 +86,33 @@ func TestBBRMaxPass(t *testing.T) {
assert.Equal(t, int64(1), bbr.maxPASS()) assert.Equal(t, int64(1), bbr.maxPASS())
} }
func TestBBRMaxPassWithCache(t *testing.T) {
bucketDuration := time.Millisecond * 100
bbr := newLimiter(confForTest()).(*BBR)
// witch cache, value of latest bucket is not counted instently.
// after a bucket duration time, this bucket will be fullly counted.
for i := 1; i <= 11; i++ {
bbr.passStat.Add(int64(i * 50))
time.Sleep(bucketDuration / 2)
_ = bbr.maxPASS()
bbr.passStat.Add(int64(i * 50))
time.Sleep(bucketDuration / 2)
}
bbr.passStat.Add(int64(1))
assert.Equal(t, int64(1000), bbr.maxPASS())
}
func TestBBRMinRt(t *testing.T) { func TestBBRMinRt(t *testing.T) {
bucketDuration := time.Millisecond * 100 bucketDuration := time.Millisecond * 100
bbr := newLimiter(confForTest()).(*BBR) bbr := newLimiter(confForTest()).(*BBR)
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
rtStat.Add(int64(j)) bbr.rtStat.Add(int64(j))
} }
if i != 9 { if i != 9 {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
} }
bbr.rtStat = rtStat
assert.Equal(t, int64(6), bbr.minRT()) assert.Equal(t, int64(6), bbr.minRT())
// default max min rt is equal to maxFloat64. // default max min rt is equal to maxFloat64.
@ -108,6 +122,27 @@ func TestBBRMinRt(t *testing.T) {
assert.Equal(t, int64(1), bbr.minRT()) assert.Equal(t, int64(1), bbr.minRT())
} }
func TestBBRMinRtWithCache(t *testing.T) {
bucketDuration := time.Millisecond * 100
bbr := newLimiter(confForTest()).(*BBR)
for i := 0; i < 10; i++ {
for j := i*10 + 1; j <= i*10+5; j++ {
bbr.rtStat.Add(int64(j))
}
if i != 9 {
time.Sleep(bucketDuration / 2)
}
_ = bbr.minRT()
for j := i*10 + 6; j <= i*10+10; j++ {
bbr.rtStat.Add(int64(j))
}
if i != 9 {
time.Sleep(bucketDuration / 2)
}
}
assert.Equal(t, int64(6), bbr.minRT())
}
func TestBBRMaxQps(t *testing.T) { func TestBBRMaxQps(t *testing.T) {
bbr := newLimiter(confForTest()).(*BBR) bbr := newLimiter(confForTest()).(*BBR)
bucketDuration := time.Millisecond * 100 bucketDuration := time.Millisecond * 100

Loading…
Cancel
Save