diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go index f7bca28d9..304ab648d 100644 --- a/pkg/ratelimit/bbr/bbr.go +++ b/pkg/ratelimit/bbr/bbr.go @@ -18,6 +18,7 @@ import ( var ( cpu int64 decay = 0.95 + initTime = time.Now() defaultConf = &Config{ Window: time.Second * 10, WinBucket: 100, @@ -67,11 +68,14 @@ type Stat struct { type BBR struct { cpu cpuGetter passStat metric.RollingCounter - rtStat metric.RollingGauge + rtStat metric.RollingCounter inFlight int64 winBucketPerSec int64 conf *Config - prevDrop int64 + prevDrop atomic.Value + prevDropHit int32 + rawMaxPASS int64 + rawMinRt int64 } // Config contains configs of bbr limiter. @@ -85,9 +89,13 @@ type Config struct { } func (l *BBR) maxPASS() int64 { - val := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { + rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS) + if rawMaxPass > 0 && l.passStat.Timespan() < 1 { + return rawMaxPass + } + rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { var result = 1.0 - for iterator.Next() { + for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() count := 0.0 for _, p := range bucket.Points { @@ -97,16 +105,21 @@ func (l *BBR) maxPASS() int64 { } return result })) - if val == 0 { - return 1 + if rawMaxPass == 0 { + rawMaxPass = 1 } - return val + atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass) + return rawMaxPass } func (l *BBR) minRT() int64 { - val := l.rtStat.Reduce(func(iterator metric.Iterator) float64 { + rawMinRT := atomic.LoadInt64(&l.rawMinRt) + if rawMinRT > 0 && l.rtStat.Timespan() < 1 { + return rawMinRT + } + rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 { var result = math.MaxFloat64 - for iterator.Next() { + for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() if len(bucket.Points) == 0 { continue @@ -119,8 +132,12 @@ func (l *BBR) minRT() int64 { result = math.Min(result, avg) } return result - }) - return int64(math.Ceil(val)) + }))) + if rawMinRT <= 0 { + rawMinRT = 1 + } + atomic.StoreInt64(&l.rawMinRt, rawMinRT) + return rawMinRT } func (l *BBR) maxFlight() int64 { @@ -128,18 +145,29 @@ func (l *BBR) maxFlight() int64 { } func (l *BBR) shouldDrop() bool { - inFlight := atomic.LoadInt64(&l.inFlight) - maxInflight := l.maxFlight() if l.cpu() < l.conf.CPUThreshold { - prevDrop := atomic.LoadInt64(&l.prevDrop) - if time.Now().Unix()-prevDrop <= 1 { - return inFlight > 1 && inFlight > maxInflight + prevDrop, _ := l.prevDrop.Load().(time.Duration) + if prevDrop == 0 { + return false } + if time.Since(initTime)-prevDrop <= time.Second { + if atomic.LoadInt32(&l.prevDropHit) == 0 { + atomic.StoreInt32(&l.prevDropHit, 1) + } + inFlight := atomic.LoadInt64(&l.inFlight) + return inFlight > 1 && inFlight > l.maxFlight() + } + l.prevDrop.Store(time.Duration(0)) return false } - drop := inFlight > 1 && inFlight > maxInflight + inFlight := atomic.LoadInt64(&l.inFlight) + drop := inFlight > 1 && inFlight > l.maxFlight() if drop { - atomic.StoreInt64(&l.prevDrop, time.Now().Unix()) + prevDrop, _ := l.prevDrop.Load().(time.Duration) + if prevDrop != 0 { + return drop + } + l.prevDrop.Store(time.Since(initTime)) } return drop } @@ -166,9 +194,9 @@ func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info l return nil, ecode.LimitExceed } atomic.AddInt64(&l.inFlight, 1) - stime := time.Now() + stime := time.Since(initTime) return func(do limit.DoneInfo) { - rt := int64(time.Since(stime) / time.Millisecond) + rt := int64((time.Since(initTime) - stime) / time.Millisecond) l.rtStat.Add(rt) atomic.AddInt64(&l.inFlight, -1) switch do.Op { @@ -188,7 +216,7 @@ func newLimiter(conf *Config) limit.Limiter { size := conf.WinBucket bucketDuration := conf.Window / time.Duration(conf.WinBucket) passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: size, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration}) cpu := func() int64 { return atomic.LoadInt64(&cpu) } diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go index 6a6f4048b..aff053af8 100644 --- a/pkg/ratelimit/bbr/bbr_test.go +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -3,19 +3,45 @@ package bbr import ( "context" "fmt" - "math" "math/rand" "sync" "sync/atomic" "testing" "time" - limit "github.com/bilibili/kratos/pkg/ratelimit" + "github.com/bilibili/kratos/pkg/ratelimit" "github.com/bilibili/kratos/pkg/stat/metric" - "github.com/stretchr/testify/assert" ) +func confForTest() *Config { + return &Config{ + Window: time.Second, + WinBucket: 10, + CPUThreshold: 800, + } +} + +func warmup(bbr *BBR, count int) { + for i := 0; i < count; i++ { + done, err := bbr.Allow(context.TODO()) + time.Sleep(time.Millisecond * 1) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func forceAllow(bbr *BBR) { + inflight := bbr.inFlight + bbr.inFlight = bbr.maxPASS() - 1 + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + bbr.inFlight = inflight +} + func TestBBR(t *testing.T) { cfg := &Config{ Window: time.Second * 5, @@ -36,7 +62,7 @@ func TestBBR(t *testing.T) { } else { count := rand.Intn(100) time.Sleep(time.Millisecond * time.Duration(count)) - f(limit.DoneInfo{Op: limit.Success}) + f(ratelimit.DoneInfo{Op: ratelimit.Success}) } } }() @@ -47,27 +73,22 @@ func TestBBR(t *testing.T) { func TestBBRMaxPass(t *testing.T) { bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := newLimiter(confForTest()).(*BBR) for i := 1; i <= 10; i++ { - passStat.Add(int64(i * 100)) + bbr.passStat.Add(int64(i * 100)) time.Sleep(bucketDuration) } - bbr := &BBR{ - passStat: passStat, - } assert.Equal(t, int64(1000), bbr.maxPASS()) // default max pass is equal to 1. - passStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr = &BBR{ - passStat: passStat, - } + bbr = newLimiter(confForTest()).(*BBR) assert.Equal(t, int64(1), bbr.maxPASS()) } func TestBBRMinRt(t *testing.T) { bucketDuration := time.Millisecond * 100 - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := newLimiter(confForTest()).(*BBR) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { for j := i*10 + 1; j <= i*10+10; j++ { rtStat.Add(int64(j)) @@ -76,26 +97,23 @@ func TestBBRMinRt(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - rtStat: rtStat, - } + bbr.rtStat = rtStat assert.Equal(t, int64(6), bbr.minRT()) // default max min rt is equal to maxFloat64. bucketDuration = time.Millisecond * 100 - rtStat = metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) - bbr = &BBR{ - rtStat: rtStat, - } - assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT()) + bbr = newLimiter(confForTest()).(*BBR) + bbr.rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + assert.Equal(t, int64(1), bbr.minRT()) } -func TestBBRMaxInflight(t *testing.T) { +func TestBBRMaxQps(t *testing.T) { + bbr := newLimiter(confForTest()).(*BBR) bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { - passStat.Add(int64((i + 1) * 100)) + passStat.Add(int64((i + 2) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { rtStat.Add(int64(j)) } @@ -103,22 +121,20 @@ func TestBBRMaxInflight(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - } + bbr.passStat = passStat + bbr.rtStat = rtStat assert.Equal(t, int64(60), bbr.maxFlight()) } func TestBBRShouldDrop(t *testing.T) { var cpu int64 - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return cpu } bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { passStat.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { @@ -128,13 +144,8 @@ func TestBBRShouldDrop(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } + bbr.passStat = passStat + bbr.rtStat = rtStat // cpu >= 800, inflight < maxQps cpu = 800 bbr.inFlight = 50 @@ -149,7 +160,7 @@ func TestBBRShouldDrop(t *testing.T) { cpu = 700 bbr.inFlight = 80 assert.Equal(t, true, bbr.shouldDrop()) - + // cpu < 800, inflight > maxQps time.Sleep(2 * time.Second) cpu = 700 @@ -169,3 +180,83 @@ func TestGroup(t *testing.T) { assert.NotNil(t, limiter) }) } + +func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { + return 500 + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { + return 900 + } + bbr.inFlight = 1 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + if i%10000 == 0 { + maxFlight := bbr.maxFlight() + if maxFlight != 0 { + bbr.inFlight = rand.Int63n(bbr.maxFlight() * 2) + } + } + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { + return 500 + } + warmup(bbr, 10000) + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + } +} + +func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { + return 900 + } + warmup(bbr, 10000) + bbr.inFlight = 1000 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + if i%10000 == 0 { + forceAllow(bbr) + } + } +} + +func BenchmarkBBRShouldDropUnderUnstableLoad(b *testing.B) { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { + return 500 + } + warmup(bbr, 10000) + bbr.prevDrop.Store(time.Since(initTime)) + bbr.inFlight = 1000 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + if i%100000 == 0 { + forceAllow(bbr) + } + } +} diff --git a/pkg/stat/metric/rolling_counter.go b/pkg/stat/metric/rolling_counter.go index bbaf7a138..c839cea0a 100644 --- a/pkg/stat/metric/rolling_counter.go +++ b/pkg/stat/metric/rolling_counter.go @@ -13,6 +13,7 @@ var _ Aggregation = &rollingCounter{} type RollingCounter interface { Metric Aggregation + Timespan() int // Reduce applies the reduction function to all buckets within the window. Reduce(func(Iterator) float64) float64 } @@ -66,3 +67,7 @@ func (r *rollingCounter) Sum() float64 { func (r *rollingCounter) Value() int64 { return int64(r.Sum()) } + +func (r *rollingCounter) Timespan() int { + return r.policy.timespan() +}