From a52895637c6090cf74acb3ca0be16c82939549c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Mon, 15 Jul 2019 00:11:44 +0800 Subject: [PATCH] perfermance: cache maxPass & minRt for reducing cpu calculation --- pkg/ratelimit/bbr/bbr.go | 55 +++++++--- pkg/ratelimit/bbr/bbr_test.go | 168 ++++++++++++++--------------- pkg/stat/metric/rolling_counter.go | 5 + 3 files changed, 125 insertions(+), 103 deletions(-) diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go index 69576a4e3..304ab648d 100644 --- a/pkg/ratelimit/bbr/bbr.go +++ b/pkg/ratelimit/bbr/bbr.go @@ -18,6 +18,7 @@ import ( var ( cpu int64 decay = 0.95 + initTime = time.Now() defaultConf = &Config{ Window: time.Second * 10, WinBucket: 100, @@ -72,6 +73,9 @@ type BBR struct { winBucketPerSec int64 conf *Config prevDrop atomic.Value + prevDropHit int32 + rawMaxPASS int64 + rawMinRt int64 } // Config contains configs of bbr limiter. @@ -85,9 +89,13 @@ type Config struct { } func (l *BBR) maxPASS() int64 { - val := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { + rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS) + if rawMaxPass > 0 && l.passStat.Timespan() < 1 { + return rawMaxPass + } + rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { var result = 1.0 - for iterator.Next() { + for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() count := 0.0 for _, p := range bucket.Points { @@ -97,16 +105,21 @@ func (l *BBR) maxPASS() int64 { } return result })) - if val == 0 { - return 1 + if rawMaxPass == 0 { + rawMaxPass = 1 } - return val + atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass) + return rawMaxPass } func (l *BBR) minRT() int64 { - val := l.rtStat.Reduce(func(iterator metric.Iterator) float64 { + rawMinRT := atomic.LoadInt64(&l.rawMinRt) + if rawMinRT > 0 && l.rtStat.Timespan() < 1 { + return rawMinRT + } + rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 { var result = math.MaxFloat64 - for iterator.Next() { + for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ { bucket := iterator.Bucket() if len(bucket.Points) == 0 { continue @@ -119,8 +132,12 @@ func (l *BBR) minRT() int64 { result = math.Min(result, avg) } return result - }) - return int64(math.Ceil(val)) + }))) + if rawMinRT <= 0 { + rawMinRT = 1 + } + atomic.StoreInt64(&l.rawMinRt, rawMinRT) + return rawMinRT } func (l *BBR) maxFlight() int64 { @@ -129,20 +146,28 @@ func (l *BBR) maxFlight() int64 { func (l *BBR) shouldDrop() bool { if l.cpu() < l.conf.CPUThreshold { - prevDrop, ok := l.prevDrop.Load().(time.Time) - if !ok { + prevDrop, _ := l.prevDrop.Load().(time.Duration) + if prevDrop == 0 { return false } - if time.Since(prevDrop) <= time.Second { + if time.Since(initTime)-prevDrop <= time.Second { + if atomic.LoadInt32(&l.prevDropHit) == 0 { + atomic.StoreInt32(&l.prevDropHit, 1) + } inFlight := atomic.LoadInt64(&l.inFlight) return inFlight > 1 && inFlight > l.maxFlight() } + l.prevDrop.Store(time.Duration(0)) return false } inFlight := atomic.LoadInt64(&l.inFlight) drop := inFlight > 1 && inFlight > l.maxFlight() if drop { - l.prevDrop.Store(time.Now()) + prevDrop, _ := l.prevDrop.Load().(time.Duration) + if prevDrop != 0 { + return drop + } + l.prevDrop.Store(time.Since(initTime)) } return drop } @@ -169,9 +194,9 @@ func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info l return nil, ecode.LimitExceed } atomic.AddInt64(&l.inFlight, 1) - stime := time.Now() + stime := time.Since(initTime) return func(do limit.DoneInfo) { - rt := int64(time.Since(stime) / time.Millisecond) + rt := int64((time.Since(initTime) - stime) / time.Millisecond) l.rtStat.Add(rt) atomic.AddInt64(&l.inFlight, -1) switch do.Op { diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go index 7672614c8..aff053af8 100644 --- a/pkg/ratelimit/bbr/bbr_test.go +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -3,7 +3,6 @@ package bbr import ( "context" "fmt" - "math" "math/rand" "sync" "sync/atomic" @@ -15,6 +14,34 @@ import ( "github.com/stretchr/testify/assert" ) +func confForTest() *Config { + return &Config{ + Window: time.Second, + WinBucket: 10, + CPUThreshold: 800, + } +} + +func warmup(bbr *BBR, count int) { + for i := 0; i < count; i++ { + done, err := bbr.Allow(context.TODO()) + time.Sleep(time.Millisecond * 1) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + } +} + +func forceAllow(bbr *BBR) { + inflight := bbr.inFlight + bbr.inFlight = bbr.maxPASS() - 1 + done, err := bbr.Allow(context.TODO()) + if err == nil { + done(ratelimit.DoneInfo{Op: ratelimit.Success}) + } + bbr.inFlight = inflight +} + func TestBBR(t *testing.T) { cfg := &Config{ Window: time.Second * 5, @@ -46,26 +73,21 @@ func TestBBR(t *testing.T) { func TestBBRMaxPass(t *testing.T) { bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr := newLimiter(confForTest()).(*BBR) for i := 1; i <= 10; i++ { - passStat.Add(int64(i * 100)) + bbr.passStat.Add(int64(i * 100)) time.Sleep(bucketDuration) } - bbr := &BBR{ - passStat: passStat, - } assert.Equal(t, int64(1000), bbr.maxPASS()) // default max pass is equal to 1. - passStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr = &BBR{ - passStat: passStat, - } + bbr = newLimiter(confForTest()).(*BBR) assert.Equal(t, int64(1), bbr.maxPASS()) } func TestBBRMinRt(t *testing.T) { bucketDuration := time.Millisecond * 100 + bbr := newLimiter(confForTest()).(*BBR) rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { for j := i*10 + 1; j <= i*10+10; j++ { @@ -75,26 +97,23 @@ func TestBBRMinRt(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - rtStat: rtStat, - } + bbr.rtStat = rtStat assert.Equal(t, int64(6), bbr.minRT()) // default max min rt is equal to maxFloat64. bucketDuration = time.Millisecond * 100 - rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr = &BBR{ - rtStat: rtStat, - } - assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT()) + bbr = newLimiter(confForTest()).(*BBR) + bbr.rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + assert.Equal(t, int64(1), bbr.minRT()) } func TestBBRMaxQps(t *testing.T) { + bbr := newLimiter(confForTest()).(*BBR) bucketDuration := time.Millisecond * 100 passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) for i := 0; i < 10; i++ { - passStat.Add(int64((i + 1) * 100)) + passStat.Add(int64((i + 2) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { rtStat.Add(int64(j)) } @@ -102,17 +121,15 @@ func TestBBRMaxQps(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - } + bbr.passStat = passStat + bbr.rtStat = rtStat assert.Equal(t, int64(60), bbr.maxFlight()) } func TestBBRShouldDrop(t *testing.T) { var cpu int64 - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return cpu } bucketDuration := time.Millisecond * 100 @@ -127,13 +144,8 @@ func TestBBRShouldDrop(t *testing.T) { time.Sleep(bucketDuration) } } - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } + bbr.passStat = passStat + bbr.rtStat = rtStat // cpu >= 800, inflight < maxQps cpu = 800 bbr.inFlight = 50 @@ -170,19 +182,10 @@ func TestGroup(t *testing.T) { } func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 500 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } b.ResetTimer() for i := 0; i <= b.N; i++ { done, err := bbr.Allow(context.TODO()) @@ -193,21 +196,19 @@ func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { } func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 900 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } + bbr.inFlight = 1 b.ResetTimer() for i := 0; i <= b.N; i++ { + if i%10000 == 0 { + maxFlight := bbr.maxFlight() + if maxFlight != 0 { + bbr.inFlight = rand.Int63n(bbr.maxFlight() * 2) + } + } done, err := bbr.Allow(context.TODO()) if err == nil { done(ratelimit.DoneInfo{Op: ratelimit.Success}) @@ -216,26 +217,11 @@ func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { } func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 500 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } - for i := 0; i < 10000; i++ { - done, err := bbr.Allow(context.TODO()) - time.Sleep(time.Millisecond * 1) - if err == nil { - done(ratelimit.DoneInfo{Op: ratelimit.Success}) - } - } + warmup(bbr, 10000) b.ResetTimer() for i := 0; i <= b.N; i++ { bbr.shouldDrop() @@ -243,28 +229,34 @@ func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { } func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { - cpuGetter := func() int64 { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { return 900 } - bucketDuration := time.Millisecond * 100 - passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) - bbr := &BBR{ - cpu: cpuGetter, - passStat: passStat, - rtStat: rtStat, - winBucketPerSec: 10, - conf: defaultConf, - } - for i := 0; i < 10000; i++ { - done, err := bbr.Allow(context.TODO()) - time.Sleep(time.Millisecond * 1) - if err == nil { - done(ratelimit.DoneInfo{Op: ratelimit.Success}) + warmup(bbr, 10000) + bbr.inFlight = 1000 + b.ResetTimer() + for i := 0; i <= b.N; i++ { + bbr.shouldDrop() + if i%10000 == 0 { + forceAllow(bbr) } } +} + +func BenchmarkBBRShouldDropUnderUnstableLoad(b *testing.B) { + bbr := newLimiter(confForTest()).(*BBR) + bbr.cpu = func() int64 { + return 500 + } + warmup(bbr, 10000) + bbr.prevDrop.Store(time.Since(initTime)) + bbr.inFlight = 1000 b.ResetTimer() for i := 0; i <= b.N; i++ { bbr.shouldDrop() + if i%100000 == 0 { + forceAllow(bbr) + } } } diff --git a/pkg/stat/metric/rolling_counter.go b/pkg/stat/metric/rolling_counter.go index bbaf7a138..c839cea0a 100644 --- a/pkg/stat/metric/rolling_counter.go +++ b/pkg/stat/metric/rolling_counter.go @@ -13,6 +13,7 @@ var _ Aggregation = &rollingCounter{} type RollingCounter interface { Metric Aggregation + Timespan() int // Reduce applies the reduction function to all buckets within the window. Reduce(func(Iterator) float64) float64 } @@ -66,3 +67,7 @@ func (r *rollingCounter) Sum() float64 { func (r *rollingCounter) Value() int64 { return int64(r.Sum()) } + +func (r *rollingCounter) Timespan() int { + return r.policy.timespan() +}