diff --git a/README.md b/README.md index b4b1f1c1f..e7994a743 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,38 @@ # Kratos -Kratos是[bilibili](https://www.bilibili.com)开源的一套Go微服务框架,包含大量微服务相关框架及工具。 -如:discovery(服务注册发现)、blademaster(HTTP框架)、warden(gRPC封装)、log、breaker、dapper(trace)、cache&db sdk、kratos tool(代码生成等工具)等等。 +Kratos是[bilibili](https://www.bilibili.com)开源的一套Go微服务框架,包含大量微服务相关框架及工具。主要包括以下组件: -我们致力于提供完整的微服务研发体验,大仓整合相关框架及工具后,微服务治理相关部分可对整体业务开发周期无感,从而更加聚焦于业务交付。对每位开发者而言,整套Kratos框架也是不错的学习仓库,可以了解和参考到[bilibili](https://www.bilibili.com)在微服务方面的技术积累和经验。 +* [http框架blademaster(bm)](doc/wiki-cn/blademaster.md):基于[gin](https://github.com/gin-gonic/gin)二次开发,具有快速、灵活的特点,可以方便的开发中间件处理通用或特殊逻辑,基础库默认实现了log&trace等。 +* [gRPC框架warden](doc/wiki-cn/warden.md):基于官方gRPC封装,默认使用[discovery](https://github.com/bilibili/discovery)进行服务注册发现,及wrr和p2c(默认)负载均衡。 +* [dapper trace](doc/wiki-cn/dapper.md):基于opentracing,全链路集成了trace,我们还提供dapper实现,请参看:[dapper敬请期待]()。 +* [log](doc/wiki-cn/logger.md):基于[zap](https://github.com/uber-go/zap)的field方式实现的高性能log库,集成了我们提供的[log-agent敬请期待]()日志收集方案。 +* [database](doc/wiki-cn/database.md):集成MySQL&HBase&TiDB的SDK,其中TiDB使用服务发现方案。 +* [cache](doc/wiki-cn/cache.md):集成memcache&redis的SDK,注意无redis-cluster实现,推荐使用代理模式[overlord](https://github.com/bilibili/overlord)。 +* [kratos tool](doc/wiki-cn/kratos-tool.md):kratos相关工具量,包括项目快速生成、pb文件代码生成、swagger文档生成等。 + +我们致力于提供完整的微服务研发体验,整合相关框架及工具后,微服务治理相关部分可对整体业务开发周期无感,从而更加聚焦于业务交付。对每位开发者而言,整套Kratos框架也是不错的学习仓库,可以了解和参考到[bilibili](https://www.bilibili.com)在微服务方面的技术积累和经验。 + +# 快速开始 + +```shell +go get -u github.com/bilibili/kratos/tool/kratos +kratos init +``` + +`kratos init`会快速生成基于kratos库的脚手架代码,如生成[kratos-demo](https://github.com/bilibili/kratos-demo) + +```shell +cd kratos-demo/cmd +go build +./cmd -conf ../configs +``` + +打开浏览器访问:[http://localhost:8000/kratos-demo/start](http://localhost:8000/kratos-demo/start),你会看到输出了`Golang 大法好 !!!` + +# Document + +[简体中文](doc/wiki-cn/summary.md) ------------- *Please report bugs, concerns, suggestions by issues, or join QQ-group 716486124 to discuss problems around source code.* - diff --git a/doc/wiki-cn/README.md b/doc/wiki-cn/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/blademaster-mid.md b/doc/wiki-cn/blademaster-mid.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/blademaster-pb.md b/doc/wiki-cn/blademaster-pb.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/blademaster.md b/doc/wiki-cn/blademaster.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/cache-mc.md b/doc/wiki-cn/cache-mc.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/cache-redis.md b/doc/wiki-cn/cache-redis.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/cache.md b/doc/wiki-cn/cache.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/dapper.md b/doc/wiki-cn/dapper.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/database-hbase.md b/doc/wiki-cn/database-hbase.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/database-mysql.md b/doc/wiki-cn/database-mysql.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/database-tidb.md b/doc/wiki-cn/database-tidb.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/database.md b/doc/wiki-cn/database.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/kratos-tool.md b/doc/wiki-cn/kratos-tool.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/log-agent.md b/doc/wiki-cn/log-agent.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/logger.md b/doc/wiki-cn/logger.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/quickstart.md b/doc/wiki-cn/quickstart.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/summary.md b/doc/wiki-cn/summary.md new file mode 100644 index 000000000..88a935a65 --- /dev/null +++ b/doc/wiki-cn/summary.md @@ -0,0 +1,22 @@ +# Summary + +* [介绍](README.md) + * [快速开始](quickstart.md) + * [案例](https://github.com/bilibili/kratos-demo) +* [http blademaster](blademaster.md) + * [middleware](blademaster-mid.md) + * [protobuf生成](blademaster-pb.md) +* [grpc warden](warden.md) + * [middleware](warden-mid.md) + * [protobuf生成](warden-pb.md) +* [dapper trace](dapper.md) +* [log](logger.md) + * [log-agent](log-agent.md) +* [database](database.md) + * [mysql](database-mysql.md) + * [hbase](database-hbase.md) + * [tidb](database-tidb.md) +* [cache](cache.md) + * [memcache](cache-mc.md) + * [redis](cache-redis.md) +* [kratos tool](kratos-tool.md) diff --git a/doc/wiki-cn/warden-mid.md b/doc/wiki-cn/warden-mid.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/warden-pb.md b/doc/wiki-cn/warden-pb.md new file mode 100644 index 000000000..e69de29bb diff --git a/doc/wiki-cn/warden.md b/doc/wiki-cn/warden.md new file mode 100644 index 000000000..e69de29bb diff --git a/go.mod b/go.mod index 410bbbd0a..2b55cfd6f 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,14 @@ module github.com/bilibili/kratos +go 1.12 + require ( github.com/BurntSushi/toml v0.3.1 github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef // indirect github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect github.com/cznic/strutil v0.0.0-20181122101858-275e90344537 // indirect + github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec github.com/fatih/color v1.7.0 github.com/fsnotify/fsnotify v1.4.7 github.com/go-playground/locales v0.12.1 // indirect @@ -15,19 +18,20 @@ require ( github.com/golang/protobuf v1.2.0 github.com/kr/pty v1.1.4 github.com/leodido/go-urn v1.1.0 // indirect + github.com/montanaflynn/stats v0.5.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v0.9.2 github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect github.com/sirupsen/logrus v1.4.1 // indirect + github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a github.com/stretchr/testify v1.3.0 github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df github.com/urfave/cli v1.20.0 + golang.org/x/net v0.0.0-20190311183353-d8887717615a golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect google.golang.org/grpc v1.18.0 gopkg.in/AlecAivazis/survey.v1 v1.8.2 gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/validator.v9 v9.26.0 ) - -go 1.12 diff --git a/pkg/database/tidb/discovery.go b/pkg/database/tidb/discovery.go index 5457455a8..de75204fb 100644 --- a/pkg/database/tidb/discovery.go +++ b/pkg/database/tidb/discovery.go @@ -16,15 +16,13 @@ var _schema = "tidb://" func (db *DB) nodeList() (nodes []string) { var ( - insInfo *naming.InstancesInfo - insMap map[string][]*naming.Instance - ins []*naming.Instance - ok bool + insMap map[string][]*naming.Instance + ins []*naming.Instance + ok bool ) - if insInfo, ok = db.dis.Fetch(context.Background()); !ok { + if insMap, ok = db.dis.Fetch(context.Background()); !ok { return } - insMap = insInfo.Instances if ins, ok = insMap[env.Zone]; !ok || len(ins) == 0 { return } diff --git a/pkg/naming/discovery/discovery.go b/pkg/naming/discovery/discovery.go index 3548e7fca..99831b973 100644 --- a/pkg/naming/discovery/discovery.go +++ b/pkg/naming/discovery/discovery.go @@ -59,6 +59,12 @@ type Config struct { Host string } +type appData struct { + ZoneInstances map[string][]*naming.Instance `json:"zone_instances"` + LastTs int64 `json:"latest_timestamp"` + Err string `json:"err"` +} + // Discovery is discovery client. type Discovery struct { once sync.Once @@ -175,15 +181,15 @@ func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) { if !ok { return } - instances, ok := resolver.Fetch(context.Background()) + zones, ok := resolver.Fetch(context.Background()) if ok { - d.newSelf(instances) + d.newSelf(zones) } } } -func (d *Discovery) newSelf(instances *naming.InstancesInfo) { - ins, ok := instances.Instances[d.conf.Zone] +func (d *Discovery) newSelf(zones map[string][]*naming.Instance) { + ins, ok := zones[d.conf.Zone] if !ok { return } @@ -270,12 +276,12 @@ func (r *Resolver) Watch() <-chan struct{} { } // Fetch fetch resolver instance. -func (r *Resolver) Fetch(c context.Context) (ins *naming.InstancesInfo, ok bool) { +func (r *Resolver) Fetch(c context.Context) (ins map[string][]*naming.Instance, ok bool) { r.d.mutex.RLock() app, ok := r.d.apps[r.id] r.d.mutex.RUnlock() if ok { - ins, ok = app.zoneIns.Load().(*naming.InstancesInfo) + ins, ok = app.zoneIns.Load().(map[string][]*naming.Instance) return } return @@ -527,7 +533,6 @@ func (d *Discovery) serverproc() { return default: } - apps, err := d.polls(ctx, d.pickNode()) if err != nil { d.switchNode() @@ -572,7 +577,7 @@ func (d *Discovery) nodes() (nodes []string) { return } -func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]naming.InstancesInfo, err error) { +func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]appData, err error) { var ( lastTs []int64 appid []string @@ -597,8 +602,9 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]nam } uri := fmt.Sprintf(_pollURL, host) res := new(struct { - Code int `json:"code"` - Data map[string]naming.InstancesInfo `json:"data"` + Code int `json:"code"` + Message string `json:"message"` + Data map[string]appData `json:"data"` }) params := url.Values{} params.Set("env", conf.Env) @@ -611,8 +617,17 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]nam } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { if !ec.Equal(ecode.NotModified) { - log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code) + log.Error("discovery: client.Get(%s) get error code(%d) message(%s)", uri+"?"+params.Encode(), res.Code, res.Message) err = ec + if ec.Equal(ecode.NothingFound) { + for appID, value := range res.Data { + if value.Err != "" { + errInfo := fmt.Sprintf("discovery: app(%s) on ENV(%s) %s!\n", appID, conf.Env, value.Err) + log.Error(errInfo) + fmt.Fprintf(os.Stderr, errInfo) + } + } + } } return } @@ -630,12 +645,12 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]nam return } -func (d *Discovery) broadcast(apps map[string]naming.InstancesInfo) { +func (d *Discovery) broadcast(apps map[string]appData) { for id, v := range apps { var count int - for zone, ins := range v.Instances { + for zone, ins := range v.ZoneInstances { if len(ins) == 0 { - delete(v.Instances, zone) + delete(v.ZoneInstances, zone) } count += len(ins) } @@ -647,7 +662,7 @@ func (d *Discovery) broadcast(apps map[string]naming.InstancesInfo) { d.mutex.RUnlock() if ok { app.lastTs = v.LastTs - app.zoneIns.Store(v) + app.zoneIns.Store(v.ZoneInstances) d.mutex.RLock() for rs := range app.resolver { select { diff --git a/pkg/naming/naming.go b/pkg/naming/naming.go index 9b1bfa303..2ef100925 100644 --- a/pkg/naming/naming.go +++ b/pkg/naming/naming.go @@ -6,19 +6,17 @@ import ( // metadata common key const ( - MetaColor = "color" - MetaWeight = "weight" - MetaCluster = "cluster" MetaZone = "zone" + MetaCluster = "cluster" + MetaWeight = "weight" + MetaColor = "color" ) // Instance represents a server the client connects to. type Instance struct { - // Region bj/sh/gz - Region string `json:"region"` // Zone is IDC. Zone string `json:"zone"` - // Env prod/pre、uat/fat1 + // Env prod/pre/uat/fat1 Env string `json:"env"` // AppID is mapping servicetree appid. AppID string `json:"appid"` @@ -34,25 +32,13 @@ type Instance struct { // Metadata is the information associated with Addr, which may be used // to make load balancing decision. Metadata map[string]string `json:"metadata"` - Status int64 -} - -// InstancesInfo instance info. -type InstancesInfo struct { - Instances map[string][]*Instance `json:"zone_instances"` - LastTs int64 `json:"latest_timestamp"` - Scheduler []*Scheduler `json:"scheduler"` -} - -// Scheduler scheduler info in multi cluster. -type Scheduler struct { - Src string `json:"src"` - Dst map[string]int64 `json:"dst"` + // Status status + Status int64 } // Resolver resolve naming service type Resolver interface { - Fetch(context.Context) (*InstancesInfo, bool) + Fetch(context.Context) (map[string][]*Instance, bool) Watch() <-chan struct{} Close() error } diff --git a/pkg/net/http/blademaster/README.md b/pkg/net/http/blademaster/README.md new file mode 100644 index 000000000..7029187d2 --- /dev/null +++ b/pkg/net/http/blademaster/README.md @@ -0,0 +1,5 @@ +#### net/http/blademaster + +##### 项目简介 + +http 框架,带来如飞一般的体验。 diff --git a/pkg/net/http/blademaster/client.go b/pkg/net/http/blademaster/client.go index 86fc45f84..125b98f4e 100644 --- a/pkg/net/http/blademaster/client.go +++ b/pkg/net/http/blademaster/client.go @@ -137,7 +137,7 @@ func (client *Client) SetConfig(c *ClientConfig) { // TODO(zhoujiahui): param realIP should be removed later. func (client *Client) NewRequest(method, uri, realIP string, params url.Values) (req *xhttp.Request, err error) { if method == xhttp.MethodGet { - req, err = xhttp.NewRequest(xhttp.MethodGet, uri+params.Encode(), nil) + req, err = xhttp.NewRequest(xhttp.MethodGet, fmt.Sprintf("%s?%s", uri, params.Encode()), nil) } else { req, err = xhttp.NewRequest(xhttp.MethodPost, uri, strings.NewReader(params.Encode())) } diff --git a/pkg/net/netutil/breaker/README.md b/pkg/net/netutil/breaker/README.md new file mode 100644 index 000000000..f700157f5 --- /dev/null +++ b/pkg/net/netutil/breaker/README.md @@ -0,0 +1,21 @@ +#### breaker + +##### 项目简介 +1. 提供熔断器功能,供各种client(如rpc、http、msyql)等进行熔断 +2. 提供Go方法供业务在breaker熔断前后进行回调处理 + +##### 配置说明 +> 1. NewGroup(name string,c *Config)当c==nil时则采用默认配置 +> 2. 可通过breaker.Init(c *Config)替换默认配置 +> 3. 可通过group.Reload(c *Config)进行配置更新 +> 4. 默认配置如下所示: + _conf = &Config{ + Window: xtime.Duration(3 * time.Second), + Sleep: xtime.Duration(100 * time.Millisecond), + Bucket: 10, + Ratio: 0.5, + Request: 100, + } + +##### 测试 +1. 执行当前目录下所有测试文件,测试所有功能 diff --git a/pkg/net/netutil/breaker/breaker_test.go b/pkg/net/netutil/breaker/breaker_test.go new file mode 100644 index 000000000..4e024251f --- /dev/null +++ b/pkg/net/netutil/breaker/breaker_test.go @@ -0,0 +1,100 @@ +package breaker + +import ( + "errors" + "testing" + "time" + + xtime "github.com/bilibili/kratos/pkg/time" +) + +func TestGroup(t *testing.T) { + g1 := NewGroup(nil) + g2 := NewGroup(_conf) + if g1.conf != g2.conf { + t.FailNow() + } + + brk := g2.Get("key") + brk1 := g2.Get("key1") + if brk == brk1 { + t.FailNow() + } + brk2 := g2.Get("key") + if brk != brk2 { + t.FailNow() + } + + g := NewGroup(_conf) + c := &Config{ + Window: xtime.Duration(1 * time.Second), + Sleep: xtime.Duration(100 * time.Millisecond), + Bucket: 10, + Ratio: 0.5, + Request: 100, + SwitchOff: !_conf.SwitchOff, + } + g.Reload(c) + if g.conf.SwitchOff == _conf.SwitchOff { + t.FailNow() + } +} + +func TestInit(t *testing.T) { + switchOff := _conf.SwitchOff + c := &Config{ + Window: xtime.Duration(3 * time.Second), + Sleep: xtime.Duration(100 * time.Millisecond), + Bucket: 10, + Ratio: 0.5, + Request: 100, + SwitchOff: !switchOff, + } + Init(c) + if _conf.SwitchOff == switchOff { + t.FailNow() + } +} + +func TestGo(t *testing.T) { + if err := Go("test_run", func() error { + t.Log("breaker allow,callback run()") + return nil + }, func() error { + t.Log("breaker not allow,callback fallback()") + return errors.New("breaker not allow") + }); err != nil { + t.Error(err) + } + + _group.Reload(&Config{ + Window: xtime.Duration(3 * time.Second), + Sleep: xtime.Duration(100 * time.Millisecond), + Bucket: 10, + Ratio: 0.5, + Request: 100, + SwitchOff: true, + }) + + if err := Go("test_fallback", func() error { + t.Log("breaker allow,callback run()") + return nil + }, func() error { + t.Log("breaker not allow,callback fallback()") + return nil + }); err != nil { + t.Error(err) + } +} + +func markSuccess(b Breaker, count int) { + for i := 0; i < count; i++ { + b.MarkSuccess() + } +} + +func markFailed(b Breaker, count int) { + for i := 0; i < count; i++ { + b.MarkFailed() + } +} diff --git a/pkg/net/netutil/breaker/example_test.go b/pkg/net/netutil/breaker/example_test.go new file mode 100644 index 000000000..f0f386c93 --- /dev/null +++ b/pkg/net/netutil/breaker/example_test.go @@ -0,0 +1,61 @@ +package breaker_test + +import ( + "fmt" + "time" + + "github.com/bilibili/kratos/pkg/net/netutil/breaker" + xtime "github.com/bilibili/kratos/pkg/time" +) + +// ExampleGroup show group usage. +func ExampleGroup() { + c := &breaker.Config{ + Window: xtime.Duration(3 * time.Second), + Sleep: xtime.Duration(100 * time.Millisecond), + Bucket: 10, + Ratio: 0.5, + Request: 100, + } + // init default config + breaker.Init(c) + // new group + g := breaker.NewGroup(c) + // reload group config + c.Bucket = 100 + c.Request = 200 + g.Reload(c) + // get breaker by key + g.Get("key") +} + +// ExampleBreaker show breaker usage. +func ExampleBreaker() { + // new group,use default breaker config + g := breaker.NewGroup(nil) + brk := g.Get("key") + // mark request success + brk.MarkSuccess() + // mark request failed + brk.MarkFailed() + // check if breaker allow or not + if brk.Allow() == nil { + fmt.Println("breaker allow") + } else { + fmt.Println("breaker not allow") + } +} + +// ExampleGo this example create a default group and show function callback +// according to the state of breaker. +func ExampleGo() { + run := func() error { + return nil + } + fallback := func() error { + return fmt.Errorf("unknown error") + } + if err := breaker.Go("example_go", run, fallback); err != nil { + fmt.Println(err) + } +} diff --git a/pkg/net/netutil/breaker/sre_breaker.go b/pkg/net/netutil/breaker/sre_breaker.go index d175d0b2e..5e410cd98 100644 --- a/pkg/net/netutil/breaker/sre_breaker.go +++ b/pkg/net/netutil/breaker/sre_breaker.go @@ -8,12 +8,12 @@ import ( "github.com/bilibili/kratos/pkg/ecode" "github.com/bilibili/kratos/pkg/log" - "github.com/bilibili/kratos/pkg/stat/summary" + "github.com/bilibili/kratos/pkg/stat/metric" ) // sreBreaker is a sre CircuitBreaker pattern. type sreBreaker struct { - stat summary.Summary + stat metric.RollingCounter k float64 request int64 @@ -23,8 +23,13 @@ type sreBreaker struct { } func newSRE(c *Config) Breaker { + counterOpts := metric.RollingCounterOpts{ + Size: c.Bucket, + BucketDuration: time.Duration(int64(c.Window) / int64(c.Bucket)), + } + stat := metric.NewRollingCounter(counterOpts) return &sreBreaker{ - stat: summary.New(time.Duration(c.Window), c.Bucket), + stat: stat, r: rand.New(rand.NewSource(time.Now().UnixNano())), request: c.Request, @@ -33,8 +38,22 @@ func newSRE(c *Config) Breaker { } } +func (b *sreBreaker) summary() (success int64, total int64) { + b.stat.Reduce(func(iterator metric.Iterator) float64 { + for iterator.Next() { + bucket := iterator.Bucket() + total += bucket.Count + for _, p := range bucket.Points { + success += int64(p) + } + } + return 0 + }) + return +} + func (b *sreBreaker) Allow() error { - success, total := b.stat.Value() + success, total := b.summary() k := b.k * float64(success) if log.V(5) { log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success) diff --git a/pkg/net/netutil/breaker/sre_breaker_test.go b/pkg/net/netutil/breaker/sre_breaker_test.go new file mode 100644 index 000000000..4559c4fc5 --- /dev/null +++ b/pkg/net/netutil/breaker/sre_breaker_test.go @@ -0,0 +1,161 @@ +package breaker + +import ( + "math" + "math/rand" + "testing" + "time" + + "github.com/bilibili/kratos/pkg/stat/metric" + xtime "github.com/bilibili/kratos/pkg/time" + + "github.com/stretchr/testify/assert" +) + +func getSRE() Breaker { + return NewGroup(&Config{ + Window: xtime.Duration(1 * time.Second), + Bucket: 10, + Request: 100, + K: 2, + }).Get("") +} + +func getSREBreaker() *sreBreaker { + counterOpts := metric.RollingCounterOpts{ + Size: 10, + BucketDuration: time.Millisecond * 100, + } + stat := metric.NewRollingCounter(counterOpts) + return &sreBreaker{ + stat: stat, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + + request: 100, + k: 2, + state: StateClosed, + } +} + +func markSuccessWithDuration(b Breaker, count int, sleep time.Duration) { + for i := 0; i < count; i++ { + b.MarkSuccess() + time.Sleep(sleep) + } +} + +func markFailedWithDuration(b Breaker, count int, sleep time.Duration) { + for i := 0; i < count; i++ { + b.MarkFailed() + time.Sleep(sleep) + } +} + +func testSREClose(t *testing.T, b Breaker) { + markSuccess(b, 80) + assert.Equal(t, b.Allow(), nil) + markSuccess(b, 120) + assert.Equal(t, b.Allow(), nil) +} + +func testSREOpen(t *testing.T, b Breaker) { + markSuccess(b, 100) + assert.Equal(t, b.Allow(), nil) + markFailed(b, 10000000) + assert.NotEqual(t, b.Allow(), nil) +} + +func testSREHalfOpen(t *testing.T, b Breaker) { + // failback + assert.Equal(t, b.Allow(), nil) + t.Run("allow single failed", func(t *testing.T) { + markFailed(b, 10000000) + assert.NotEqual(t, b.Allow(), nil) + }) + time.Sleep(2 * time.Second) + t.Run("allow single succeed", func(t *testing.T) { + assert.Equal(t, b.Allow(), nil) + markSuccess(b, 10000000) + assert.Equal(t, b.Allow(), nil) + }) +} + +func TestSRE(t *testing.T) { + b := getSRE() + testSREClose(t, b) + + b = getSRE() + testSREOpen(t, b) + + b = getSRE() + testSREHalfOpen(t, b) +} + +func TestSRESelfProtection(t *testing.T) { + t.Run("total request < 100", func(t *testing.T) { + b := getSRE() + markFailed(b, 99) + assert.Equal(t, b.Allow(), nil) + }) + t.Run("total request > 100, total < 2 * success", func(t *testing.T) { + b := getSRE() + size := rand.Intn(10000000) + succ := int(math.Ceil(float64(size))) + 1 + markSuccess(b, succ) + markFailed(b, size-succ) + assert.Equal(t, b.Allow(), nil) + }) +} + +func TestSRESummary(t *testing.T) { + var ( + b *sreBreaker + succ, total int64 + ) + + sleep := 50 * time.Millisecond + t.Run("succ == total", func(t *testing.T) { + b = getSREBreaker() + markSuccessWithDuration(b, 10, sleep) + succ, total = b.summary() + assert.Equal(t, succ, int64(10)) + assert.Equal(t, total, int64(10)) + }) + + t.Run("fail == total", func(t *testing.T) { + b = getSREBreaker() + markFailedWithDuration(b, 10, sleep) + succ, total = b.summary() + assert.Equal(t, succ, int64(0)) + assert.Equal(t, total, int64(10)) + }) + + t.Run("succ = 1/2 * total, fail = 1/2 * total", func(t *testing.T) { + b = getSREBreaker() + markFailedWithDuration(b, 5, sleep) + markSuccessWithDuration(b, 5, sleep) + succ, total = b.summary() + assert.Equal(t, succ, int64(5)) + assert.Equal(t, total, int64(10)) + }) + + t.Run("auto reset rolling counter", func(t *testing.T) { + time.Sleep(time.Second) + succ, total = b.summary() + assert.Equal(t, succ, int64(0)) + assert.Equal(t, total, int64(0)) + }) +} + +func BenchmarkSreBreakerAllow(b *testing.B) { + breaker := getSRE() + b.ResetTimer() + for i := 0; i <= b.N; i++ { + breaker.Allow() + if i%2 == 0 { + breaker.MarkSuccess() + } else { + breaker.MarkFailed() + } + } +} diff --git a/pkg/net/rpc/warden/OWNERS b/pkg/net/rpc/warden/OWNERS deleted file mode 100644 index e1706e43d..000000000 --- a/pkg/net/rpc/warden/OWNERS +++ /dev/null @@ -1,10 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -approvers: -- caoguoliang -- maojian -labels: -- library -reviewers: -- caoguoliang -- maojian diff --git a/pkg/net/rpc/warden/README.md b/pkg/net/rpc/warden/README.md index 3f8021442..d3d062ade 100644 --- a/pkg/net/rpc/warden/README.md +++ b/pkg/net/rpc/warden/README.md @@ -1,13 +1,5 @@ -#### net/rcp/warden +#### net/rpc/warden ##### 项目简介 -来自 bilibili 主站技术部的 RPC 框架,融合主站技术部的核心科技,带来如飞一般的体验。 - -##### 编译环境 - -- **请只用 Golang v1.9.x 以上版本编译执行** - -##### 依赖包 - -- [grpc](google.golang.org/grpc) +gRPC 框架,带来如飞一般的体验。 diff --git a/pkg/net/rpc/warden/balancer/p2c/CHANGELOG.md b/pkg/net/rpc/warden/balancer/p2c/CHANGELOG.md deleted file mode 100644 index f60e0eef4..000000000 --- a/pkg/net/rpc/warden/balancer/p2c/CHANGELOG.md +++ /dev/null @@ -1,20 +0,0 @@ -### business/warden/balancer/p2c - -### Version 1.3.1 -1. add more test - -### Version 1.3 -1. P2C替换smooth weighted round-robin - -##### Version 1.2.1 -1. 删除了netflix ribbon的权重算法,改成了平方根算法 - -##### Version 1.2.0 -1. 实现了动态计算的调度轮询算法(使用了服务端的成功率数据,替换基于本地计算的成功率数据) - -##### Version 1.1.0 -1. 实现了动态计算的调度轮询算法 - -##### Version 1.0.0 - -1. 实现了带权重可以识别Color的轮询算法 diff --git a/pkg/net/rpc/warden/balancer/p2c/OWNERS b/pkg/net/rpc/warden/balancer/p2c/OWNERS deleted file mode 100644 index 464e4961d..000000000 --- a/pkg/net/rpc/warden/balancer/p2c/OWNERS +++ /dev/null @@ -1,9 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -approvers: -- caoguoliang -labels: -- library -reviewers: -- caoguoliang -- maojian diff --git a/pkg/net/rpc/warden/balancer/p2c/README.md b/pkg/net/rpc/warden/balancer/p2c/README.md index d43bd7908..97b380952 100644 --- a/pkg/net/rpc/warden/balancer/p2c/README.md +++ b/pkg/net/rpc/warden/balancer/p2c/README.md @@ -1,13 +1,5 @@ -#### business/warden/balancer/wrr +#### warden/balancer/p2c ##### 项目简介 -warden 的 weighted round robin负载均衡模块,主要用于为每个RPC请求返回一个Server节点以供调用 - -##### 编译环境 - -- **请只用 Golang v1.9.x 以上版本编译执行** - -##### 依赖包 - -- [grpc](google.golang.org/grpc) \ No newline at end of file +warden 的 Power of Two Choices (P2C)负载均衡模块,主要用于为每个RPC请求返回一个Server节点以供调用 diff --git a/pkg/net/rpc/warden/balancer/wrr/CHANGELOG.md b/pkg/net/rpc/warden/balancer/wrr/CHANGELOG.md deleted file mode 100644 index f38d1b92a..000000000 --- a/pkg/net/rpc/warden/balancer/wrr/CHANGELOG.md +++ /dev/null @@ -1,17 +0,0 @@ -### business/warden/balancer/wrr - -##### Version 1.3.0 -1. 迁移 stat.Summary 到 metric.RollingCounter,metric.RollingGauge - -##### Version 1.2.1 -1. 删除了netflix ribbon的权重算法,改成了平方根算法 - -##### Version 1.2.0 -1. 实现了动态计算的调度轮询算法(使用了服务端的成功率数据,替换基于本地计算的成功率数据) - -##### Version 1.1.0 -1. 实现了动态计算的调度轮询算法 - -##### Version 1.0.0 - -1. 实现了带权重可以识别Color的轮询算法 diff --git a/pkg/net/rpc/warden/balancer/wrr/OWNERS b/pkg/net/rpc/warden/balancer/wrr/OWNERS deleted file mode 100644 index 464e4961d..000000000 --- a/pkg/net/rpc/warden/balancer/wrr/OWNERS +++ /dev/null @@ -1,9 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -approvers: -- caoguoliang -labels: -- library -reviewers: -- caoguoliang -- maojian diff --git a/pkg/net/rpc/warden/balancer/wrr/README.md b/pkg/net/rpc/warden/balancer/wrr/README.md index d43bd7908..9483e71dd 100644 --- a/pkg/net/rpc/warden/balancer/wrr/README.md +++ b/pkg/net/rpc/warden/balancer/wrr/README.md @@ -1,13 +1,5 @@ -#### business/warden/balancer/wrr +#### warden/balancer/wrr ##### 项目简介 warden 的 weighted round robin负载均衡模块,主要用于为每个RPC请求返回一个Server节点以供调用 - -##### 编译环境 - -- **请只用 Golang v1.9.x 以上版本编译执行** - -##### 依赖包 - -- [grpc](google.golang.org/grpc) \ No newline at end of file diff --git a/pkg/net/rpc/warden/resolver/CHANGELOG.md b/pkg/net/rpc/warden/resolver/CHANGELOG.md deleted file mode 100644 index 889c4f3d8..000000000 --- a/pkg/net/rpc/warden/resolver/CHANGELOG.md +++ /dev/null @@ -1,17 +0,0 @@ -### business/warden/resolver - -##### Version 1.1.1 -1. add dial helper - -##### Version 1.1.0 -1. 增加了子集选择算法 - -##### Version 1.0.2 -1. 增加GET接口 - -##### Version 1.0.1 -1. 支持zone和clusters - - -##### Version 1.0.0 -1. 实现了基本的服务发现功能 diff --git a/pkg/net/rpc/warden/resolver/OWNERS b/pkg/net/rpc/warden/resolver/OWNERS deleted file mode 100644 index 464e4961d..000000000 --- a/pkg/net/rpc/warden/resolver/OWNERS +++ /dev/null @@ -1,9 +0,0 @@ -# See the OWNERS docs at https://go.k8s.io/owners - -approvers: -- caoguoliang -labels: -- library -reviewers: -- caoguoliang -- maojian diff --git a/pkg/net/rpc/warden/resolver/README.md b/pkg/net/rpc/warden/resolver/README.md index e50d80f44..456e74090 100644 --- a/pkg/net/rpc/warden/resolver/README.md +++ b/pkg/net/rpc/warden/resolver/README.md @@ -1,13 +1,5 @@ -#### business/warden/resolver +#### warden/resolver ##### 项目简介 warden 的 服务发现模块,用于从底层的注册中心中获取Server节点列表并返回给GRPC - -##### 编译环境 - -- **请只用 Golang v1.9.x 以上版本编译执行** - -##### 依赖包 - -- [grpc](google.golang.org/grpc) \ No newline at end of file diff --git a/pkg/net/rpc/warden/resolver/direct/CHANGELOG.md b/pkg/net/rpc/warden/resolver/direct/CHANGELOG.md deleted file mode 100644 index 7ea59f026..000000000 --- a/pkg/net/rpc/warden/resolver/direct/CHANGELOG.md +++ /dev/null @@ -1,6 +0,0 @@ -### business/warden/resolver/direct - - -##### Version 1.0.0 - -1. 实现了基本的服务发现直连功能 diff --git a/pkg/net/rpc/warden/resolver/direct/README.md b/pkg/net/rpc/warden/resolver/direct/README.md index 5c5259f16..28f5d0ace 100644 --- a/pkg/net/rpc/warden/resolver/direct/README.md +++ b/pkg/net/rpc/warden/resolver/direct/README.md @@ -1,14 +1,6 @@ -#### business/warden/resolver/direct +#### warden/resolver/direct ##### 项目简介 warden 的直连服务模块,用于通过IP地址列表直接连接后端服务 连接字符串格式: direct://default/192.168.1.1:8080,192.168.1.2:8081 - -##### 编译环境 - -- **请只用 Golang v1.9.x 以上版本编译执行** - -##### 依赖包 - -- [grpc](google.golang.org/grpc) \ No newline at end of file diff --git a/pkg/net/rpc/warden/resolver/resolver.go b/pkg/net/rpc/warden/resolver/resolver.go index 1be36ae10..f459e5474 100644 --- a/pkg/net/rpc/warden/resolver/resolver.go +++ b/pkg/net/rpc/warden/resolver/resolver.go @@ -128,10 +128,10 @@ func (r *Resolver) updateproc() { return } } - if insInfo, ok := r.nr.Fetch(context.Background()); ok { - instances, ok := insInfo.Instances[r.zone] + if insMap, ok := r.nr.Fetch(context.Background()); ok { + instances, ok := insMap[r.zone] if !ok { - for _, value := range insInfo.Instances { + for _, value := range insMap { instances = append(instances, value...) } } diff --git a/pkg/stat/metric/counter.go b/pkg/stat/metric/counter.go new file mode 100644 index 000000000..864f5ff0c --- /dev/null +++ b/pkg/stat/metric/counter.go @@ -0,0 +1,36 @@ +package metric + +import ( + "fmt" + "sync/atomic" +) + +var _ Metric = &counter{} + +// Counter stores a numerical value that only ever goes up. +type Counter interface { + Metric +} + +// CounterOpts is an alias of Opts. +type CounterOpts Opts + +type counter struct { + val int64 +} + +// NewCounter creates a new Counter based on the CounterOpts. +func NewCounter(opts CounterOpts) Counter { + return &counter{} +} + +func (c *counter) Add(val int64) { + if val < 0 { + panic(fmt.Errorf("stat/metric: cannot decrease in negative value. val: %d", val)) + } + atomic.AddInt64(&c.val, val) +} + +func (c *counter) Value() int64 { + return atomic.LoadInt64(&c.val) +} diff --git a/pkg/stat/metric/counter_test.go b/pkg/stat/metric/counter_test.go new file mode 100644 index 000000000..5229bdc81 --- /dev/null +++ b/pkg/stat/metric/counter_test.go @@ -0,0 +1,18 @@ +package metric + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCounter(t *testing.T) { + counter := NewCounter(CounterOpts{}) + count := rand.Intn(100) + for i := 0; i < count; i++ { + counter.Add(1) + } + val := counter.Value() + assert.Equal(t, val, int64(count)) +} diff --git a/pkg/stat/metric/gauge.go b/pkg/stat/metric/gauge.go new file mode 100644 index 000000000..928b7fe32 --- /dev/null +++ b/pkg/stat/metric/gauge.go @@ -0,0 +1,37 @@ +package metric + +import "sync/atomic" + +var _ Metric = &gauge{} + +// Gauge stores a numerical value that can be add arbitrarily. +type Gauge interface { + Metric + // Sets sets the value to the given number. + Set(int64) +} + +// GaugeOpts is an alias of Opts. +type GaugeOpts Opts + +type gauge struct { + val int64 +} + +// NewGauge creates a new Gauge based on the GaugeOpts. +func NewGauge(opts GaugeOpts) Gauge { + return &gauge{} +} + +func (g *gauge) Add(val int64) { + atomic.AddInt64(&g.val, val) +} + +func (g *gauge) Set(val int64) { + old := atomic.LoadInt64(&g.val) + atomic.CompareAndSwapInt64(&g.val, old, val) +} + +func (g *gauge) Value() int64 { + return atomic.LoadInt64(&g.val) +} diff --git a/pkg/stat/metric/gauge_test.go b/pkg/stat/metric/gauge_test.go new file mode 100644 index 000000000..b3cb26d22 --- /dev/null +++ b/pkg/stat/metric/gauge_test.go @@ -0,0 +1,23 @@ +package metric + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGaugeAdd(t *testing.T) { + gauge := NewGauge(GaugeOpts{}) + gauge.Add(100) + gauge.Add(-50) + val := gauge.Value() + assert.Equal(t, val, int64(50)) +} + +func TestGaugeSet(t *testing.T) { + gauge := NewGauge(GaugeOpts{}) + gauge.Add(100) + gauge.Set(50) + val := gauge.Value() + assert.Equal(t, val, int64(50)) +} diff --git a/pkg/stat/metric/iterator.go b/pkg/stat/metric/iterator.go new file mode 100644 index 000000000..b0cac693d --- /dev/null +++ b/pkg/stat/metric/iterator.go @@ -0,0 +1,26 @@ +package metric + +import "fmt" + +// Iterator iterates the buckets within the window. +type Iterator struct { + count int + iteratedCount int + cur *Bucket +} + +// Next returns true util all of the buckets has been iterated. +func (i *Iterator) Next() bool { + return i.count != i.iteratedCount +} + +// Bucket gets current bucket. +func (i *Iterator) Bucket() Bucket { + if !(i.Next()) { + panic(fmt.Errorf("stat/metric: iteration out of range iteratedCount: %d count: %d", i.iteratedCount, i.count)) + } + bucket := *i.cur + i.iteratedCount++ + i.cur = i.cur.Next() + return bucket +} diff --git a/pkg/stat/metric/metric.go b/pkg/stat/metric/metric.go new file mode 100644 index 000000000..0c45172b6 --- /dev/null +++ b/pkg/stat/metric/metric.go @@ -0,0 +1,30 @@ +package metric + +// Opts contains the common arguments for creating Metric. +type Opts struct { +} + +// Metric is a sample interface. +// Implementations of Metrics in metric package are Counter, Gauge, +// PointGauge, RollingCounter and RollingGauge. +type Metric interface { + // Add adds the given value to the counter. + Add(int64) + // Value gets the current value. + // If the metric's type is PointGauge, RollingCounter, RollingGauge, + // it returns the sum value within the window. + Value() int64 +} + +// Aggregation contains some common aggregation function. +// Each aggregation can compute summary statistics of window. +type Aggregation interface { + // Min finds the min value within the window. + Min() float64 + // Max finds the max value within the window. + Max() float64 + // Avg computes average value within the window. + Avg() float64 + // Sum computes sum value within the window. + Sum() float64 +} diff --git a/pkg/stat/metric/point_gauge.go b/pkg/stat/metric/point_gauge.go new file mode 100644 index 000000000..0fc15a622 --- /dev/null +++ b/pkg/stat/metric/point_gauge.go @@ -0,0 +1,61 @@ +package metric + +var _ Metric = &pointGauge{} +var _ Aggregation = &pointGauge{} + +// PointGauge represents a ring window. +// Every buckets within the window contains one point. +// When the window is full, the earliest point will be overwrite. +type PointGauge interface { + Aggregation + Metric + // Reduce applies the reduction function to all buckets within the window. + Reduce(func(Iterator) float64) float64 +} + +// PointGaugeOpts contains the arguments for creating PointGauge. +type PointGaugeOpts struct { + // Size represents the bucket size within the window. + Size int +} + +type pointGauge struct { + policy *PointPolicy +} + +// NewPointGauge creates a new PointGauge based on PointGaugeOpts. +func NewPointGauge(opts PointGaugeOpts) PointGauge { + window := NewWindow(WindowOpts{Size: opts.Size}) + policy := NewPointPolicy(window) + return &pointGauge{ + policy: policy, + } +} + +func (r *pointGauge) Add(val int64) { + r.policy.Append(float64(val)) +} + +func (r *pointGauge) Reduce(f func(Iterator) float64) float64 { + return r.policy.Reduce(f) +} + +func (r *pointGauge) Avg() float64 { + return r.policy.Reduce(Avg) +} + +func (r *pointGauge) Min() float64 { + return r.policy.Reduce(Min) +} + +func (r *pointGauge) Max() float64 { + return r.policy.Reduce(Max) +} + +func (r *pointGauge) Sum() float64 { + return r.policy.Reduce(Sum) +} + +func (r *pointGauge) Value() int64 { + return int64(r.Sum()) +} diff --git a/pkg/stat/metric/point_gauge_test.go b/pkg/stat/metric/point_gauge_test.go new file mode 100644 index 000000000..fcfee4788 --- /dev/null +++ b/pkg/stat/metric/point_gauge_test.go @@ -0,0 +1,55 @@ +package metric + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPointGaugeAdd(t *testing.T) { + opts := PointGaugeOpts{Size: 3} + pointGauge := NewPointGauge(opts) + listBuckets := func() [][]float64 { + buckets := make([][]float64, 0) + pointGauge.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + buckets = append(buckets, bucket.Points) + } + return 0.0 + }) + return buckets + } + assert.Equal(t, [][]float64{{}, {}, {}}, listBuckets(), "Empty Buckets") + pointGauge.Add(1) + assert.Equal(t, [][]float64{{}, {}, {1}}, listBuckets(), "Point 1") + pointGauge.Add(2) + assert.Equal(t, [][]float64{{}, {1}, {2}}, listBuckets(), "Point 1, 2") + pointGauge.Add(3) + assert.Equal(t, [][]float64{{1}, {2}, {3}}, listBuckets(), "Point 1, 2, 3") + pointGauge.Add(4) + assert.Equal(t, [][]float64{{2}, {3}, {4}}, listBuckets(), "Point 2, 3, 4") + pointGauge.Add(5) + assert.Equal(t, [][]float64{{3}, {4}, {5}}, listBuckets(), "Point 3, 4, 5") +} + +func TestPointGaugeReduce(t *testing.T) { + opts := PointGaugeOpts{Size: 10} + pointGauge := NewPointGauge(opts) + for i := 0; i < opts.Size; i++ { + pointGauge.Add(int64(i)) + } + var _ = pointGauge.Reduce(func(i Iterator) float64 { + idx := 0 + for i.Next() { + bucket := i.Bucket() + assert.Equal(t, bucket.Points[0], float64(idx), "validate points of pointGauge") + idx++ + } + return 0.0 + }) + assert.Equal(t, float64(9), pointGauge.Max(), "validate max of pointGauge") + assert.Equal(t, float64(4.5), pointGauge.Avg(), "validate avg of pointGauge") + assert.Equal(t, float64(0), pointGauge.Min(), "validate min of pointGauge") + assert.Equal(t, float64(45), pointGauge.Sum(), "validate sum of pointGauge") +} diff --git a/pkg/stat/metric/point_policy.go b/pkg/stat/metric/point_policy.go new file mode 100644 index 000000000..aae8934e9 --- /dev/null +++ b/pkg/stat/metric/point_policy.go @@ -0,0 +1,57 @@ +package metric + +import "sync" + +// PointPolicy is a policy of points within the window. +// PointPolicy wraps the window and make it seem like ring-buf. +// When using PointPolicy, every buckets within the windows contains at more one point. +// e.g. [[1], [2], [3]] +type PointPolicy struct { + mu sync.RWMutex + size int + window *Window + offset int +} + +// NewPointPolicy creates a new PointPolicy. +func NewPointPolicy(window *Window) *PointPolicy { + return &PointPolicy{ + window: window, + size: window.Size(), + offset: -1, + } +} + +func (p *PointPolicy) prevOffset() int { + return p.offset +} + +func (p *PointPolicy) nextOffset() int { + return (p.prevOffset() + 1) % p.size +} + +func (p *PointPolicy) updateOffset(offset int) { + p.offset = offset +} + +// Append appends the given points to the window. +func (p *PointPolicy) Append(val float64) { + p.mu.Lock() + defer p.mu.Unlock() + offset := p.nextOffset() + p.window.ResetBucket(offset) + p.window.Append(offset, val) + p.updateOffset(offset) +} + +// Reduce applies the reduction function to all buckets within the window. +func (p *PointPolicy) Reduce(f func(Iterator) float64) float64 { + p.mu.RLock() + defer p.mu.RUnlock() + offset := p.offset + 1 + if offset == p.size { + offset = 0 + } + iterator := p.window.Iterator(offset, p.size) + return f(iterator) +} diff --git a/pkg/stat/metric/reduce.go b/pkg/stat/metric/reduce.go new file mode 100644 index 000000000..20165c984 --- /dev/null +++ b/pkg/stat/metric/reduce.go @@ -0,0 +1,77 @@ +package metric + +// Sum the values within the window. +func Sum(iterator Iterator) float64 { + var result = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result = result + p + } + } + return result +} + +// Avg the values within the window. +func Avg(iterator Iterator) float64 { + var result = 0.0 + var count = 0.0 + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + result = result + p + count = count + 1 + } + } + return result / count +} + +// Min the values within the window. +func Min(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p < result { + result = p + } + } + } + return result +} + +// Max the values within the window. +func Max(iterator Iterator) float64 { + var result = 0.0 + var started = false + for iterator.Next() { + bucket := iterator.Bucket() + for _, p := range bucket.Points { + if !started { + result = p + started = true + continue + } + if p > result { + result = p + } + } + } + return result +} + +// Count sums the count value within the window. +func Count(iterator Iterator) float64 { + var result int64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Count + } + return float64(result) +} diff --git a/pkg/stat/metric/reduce_test.go b/pkg/stat/metric/reduce_test.go new file mode 100644 index 000000000..5165dd293 --- /dev/null +++ b/pkg/stat/metric/reduce_test.go @@ -0,0 +1,17 @@ +package metric + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCount(t *testing.T) { + opts := PointGaugeOpts{Size: 10} + pointGauge := NewPointGauge(opts) + for i := 0; i < opts.Size; i++ { + pointGauge.Add(int64(i)) + } + result := pointGauge.Reduce(Count) + assert.Equal(t, float64(10), result, "validate count of pointGauge") +} diff --git a/pkg/stat/metric/rolling_counter.go b/pkg/stat/metric/rolling_counter.go new file mode 100644 index 000000000..bbaf7a138 --- /dev/null +++ b/pkg/stat/metric/rolling_counter.go @@ -0,0 +1,68 @@ +package metric + +import ( + "fmt" + "time" +) + +var _ Metric = &rollingCounter{} +var _ Aggregation = &rollingCounter{} + +// RollingCounter represents a ring window based on time duration. +// e.g. [[1], [3], [5]] +type RollingCounter interface { + Metric + Aggregation + // Reduce applies the reduction function to all buckets within the window. + Reduce(func(Iterator) float64) float64 +} + +// RollingCounterOpts contains the arguments for creating RollingCounter. +type RollingCounterOpts struct { + Size int + BucketDuration time.Duration +} + +type rollingCounter struct { + policy *RollingPolicy +} + +// NewRollingCounter creates a new RollingCounter bases on RollingCounterOpts. +func NewRollingCounter(opts RollingCounterOpts) RollingCounter { + window := NewWindow(WindowOpts{Size: opts.Size}) + policy := NewRollingPolicy(window, RollingPolicyOpts{BucketDuration: opts.BucketDuration}) + return &rollingCounter{ + policy: policy, + } +} + +func (r *rollingCounter) Add(val int64) { + if val < 0 { + panic(fmt.Errorf("stat/metric: cannot decrease in value. val: %d", val)) + } + r.policy.Add(float64(val)) +} + +func (r *rollingCounter) Reduce(f func(Iterator) float64) float64 { + return r.policy.Reduce(f) +} + +func (r *rollingCounter) Avg() float64 { + return r.policy.Reduce(Avg) +} + +func (r *rollingCounter) Min() float64 { + return r.policy.Reduce(Min) +} + +func (r *rollingCounter) Max() float64 { + return r.policy.Reduce(Max) +} + +func (r *rollingCounter) Sum() float64 { + return r.policy.Reduce(Sum) +} + +func (r *rollingCounter) Value() int64 { + return int64(r.Sum()) +} diff --git a/pkg/stat/metric/rolling_counter_test.go b/pkg/stat/metric/rolling_counter_test.go new file mode 100644 index 000000000..82caa52e1 --- /dev/null +++ b/pkg/stat/metric/rolling_counter_test.go @@ -0,0 +1,156 @@ +package metric + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRollingCounterAdd(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + listBuckets := func() [][]float64 { + buckets := make([][]float64, 0) + r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + buckets = append(buckets, bucket.Points) + } + return 0.0 + }) + return buckets + } + assert.Equal(t, [][]float64{{}, {}, {}}, listBuckets()) + r.Add(1) + assert.Equal(t, [][]float64{{}, {}, {1}}, listBuckets()) + time.Sleep(time.Second) + r.Add(2) + r.Add(3) + assert.Equal(t, [][]float64{{}, {1}, {5}}, listBuckets()) + time.Sleep(time.Second) + r.Add(4) + r.Add(5) + r.Add(6) + assert.Equal(t, [][]float64{{1}, {5}, {15}}, listBuckets()) + time.Sleep(time.Second) + r.Add(7) + assert.Equal(t, [][]float64{{5}, {15}, {7}}, listBuckets()) +} + +func TestRollingCounterReduce(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for x := 0; x < size; x = x + 1 { + for i := 0; i <= x; i++ { + r.Add(1) + } + if x < size-1 { + time.Sleep(bucketDuration) + } + } + var result = r.Reduce(func(iterator Iterator) float64 { + var result float64 + for iterator.Next() { + bucket := iterator.Bucket() + result += bucket.Points[0] + } + return result + }) + if result != 6.0 { + t.Fatalf("Validate sum of points. result: %f", result) + } +} + +func TestRollingCounterDataRace(t *testing.T) { + size := 3 + bucketDuration := time.Millisecond * 10 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + var stop = make(chan bool) + go func() { + for { + select { + case <-stop: + return + default: + r.Add(1) + time.Sleep(time.Millisecond * 5) + } + } + }() + go func() { + for { + select { + case <-stop: + return + default: + _ = r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + for range bucket.Points { + continue + } + } + return 0 + }) + } + } + }() + time.Sleep(time.Second * 3) + close(stop) +} + +func BenchmarkRollingCounterIncr(b *testing.B) { + size := 3 + bucketDuration := time.Millisecond * 100 + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + b.ResetTimer() + for i := 0; i <= b.N; i++ { + r.Add(1) + } +} + +func BenchmarkRollingCounterReduce(b *testing.B) { + size := 3 + bucketDuration := time.Second + opts := RollingCounterOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingCounter(opts) + for i := 0; i <= 10; i++ { + r.Add(1) + time.Sleep(time.Millisecond * 500) + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + var _ = r.Reduce(func(i Iterator) float64 { + var result float64 + for i.Next() { + bucket := i.Bucket() + if len(bucket.Points) != 0 { + result += bucket.Points[0] + } + } + return result + }) + } +} diff --git a/pkg/stat/metric/rolling_gauge.go b/pkg/stat/metric/rolling_gauge.go new file mode 100644 index 000000000..c065a0228 --- /dev/null +++ b/pkg/stat/metric/rolling_gauge.go @@ -0,0 +1,62 @@ +package metric + +import "time" + +var _ Metric = &rollingGauge{} +var _ Aggregation = &rollingGauge{} + +// RollingGauge represents a ring window based on time duration. +// e.g. [[1, 2], [1, 2, 3], [1,2, 3, 4]] +type RollingGauge interface { + Metric + Aggregation + // Reduce applies the reduction function to all buckets within the window. + Reduce(func(Iterator) float64) float64 +} + +// RollingGaugeOpts contains the arguments for creating RollingGauge. +type RollingGaugeOpts struct { + Size int + BucketDuration time.Duration +} + +type rollingGauge struct { + policy *RollingPolicy +} + +// NewRollingGauge creates a new RollingGauge baseed on RollingGaugeOpts. +func NewRollingGauge(opts RollingGaugeOpts) RollingGauge { + window := NewWindow(WindowOpts{Size: opts.Size}) + policy := NewRollingPolicy(window, RollingPolicyOpts{BucketDuration: opts.BucketDuration}) + return &rollingGauge{ + policy: policy, + } +} + +func (r *rollingGauge) Add(val int64) { + r.policy.Append(float64(val)) +} + +func (r *rollingGauge) Reduce(f func(Iterator) float64) float64 { + return r.policy.Reduce(f) +} + +func (r *rollingGauge) Avg() float64 { + return r.policy.Reduce(Avg) +} + +func (r *rollingGauge) Min() float64 { + return r.policy.Reduce(Min) +} + +func (r *rollingGauge) Max() float64 { + return r.policy.Reduce(Max) +} + +func (r *rollingGauge) Sum() float64 { + return r.policy.Reduce(Sum) +} + +func (r *rollingGauge) Value() int64 { + return int64(r.Sum()) +} diff --git a/pkg/stat/metric/rolling_gauge_test.go b/pkg/stat/metric/rolling_gauge_test.go new file mode 100644 index 000000000..825ea1551 --- /dev/null +++ b/pkg/stat/metric/rolling_gauge_test.go @@ -0,0 +1,192 @@ +package metric + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRollingGaugeAdd(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingGaugeOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingGauge(opts) + listBuckets := func() [][]float64 { + buckets := make([][]float64, 0) + r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + buckets = append(buckets, bucket.Points) + } + return 0.0 + }) + return buckets + } + assert.Equal(t, [][]float64{{}, {}, {}}, listBuckets()) + r.Add(1) + assert.Equal(t, [][]float64{{}, {}, {1}}, listBuckets()) + time.Sleep(time.Second) + r.Add(2) + r.Add(3) + assert.Equal(t, [][]float64{{}, {1}, {2, 3}}, listBuckets()) + time.Sleep(time.Second) + r.Add(4) + r.Add(5) + r.Add(6) + assert.Equal(t, [][]float64{{1}, {2, 3}, {4, 5, 6}}, listBuckets()) + time.Sleep(time.Second) + r.Add(7) + assert.Equal(t, [][]float64{{2, 3}, {4, 5, 6}, {7}}, listBuckets()) +} + +func TestRollingGaugeReset(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingGaugeOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingGauge(opts) + listBuckets := func() [][]float64 { + buckets := make([][]float64, 0) + r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + buckets = append(buckets, bucket.Points) + } + return 0.0 + }) + return buckets + } + r.Add(1) + time.Sleep(time.Second) + assert.Equal(t, [][]float64{{}, {1}}, listBuckets()) + time.Sleep(time.Second) + assert.Equal(t, [][]float64{{1}}, listBuckets()) + time.Sleep(time.Second) + assert.Equal(t, [][]float64{}, listBuckets()) + + // cross window + r.Add(1) + time.Sleep(time.Second * 5) + assert.Equal(t, [][]float64{}, listBuckets()) +} + +func TestRollingGaugeReduce(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingGaugeOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingGauge(opts) + for x := 0; x < size; x = x + 1 { + for i := 0; i <= x; i++ { + r.Add(int64(i)) + } + if x < size-1 { + time.Sleep(bucketDuration) + } + } + var result = r.Reduce(func(i Iterator) float64 { + var result float64 + for i.Next() { + bucket := i.Bucket() + for _, point := range bucket.Points { + result += point + } + } + return result + }) + if result != 4.0 { + t.Fatalf("Validate sum of points. result: %f", result) + } +} + +func TestRollingGaugeDataRace(t *testing.T) { + size := 3 + bucketDuration := time.Second + opts := RollingGaugeOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingGauge(opts) + var stop = make(chan bool) + go func() { + for { + select { + case <-stop: + return + default: + r.Add(rand.Int63()) + time.Sleep(time.Millisecond * 5) + } + } + }() + go func() { + for { + select { + case <-stop: + return + default: + _ = r.Reduce(func(i Iterator) float64 { + for i.Next() { + bucket := i.Bucket() + for range bucket.Points { + continue + } + } + return 0 + }) + } + } + }() + time.Sleep(time.Second * 3) + close(stop) +} + +func BenchmarkRollingGaugeIncr(b *testing.B) { + size := 10 + bucketDuration := time.Second + opts := RollingGaugeOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingGauge(opts) + b.ResetTimer() + for i := 0; i <= b.N; i++ { + r.Add(1.0) + } +} + +func BenchmarkRollingGaugeReduce(b *testing.B) { + size := 10 + bucketDuration := time.Second + opts := RollingGaugeOpts{ + Size: size, + BucketDuration: bucketDuration, + } + r := NewRollingGauge(opts) + for i := 0; i <= 10; i++ { + r.Add(1.0) + time.Sleep(time.Millisecond * 500) + } + b.ResetTimer() + for i := 0; i <= b.N; i++ { + var _ = r.Reduce(func(i Iterator) float64 { + var result float64 + for i.Next() { + bucket := i.Bucket() + if len(bucket.Points) != 0 { + result += bucket.Points[0] + } + } + return result + }) + } +} diff --git a/pkg/stat/metric/rolling_policy.go b/pkg/stat/metric/rolling_policy.go new file mode 100644 index 000000000..a474e0fa5 --- /dev/null +++ b/pkg/stat/metric/rolling_policy.go @@ -0,0 +1,97 @@ +package metric + +import ( + "sync" + "time" +) + +// RollingPolicy is a policy for ring window based on time duration. +// RollingPolicy moves bucket offset with time duration. +// e.g. If the last point is appended one bucket duration ago, +// RollingPolicy will increment current offset. +type RollingPolicy struct { + mu sync.RWMutex + size int + window *Window + offset int + + bucketDuration time.Duration + lastAppendTime time.Time +} + +// RollingPolicyOpts contains the arguments for creating RollingPolicy. +type RollingPolicyOpts struct { + BucketDuration time.Duration +} + +// NewRollingPolicy creates a new RollingPolicy based on the given window and RollingPolicyOpts. +func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy { + return &RollingPolicy{ + window: window, + size: window.Size(), + offset: 0, + + bucketDuration: opts.BucketDuration, + lastAppendTime: time.Now(), + } +} + +func (r *RollingPolicy) timespan() int { + v := int(time.Since(r.lastAppendTime) / r.bucketDuration) + if v < r.size && v > -1 { // maybe time backwards + return v + } + return r.size +} + +func (r *RollingPolicy) add(f func(offset int, val float64), val float64) { + r.mu.Lock() + timespan := r.timespan() + if timespan > 0 { + offset := r.offset + // reset the expired buckets + s := offset + 1 + e, e1 := s+timespan, 0 // e: reset offset must start from offset+1 + if e > r.size { + e1 = e - r.size + e = r.size + } + for i := s; i < e; i++ { + r.window.ResetBucket(i) + offset = i + } + for i := 0; i < e1; i++ { + r.window.ResetBucket(i) + offset = i + } + r.offset = offset + r.lastAppendTime = time.Now() + } + f(r.offset, val) + r.mu.Unlock() +} + +// Append appends the given points to the window. +func (r *RollingPolicy) Append(val float64) { + r.add(r.window.Append, val) +} + +// Add adds the given value to the latest point within bucket. +func (r *RollingPolicy) Add(val float64) { + r.add(r.window.Add, val) +} + +// Reduce applies the reduction function to all buckets within the window. +func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) { + r.mu.RLock() + timespan := r.timespan() + if count := r.size - timespan; count > 0 { + offset := r.offset + timespan + 1 + if offset >= r.size { + offset = offset - r.size + } + val = f(r.window.Iterator(offset, count)) + } + r.mu.RUnlock() + return val +} diff --git a/pkg/stat/metric/window.go b/pkg/stat/metric/window.go new file mode 100644 index 000000000..e8a0d9844 --- /dev/null +++ b/pkg/stat/metric/window.go @@ -0,0 +1,107 @@ +package metric + +// Bucket contains multiple float64 points. +type Bucket struct { + Points []float64 + Count int64 + next *Bucket +} + +// Append appends the given value to the bucket. +func (b *Bucket) Append(val float64) { + b.Points = append(b.Points, val) + b.Count++ +} + +// Add adds the given value to the point. +func (b *Bucket) Add(offset int, val float64) { + b.Points[offset] += val + b.Count++ +} + +// Reset empties the bucket. +func (b *Bucket) Reset() { + b.Points = b.Points[:0] + b.Count = 0 +} + +// Next returns the next bucket. +func (b *Bucket) Next() *Bucket { + return b.next +} + +// Window contains multiple buckets. +type Window struct { + window []Bucket + size int +} + +// WindowOpts contains the arguments for creating Window. +type WindowOpts struct { + Size int +} + +// NewWindow creates a new Window based on WindowOpts. +func NewWindow(opts WindowOpts) *Window { + buckets := make([]Bucket, opts.Size) + for offset := range buckets { + buckets[offset] = Bucket{Points: make([]float64, 0)} + nextOffset := offset + 1 + if nextOffset == opts.Size { + nextOffset = 0 + } + buckets[offset].next = &buckets[nextOffset] + } + return &Window{window: buckets, size: opts.Size} +} + +// ResetWindow empties all buckets within the window. +func (w *Window) ResetWindow() { + for offset := range w.window { + w.ResetBucket(offset) + } +} + +// ResetBucket empties the bucket based on the given offset. +func (w *Window) ResetBucket(offset int) { + w.window[offset].Reset() +} + +// ResetBuckets empties the buckets based on the given offsets. +func (w *Window) ResetBuckets(offsets []int) { + for _, offset := range offsets { + w.ResetBucket(offset) + } +} + +// Append appends the given value to the bucket where index equals the given offset. +func (w *Window) Append(offset int, val float64) { + w.window[offset].Append(val) +} + +// Add adds the given value to the latest point within bucket where index equals the given offset. +func (w *Window) Add(offset int, val float64) { + if w.window[offset].Count == 0 { + w.window[offset].Append(val) + return + } + w.window[offset].Add(0, val) +} + +// Bucket returns the bucket where index equals the given offset. +func (w *Window) Bucket(offset int) Bucket { + return w.window[offset] +} + +// Size returns the size of the window. +func (w *Window) Size() int { + return w.size +} + +// Iterator returns the bucket iterator. +func (w *Window) Iterator(offset int, count int) Iterator { + return Iterator{ + count: count, + cur: &w.window[offset], + } +} diff --git a/pkg/stat/metric/window_test.go b/pkg/stat/metric/window_test.go new file mode 100644 index 000000000..98578debd --- /dev/null +++ b/pkg/stat/metric/window_test.go @@ -0,0 +1,67 @@ +package metric + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestWindowResetWindow(t *testing.T) { + opts := WindowOpts{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetWindow() + for i := 0; i < opts.Size; i++ { + assert.Equal(t, len(window.Bucket(i).Points), 0) + } +} + +func TestWindowResetBucket(t *testing.T) { + opts := WindowOpts{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBucket(1) + assert.Equal(t, len(window.Bucket(1).Points), 0) + assert.Equal(t, window.Bucket(0).Points[0], float64(1.0)) + assert.Equal(t, window.Bucket(2).Points[0], float64(1.0)) +} + +func TestWindowResetBuckets(t *testing.T) { + opts := WindowOpts{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + window.ResetBuckets([]int{0, 1, 2}) + for i := 0; i < opts.Size; i++ { + assert.Equal(t, len(window.Bucket(i).Points), 0) + } +} + +func TestWindowAppend(t *testing.T) { + opts := WindowOpts{Size: 3} + window := NewWindow(opts) + for i := 0; i < opts.Size; i++ { + window.Append(i, 1.0) + } + for i := 0; i < opts.Size; i++ { + assert.Equal(t, window.Bucket(i).Points[0], float64(1.0)) + } +} + +func TestWindowAdd(t *testing.T) { + opts := WindowOpts{Size: 3} + window := NewWindow(opts) + window.Append(0, 1.0) + window.Add(0, 1.0) + assert.Equal(t, window.Bucket(0).Points[0], float64(2.0)) +} + +func TestWindowSize(t *testing.T) { + opts := WindowOpts{Size: 3} + window := NewWindow(opts) + assert.Equal(t, window.Size(), 3) +} diff --git a/pkg/stat/summary/README.md b/pkg/stat/summary/README.md deleted file mode 100644 index dd9c473ac..000000000 --- a/pkg/stat/summary/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# summary - -## 项目简介 - -summary计数器 diff --git a/pkg/stat/summary/summary.go b/pkg/stat/summary/summary.go deleted file mode 100644 index a6afc61c9..000000000 --- a/pkg/stat/summary/summary.go +++ /dev/null @@ -1,129 +0,0 @@ -package summary - -import ( - "sync" - "time" -) - -type bucket struct { - val int64 - count int64 - next *bucket -} - -func (b *bucket) Add(val int64) { - b.val += val - b.count++ -} - -func (b *bucket) Value() (int64, int64) { - return b.val, b.count -} - -func (b *bucket) Reset() { - b.val = 0 - b.count = 0 -} - -// Summary is a summary interface. -type Summary interface { - Add(int64) - Reset() - Value() (val int64, cnt int64) -} - -type summary struct { - mu sync.RWMutex - buckets []bucket - bucketTime int64 - lastAccess int64 - cur *bucket -} - -// New new a summary. -// -// use RollingCounter creates a new window. windowTime is the time covering the entire -// window. windowBuckets is the number of buckets the window is divided into. -// An example: a 10 second window with 10 buckets will have 10 buckets covering -// 1 second each. -func New(window time.Duration, winBucket int) Summary { - buckets := make([]bucket, winBucket) - bucket := &buckets[0] - for i := 1; i < winBucket; i++ { - bucket.next = &buckets[i] - bucket = bucket.next - } - bucket.next = &buckets[0] - bucketTime := time.Duration(window.Nanoseconds() / int64(winBucket)) - return &summary{ - cur: &buckets[0], - buckets: buckets, - bucketTime: int64(bucketTime), - lastAccess: time.Now().UnixNano(), - } -} - -// Add increments the summary by value. -func (s *summary) Add(val int64) { - s.mu.Lock() - s.lastBucket().Add(val) - s.mu.Unlock() -} - -// Value get the summary value and count. -func (s *summary) Value() (val int64, cnt int64) { - now := time.Now().UnixNano() - s.mu.RLock() - b := s.cur - i := s.elapsed(now) - for j := 0; j < len(s.buckets); j++ { - // skip all future reset bucket. - if i > 0 { - i-- - } else { - v, c := b.Value() - val += v - cnt += c - } - b = b.next - } - s.mu.RUnlock() - return -} - -// Reset reset the counter. -func (s *summary) Reset() { - s.mu.Lock() - for i := range s.buckets { - s.buckets[i].Reset() - } - s.mu.Unlock() -} - -func (s *summary) elapsed(now int64) (i int) { - var e int64 - if e = now - s.lastAccess; e <= s.bucketTime { - return - } - if i = int(e / s.bucketTime); i > len(s.buckets) { - i = len(s.buckets) - } - return -} - -func (s *summary) lastBucket() (b *bucket) { - now := time.Now().UnixNano() - b = s.cur - // reset the buckets between now and number of buckets ago. If - // that is more that the existing buckets, reset all. - if i := s.elapsed(now); i > 0 { - s.lastAccess = now - for ; i > 0; i-- { - // replace the next used bucket. - b = b.next - b.Reset() - } - } - s.cur = b - return -} diff --git a/pkg/stat/summary/summary_test.go b/pkg/stat/summary/summary_test.go deleted file mode 100644 index 83d6043c3..000000000 --- a/pkg/stat/summary/summary_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package summary - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestSummaryMinInterval(t *testing.T) { - count := New(time.Second/2, 10) - tk1 := time.NewTicker(5 * time.Millisecond) - defer tk1.Stop() - for i := 0; i < 100; i++ { - <-tk1.C - count.Add(2) - } - - v, c := count.Value() - t.Logf("count value: %d, %d\n", v, c) - // 10% of error when bucket is 10 - if v < 190 || v > 210 { - t.Errorf("expect value in [90-110] get %d", v) - } - // 10% of error when bucket is 10 - if c < 90 || c > 110 { - t.Errorf("expect value in [90-110] get %d", v) - } -} - -func TestSummary(t *testing.T) { - s := New(time.Second, 10) - - t.Run("add", func(t *testing.T) { - s.Add(1) - v, c := s.Value() - assert.Equal(t, v, int64(1)) - assert.Equal(t, c, int64(1)) - }) - time.Sleep(time.Millisecond * 110) - t.Run("add2", func(t *testing.T) { - s.Add(1) - v, c := s.Value() - assert.Equal(t, v, int64(2)) - assert.Equal(t, c, int64(2)) - }) - time.Sleep(time.Millisecond * 900) // expire one bucket, 110 + 900 - t.Run("expire", func(t *testing.T) { - v, c := s.Value() - assert.Equal(t, v, int64(1)) - assert.Equal(t, c, int64(1)) - s.Add(1) - v, c = s.Value() - assert.Equal(t, v, int64(2)) // expire one bucket - assert.Equal(t, c, int64(2)) // expire one bucket - }) - time.Sleep(time.Millisecond * 1100) - t.Run("expire_all", func(t *testing.T) { - v, c := s.Value() - assert.Equal(t, v, int64(0)) - assert.Equal(t, c, int64(0)) - }) - t.Run("reset", func(t *testing.T) { - s.Reset() - v, c := s.Value() - assert.Equal(t, v, int64(0)) - assert.Equal(t, c, int64(0)) - }) -} diff --git a/tool/kratos/template.go b/tool/kratos/template.go index a31d2c725..2a250e303 100644 --- a/tool/kratos/template.go +++ b/tool/kratos/template.go @@ -218,14 +218,6 @@ func (d *Dao) pingRedis(ctx context.Context) (err error) { ## 项目简介 1. - -## 编译环境 - - -## 依赖包 - - -## 编译执行 ` _tplService = `package service