From 0bff98d729b54a030b5b3e865fe75dacadf3475f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Fri, 12 Jul 2019 14:27:04 +0800 Subject: [PATCH 1/3] perfermance: reduce calculation times of maxInflight --- pkg/ratelimit/bbr/bbr.go | 23 ++++--- pkg/ratelimit/bbr/bbr_test.go | 117 +++++++++++++++++++++++++++++++--- 2 files changed, 121 insertions(+), 19 deletions(-) diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go index f7bca28d9..69576a4e3 100644 --- a/pkg/ratelimit/bbr/bbr.go +++ b/pkg/ratelimit/bbr/bbr.go @@ -67,11 +67,11 @@ type Stat struct { type BBR struct { cpu cpuGetter passStat metric.RollingCounter - rtStat metric.RollingGauge + rtStat metric.RollingCounter inFlight int64 winBucketPerSec int64 conf *Config - prevDrop int64 + prevDrop atomic.Value } // Config contains configs of bbr limiter. @@ -128,18 +128,21 @@ func (l *BBR) maxFlight() int64 { } func (l *BBR) shouldDrop() bool { - inFlight := atomic.LoadInt64(&l.inFlight) - maxInflight := l.maxFlight() if l.cpu() < l.conf.CPUThreshold { - prevDrop := atomic.LoadInt64(&l.prevDrop) - if time.Now().Unix()-prevDrop <= 1 { - return inFlight > 1 && inFlight > maxInflight + prevDrop, ok := l.prevDrop.Load().(time.Time) + if !ok { + return false + } + if time.Since(prevDrop) <= time.Second { + inFlight := atomic.LoadInt64(&l.inFlight) + return inFlight > 1 && inFlight > l.maxFlight() } return false } - drop := inFlight > 1 && inFlight > maxInflight + inFlight := atomic.LoadInt64(&l.inFlight) + drop := inFlight > 1 && inFlight > l.maxFlight() if drop { - atomic.StoreInt64(&l.prevDrop, time.Now().Unix()) + l.prevDrop.Store(time.Now()) } return drop } @@ -188,7 +191,7 @@ func newLimiter(conf *Config) limit.Limiter { size := conf.WinBucket bucketDuration := conf.Window / time.Duration(conf.WinBucket) passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: size, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration}) cpu := func() int64 { return atomic.LoadInt64(&cpu) } diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go index 6a6f4048b..1a24fd7a7 100644 --- a/pkg/ratelimit/bbr/bbr_test.go +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -10,9 +10,8 @@ import ( "testing" "time" - limit "github.com/bilibili/kratos/pkg/ratelimit" + "github.com/bilibili/kratos/pkg/ratelimit" "github.com/bilibili/kratos/pkg/stat/metric" - "github.com/stretchr/testify/assert" ) @@ -36,7 +35,7 @@ func TestBBR(t *testing.T) { } else { count := rand.Intn(100) time.Sleep(time.Millisecond * time.Duration(count)) - f(limit.DoneInfo{Op: limit.Success}) + f(ratelimit.DoneInfo{Op: ratelimit.Success}) } } }() @@ -67,7 +66,7 @@ func TestBBRMaxPass(t *testing.T) { func TestBBRMinRt(t *testing.T) { bucketDuration := time.Millisecond * 100 - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { for j := i*10 + 1; j <= i*10+10; j++ { rtStat.Add(int64(j)) @@ -83,17 +82,17 @@ func TestBBRMinRt(t *testing.T) { // default max min rt is equal to maxFloat64. bucketDuration = time.Millisecond * 100 - rtStat = metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) bbr = &BBR{ rtStat: rtStat, } assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT()) } -func TestBBRMaxInflight(t *testing.T) { +func TestBBRMaxQps(t *testing.T) { bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { passStat.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { @@ -118,7 +117,7 @@ func TestBBRShouldDrop(t *testing.T) { } bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { passStat.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { @@ -149,7 +148,7 @@ func TestBBRShouldDrop(t *testing.T) { cpu = 700 bbr.inFlight = 80 assert.Equal(t, true, bbr.shouldDrop()) - + // cpu < 800, inflight > maxQps time.Sleep(2 * time.Second) cpu = 700 @@ -169,3 +168,103 @@ func TestGroup(t *testing.T) { assert.NotNil(t, limiter) }) } + +func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { + cpuGetter := func() int64 { + return 50 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { + cpuGetter := func() int64 { + return 90 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { + cpuGetter := func() int64 { + return 50 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + for i := 0; i < 10000; i++ { + done, err := bbr.Allow(context.TODO()) + time.Sleep(time.Millisecond * 1) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + } +} + +func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { + cpuGetter := func() int64 { + return 90 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + for i := 0; i < 10000; i++ { + done, err := bbr.Allow(context.TODO()) + time.Sleep(time.Millisecond * 1) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + } +} From bbb46d1abeb76cf17a89384fc0d65f7d4c3d24f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Fri, 12 Jul 2019 15:26:52 +0800 Subject: [PATCH 2/3] fix typo: return value of the cpu getter --- pkg/ratelimit/bbr/bbr_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go index 1a24fd7a7..7672614c8 100644 --- a/pkg/ratelimit/bbr/bbr_test.go +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -171,7 +171,7 @@ func TestGroup(t *testing.T) { func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { cpuGetter := func() int64 { - return 50 + return 500 } bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) @@ -194,7 +194,7 @@ func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { cpuGetter := func() int64 { - return 90 + return 900 } bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) @@ -217,7 +217,7 @@ func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { cpuGetter := func() int64 { - return 50 + return 500 } bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) @@ -244,7 +244,7 @@ func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { cpuGetter := func() int64 { - return 90 + return 900 } bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) From a52895637c6090cf74acb3ca0be16c82939549c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Mon, 15 Jul 2019 00:11:44 +0800 Subject: [PATCH 3/3] perfermance: cache maxPass & minRt for reducing cpu calculation --- pkg/ratelimit/bbr/bbr.go | 55 +++++++--- pkg/ratelimit/bbr/bbr_test.go | 168 ++++++++++++++--------------- pkg/stat/metric/rolling_counter.go | 5 + 3 files changed, 125 insertions(+), 103 deletions(-) diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go index 69576a4e3..304ab648d 100644 --- a/pkg/ratelimit/bbr/bbr.go +++ b/pkg/ratelimit/bbr/bbr.go @@ -18,6 +18,7 @@ import ( var ( cpu int64 decay = 0.95 + initTime = time.Now() defaultConf = &Config{ Window: time.Second * 10, WinBucket: 100, @@ -72,6 +73,9 @@ type BBR struct { winBucketPerSec int64 conf *Config prevDrop atomic.Value + prevDropHit int32 + rawMaxPASS int64 + rawMinRt int64 } // Config contains configs of bbr limiter. @@ -85,9 +89,13 @@ type Config struct { } func (l *BBR) maxPASS() int64 { - val := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { + rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS) + if rawMaxPass > 0 && l.passStat.Timespan() < 1 { + return rawMaxPass + } + rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { var result = 1.0 - for iterator.Next() { + for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() count := 0.0 for _, p := range bucket.Points { @@ -97,16 +105,21 @@ func (l *BBR) maxPASS() int64 { } return result })) - if val == 0 { - return 1 + if rawMaxPass == 0 { + rawMaxPass = 1 } - return val + atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass) + return rawMaxPass } func (l *BBR) minRT() int64 { - val := l.rtStat.Reduce(func(iterator metric.Iterator) float64 { + rawMinRT := atomic.LoadInt64(&l.rawMinRt) + if rawMinRT > 0 && l.rtStat.Timespan() < 1 { + return rawMinRT + } + rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 { var result = math.MaxFloat64 - for iterator.Next() { + for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() if len(bucket.Points) == 0 { continue @@ -119,8 +132,12 @@ func (l *BBR) minRT() int64 { result = math.Min(result, avg) } return result - }) - return int64(math.Ceil(val)) + }))) + if rawMinRT <= 0 { + rawMinRT = 1 + } + atomic.StoreInt64(&l.rawMinRt, rawMinRT) + return rawMinRT } func (l *BBR) maxFlight() int64 { @@ -129,20 +146,28 @@ func (l *BBR) maxFlight() int64 { func (l *BBR) shouldDrop() bool { if l.cpu() < l.conf.CPUThreshold { - prevDrop, ok := l.prevDrop.Load().(time.Time) - if !ok { + prevDrop, _ := l.prevDrop.Load().(time.Duration) + if prevDrop == 0 { return false } - if time.Since(prevDrop) <= time.Second { + if time.Since(initTime)-prevDrop <= time.Second { + if atomic.LoadInt32(&l.prevDropHit) == 0 { + atomic.StoreInt32(&l.prevDropHit, 1) + } inFlight := atomic.LoadInt64(&l.inFlight) return inFlight > 1 && inFlight > l.maxFlight() } + l.prevDrop.Store(time.Duration(0)) return false } inFlight := atomic.LoadInt64(&l.inFlight) drop := inFlight > 1 && inFlight > l.maxFlight() if drop { - l.prevDrop.Store(time.Now()) + prevDrop, _ := l.prevDrop.Load().(time.Duration) + if prevDrop != 0 { + return drop + } + l.prevDrop.Store(time.Since(initTime)) } return drop } @@ -169,9 +194,9 @@ func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info l return nil, ecode.LimitExceed } atomic.AddInt64(&l.inFlight, 1) - stime := time.Now() + stime := time.Since(initTime) return func(do limit.DoneInfo) { - rt := int64(time.Since(stime) / time.Millisecond) + rt := int64((time.Since(initTime) - stime) / time.Millisecond) l.rtStat.Add(rt) atomic.AddInt64(&l.inFlight, -1) switch do.Op { diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go index 7672614c8..aff053af8 100644 --- a/pkg/ratelimit/bbr/bbr_test.go +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -3,7 +3,6 @@ package bbr import ( "context" "fmt" - "math" "math/rand" "sync" "sync/atomic" @@ -15,6 +14,34 @@ import ( "github.com/stretchr/testify/assert" ) +func confForTest() *Config { + return &Config{ + Window: time.Second, + WinBucket: 10, + CPUThreshold: 800, + } +} + +func warmup(bbr *BBR, count int) { + for i := 0; i < count; i++ { + done, err := bbr.Allow(context.TODO()) + time.Sleep(time.Millisecond * 1) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func forceAllow(bbr *BBR) { + inflight := bbr.inFlight + bbr.inFlight = bbr.maxPASS() - 1 + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + bbr.inFlight = inflight +} + func TestBBR(t *testing.T) { cfg := &Config{ Window: time.Second * 5, @@ -46,26 +73,21 @@ func TestBBR(t *testing.T) { func TestBBRMaxPass(t *testing.T) { bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := newLimiter(confForTest()).(*BBR) for i := 1; i <= 10; i++ { - passStat.Add(int64(i * 100)) + bbr.passStat.Add(int64(i * 100)) time.Sleep(bucketDuration) } - bbr := &BBR{ - passStat: passStat, - } assert.Equal(t, int64(1000), bbr.maxPASS()) // default max pass is equal to 1. - passStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr = &BBR{ - passStat: passStat, - } + bbr = newLimiter(confForTest()).(*BBR) assert.Equal(t, int64(1), bbr.maxPASS()) } func TestBBRMinRt(t *testing.T) { bucketDuration := time.Millisecond * 100 + bbr := newLimiter(confForTest()).(*BBR) rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { for j := i*10 + 1; j <= i*10+10; j++ { @@ -75,26 +97,23 @@ func TestBBRMinRt(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - rtStat: rtStat, - } + bbr.rtStat = rtStat assert.Equal(t, int64(6), bbr.minRT()) // default max min rt is equal to maxFloat64. bucketDuration = time.Millisecond * 100 - rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr = &BBR{ - rtStat: rtStat, - } - assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT()) + bbr = newLimiter(confForTest()).(*BBR) + bbr.rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + assert.Equal(t, int64(1), bbr.minRT()) } func TestBBRMaxQps(t *testing.T) { + bbr := newLimiter(confForTest()).(*BBR) bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { - passStat.Add(int64((i + 1) * 100)) + passStat.Add(int64((i + 2) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { rtStat.Add(int64(j)) } @@ -102,17 +121,15 @@ func TestBBRMaxQps(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - } + bbr.passStat = passStat + bbr.rtStat = rtStat assert.Equal(t, int64(60), bbr.maxFlight()) } func TestBBRShouldDrop(t *testing.T) { var cpu int64 - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return cpu } bucketDuration := time.Millisecond * 100 @@ -127,13 +144,8 @@ func TestBBRShouldDrop(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } + bbr.passStat = passStat + bbr.rtStat = rtStat // cpu >= 800, inflight < maxQps cpu = 800 bbr.inFlight = 50 @@ -170,19 +182,10 @@ func TestGroup(t *testing.T) { } func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 500 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } b.ResetTimer() for i := 0; i <= b.N; i++ { done, err := bbr.Allow(context.TODO()) @@ -193,21 +196,19 @@ func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { } func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 900 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } + bbr.inFlight = 1 b.ResetTimer() for i := 0; i <= b.N; i++ { + if i%10000 == 0 { + maxFlight := bbr.maxFlight() + if maxFlight != 0 { + bbr.inFlight = rand.Int63n(bbr.maxFlight() * 2) + } + } done, err := bbr.Allow(context.TODO()) if err == nil { done(ratelimit.DoneInfo{Op: ratelimit.Success}) @@ -216,26 +217,11 @@ func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { } func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 500 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } - for i := 0; i < 10000; i++ { - done, err := bbr.Allow(context.TODO()) - time.Sleep(time.Millisecond * 1) - if err == nil { - done(ratelimit.DoneInfo{Op: ratelimit.Success}) - } - } + warmup(bbr, 10000) b.ResetTimer() for i := 0; i <= b.N; i++ { bbr.shouldDrop() @@ -243,28 +229,34 @@ func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { } func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 900 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } - for i := 0; i < 10000; i++ { - done, err := bbr.Allow(context.TODO()) - time.Sleep(time.Millisecond * 1) - if err == nil { - done(ratelimit.DoneInfo{Op: ratelimit.Success}) + warmup(bbr, 10000) + bbr.inFlight = 1000 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + if i%10000 == 0 { + forceAllow(bbr) } } +} + +func BenchmarkBBRShouldDropUnderUnstableLoad(b *testing.B) { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { + return 500 + } + warmup(bbr, 10000) + bbr.prevDrop.Store(time.Since(initTime)) + bbr.inFlight = 1000 b.ResetTimer() for i := 0; i <= b.N; i++ { bbr.shouldDrop() + if i%100000 == 0 { + forceAllow(bbr) + } } } diff --git a/pkg/stat/metric/rolling_counter.go b/pkg/stat/metric/rolling_counter.go index bbaf7a138..c839cea0a 100644 --- a/pkg/stat/metric/rolling_counter.go +++ b/pkg/stat/metric/rolling_counter.go @@ -13,6 +13,7 @@ var _ Aggregation = &rollingCounter{} type RollingCounter interface { Metric Aggregation + Timespan() int // Reduce applies the reduction function to all buckets within the window. Reduce(func(Iterator) float64) float64 } @@ -66,3 +67,7 @@ func (r *rollingCounter) Sum() float64 { func (r *rollingCounter) Value() int64 { return int64(r.Sum()) } + +func (r *rollingCounter) Timespan() int { + return r.policy.timespan() +}