perfermance: cache maxPass & minRt for reducing cpu calculation

pull/207/head
程飞 6 years ago
parent bbb46d1abe
commit a52895637c
  1. 55
      pkg/ratelimit/bbr/bbr.go
  2. 168
      pkg/ratelimit/bbr/bbr_test.go
  3. 5
      pkg/stat/metric/rolling_counter.go

@ -18,6 +18,7 @@ import (
var ( var (
cpu int64 cpu int64
decay = 0.95 decay = 0.95
initTime = time.Now()
defaultConf = &Config{ defaultConf = &Config{
Window: time.Second * 10, Window: time.Second * 10,
WinBucket: 100, WinBucket: 100,
@ -72,6 +73,9 @@ type BBR struct {
winBucketPerSec int64 winBucketPerSec int64
conf *Config conf *Config
prevDrop atomic.Value prevDrop atomic.Value
prevDropHit int32
rawMaxPASS int64
rawMinRt int64
} }
// Config contains configs of bbr limiter. // Config contains configs of bbr limiter.
@ -85,9 +89,13 @@ type Config struct {
} }
func (l *BBR) maxPASS() int64 { func (l *BBR) maxPASS() int64 {
val := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS)
if rawMaxPass > 0 && l.passStat.Timespan() < 1 {
return rawMaxPass
}
rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
var result = 1.0 var result = 1.0
for iterator.Next() { for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket() bucket := iterator.Bucket()
count := 0.0 count := 0.0
for _, p := range bucket.Points { for _, p := range bucket.Points {
@ -97,16 +105,21 @@ func (l *BBR) maxPASS() int64 {
} }
return result return result
})) }))
if val == 0 { if rawMaxPass == 0 {
return 1 rawMaxPass = 1
} }
return val atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass)
return rawMaxPass
} }
func (l *BBR) minRT() int64 { func (l *BBR) minRT() int64 {
val := l.rtStat.Reduce(func(iterator metric.Iterator) float64 { rawMinRT := atomic.LoadInt64(&l.rawMinRt)
if rawMinRT > 0 && l.rtStat.Timespan() < 1 {
return rawMinRT
}
rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
var result = math.MaxFloat64 var result = math.MaxFloat64
for iterator.Next() { for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket() bucket := iterator.Bucket()
if len(bucket.Points) == 0 { if len(bucket.Points) == 0 {
continue continue
@ -119,8 +132,12 @@ func (l *BBR) minRT() int64 {
result = math.Min(result, avg) result = math.Min(result, avg)
} }
return result return result
}) })))
return int64(math.Ceil(val)) if rawMinRT <= 0 {
rawMinRT = 1
}
atomic.StoreInt64(&l.rawMinRt, rawMinRT)
return rawMinRT
} }
func (l *BBR) maxFlight() int64 { func (l *BBR) maxFlight() int64 {
@ -129,20 +146,28 @@ func (l *BBR) maxFlight() int64 {
func (l *BBR) shouldDrop() bool { func (l *BBR) shouldDrop() bool {
if l.cpu() < l.conf.CPUThreshold { if l.cpu() < l.conf.CPUThreshold {
prevDrop, ok := l.prevDrop.Load().(time.Time) prevDrop, _ := l.prevDrop.Load().(time.Duration)
if !ok { if prevDrop == 0 {
return false return false
} }
if time.Since(prevDrop) <= time.Second { if time.Since(initTime)-prevDrop <= time.Second {
if atomic.LoadInt32(&l.prevDropHit) == 0 {
atomic.StoreInt32(&l.prevDropHit, 1)
}
inFlight := atomic.LoadInt64(&l.inFlight) inFlight := atomic.LoadInt64(&l.inFlight)
return inFlight > 1 && inFlight > l.maxFlight() return inFlight > 1 && inFlight > l.maxFlight()
} }
l.prevDrop.Store(time.Duration(0))
return false return false
} }
inFlight := atomic.LoadInt64(&l.inFlight) inFlight := atomic.LoadInt64(&l.inFlight)
drop := inFlight > 1 && inFlight > l.maxFlight() drop := inFlight > 1 && inFlight > l.maxFlight()
if drop { if drop {
l.prevDrop.Store(time.Now()) prevDrop, _ := l.prevDrop.Load().(time.Duration)
if prevDrop != 0 {
return drop
}
l.prevDrop.Store(time.Since(initTime))
} }
return drop return drop
} }
@ -169,9 +194,9 @@ func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info l
return nil, ecode.LimitExceed return nil, ecode.LimitExceed
} }
atomic.AddInt64(&l.inFlight, 1) atomic.AddInt64(&l.inFlight, 1)
stime := time.Now() stime := time.Since(initTime)
return func(do limit.DoneInfo) { return func(do limit.DoneInfo) {
rt := int64(time.Since(stime) / time.Millisecond) rt := int64((time.Since(initTime) - stime) / time.Millisecond)
l.rtStat.Add(rt) l.rtStat.Add(rt)
atomic.AddInt64(&l.inFlight, -1) atomic.AddInt64(&l.inFlight, -1)
switch do.Op { switch do.Op {

@ -3,7 +3,6 @@ package bbr
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -15,6 +14,34 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func confForTest() *Config {
return &Config{
Window: time.Second,
WinBucket: 10,
CPUThreshold: 800,
}
}
func warmup(bbr *BBR, count int) {
for i := 0; i < count; i++ {
done, err := bbr.Allow(context.TODO())
time.Sleep(time.Millisecond * 1)
if err == nil {
done(ratelimit.DoneInfo{Op: ratelimit.Success})
}
}
}
func forceAllow(bbr *BBR) {
inflight := bbr.inFlight
bbr.inFlight = bbr.maxPASS() - 1
done, err := bbr.Allow(context.TODO())
if err == nil {
done(ratelimit.DoneInfo{Op: ratelimit.Success})
}
bbr.inFlight = inflight
}
func TestBBR(t *testing.T) { func TestBBR(t *testing.T) {
cfg := &Config{ cfg := &Config{
Window: time.Second * 5, Window: time.Second * 5,
@ -46,26 +73,21 @@ func TestBBR(t *testing.T) {
func TestBBRMaxPass(t *testing.T) { func TestBBRMaxPass(t *testing.T) {
bucketDuration := time.Millisecond * 100 bucketDuration := time.Millisecond * 100
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) bbr := newLimiter(confForTest()).(*BBR)
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
passStat.Add(int64(i * 100)) bbr.passStat.Add(int64(i * 100))
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
bbr := &BBR{
passStat: passStat,
}
assert.Equal(t, int64(1000), bbr.maxPASS()) assert.Equal(t, int64(1000), bbr.maxPASS())
// default max pass is equal to 1. // default max pass is equal to 1.
passStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) bbr = newLimiter(confForTest()).(*BBR)
bbr = &BBR{
passStat: passStat,
}
assert.Equal(t, int64(1), bbr.maxPASS()) assert.Equal(t, int64(1), bbr.maxPASS())
} }
func TestBBRMinRt(t *testing.T) { func TestBBRMinRt(t *testing.T) {
bucketDuration := time.Millisecond * 100 bucketDuration := time.Millisecond * 100
bbr := newLimiter(confForTest()).(*BBR)
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
@ -75,26 +97,23 @@ func TestBBRMinRt(t *testing.T) {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
} }
bbr := &BBR{ bbr.rtStat = rtStat
rtStat: rtStat,
}
assert.Equal(t, int64(6), bbr.minRT()) assert.Equal(t, int64(6), bbr.minRT())
// default max min rt is equal to maxFloat64. // default max min rt is equal to maxFloat64.
bucketDuration = time.Millisecond * 100 bucketDuration = time.Millisecond * 100
rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) bbr = newLimiter(confForTest()).(*BBR)
bbr = &BBR{ bbr.rtStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
rtStat: rtStat, assert.Equal(t, int64(1), bbr.minRT())
}
assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT())
} }
func TestBBRMaxQps(t *testing.T) { func TestBBRMaxQps(t *testing.T) {
bbr := newLimiter(confForTest()).(*BBR)
bucketDuration := time.Millisecond * 100 bucketDuration := time.Millisecond * 100
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
passStat.Add(int64((i + 1) * 100)) passStat.Add(int64((i + 2) * 100))
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
rtStat.Add(int64(j)) rtStat.Add(int64(j))
} }
@ -102,17 +121,15 @@ func TestBBRMaxQps(t *testing.T) {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
} }
bbr := &BBR{ bbr.passStat = passStat
passStat: passStat, bbr.rtStat = rtStat
rtStat: rtStat,
winBucketPerSec: 10,
}
assert.Equal(t, int64(60), bbr.maxFlight()) assert.Equal(t, int64(60), bbr.maxFlight())
} }
func TestBBRShouldDrop(t *testing.T) { func TestBBRShouldDrop(t *testing.T) {
var cpu int64 var cpu int64
cpuGetter := func() int64 { bbr := newLimiter(confForTest()).(*BBR)
bbr.cpu = func() int64 {
return cpu return cpu
} }
bucketDuration := time.Millisecond * 100 bucketDuration := time.Millisecond * 100
@ -127,13 +144,8 @@ func TestBBRShouldDrop(t *testing.T) {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
} }
bbr := &BBR{ bbr.passStat = passStat
cpu: cpuGetter, bbr.rtStat = rtStat
passStat: passStat,
rtStat: rtStat,
winBucketPerSec: 10,
conf: defaultConf,
}
// cpu >= 800, inflight < maxQps // cpu >= 800, inflight < maxQps
cpu = 800 cpu = 800
bbr.inFlight = 50 bbr.inFlight = 50
@ -170,19 +182,10 @@ func TestGroup(t *testing.T) {
} }
func BenchmarkBBRAllowUnderLowLoad(b *testing.B) { func BenchmarkBBRAllowUnderLowLoad(b *testing.B) {
cpuGetter := func() int64 { bbr := newLimiter(confForTest()).(*BBR)
bbr.cpu = func() int64 {
return 500 return 500
} }
bucketDuration := time.Millisecond * 100
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
bbr := &BBR{
cpu: cpuGetter,
passStat: passStat,
rtStat: rtStat,
winBucketPerSec: 10,
conf: defaultConf,
}
b.ResetTimer() b.ResetTimer()
for i := 0; i <= b.N; i++ { for i := 0; i <= b.N; i++ {
done, err := bbr.Allow(context.TODO()) done, err := bbr.Allow(context.TODO())
@ -193,21 +196,19 @@ func BenchmarkBBRAllowUnderLowLoad(b *testing.B) {
} }
func BenchmarkBBRAllowUnderHighLoad(b *testing.B) { func BenchmarkBBRAllowUnderHighLoad(b *testing.B) {
cpuGetter := func() int64 { bbr := newLimiter(confForTest()).(*BBR)
bbr.cpu = func() int64 {
return 900 return 900
} }
bucketDuration := time.Millisecond * 100 bbr.inFlight = 1
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
bbr := &BBR{
cpu: cpuGetter,
passStat: passStat,
rtStat: rtStat,
winBucketPerSec: 10,
conf: defaultConf,
}
b.ResetTimer() b.ResetTimer()
for i := 0; i <= b.N; i++ { for i := 0; i <= b.N; i++ {
if i%10000 == 0 {
maxFlight := bbr.maxFlight()
if maxFlight != 0 {
bbr.inFlight = rand.Int63n(bbr.maxFlight() * 2)
}
}
done, err := bbr.Allow(context.TODO()) done, err := bbr.Allow(context.TODO())
if err == nil { if err == nil {
done(ratelimit.DoneInfo{Op: ratelimit.Success}) done(ratelimit.DoneInfo{Op: ratelimit.Success})
@ -216,26 +217,11 @@ func BenchmarkBBRAllowUnderHighLoad(b *testing.B) {
} }
func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) { func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) {
cpuGetter := func() int64 { bbr := newLimiter(confForTest()).(*BBR)
bbr.cpu = func() int64 {
return 500 return 500
} }
bucketDuration := time.Millisecond * 100 warmup(bbr, 10000)
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
bbr := &BBR{
cpu: cpuGetter,
passStat: passStat,
rtStat: rtStat,
winBucketPerSec: 10,
conf: defaultConf,
}
for i := 0; i < 10000; i++ {
done, err := bbr.Allow(context.TODO())
time.Sleep(time.Millisecond * 1)
if err == nil {
done(ratelimit.DoneInfo{Op: ratelimit.Success})
}
}
b.ResetTimer() b.ResetTimer()
for i := 0; i <= b.N; i++ { for i := 0; i <= b.N; i++ {
bbr.shouldDrop() bbr.shouldDrop()
@ -243,28 +229,34 @@ func BenchmarkBBRShouldDropUnderLowLoad(b *testing.B) {
} }
func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) { func BenchmarkBBRShouldDropUnderHighLoad(b *testing.B) {
cpuGetter := func() int64 { bbr := newLimiter(confForTest()).(*BBR)
bbr.cpu = func() int64 {
return 900 return 900
} }
bucketDuration := time.Millisecond * 100 warmup(bbr, 10000)
passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) bbr.inFlight = 1000
rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) b.ResetTimer()
bbr := &BBR{ for i := 0; i <= b.N; i++ {
cpu: cpuGetter, bbr.shouldDrop()
passStat: passStat, if i%10000 == 0 {
rtStat: rtStat, forceAllow(bbr)
winBucketPerSec: 10,
conf: defaultConf,
}
for i := 0; i < 10000; i++ {
done, err := bbr.Allow(context.TODO())
time.Sleep(time.Millisecond * 1)
if err == nil {
done(ratelimit.DoneInfo{Op: ratelimit.Success})
} }
} }
}
func BenchmarkBBRShouldDropUnderUnstableLoad(b *testing.B) {
bbr := newLimiter(confForTest()).(*BBR)
bbr.cpu = func() int64 {
return 500
}
warmup(bbr, 10000)
bbr.prevDrop.Store(time.Since(initTime))
bbr.inFlight = 1000
b.ResetTimer() b.ResetTimer()
for i := 0; i <= b.N; i++ { for i := 0; i <= b.N; i++ {
bbr.shouldDrop() bbr.shouldDrop()
if i%100000 == 0 {
forceAllow(bbr)
}
} }
} }

@ -13,6 +13,7 @@ var _ Aggregation = &rollingCounter{}
type RollingCounter interface { type RollingCounter interface {
Metric Metric
Aggregation Aggregation
Timespan() int
// Reduce applies the reduction function to all buckets within the window. // Reduce applies the reduction function to all buckets within the window.
Reduce(func(Iterator) float64) float64 Reduce(func(Iterator) float64) float64
} }
@ -66,3 +67,7 @@ func (r *rollingCounter) Sum() float64 {
func (r *rollingCounter) Value() int64 { func (r *rollingCounter) Value() int64 {
return int64(r.Sum()) return int64(r.Sum())
} }
func (r *rollingCounter) Timespan() int {
return r.policy.timespan()
}

Loading…
Cancel
Save