diff --git a/pkg/container/group/README.md b/pkg/container/group/README.md new file mode 100644 index 000000000..33d48ba01 --- /dev/null +++ b/pkg/container/group/README.md @@ -0,0 +1,12 @@ +#### group + +##### 项目简介 + +懒加载对象容器 + +##### 编译环境 + +- **推荐 Golang v1.12.1 以上版本编译执行** + +##### 依赖包 + diff --git a/pkg/container/group/example_test.go b/pkg/container/group/example_test.go new file mode 100644 index 000000000..9f52fb809 --- /dev/null +++ b/pkg/container/group/example_test.go @@ -0,0 +1,46 @@ +package group + +import "fmt" + +type Counter struct { + Value int +} + +func (c *Counter) Incr() { + c.Value++ +} + +func ExampleGroup_Get() { + new := func() interface{} { + fmt.Println("Only Once") + return &Counter{} + } + group := NewGroup(new) + + // Create a new Counter + group.Get("pass").(*Counter).Incr() + + // Get the created Counter again. + group.Get("pass").(*Counter).Incr() + // Output: + // Only Once +} + +func ExampleGroup_Reset() { + new := func() interface{} { + return &Counter{} + } + group := NewGroup(new) + + newV2 := func() interface{} { + fmt.Println("New V2") + return &Counter{} + } + // Reset the new function and clear all created objects. + group.Reset(newV2) + + // Create a new Counter + group.Get("pass").(*Counter).Incr() + // Output: + // New V2 +} diff --git a/pkg/container/group/group.go b/pkg/container/group/group.go new file mode 100644 index 000000000..cf0b80e14 --- /dev/null +++ b/pkg/container/group/group.go @@ -0,0 +1,55 @@ +// Package group provides a sample lazy load container. +// The group only creating a new object not until the object is needed by user. +// And it will cache all the objects to reduce the creation of object. +package group + +import "sync" + +// Group is a lazy load container. +type Group struct { + new func() interface{} + objs sync.Map + sync.RWMutex +} + +// NewGroup news a group container. +func NewGroup(new func() interface{}) *Group { + if new == nil { + panic("container.group: can't assign a nil to the new function") + } + return &Group{ + new: new, + } +} + +// Get gets the object by the given key. +func (g *Group) Get(key string) interface{} { + g.RLock() + new := g.new + g.RUnlock() + obj, ok := g.objs.Load(key) + if !ok { + obj = new() + g.objs.Store(key, obj) + } + return obj +} + +// Reset resets the new function and deletes all existing objects. +func (g *Group) Reset(new func() interface{}) { + if new == nil { + panic("container.group: can't assign a nil to the new function") + } + g.Lock() + g.new = new + g.Unlock() + g.Clear() +} + +// Clear deletes all objects. +func (g *Group) Clear() { + g.objs.Range(func(key, value interface{}) bool { + g.objs.Delete(key) + return true + }) +} diff --git a/pkg/container/group/group_test.go b/pkg/container/group/group_test.go new file mode 100644 index 000000000..c52827295 --- /dev/null +++ b/pkg/container/group/group_test.go @@ -0,0 +1,69 @@ +package group + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGroupGet(t *testing.T) { + count := 0 + g := NewGroup(func() interface{} { + count++ + return count + }) + v := g.Get("/x/internal/dummy/user") + assert.Equal(t, 1, v.(int)) + + v = g.Get("/x/internal/dummy/avatar") + assert.Equal(t, 2, v.(int)) + + v = g.Get("/x/internal/dummy/user") + assert.Equal(t, 1, v.(int)) + assert.Equal(t, 2, count) + +} + +func TestGroupReset(t *testing.T) { + g := NewGroup(func() interface{} { + return 1 + }) + g.Get("/x/internal/dummy/user") + call := false + g.Reset(func() interface{} { + call = true + return 1 + }) + + length := 0 + g.objs.Range(func(_, _ interface{}) bool { + length++ + return true + }) + assert.Equal(t, 0, length) + + g.Get("/x/internal/dummy/user") + assert.Equal(t, true, call) +} + +func TestGroupClear(t *testing.T) { + g := NewGroup(func() interface{} { + return 1 + }) + g.Get("/x/internal/dummy/user") + length := 0 + g.objs.Range(func(_, _ interface{}) bool { + length++ + return true + }) + assert.Equal(t, 1, length) + + g.Clear() + length = 0 + g.objs.Range(func(_, _ interface{}) bool { + length++ + return true + }) + assert.Equal(t, 0, length) + +} diff --git a/pkg/ratelimit/README.md b/pkg/ratelimit/README.md new file mode 100644 index 000000000..5cc494138 --- /dev/null +++ b/pkg/ratelimit/README.md @@ -0,0 +1,14 @@ +# rate + +# 项目简介 +BBR 限流 + +# 编译环境 + + +# 依赖包 + + +# 编译执行 + + \ No newline at end of file diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go new file mode 100644 index 000000000..74d1dcc3d --- /dev/null +++ b/pkg/ratelimit/bbr/bbr.go @@ -0,0 +1,222 @@ +package bbr + +import ( + "context" + "math" + "sync/atomic" + "time" + + "github.com/Bilibili/kratos/pkg/container/group" + "github.com/Bilibili/kratos/pkg/ecode" + "github.com/Bilibili/kratos/pkg/log" + limit "github.com/Bilibili/kratos/pkg/ratelimit" + "github.com/Bilibili/kratos/pkg/stat/metric" + + cpustat "github.com/Bilibili/kratos/pkg/stat/sys/cpu" +) + +var ( + cpu int64 + decay = 0.75 + defaultConf = &Config{ + Window: time.Second * 5, + WinBucket: 50, + CPUThreshold: 800, + } +) + +type cpuGetter func() int64 + +func init() { + go cpuproc() +} + +func cpuproc() { + defer func() { + if err := recover(); err != nil { + log.Error("rate.limit.cpuproc() err(%+v)", err) + go cpuproc() + } + }() + ticker := time.NewTicker(time.Millisecond * 250) + // EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863 + for range ticker.C { + stat := &cpustat.Stat{} + cpustat.ReadStat(stat) + prevCpu := atomic.LoadInt64(&cpu) + curCpu := int64(float64(prevCpu)*decay + float64(stat.Usage)*(1.0-decay)) + atomic.StoreInt64(&cpu, curCpu) + } +} + +// Stats contains the metrics's snapshot of bbr. +type Stat struct { + Cpu int64 + InFlight int64 + MaxInFlight int64 + MinRt int64 + MaxPass int64 +} + +// BBR implements bbr-like limiter. +// It is inspired by sentinel. +// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81 +type BBR struct { + cpu cpuGetter + passStat metric.RollingCounter + rtStat metric.RollingGauge + inFlight int64 + winBucketPerSec int64 + conf *Config + prevDrop time.Time +} + +// Config contains configs of bbr limiter. +type Config struct { + Enabled bool + Window time.Duration + WinBucket int + Rule string + Debug bool + CPUThreshold int64 +} + +func (l *BBR) maxPASS() int64 { + val := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 { + var result = 1.0 + for iterator.Next() { + bucket := iterator.Bucket() + count := 0.0 + for _, p := range bucket.Points { + count += p + } + result = math.Max(result, count) + } + return result + })) + if val == 0 { + return 1 + } + return val +} + +func (l *BBR) minRT() int64 { + val := l.rtStat.Reduce(func(iterator metric.Iterator) float64 { + var result = math.MaxFloat64 + for iterator.Next() { + bucket := iterator.Bucket() + if len(bucket.Points) == 0 { + continue + } + total := 0.0 + for _, p := range bucket.Points { + total += p + } + avg := total / float64(bucket.Count) + result = math.Min(result, avg) + } + return result + }) + return int64(math.Ceil(val)) +} + +func (l *BBR) maxFlight() int64 { + return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0 + 0.5)) +} + +func (l *BBR) shouldDrop() bool { + inFlight := atomic.LoadInt64(&l.inFlight) + maxInflight := l.maxFlight() + if l.cpu() < l.conf.CPUThreshold { + if time.Now().Sub(l.prevDrop) <= 1000*time.Millisecond { + return inFlight > 1 && inFlight > maxInflight + } + return false + } + return inFlight > 1 && inFlight > maxInflight +} + +// Stat tasks a snapshot of the bbr limiter. +func (l *BBR) Stat() Stat { + return Stat{ + Cpu: l.cpu(), + InFlight: atomic.LoadInt64(&l.inFlight), + MinRt: l.minRT(), + MaxPass: l.maxPASS(), + MaxInFlight: l.maxFlight(), + } +} + +// Allow checks all inbound traffic. +// Once overload is detected, it raises ecode.LimitExceed error. +func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info limit.DoneInfo), error) { + allowOpts := limit.DefaultAllowOpts() + for _, opt := range opts { + opt.Apply(&allowOpts) + } + if l.shouldDrop() { + l.prevDrop = time.Now() + return nil, ecode.LimitExceed + } + atomic.AddInt64(&l.inFlight, 1) + stime := time.Now() + return func(do limit.DoneInfo) { + rt := int64(time.Since(stime) / time.Millisecond) + l.rtStat.Add(rt) + atomic.AddInt64(&l.inFlight, -1) + switch do.Op { + case limit.Success: + l.passStat.Add(1) + return + default: + return + } + }, nil +} + +func newLimiter(conf *Config) limit.Limiter { + if conf == nil { + conf = defaultConf + } + size := conf.WinBucket + bucketDuration := conf.Window / time.Duration(conf.WinBucket) + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: size, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: size, BucketDuration: bucketDuration}) + cpu := func() int64 { + return atomic.LoadInt64(&cpu) + } + limiter := &BBR{ + cpu: cpu, + conf: conf, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: int64(time.Second) / (int64(conf.Window) / int64(conf.WinBucket)), + prevDrop: time.Unix(0, 0), + } + return limiter +} + +// Group represents a class of BBRLimiter and forms a namespace in which +// units of BBRLimiter. +type Group struct { + group *group.Group +} + +// NewGroup new a limiter group container, if conf nil use default conf. +func NewGroup(conf *Config) *Group { + if conf == nil { + conf = defaultConf + } + group := group.NewGroup(func() interface{} { + return newLimiter(conf) + }) + return &Group{ + group: group, + } +} + +// Get get a limiter by a specified key, if limiter not exists then make a new one. +func (g *Group) Get(key string) limit.Limiter { + limiter := g.group.Get(key) + return limiter.(limit.Limiter) +} diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go new file mode 100644 index 000000000..1c078f119 --- /dev/null +++ b/pkg/ratelimit/bbr/bbr_test.go @@ -0,0 +1,166 @@ +package bbr + +import ( + "context" + "fmt" + "math" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + limit "github.com/Bilibili/kratos/pkg/ratelimit" + "github.com/Bilibili/kratos/pkg/stat/metric" + + "github.com/stretchr/testify/assert" +) + +func TestBBR(t *testing.T) { + cfg := &Config{ + Window: time.Second * 5, + WinBucket: 50, + CPUThreshold: 100, + } + limiter := newLimiter(cfg) + var wg sync.WaitGroup + var drop int64 + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 300; i++ { + f, err := limiter.Allow(context.TODO()) + if err != nil { + atomic.AddInt64(&drop, 1) + } else { + count := rand.Intn(100) + time.Sleep(time.Millisecond * time.Duration(count)) + f(limit.DoneInfo{Op: limit.Success}) + } + } + }() + } + wg.Wait() + fmt.Println("drop: ", drop) +} + +func TestBBRMaxPass(t *testing.T) { + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + for i := 1; i <= 10; i++ { + passStat.Add(int64(i * 100)) + time.Sleep(bucketDuration) + } + bbr := &BBR{ + passStat: passStat, + } + assert.Equal(t, int64(1000), bbr.maxPASS()) + + // default max pass is equal to 1. + passStat = metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + bbr = &BBR{ + passStat: passStat, + } + assert.Equal(t, int64(1), bbr.maxPASS()) +} + +func TestBBRMinRt(t *testing.T) { + bucketDuration := time.Millisecond * 100 + rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + for i := 0; i < 10; i++ { + for j := i*10 + 1; j <= i*10+10; j++ { + rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration) + } + } + bbr := &BBR{ + rtStat: rtStat, + } + assert.Equal(t, int64(6), bbr.minRT()) + + // default max min rt is equal to maxFloat64. + bucketDuration = time.Millisecond * 100 + rtStat = metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + bbr = &BBR{ + rtStat: rtStat, + } + assert.Equal(t, int64(math.Ceil(math.MaxFloat64)), bbr.minRT()) +} + +func TestBBRMaxQps(t *testing.T) { + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + for i := 0; i < 10; i++ { + passStat.Add(int64((i + 1) * 100)) + for j := i*10 + 1; j <= i*10+10; j++ { + rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration) + } + } + bbr := &BBR{ + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + } + assert.Equal(t, int64(60), bbr.maxQPS()) +} + +func TestBBRShouldDrop(t *testing.T) { + var cpu int64 + cpuGetter := func() int64 { + return cpu + } + bucketDuration := time.Millisecond * 100 + passStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration}) + rtStat := metric.NewRollingGauge(metric.RollingGaugeOpts{Size: 10, BucketDuration: bucketDuration}) + for i := 0; i < 10; i++ { + passStat.Add(int64((i + 1) * 100)) + for j := i*10 + 1; j <= i*10+10; j++ { + rtStat.Add(int64(j)) + } + if i != 9 { + time.Sleep(bucketDuration) + } + } + bbr := &BBR{ + cpu: cpuGetter, + passStat: passStat, + rtStat: rtStat, + winBucketPerSec: 10, + prevDrop: time.Unix(0, 0), + conf: defaultConf, + } + // cpu >= 800, inflight < maxQps + cpu = 800 + bbr.inFlight = 50 + assert.Equal(t, false, bbr.shouldDrop()) + + // cpu >= 800, inflight > maxQps + cpu = 800 + bbr.inFlight = 80 + assert.Equal(t, true, bbr.shouldDrop()) + + // cpu < 800, inflight > maxQps + cpu = 700 + bbr.inFlight = 80 + assert.Equal(t, false, bbr.shouldDrop()) +} + +func TestGroup(t *testing.T) { + cfg := &Config{ + Window: time.Second * 5, + WinBucket: 50, + CPUThreshold: 100, + } + group := NewGroup(cfg) + t.Run("get", func(t *testing.T) { + limiter := group.Get("test") + assert.NotNil(t, limiter) + }) +} diff --git a/pkg/ratelimit/limiter.go b/pkg/ratelimit/limiter.go new file mode 100644 index 000000000..79c8e8ab1 --- /dev/null +++ b/pkg/ratelimit/limiter.go @@ -0,0 +1,40 @@ +package ratelimit + +import ( + "context" +) + +// Op operations type. +type Op int + +const ( + // Success opertion type: success + Success Op = iota + // Ignore opertion type: ignore + Ignore + // Drop opertion type: drop + Drop +) + +type allowOptions struct{} + +// AllowOptions allow options. +type AllowOption interface { + Apply(*allowOptions) +} + +// DoneInfo done info. +type DoneInfo struct { + Err error + Op Op +} + +// DefaultAllowOpts returns the default allow options. +func DefaultAllowOpts() allowOptions { + return allowOptions{} +} + +// Limiter limit interface. +type Limiter interface { + Allow(ctx context.Context, opts ...AllowOption) (func(info DoneInfo), error) +}