diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go index 7bc898b3c..738336e9d 100644 --- a/pkg/ratelimit/bbr/bbr.go +++ b/pkg/ratelimit/bbr/bbr.go @@ -71,11 +71,20 @@ type BBR struct { rtStat metric.RollingCounter inFlight int64 winBucketPerSec int64 + bucketDuration time.Duration + winSize int conf *Config prevDrop atomic.Value - prevDropHit int32 - rawMaxPASS int64 - rawMinRt int64 + maxPASSCache atomic.Value + minRtCache atomic.Value +} + +// CounterCache is used to cache maxPASS and minRt result. +// Value of current bucket is not counted in real time. +// Cache time is equal to a bucket duration. +type CounterCache struct { + val int64 + time time.Time } // Config contains configs of bbr limiter. @@ -89,11 +98,14 @@ type Config struct { } func (l *BBR) maxPASS() int64 { - rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS) - if rawMaxPass > 0 && l.passStat.Timespan() < 1 { - return rawMaxPass + passCache := l.maxPASSCache.Load() + if passCache != nil { + ps := passCache.(*CounterCache) + if l.timespan(ps.time) < 1 { + return ps.val + } } - rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { + rawMaxPass := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { var result = 1.0 for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() @@ -108,16 +120,30 @@ func (l *BBR) maxPASS() int64 { if rawMaxPass == 0 { rawMaxPass = 1 } - atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass) + l.maxPASSCache.Store(&CounterCache{ + val: rawMaxPass, + time: time.Now(), + }) return rawMaxPass } +func (l *BBR) timespan(lastTime time.Time) int { + v := int(time.Since(lastTime) / l.bucketDuration) + if v > -1 { + return v + } + return l.winSize +} + func (l *BBR) minRT() int64 { - rawMinRT := atomic.LoadInt64(&l.rawMinRt) - if rawMinRT > 0 && l.rtStat.Timespan() < 1 { - return rawMinRT + rtCache := l.minRtCache.Load() + if rtCache != nil { + rc := rtCache.(*CounterCache) + if l.timespan(rc.time) < 1 { + return rc.val + } } - rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 { + rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 { var result = math.MaxFloat64 for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() @@ -136,7 +162,10 @@ func (l *BBR) minRT() int64 { if rawMinRT <= 0 { rawMinRT = 1 } - atomic.StoreInt64(&l.rawMinRt, rawMinRT) + l.minRtCache.Store(&CounterCache{ + val: rawMinRT, + time: time.Now(), + }) return rawMinRT } @@ -151,9 +180,6 @@ func (l *BBR) shouldDrop() bool { return false } if time.Since(initTime)-prevDrop <= time.Second { - if atomic.LoadInt32(&l.prevDropHit) == 0 { - atomic.StoreInt32(&l.prevDropHit, 1) - } inFlight := atomic.LoadInt64(&l.inFlight) return inFlight > 1 && inFlight > l.maxFlight() } @@ -226,6 +252,8 @@ func newLimiter(conf *Config) limit.Limiter { passStat: passStat, rtStat: rtStat, winBucketPerSec: int64(time.Second) / (int64(conf.Window) / int64(conf.WinBucket)), + bucketDuration: bucketDuration, + winSize: conf.WinBucket, } return limiter } diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go index a807d8525..93345e5ff 100644 --- a/pkg/ratelimit/bbr/bbr_test.go +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -86,19 +86,33 @@ func TestBBRMaxPass(t *testing.T) { assert.Equal(t, int64(1), bbr.maxPASS()) } +func TestBBRMaxPassWithCache(t *testing.T) { + bucketDuration := time.Millisecond * 100 + bbr := newLimiter(confForTest()).(*BBR) + // witch cache, value of latest bucket is not counted instently. + // after a bucket duration time, this bucket will be fullly counted. + for i := 1; i <= 11; i++ { + bbr.passStat.Add(int64(i * 50)) + time.Sleep(bucketDuration / 2) + _ = bbr.maxPASS() + bbr.passStat.Add(int64(i * 50)) + time.Sleep(bucketDuration / 2) + } + bbr.passStat.Add(int64(1)) + assert.Equal(t, int64(1000), bbr.maxPASS()) +} + func TestBBRMinRt(t *testing.T) { bucketDuration := time.Millisecond * 100 bbr := newLimiter(confForTest()).(*BBR) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { for j := i*10 + 1; j <= i*10+10; j++ { - rtStat.Add(int64(j)) + bbr.rtStat.Add(int64(j)) } if i != 9 { time.Sleep(bucketDuration) } } - bbr.rtStat = rtStat assert.Equal(t, int64(6), bbr.minRT()) // default max min rt is equal to maxFloat64. @@ -108,6 +122,27 @@ func TestBBRMinRt(t *testing.T) { assert.Equal(t, int64(1), bbr.minRT()) } +func TestBBRMinRtWithCache(t *testing.T) { + bucketDuration := time.Millisecond * 100 + bbr := newLimiter(confForTest()).(*BBR) + for i := 0; i < 10; i++ { + for j := i*10 + 1; j <= i*10+5; j++ { + bbr.rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration / 2) + } + _ = bbr.minRT() + for j := i*10 + 6; j <= i*10+10; j++ { + bbr.rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration / 2) + } + } + assert.Equal(t, int64(6), bbr.minRT()) +} + func TestBBRMaxQps(t *testing.T) { bbr := newLimiter(confForTest()).(*BBR) bucketDuration := time.Millisecond * 100