diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go index f7bca28d9..69576a4e3 100644 --- a/pkg/ratelimit/bbr/bbr.go +++ b/pkg/ratelimit/bbr/bbr.go @@ -67,11 +67,11 @@ type Stat struct { type BBR struct { cpu cpuGetter passStat metric.RollingCounter - rtStat metric.RollingGauge + rtStat metric.RollingCounter inFlight int64 winBucketPerSec int64 conf *Config - prevDrop int64 + prevDrop atomic.Value } // Config contains configs of bbr limiter. @@ -128,18 +128,21 @@ func (l *BBR) maxFlight() int64 { } func (l *BBR) shouldDrop() bool { - inFlight := atomic.LoadInt64(&l.inFlight) - maxInflight := l.maxFlight() if l.cpu() < l.conf.CPUThreshold { - prevDrop := atomic.LoadInt64(&l.prevDrop) - if time.Now().Unix()-prevDrop <= 1 { - return inFlight > 1 && inFlight > maxInflight + prevDrop, ok := l.prevDrop.Load().(time.Time) + if !ok { + return false + } + if time.Since(prevDrop) <= time.Second { + inFlight := atomic.LoadInt64(&l.inFlight) + return inFlight > 1 && inFlight > l.maxFlight() } return false } - drop := inFlight > 1 && inFlight > maxInflight + inFlight := atomic.LoadInt64(&l.inFlight) + drop := inFlight > 1 && inFlight > l.maxFlight() if drop { - atomic.StoreInt64(&l.prevDrop, time.Now().Unix()) + l.prevDrop.Store(time.Now()) } return drop } @@ -188,7 +191,7 @@ func newLimiter(conf *Config) limit.Limiter { size := conf.WinBucket bucketDuration := conf.Window / time.Duration(conf.WinBucket) passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: size, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration}) cpu := func() int64 { return atomic.LoadInt64(&cpu) } diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go index 6a6f4048b..1a24fd7a7 100644 --- a/pkg/ratelimit/bbr/bbr_test.go +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -10,9 +10,8 @@ import ( "testing" "time" - limit "github.com/bilibili/kratos/pkg/ratelimit" + "github.com/bilibili/kratos/pkg/ratelimit" "github.com/bilibili/kratos/pkg/stat/metric" - "github.com/stretchr/testify/assert" ) @@ -36,7 +35,7 @@ func TestBBR(t *testing.T) { } else { count := rand.Intn(100) time.Sleep(time.Millisecond * time.Duration(count)) - f(limit.DoneInfo{Op: limit.Success}) + f(ratelimit.DoneInfo{Op: ratelimit.Success}) } } }() @@ -67,7 +66,7 @@ func TestBBRMaxPass(t *testing.T) { func TestBBRMinRt(t *testing.T) { bucketDuration := time.Millisecond * 100 - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { for j := i*10 + 1; j <= i*10+10; j++ { rtStat.Add(int64(j)) @@ -83,17 +82,17 @@ func TestBBRMinRt(t *testing.T) { // default max min rt is equal to maxFloat64. bucketDuration = time.Millisecond * 100 - rtStat = metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) bbr = &BBR{ rtStat: rtStat, } assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT()) } -func TestBBRMaxInflight(t *testing.T) { +func TestBBRMaxQps(t *testing.T) { bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { passStat.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { @@ -118,7 +117,7 @@ func TestBBRShouldDrop(t *testing.T) { } bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { passStat.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { @@ -149,7 +148,7 @@ func TestBBRShouldDrop(t *testing.T) { cpu = 700 bbr.inFlight = 80 assert.Equal(t, true, bbr.shouldDrop()) - + // cpu < 800, inflight > maxQps time.Sleep(2 * time.Second) cpu = 700 @@ -169,3 +168,103 @@ func TestGroup(t *testing.T) { assert.NotNil(t, limiter) }) } + +func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { + cpuGetter := func() int64 { + return 50 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { + cpuGetter := func() int64 { + return 90 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { + cpuGetter := func() int64 { + return 50 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + for i := 0; i < 10000; i++ { + done, err := bbr.Allow(context.TODO()) + time.Sleep(time.Millisecond * 1) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + } +} + +func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { + cpuGetter := func() int64 { + return 90 + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + conf: defaultConf, + } + for i := 0; i < 10000; i++ { + done, err := bbr.Allow(context.TODO()) + time.Sleep(time.Millisecond * 1) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + } +}