pull/13/head
felixhao 6 years ago
parent 3e40f096f4
commit 2739a26481
  1. 35
      README.md
  2. 0
      doc/wiki-cn/README.md
  3. 0
      doc/wiki-cn/blademaster-mid.md
  4. 0
      doc/wiki-cn/blademaster-pb.md
  5. 0
      doc/wiki-cn/blademaster.md
  6. 0
      doc/wiki-cn/cache-mc.md
  7. 0
      doc/wiki-cn/cache-redis.md
  8. 0
      doc/wiki-cn/cache.md
  9. 0
      doc/wiki-cn/dapper.md
  10. 0
      doc/wiki-cn/database-hbase.md
  11. 0
      doc/wiki-cn/database-mysql.md
  12. 0
      doc/wiki-cn/database-tidb.md
  13. 0
      doc/wiki-cn/database.md
  14. 0
      doc/wiki-cn/kratos-tool.md
  15. 0
      doc/wiki-cn/log-agent.md
  16. 0
      doc/wiki-cn/logger.md
  17. 0
      doc/wiki-cn/quickstart.md
  18. 22
      doc/wiki-cn/summary.md
  19. 0
      doc/wiki-cn/warden-mid.md
  20. 0
      doc/wiki-cn/warden-pb.md
  21. 0
      doc/wiki-cn/warden.md
  22. 8
      go.mod
  23. 10
      pkg/database/tidb/discovery.go
  24. 45
      pkg/naming/discovery/discovery.go
  25. 28
      pkg/naming/naming.go
  26. 5
      pkg/net/http/blademaster/README.md
  27. 2
      pkg/net/http/blademaster/client.go
  28. 21
      pkg/net/netutil/breaker/README.md
  29. 100
      pkg/net/netutil/breaker/breaker_test.go
  30. 61
      pkg/net/netutil/breaker/example_test.go
  31. 27
      pkg/net/netutil/breaker/sre_breaker.go
  32. 161
      pkg/net/netutil/breaker/sre_breaker_test.go
  33. 10
      pkg/net/rpc/warden/OWNERS
  34. 12
      pkg/net/rpc/warden/README.md
  35. 20
      pkg/net/rpc/warden/balancer/p2c/CHANGELOG.md
  36. 9
      pkg/net/rpc/warden/balancer/p2c/OWNERS
  37. 12
      pkg/net/rpc/warden/balancer/p2c/README.md
  38. 17
      pkg/net/rpc/warden/balancer/wrr/CHANGELOG.md
  39. 9
      pkg/net/rpc/warden/balancer/wrr/OWNERS
  40. 10
      pkg/net/rpc/warden/balancer/wrr/README.md
  41. 17
      pkg/net/rpc/warden/resolver/CHANGELOG.md
  42. 9
      pkg/net/rpc/warden/resolver/OWNERS
  43. 10
      pkg/net/rpc/warden/resolver/README.md
  44. 6
      pkg/net/rpc/warden/resolver/direct/CHANGELOG.md
  45. 10
      pkg/net/rpc/warden/resolver/direct/README.md
  46. 6
      pkg/net/rpc/warden/resolver/resolver.go
  47. 36
      pkg/stat/metric/counter.go
  48. 18
      pkg/stat/metric/counter_test.go
  49. 37
      pkg/stat/metric/gauge.go
  50. 23
      pkg/stat/metric/gauge_test.go
  51. 26
      pkg/stat/metric/iterator.go
  52. 30
      pkg/stat/metric/metric.go
  53. 61
      pkg/stat/metric/point_gauge.go
  54. 55
      pkg/stat/metric/point_gauge_test.go
  55. 57
      pkg/stat/metric/point_policy.go
  56. 77
      pkg/stat/metric/reduce.go
  57. 17
      pkg/stat/metric/reduce_test.go
  58. 68
      pkg/stat/metric/rolling_counter.go
  59. 156
      pkg/stat/metric/rolling_counter_test.go
  60. 62
      pkg/stat/metric/rolling_gauge.go
  61. 192
      pkg/stat/metric/rolling_gauge_test.go
  62. 97
      pkg/stat/metric/rolling_policy.go
  63. 107
      pkg/stat/metric/window.go
  64. 67
      pkg/stat/metric/window_test.go
  65. 5
      pkg/stat/summary/README.md
  66. 129
      pkg/stat/summary/summary.go
  67. 69
      pkg/stat/summary/summary_test.go
  68. 8
      tool/kratos/template.go

@ -1,11 +1,38 @@
# Kratos # Kratos
Kratos是[bilibili](https://www.bilibili.com)开源的一套Go微服务框架,包含大量微服务相关框架及工具。 Kratos是[bilibili](https://www.bilibili.com)开源的一套Go微服务框架,包含大量微服务相关框架及工具。主要包括以下组件:
如:discovery(服务注册发现)、blademaster(HTTP框架)、warden(gRPC封装)、log、breaker、dapper(trace)、cache&db sdk、kratos tool(代码生成等工具)等等。
我们致力于提供完整的微服务研发体验,大仓整合相关框架及工具后,微服务治理相关部分可对整体业务开发周期无感,从而更加聚焦于业务交付。对每位开发者而言,整套Kratos框架也是不错的学习仓库,可以了解和参考到[bilibili](https://www.bilibili.com)在微服务方面的技术积累和经验。 * [http框架blademaster(bm)](doc/wiki-cn/blademaster.md):基于[gin](https://github.com/gin-gonic/gin)二次开发,具有快速、灵活的特点,可以方便的开发中间件处理通用或特殊逻辑,基础库默认实现了log&trace等。
* [gRPC框架warden](doc/wiki-cn/warden.md):基于官方gRPC封装,默认使用[discovery](https://github.com/bilibili/discovery)进行服务注册发现,及wrr和p2c(默认)负载均衡。
* [dapper trace](doc/wiki-cn/dapper.md):基于opentracing,全链路集成了trace,我们还提供dapper实现,请参看:[dapper敬请期待]()。
* [log](doc/wiki-cn/logger.md):基于[zap](https://github.com/uber-go/zap)的field方式实现的高性能log库,集成了我们提供的[log-agent敬请期待]()日志收集方案。
* [database](doc/wiki-cn/database.md):集成MySQL&HBase&TiDB的SDK,其中TiDB使用服务发现方案。
* [cache](doc/wiki-cn/cache.md):集成memcache&redis的SDK,注意无redis-cluster实现,推荐使用代理模式[overlord](https://github.com/bilibili/overlord)。
* [kratos tool](doc/wiki-cn/kratos-tool.md):kratos相关工具量,包括项目快速生成、pb文件代码生成、swagger文档生成等。
我们致力于提供完整的微服务研发体验,整合相关框架及工具后,微服务治理相关部分可对整体业务开发周期无感,从而更加聚焦于业务交付。对每位开发者而言,整套Kratos框架也是不错的学习仓库,可以了解和参考到[bilibili](https://www.bilibili.com)在微服务方面的技术积累和经验。
# 快速开始
```shell
go get -u github.com/bilibili/kratos/tool/kratos
kratos init
```
`kratos init`会快速生成基于kratos库的脚手架代码,如生成[kratos-demo](https://github.com/bilibili/kratos-demo)
```shell
cd kratos-demo/cmd
go build
./cmd -conf ../configs
```
打开浏览器访问:[http://localhost:8000/kratos-demo/start](http://localhost:8000/kratos-demo/start),你会看到输出了`Golang 大法好 !!!`
# Document
[简体中文](doc/wiki-cn/summary.md)
------------- -------------
*Please report bugs, concerns, suggestions by issues, or join QQ-group 716486124 to discuss problems around source code.* *Please report bugs, concerns, suggestions by issues, or join QQ-group 716486124 to discuss problems around source code.*

@ -0,0 +1,22 @@
# Summary
* [介绍](README.md)
* [快速开始](quickstart.md)
* [案例](https://github.com/bilibili/kratos-demo)
* [http blademaster](blademaster.md)
* [middleware](blademaster-mid.md)
* [protobuf生成](blademaster-pb.md)
* [grpc warden](warden.md)
* [middleware](warden-mid.md)
* [protobuf生成](warden-pb.md)
* [dapper trace](dapper.md)
* [log](logger.md)
* [log-agent](log-agent.md)
* [database](database.md)
* [mysql](database-mysql.md)
* [hbase](database-hbase.md)
* [tidb](database-tidb.md)
* [cache](cache.md)
* [memcache](cache-mc.md)
* [redis](cache-redis.md)
* [kratos tool](kratos-tool.md)

@ -1,11 +1,14 @@
module github.com/bilibili/kratos module github.com/bilibili/kratos
go 1.12
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef // indirect github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef // indirect
github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/cznic/strutil v0.0.0-20181122101858-275e90344537 // indirect github.com/cznic/strutil v0.0.0-20181122101858-275e90344537 // indirect
github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec
github.com/fatih/color v1.7.0 github.com/fatih/color v1.7.0
github.com/fsnotify/fsnotify v1.4.7 github.com/fsnotify/fsnotify v1.4.7
github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/locales v0.12.1 // indirect
@ -15,19 +18,20 @@ require (
github.com/golang/protobuf v1.2.0 github.com/golang/protobuf v1.2.0
github.com/kr/pty v1.1.4 github.com/kr/pty v1.1.4
github.com/leodido/go-urn v1.1.0 // indirect github.com/leodido/go-urn v1.1.0 // indirect
github.com/montanaflynn/stats v0.5.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2 github.com/prometheus/client_golang v0.9.2
github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect
github.com/sirupsen/logrus v1.4.1 // indirect github.com/sirupsen/logrus v1.4.1 // indirect
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a
github.com/stretchr/testify v1.3.0 github.com/stretchr/testify v1.3.0
github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df
github.com/urfave/cli v1.20.0 github.com/urfave/cli v1.20.0
golang.org/x/net v0.0.0-20190311183353-d8887717615a
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/grpc v1.18.0 google.golang.org/grpc v1.18.0
gopkg.in/AlecAivazis/survey.v1 v1.8.2 gopkg.in/AlecAivazis/survey.v1 v1.8.2
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.26.0 gopkg.in/go-playground/validator.v9 v9.26.0
) )
go 1.12

@ -16,15 +16,13 @@ var _schema = "tidb://"
func (db *DB) nodeList() (nodes []string) { func (db *DB) nodeList() (nodes []string) {
var ( var (
insInfo *naming.InstancesInfo insMap map[string][]*naming.Instance
insMap map[string][]*naming.Instance ins []*naming.Instance
ins []*naming.Instance ok bool
ok bool
) )
if insInfo, ok = db.dis.Fetch(context.Background()); !ok { if insMap, ok = db.dis.Fetch(context.Background()); !ok {
return return
} }
insMap = insInfo.Instances
if ins, ok = insMap[env.Zone]; !ok || len(ins) == 0 { if ins, ok = insMap[env.Zone]; !ok || len(ins) == 0 {
return return
} }

@ -59,6 +59,12 @@ type Config struct {
Host string Host string
} }
type appData struct {
ZoneInstances map[string][]*naming.Instance `json:"zone_instances"`
LastTs int64 `json:"latest_timestamp"`
Err string `json:"err"`
}
// Discovery is discovery client. // Discovery is discovery client.
type Discovery struct { type Discovery struct {
once sync.Once once sync.Once
@ -175,15 +181,15 @@ func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) {
if !ok { if !ok {
return return
} }
instances, ok := resolver.Fetch(context.Background()) zones, ok := resolver.Fetch(context.Background())
if ok { if ok {
d.newSelf(instances) d.newSelf(zones)
} }
} }
} }
func (d *Discovery) newSelf(instances *naming.InstancesInfo) { func (d *Discovery) newSelf(zones map[string][]*naming.Instance) {
ins, ok := instances.Instances[d.conf.Zone] ins, ok := zones[d.conf.Zone]
if !ok { if !ok {
return return
} }
@ -270,12 +276,12 @@ func (r *Resolver) Watch() <-chan struct{} {
} }
// Fetch fetch resolver instance. // Fetch fetch resolver instance.
func (r *Resolver) Fetch(c context.Context) (ins *naming.InstancesInfo, ok bool) { func (r *Resolver) Fetch(c context.Context) (ins map[string][]*naming.Instance, ok bool) {
r.d.mutex.RLock() r.d.mutex.RLock()
app, ok := r.d.apps[r.id] app, ok := r.d.apps[r.id]
r.d.mutex.RUnlock() r.d.mutex.RUnlock()
if ok { if ok {
ins, ok = app.zoneIns.Load().(*naming.InstancesInfo) ins, ok = app.zoneIns.Load().(map[string][]*naming.Instance)
return return
} }
return return
@ -527,7 +533,6 @@ func (d *Discovery) serverproc() {
return return
default: default:
} }
apps, err := d.polls(ctx, d.pickNode()) apps, err := d.polls(ctx, d.pickNode())
if err != nil { if err != nil {
d.switchNode() d.switchNode()
@ -572,7 +577,7 @@ func (d *Discovery) nodes() (nodes []string) {
return return
} }
func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]naming.InstancesInfo, err error) { func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]appData, err error) {
var ( var (
lastTs []int64 lastTs []int64
appid []string appid []string
@ -597,8 +602,9 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]nam
} }
uri := fmt.Sprintf(_pollURL, host) uri := fmt.Sprintf(_pollURL, host)
res := new(struct { res := new(struct {
Code int `json:"code"` Code int `json:"code"`
Data map[string]naming.InstancesInfo `json:"data"` Message string `json:"message"`
Data map[string]appData `json:"data"`
}) })
params := url.Values{} params := url.Values{}
params.Set("env", conf.Env) params.Set("env", conf.Env)
@ -611,8 +617,17 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]nam
} }
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
if !ec.Equal(ecode.NotModified) { if !ec.Equal(ecode.NotModified) {
log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code) log.Error("discovery: client.Get(%s) get error code(%d) message(%s)", uri+"?"+params.Encode(), res.Code, res.Message)
err = ec err = ec
if ec.Equal(ecode.NothingFound) {
for appID, value := range res.Data {
if value.Err != "" {
errInfo := fmt.Sprintf("discovery: app(%s) on ENV(%s) %s!\n", appID, conf.Env, value.Err)
log.Error(errInfo)
fmt.Fprintf(os.Stderr, errInfo)
}
}
}
} }
return return
} }
@ -630,12 +645,12 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]nam
return return
} }
func (d *Discovery) broadcast(apps map[string]naming.InstancesInfo) { func (d *Discovery) broadcast(apps map[string]appData) {
for id, v := range apps { for id, v := range apps {
var count int var count int
for zone, ins := range v.Instances { for zone, ins := range v.ZoneInstances {
if len(ins) == 0 { if len(ins) == 0 {
delete(v.Instances, zone) delete(v.ZoneInstances, zone)
} }
count += len(ins) count += len(ins)
} }
@ -647,7 +662,7 @@ func (d *Discovery) broadcast(apps map[string]naming.InstancesInfo) {
d.mutex.RUnlock() d.mutex.RUnlock()
if ok { if ok {
app.lastTs = v.LastTs app.lastTs = v.LastTs
app.zoneIns.Store(v) app.zoneIns.Store(v.ZoneInstances)
d.mutex.RLock() d.mutex.RLock()
for rs := range app.resolver { for rs := range app.resolver {
select { select {

@ -6,19 +6,17 @@ import (
// metadata common key // metadata common key
const ( const (
MetaColor = "color"
MetaWeight = "weight"
MetaCluster = "cluster"
MetaZone = "zone" MetaZone = "zone"
MetaCluster = "cluster"
MetaWeight = "weight"
MetaColor = "color"
) )
// Instance represents a server the client connects to. // Instance represents a server the client connects to.
type Instance struct { type Instance struct {
// Region bj/sh/gz
Region string `json:"region"`
// Zone is IDC. // Zone is IDC.
Zone string `json:"zone"` Zone string `json:"zone"`
// Env prod/preuat/fat1 // Env prod/pre/uat/fat1
Env string `json:"env"` Env string `json:"env"`
// AppID is mapping servicetree appid. // AppID is mapping servicetree appid.
AppID string `json:"appid"` AppID string `json:"appid"`
@ -34,25 +32,13 @@ type Instance struct {
// Metadata is the information associated with Addr, which may be used // Metadata is the information associated with Addr, which may be used
// to make load balancing decision. // to make load balancing decision.
Metadata map[string]string `json:"metadata"` Metadata map[string]string `json:"metadata"`
Status int64 // Status status
} Status int64
// InstancesInfo instance info.
type InstancesInfo struct {
Instances map[string][]*Instance `json:"zone_instances"`
LastTs int64 `json:"latest_timestamp"`
Scheduler []*Scheduler `json:"scheduler"`
}
// Scheduler scheduler info in multi cluster.
type Scheduler struct {
Src string `json:"src"`
Dst map[string]int64 `json:"dst"`
} }
// Resolver resolve naming service // Resolver resolve naming service
type Resolver interface { type Resolver interface {
Fetch(context.Context) (*InstancesInfo, bool) Fetch(context.Context) (map[string][]*Instance, bool)
Watch() <-chan struct{} Watch() <-chan struct{}
Close() error Close() error
} }

@ -0,0 +1,5 @@
#### net/http/blademaster
##### 项目简介
http 框架,带来如飞一般的体验。

@ -137,7 +137,7 @@ func (client *Client) SetConfig(c *ClientConfig) {
// TODO(zhoujiahui): param realIP should be removed later. // TODO(zhoujiahui): param realIP should be removed later.
func (client *Client) NewRequest(method, uri, realIP string, params url.Values) (req *xhttp.Request, err error) { func (client *Client) NewRequest(method, uri, realIP string, params url.Values) (req *xhttp.Request, err error) {
if method == xhttp.MethodGet { if method == xhttp.MethodGet {
req, err = xhttp.NewRequest(xhttp.MethodGet, uri+params.Encode(), nil) req, err = xhttp.NewRequest(xhttp.MethodGet, fmt.Sprintf("%s?%s", uri, params.Encode()), nil)
} else { } else {
req, err = xhttp.NewRequest(xhttp.MethodPost, uri, strings.NewReader(params.Encode())) req, err = xhttp.NewRequest(xhttp.MethodPost, uri, strings.NewReader(params.Encode()))
} }

@ -0,0 +1,21 @@
#### breaker
##### 项目简介
1. 提供熔断器功能,供各种client(如rpc、http、msyql)等进行熔断
2. 提供Go方法供业务在breaker熔断前后进行回调处理
##### 配置说明
> 1. NewGroup(name string,c *Config)当c==nil时则采用默认配置
> 2. 可通过breaker.Init(c *Config)替换默认配置
> 3. 可通过group.Reload(c *Config)进行配置更新
> 4. 默认配置如下所示:
_conf = &Config{
Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10,
Ratio: 0.5,
Request: 100,
}
##### 测试
1. 执行当前目录下所有测试文件,测试所有功能

@ -0,0 +1,100 @@
package breaker
import (
"errors"
"testing"
"time"
xtime "github.com/bilibili/kratos/pkg/time"
)
func TestGroup(t *testing.T) {
g1 := NewGroup(nil)
g2 := NewGroup(_conf)
if g1.conf != g2.conf {
t.FailNow()
}
brk := g2.Get("key")
brk1 := g2.Get("key1")
if brk == brk1 {
t.FailNow()
}
brk2 := g2.Get("key")
if brk != brk2 {
t.FailNow()
}
g := NewGroup(_conf)
c := &Config{
Window: xtime.Duration(1 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10,
Ratio: 0.5,
Request: 100,
SwitchOff: !_conf.SwitchOff,
}
g.Reload(c)
if g.conf.SwitchOff == _conf.SwitchOff {
t.FailNow()
}
}
func TestInit(t *testing.T) {
switchOff := _conf.SwitchOff
c := &Config{
Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10,
Ratio: 0.5,
Request: 100,
SwitchOff: !switchOff,
}
Init(c)
if _conf.SwitchOff == switchOff {
t.FailNow()
}
}
func TestGo(t *testing.T) {
if err := Go("test_run", func() error {
t.Log("breaker allow,callback run()")
return nil
}, func() error {
t.Log("breaker not allow,callback fallback()")
return errors.New("breaker not allow")
}); err != nil {
t.Error(err)
}
_group.Reload(&Config{
Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10,
Ratio: 0.5,
Request: 100,
SwitchOff: true,
})
if err := Go("test_fallback", func() error {
t.Log("breaker allow,callback run()")
return nil
}, func() error {
t.Log("breaker not allow,callback fallback()")
return nil
}); err != nil {
t.Error(err)
}
}
func markSuccess(b Breaker, count int) {
for i := 0; i < count; i++ {
b.MarkSuccess()
}
}
func markFailed(b Breaker, count int) {
for i := 0; i < count; i++ {
b.MarkFailed()
}
}

@ -0,0 +1,61 @@
package breaker_test
import (
"fmt"
"time"
"github.com/bilibili/kratos/pkg/net/netutil/breaker"
xtime "github.com/bilibili/kratos/pkg/time"
)
// ExampleGroup show group usage.
func ExampleGroup() {
c := &breaker.Config{
Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10,
Ratio: 0.5,
Request: 100,
}
// init default config
breaker.Init(c)
// new group
g := breaker.NewGroup(c)
// reload group config
c.Bucket = 100
c.Request = 200
g.Reload(c)
// get breaker by key
g.Get("key")
}
// ExampleBreaker show breaker usage.
func ExampleBreaker() {
// new group,use default breaker config
g := breaker.NewGroup(nil)
brk := g.Get("key")
// mark request success
brk.MarkSuccess()
// mark request failed
brk.MarkFailed()
// check if breaker allow or not
if brk.Allow() == nil {
fmt.Println("breaker allow")
} else {
fmt.Println("breaker not allow")
}
}
// ExampleGo this example create a default group and show function callback
// according to the state of breaker.
func ExampleGo() {
run := func() error {
return nil
}
fallback := func() error {
return fmt.Errorf("unknown error")
}
if err := breaker.Go("example_go", run, fallback); err != nil {
fmt.Println(err)
}
}

@ -8,12 +8,12 @@ import (
"github.com/bilibili/kratos/pkg/ecode" "github.com/bilibili/kratos/pkg/ecode"
"github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/log"
"github.com/bilibili/kratos/pkg/stat/summary" "github.com/bilibili/kratos/pkg/stat/metric"
) )
// sreBreaker is a sre CircuitBreaker pattern. // sreBreaker is a sre CircuitBreaker pattern.
type sreBreaker struct { type sreBreaker struct {
stat summary.Summary stat metric.RollingCounter
k float64 k float64
request int64 request int64
@ -23,8 +23,13 @@ type sreBreaker struct {
} }
func newSRE(c *Config) Breaker { func newSRE(c *Config) Breaker {
counterOpts := metric.RollingCounterOpts{
Size: c.Bucket,
BucketDuration: time.Duration(int64(c.Window) / int64(c.Bucket)),
}
stat := metric.NewRollingCounter(counterOpts)
return &sreBreaker{ return &sreBreaker{
stat: summary.New(time.Duration(c.Window), c.Bucket), stat: stat,
r: rand.New(rand.NewSource(time.Now().UnixNano())), r: rand.New(rand.NewSource(time.Now().UnixNano())),
request: c.Request, request: c.Request,
@ -33,8 +38,22 @@ func newSRE(c *Config) Breaker {
} }
} }
func (b *sreBreaker) summary() (success int64, total int64) {
b.stat.Reduce(func(iterator metric.Iterator) float64 {
for iterator.Next() {
bucket := iterator.Bucket()
total += bucket.Count
for _, p := range bucket.Points {
success += int64(p)
}
}
return 0
})
return
}
func (b *sreBreaker) Allow() error { func (b *sreBreaker) Allow() error {
success, total := b.stat.Value() success, total := b.summary()
k := b.k * float64(success) k := b.k * float64(success)
if log.V(5) { if log.V(5) {
log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success) log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success)

@ -0,0 +1,161 @@
package breaker
import (
"math"
"math/rand"
"testing"
"time"
"github.com/bilibili/kratos/pkg/stat/metric"
xtime "github.com/bilibili/kratos/pkg/time"
"github.com/stretchr/testify/assert"
)
func getSRE() Breaker {
return NewGroup(&Config{
Window: xtime.Duration(1 * time.Second),
Bucket: 10,
Request: 100,
K: 2,
}).Get("")
}
func getSREBreaker() *sreBreaker {
counterOpts := metric.RollingCounterOpts{
Size: 10,
BucketDuration: time.Millisecond * 100,
}
stat := metric.NewRollingCounter(counterOpts)
return &sreBreaker{
stat: stat,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
request: 100,
k: 2,
state: StateClosed,
}
}
func markSuccessWithDuration(b Breaker, count int, sleep time.Duration) {
for i := 0; i < count; i++ {
b.MarkSuccess()
time.Sleep(sleep)
}
}
func markFailedWithDuration(b Breaker, count int, sleep time.Duration) {
for i := 0; i < count; i++ {
b.MarkFailed()
time.Sleep(sleep)
}
}
func testSREClose(t *testing.T, b Breaker) {
markSuccess(b, 80)
assert.Equal(t, b.Allow(), nil)
markSuccess(b, 120)
assert.Equal(t, b.Allow(), nil)
}
func testSREOpen(t *testing.T, b Breaker) {
markSuccess(b, 100)
assert.Equal(t, b.Allow(), nil)
markFailed(b, 10000000)
assert.NotEqual(t, b.Allow(), nil)
}
func testSREHalfOpen(t *testing.T, b Breaker) {
// failback
assert.Equal(t, b.Allow(), nil)
t.Run("allow single failed", func(t *testing.T) {
markFailed(b, 10000000)
assert.NotEqual(t, b.Allow(), nil)
})
time.Sleep(2 * time.Second)
t.Run("allow single succeed", func(t *testing.T) {
assert.Equal(t, b.Allow(), nil)
markSuccess(b, 10000000)
assert.Equal(t, b.Allow(), nil)
})
}
func TestSRE(t *testing.T) {
b := getSRE()
testSREClose(t, b)
b = getSRE()
testSREOpen(t, b)
b = getSRE()
testSREHalfOpen(t, b)
}
func TestSRESelfProtection(t *testing.T) {
t.Run("total request < 100", func(t *testing.T) {
b := getSRE()
markFailed(b, 99)
assert.Equal(t, b.Allow(), nil)
})
t.Run("total request > 100, total < 2 * success", func(t *testing.T) {
b := getSRE()
size := rand.Intn(10000000)
succ := int(math.Ceil(float64(size))) + 1
markSuccess(b, succ)
markFailed(b, size-succ)
assert.Equal(t, b.Allow(), nil)
})
}
func TestSRESummary(t *testing.T) {
var (
b *sreBreaker
succ, total int64
)
sleep := 50 * time.Millisecond
t.Run("succ == total", func(t *testing.T) {
b = getSREBreaker()
markSuccessWithDuration(b, 10, sleep)
succ, total = b.summary()
assert.Equal(t, succ, int64(10))
assert.Equal(t, total, int64(10))
})
t.Run("fail == total", func(t *testing.T) {
b = getSREBreaker()
markFailedWithDuration(b, 10, sleep)
succ, total = b.summary()
assert.Equal(t, succ, int64(0))
assert.Equal(t, total, int64(10))
})
t.Run("succ = 1/2 * total, fail = 1/2 * total", func(t *testing.T) {
b = getSREBreaker()
markFailedWithDuration(b, 5, sleep)
markSuccessWithDuration(b, 5, sleep)
succ, total = b.summary()
assert.Equal(t, succ, int64(5))
assert.Equal(t, total, int64(10))
})
t.Run("auto reset rolling counter", func(t *testing.T) {
time.Sleep(time.Second)
succ, total = b.summary()
assert.Equal(t, succ, int64(0))
assert.Equal(t, total, int64(0))
})
}
func BenchmarkSreBreakerAllow(b *testing.B) {
breaker := getSRE()
b.ResetTimer()
for i := 0; i <= b.N; i++ {
breaker.Allow()
if i%2 == 0 {
breaker.MarkSuccess()
} else {
breaker.MarkFailed()
}
}
}

@ -1,10 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- caoguoliang
- maojian
labels:
- library
reviewers:
- caoguoliang
- maojian

@ -1,13 +1,5 @@
#### net/rcp/warden #### net/rpc/warden
##### 项目简介 ##### 项目简介
来自 bilibili 主站技术部的 RPC 框架,融合主站技术部的核心科技,带来如飞一般的体验。 gRPC 框架,带来如飞一般的体验。
##### 编译环境
- **请只用 Golang v1.9.x 以上版本编译执行**
##### 依赖包
- [grpc](google.golang.org/grpc)

@ -1,20 +0,0 @@
### business/warden/balancer/p2c
### Version 1.3.1
1. add more test
### Version 1.3
1. P2C替换smooth weighted round-robin
##### Version 1.2.1
1. 删除了netflix ribbon的权重算法,改成了平方根算法
##### Version 1.2.0
1. 实现了动态计算的调度轮询算法(使用了服务端的成功率数据,替换基于本地计算的成功率数据)
##### Version 1.1.0
1. 实现了动态计算的调度轮询算法
##### Version 1.0.0
1. 实现了带权重可以识别Color的轮询算法

@ -1,9 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- caoguoliang
labels:
- library
reviewers:
- caoguoliang
- maojian

@ -1,13 +1,5 @@
#### business/warden/balancer/wrr #### warden/balancer/p2c
##### 项目简介 ##### 项目简介
warden 的 weighted round robin负载均衡模块,主要用于为每个RPC请求返回一个Server节点以供调用 warden 的 Power of Two Choices (P2C)负载均衡模块,主要用于为每个RPC请求返回一个Server节点以供调用
##### 编译环境
- **请只用 Golang v1.9.x 以上版本编译执行**
##### 依赖包
- [grpc](google.golang.org/grpc)

@ -1,17 +0,0 @@
### business/warden/balancer/wrr
##### Version 1.3.0
1. 迁移 stat.Summary 到 metric.RollingCounter,metric.RollingGauge
##### Version 1.2.1
1. 删除了netflix ribbon的权重算法,改成了平方根算法
##### Version 1.2.0
1. 实现了动态计算的调度轮询算法(使用了服务端的成功率数据,替换基于本地计算的成功率数据)
##### Version 1.1.0
1. 实现了动态计算的调度轮询算法
##### Version 1.0.0
1. 实现了带权重可以识别Color的轮询算法

@ -1,9 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- caoguoliang
labels:
- library
reviewers:
- caoguoliang
- maojian

@ -1,13 +1,5 @@
#### business/warden/balancer/wrr #### warden/balancer/wrr
##### 项目简介 ##### 项目简介
warden 的 weighted round robin负载均衡模块,主要用于为每个RPC请求返回一个Server节点以供调用 warden 的 weighted round robin负载均衡模块,主要用于为每个RPC请求返回一个Server节点以供调用
##### 编译环境
- **请只用 Golang v1.9.x 以上版本编译执行**
##### 依赖包
- [grpc](google.golang.org/grpc)

@ -1,17 +0,0 @@
### business/warden/resolver
##### Version 1.1.1
1. add dial helper
##### Version 1.1.0
1. 增加了子集选择算法
##### Version 1.0.2
1. 增加GET接口
##### Version 1.0.1
1. 支持zone和clusters
##### Version 1.0.0
1. 实现了基本的服务发现功能

@ -1,9 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- caoguoliang
labels:
- library
reviewers:
- caoguoliang
- maojian

@ -1,13 +1,5 @@
#### business/warden/resolver #### warden/resolver
##### 项目简介 ##### 项目简介
warden 的 服务发现模块,用于从底层的注册中心中获取Server节点列表并返回给GRPC warden 的 服务发现模块,用于从底层的注册中心中获取Server节点列表并返回给GRPC
##### 编译环境
- **请只用 Golang v1.9.x 以上版本编译执行**
##### 依赖包
- [grpc](google.golang.org/grpc)

@ -1,6 +0,0 @@
### business/warden/resolver/direct
##### Version 1.0.0
1. 实现了基本的服务发现直连功能

@ -1,14 +1,6 @@
#### business/warden/resolver/direct #### warden/resolver/direct
##### 项目简介 ##### 项目简介
warden 的直连服务模块,用于通过IP地址列表直接连接后端服务 warden 的直连服务模块,用于通过IP地址列表直接连接后端服务
连接字符串格式: direct://default/192.168.1.1:8080,192.168.1.2:8081 连接字符串格式: direct://default/192.168.1.1:8080,192.168.1.2:8081
##### 编译环境
- **请只用 Golang v1.9.x 以上版本编译执行**
##### 依赖包
- [grpc](google.golang.org/grpc)

@ -128,10 +128,10 @@ func (r *Resolver) updateproc() {
return return
} }
} }
if insInfo, ok := r.nr.Fetch(context.Background()); ok { if insMap, ok := r.nr.Fetch(context.Background()); ok {
instances, ok := insInfo.Instances[r.zone] instances, ok := insMap[r.zone]
if !ok { if !ok {
for _, value := range insInfo.Instances { for _, value := range insMap {
instances = append(instances, value...) instances = append(instances, value...)
} }
} }

@ -0,0 +1,36 @@
package metric
import (
"fmt"
"sync/atomic"
)
var _ Metric = &counter{}
// Counter stores a numerical value that only ever goes up.
type Counter interface {
Metric
}
// CounterOpts is an alias of Opts.
type CounterOpts Opts
type counter struct {
val int64
}
// NewCounter creates a new Counter based on the CounterOpts.
func NewCounter(opts CounterOpts) Counter {
return &counter{}
}
func (c *counter) Add(val int64) {
if val < 0 {
panic(fmt.Errorf("stat/metric: cannot decrease in negative value. val: %d", val))
}
atomic.AddInt64(&c.val, val)
}
func (c *counter) Value() int64 {
return atomic.LoadInt64(&c.val)
}

@ -0,0 +1,18 @@
package metric
import (
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
)
func TestCounter(t *testing.T) {
counter := NewCounter(CounterOpts{})
count := rand.Intn(100)
for i := 0; i < count; i++ {
counter.Add(1)
}
val := counter.Value()
assert.Equal(t, val, int64(count))
}

@ -0,0 +1,37 @@
package metric
import "sync/atomic"
var _ Metric = &gauge{}
// Gauge stores a numerical value that can be add arbitrarily.
type Gauge interface {
Metric
// Sets sets the value to the given number.
Set(int64)
}
// GaugeOpts is an alias of Opts.
type GaugeOpts Opts
type gauge struct {
val int64
}
// NewGauge creates a new Gauge based on the GaugeOpts.
func NewGauge(opts GaugeOpts) Gauge {
return &gauge{}
}
func (g *gauge) Add(val int64) {
atomic.AddInt64(&g.val, val)
}
func (g *gauge) Set(val int64) {
old := atomic.LoadInt64(&g.val)
atomic.CompareAndSwapInt64(&g.val, old, val)
}
func (g *gauge) Value() int64 {
return atomic.LoadInt64(&g.val)
}

@ -0,0 +1,23 @@
package metric
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGaugeAdd(t *testing.T) {
gauge := NewGauge(GaugeOpts{})
gauge.Add(100)
gauge.Add(-50)
val := gauge.Value()
assert.Equal(t, val, int64(50))
}
func TestGaugeSet(t *testing.T) {
gauge := NewGauge(GaugeOpts{})
gauge.Add(100)
gauge.Set(50)
val := gauge.Value()
assert.Equal(t, val, int64(50))
}

@ -0,0 +1,26 @@
package metric
import "fmt"
// Iterator iterates the buckets within the window.
type Iterator struct {
count int
iteratedCount int
cur *Bucket
}
// Next returns true util all of the buckets has been iterated.
func (i *Iterator) Next() bool {
return i.count != i.iteratedCount
}
// Bucket gets current bucket.
func (i *Iterator) Bucket() Bucket {
if !(i.Next()) {
panic(fmt.Errorf("stat/metric: iteration out of range iteratedCount: %d count: %d", i.iteratedCount, i.count))
}
bucket := *i.cur
i.iteratedCount++
i.cur = i.cur.Next()
return bucket
}

@ -0,0 +1,30 @@
package metric
// Opts contains the common arguments for creating Metric.
type Opts struct {
}
// Metric is a sample interface.
// Implementations of Metrics in metric package are Counter, Gauge,
// PointGauge, RollingCounter and RollingGauge.
type Metric interface {
// Add adds the given value to the counter.
Add(int64)
// Value gets the current value.
// If the metric's type is PointGauge, RollingCounter, RollingGauge,
// it returns the sum value within the window.
Value() int64
}
// Aggregation contains some common aggregation function.
// Each aggregation can compute summary statistics of window.
type Aggregation interface {
// Min finds the min value within the window.
Min() float64
// Max finds the max value within the window.
Max() float64
// Avg computes average value within the window.
Avg() float64
// Sum computes sum value within the window.
Sum() float64
}

@ -0,0 +1,61 @@
package metric
var _ Metric = &pointGauge{}
var _ Aggregation = &pointGauge{}
// PointGauge represents a ring window.
// Every buckets within the window contains one point.
// When the window is full, the earliest point will be overwrite.
type PointGauge interface {
Aggregation
Metric
// Reduce applies the reduction function to all buckets within the window.
Reduce(func(Iterator) float64) float64
}
// PointGaugeOpts contains the arguments for creating PointGauge.
type PointGaugeOpts struct {
// Size represents the bucket size within the window.
Size int
}
type pointGauge struct {
policy *PointPolicy
}
// NewPointGauge creates a new PointGauge based on PointGaugeOpts.
func NewPointGauge(opts PointGaugeOpts) PointGauge {
window := NewWindow(WindowOpts{Size: opts.Size})
policy := NewPointPolicy(window)
return &pointGauge{
policy: policy,
}
}
func (r *pointGauge) Add(val int64) {
r.policy.Append(float64(val))
}
func (r *pointGauge) Reduce(f func(Iterator) float64) float64 {
return r.policy.Reduce(f)
}
func (r *pointGauge) Avg() float64 {
return r.policy.Reduce(Avg)
}
func (r *pointGauge) Min() float64 {
return r.policy.Reduce(Min)
}
func (r *pointGauge) Max() float64 {
return r.policy.Reduce(Max)
}
func (r *pointGauge) Sum() float64 {
return r.policy.Reduce(Sum)
}
func (r *pointGauge) Value() int64 {
return int64(r.Sum())
}

@ -0,0 +1,55 @@
package metric
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestPointGaugeAdd(t *testing.T) {
opts := PointGaugeOpts{Size: 3}
pointGauge := NewPointGauge(opts)
listBuckets := func() [][]float64 {
buckets := make([][]float64, 0)
pointGauge.Reduce(func(i Iterator) float64 {
for i.Next() {
bucket := i.Bucket()
buckets = append(buckets, bucket.Points)
}
return 0.0
})
return buckets
}
assert.Equal(t, [][]float64{{}, {}, {}}, listBuckets(), "Empty Buckets")
pointGauge.Add(1)
assert.Equal(t, [][]float64{{}, {}, {1}}, listBuckets(), "Point 1")
pointGauge.Add(2)
assert.Equal(t, [][]float64{{}, {1}, {2}}, listBuckets(), "Point 1, 2")
pointGauge.Add(3)
assert.Equal(t, [][]float64{{1}, {2}, {3}}, listBuckets(), "Point 1, 2, 3")
pointGauge.Add(4)
assert.Equal(t, [][]float64{{2}, {3}, {4}}, listBuckets(), "Point 2, 3, 4")
pointGauge.Add(5)
assert.Equal(t, [][]float64{{3}, {4}, {5}}, listBuckets(), "Point 3, 4, 5")
}
func TestPointGaugeReduce(t *testing.T) {
opts := PointGaugeOpts{Size: 10}
pointGauge := NewPointGauge(opts)
for i := 0; i < opts.Size; i++ {
pointGauge.Add(int64(i))
}
var _ = pointGauge.Reduce(func(i Iterator) float64 {
idx := 0
for i.Next() {
bucket := i.Bucket()
assert.Equal(t, bucket.Points[0], float64(idx), "validate points of pointGauge")
idx++
}
return 0.0
})
assert.Equal(t, float64(9), pointGauge.Max(), "validate max of pointGauge")
assert.Equal(t, float64(4.5), pointGauge.Avg(), "validate avg of pointGauge")
assert.Equal(t, float64(0), pointGauge.Min(), "validate min of pointGauge")
assert.Equal(t, float64(45), pointGauge.Sum(), "validate sum of pointGauge")
}

@ -0,0 +1,57 @@
package metric
import "sync"
// PointPolicy is a policy of points within the window.
// PointPolicy wraps the window and make it seem like ring-buf.
// When using PointPolicy, every buckets within the windows contains at more one point.
// e.g. [[1], [2], [3]]
type PointPolicy struct {
mu sync.RWMutex
size int
window *Window
offset int
}
// NewPointPolicy creates a new PointPolicy.
func NewPointPolicy(window *Window) *PointPolicy {
return &PointPolicy{
window: window,
size: window.Size(),
offset: -1,
}
}
func (p *PointPolicy) prevOffset() int {
return p.offset
}
func (p *PointPolicy) nextOffset() int {
return (p.prevOffset() + 1) % p.size
}
func (p *PointPolicy) updateOffset(offset int) {
p.offset = offset
}
// Append appends the given points to the window.
func (p *PointPolicy) Append(val float64) {
p.mu.Lock()
defer p.mu.Unlock()
offset := p.nextOffset()
p.window.ResetBucket(offset)
p.window.Append(offset, val)
p.updateOffset(offset)
}
// Reduce applies the reduction function to all buckets within the window.
func (p *PointPolicy) Reduce(f func(Iterator) float64) float64 {
p.mu.RLock()
defer p.mu.RUnlock()
offset := p.offset + 1
if offset == p.size {
offset = 0
}
iterator := p.window.Iterator(offset, p.size)
return f(iterator)
}

@ -0,0 +1,77 @@
package metric
// Sum the values within the window.
func Sum(iterator Iterator) float64 {
var result = 0.0
for iterator.Next() {
bucket := iterator.Bucket()
for _, p := range bucket.Points {
result = result + p
}
}
return result
}
// Avg the values within the window.
func Avg(iterator Iterator) float64 {
var result = 0.0
var count = 0.0
for iterator.Next() {
bucket := iterator.Bucket()
for _, p := range bucket.Points {
result = result + p
count = count + 1
}
}
return result / count
}
// Min the values within the window.
func Min(iterator Iterator) float64 {
var result = 0.0
var started = false
for iterator.Next() {
bucket := iterator.Bucket()
for _, p := range bucket.Points {
if !started {
result = p
started = true
continue
}
if p < result {
result = p
}
}
}
return result
}
// Max the values within the window.
func Max(iterator Iterator) float64 {
var result = 0.0
var started = false
for iterator.Next() {
bucket := iterator.Bucket()
for _, p := range bucket.Points {
if !started {
result = p
started = true
continue
}
if p > result {
result = p
}
}
}
return result
}
// Count sums the count value within the window.
func Count(iterator Iterator) float64 {
var result int64
for iterator.Next() {
bucket := iterator.Bucket()
result += bucket.Count
}
return float64(result)
}

@ -0,0 +1,17 @@
package metric
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCount(t *testing.T) {
opts := PointGaugeOpts{Size: 10}
pointGauge := NewPointGauge(opts)
for i := 0; i < opts.Size; i++ {
pointGauge.Add(int64(i))
}
result := pointGauge.Reduce(Count)
assert.Equal(t, float64(10), result, "validate count of pointGauge")
}

@ -0,0 +1,68 @@
package metric
import (
"fmt"
"time"
)
var _ Metric = &rollingCounter{}
var _ Aggregation = &rollingCounter{}
// RollingCounter represents a ring window based on time duration.
// e.g. [[1], [3], [5]]
type RollingCounter interface {
Metric
Aggregation
// Reduce applies the reduction function to all buckets within the window.
Reduce(func(Iterator) float64) float64
}
// RollingCounterOpts contains the arguments for creating RollingCounter.
type RollingCounterOpts struct {
Size int
BucketDuration time.Duration
}
type rollingCounter struct {
policy *RollingPolicy
}
// NewRollingCounter creates a new RollingCounter bases on RollingCounterOpts.
func NewRollingCounter(opts RollingCounterOpts) RollingCounter {
window := NewWindow(WindowOpts{Size: opts.Size})
policy := NewRollingPolicy(window, RollingPolicyOpts{BucketDuration: opts.BucketDuration})
return &rollingCounter{
policy: policy,
}
}
func (r *rollingCounter) Add(val int64) {
if val < 0 {
panic(fmt.Errorf("stat/metric: cannot decrease in value. val: %d", val))
}
r.policy.Add(float64(val))
}
func (r *rollingCounter) Reduce(f func(Iterator) float64) float64 {
return r.policy.Reduce(f)
}
func (r *rollingCounter) Avg() float64 {
return r.policy.Reduce(Avg)
}
func (r *rollingCounter) Min() float64 {
return r.policy.Reduce(Min)
}
func (r *rollingCounter) Max() float64 {
return r.policy.Reduce(Max)
}
func (r *rollingCounter) Sum() float64 {
return r.policy.Reduce(Sum)
}
func (r *rollingCounter) Value() int64 {
return int64(r.Sum())
}

@ -0,0 +1,156 @@
package metric
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestRollingCounterAdd(t *testing.T) {
size := 3
bucketDuration := time.Second
opts := RollingCounterOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingCounter(opts)
listBuckets := func() [][]float64 {
buckets := make([][]float64, 0)
r.Reduce(func(i Iterator) float64 {
for i.Next() {
bucket := i.Bucket()
buckets = append(buckets, bucket.Points)
}
return 0.0
})
return buckets
}
assert.Equal(t, [][]float64{{}, {}, {}}, listBuckets())
r.Add(1)
assert.Equal(t, [][]float64{{}, {}, {1}}, listBuckets())
time.Sleep(time.Second)
r.Add(2)
r.Add(3)
assert.Equal(t, [][]float64{{}, {1}, {5}}, listBuckets())
time.Sleep(time.Second)
r.Add(4)
r.Add(5)
r.Add(6)
assert.Equal(t, [][]float64{{1}, {5}, {15}}, listBuckets())
time.Sleep(time.Second)
r.Add(7)
assert.Equal(t, [][]float64{{5}, {15}, {7}}, listBuckets())
}
func TestRollingCounterReduce(t *testing.T) {
size := 3
bucketDuration := time.Second
opts := RollingCounterOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingCounter(opts)
for x := 0; x < size; x = x + 1 {
for i := 0; i <= x; i++ {
r.Add(1)
}
if x < size-1 {
time.Sleep(bucketDuration)
}
}
var result = r.Reduce(func(iterator Iterator) float64 {
var result float64
for iterator.Next() {
bucket := iterator.Bucket()
result += bucket.Points[0]
}
return result
})
if result != 6.0 {
t.Fatalf("Validate sum of points. result: %f", result)
}
}
func TestRollingCounterDataRace(t *testing.T) {
size := 3
bucketDuration := time.Millisecond * 10
opts := RollingCounterOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingCounter(opts)
var stop = make(chan bool)
go func() {
for {
select {
case <-stop:
return
default:
r.Add(1)
time.Sleep(time.Millisecond * 5)
}
}
}()
go func() {
for {
select {
case <-stop:
return
default:
_ = r.Reduce(func(i Iterator) float64 {
for i.Next() {
bucket := i.Bucket()
for range bucket.Points {
continue
}
}
return 0
})
}
}
}()
time.Sleep(time.Second * 3)
close(stop)
}
func BenchmarkRollingCounterIncr(b *testing.B) {
size := 3
bucketDuration := time.Millisecond * 100
opts := RollingCounterOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingCounter(opts)
b.ResetTimer()
for i := 0; i <= b.N; i++ {
r.Add(1)
}
}
func BenchmarkRollingCounterReduce(b *testing.B) {
size := 3
bucketDuration := time.Second
opts := RollingCounterOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingCounter(opts)
for i := 0; i <= 10; i++ {
r.Add(1)
time.Sleep(time.Millisecond * 500)
}
b.ResetTimer()
for i := 0; i <= b.N; i++ {
var _ = r.Reduce(func(i Iterator) float64 {
var result float64
for i.Next() {
bucket := i.Bucket()
if len(bucket.Points) != 0 {
result += bucket.Points[0]
}
}
return result
})
}
}

@ -0,0 +1,62 @@
package metric
import "time"
var _ Metric = &rollingGauge{}
var _ Aggregation = &rollingGauge{}
// RollingGauge represents a ring window based on time duration.
// e.g. [[1, 2], [1, 2, 3], [1,2, 3, 4]]
type RollingGauge interface {
Metric
Aggregation
// Reduce applies the reduction function to all buckets within the window.
Reduce(func(Iterator) float64) float64
}
// RollingGaugeOpts contains the arguments for creating RollingGauge.
type RollingGaugeOpts struct {
Size int
BucketDuration time.Duration
}
type rollingGauge struct {
policy *RollingPolicy
}
// NewRollingGauge creates a new RollingGauge baseed on RollingGaugeOpts.
func NewRollingGauge(opts RollingGaugeOpts) RollingGauge {
window := NewWindow(WindowOpts{Size: opts.Size})
policy := NewRollingPolicy(window, RollingPolicyOpts{BucketDuration: opts.BucketDuration})
return &rollingGauge{
policy: policy,
}
}
func (r *rollingGauge) Add(val int64) {
r.policy.Append(float64(val))
}
func (r *rollingGauge) Reduce(f func(Iterator) float64) float64 {
return r.policy.Reduce(f)
}
func (r *rollingGauge) Avg() float64 {
return r.policy.Reduce(Avg)
}
func (r *rollingGauge) Min() float64 {
return r.policy.Reduce(Min)
}
func (r *rollingGauge) Max() float64 {
return r.policy.Reduce(Max)
}
func (r *rollingGauge) Sum() float64 {
return r.policy.Reduce(Sum)
}
func (r *rollingGauge) Value() int64 {
return int64(r.Sum())
}

@ -0,0 +1,192 @@
package metric
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestRollingGaugeAdd(t *testing.T) {
size := 3
bucketDuration := time.Second
opts := RollingGaugeOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingGauge(opts)
listBuckets := func() [][]float64 {
buckets := make([][]float64, 0)
r.Reduce(func(i Iterator) float64 {
for i.Next() {
bucket := i.Bucket()
buckets = append(buckets, bucket.Points)
}
return 0.0
})
return buckets
}
assert.Equal(t, [][]float64{{}, {}, {}}, listBuckets())
r.Add(1)
assert.Equal(t, [][]float64{{}, {}, {1}}, listBuckets())
time.Sleep(time.Second)
r.Add(2)
r.Add(3)
assert.Equal(t, [][]float64{{}, {1}, {2, 3}}, listBuckets())
time.Sleep(time.Second)
r.Add(4)
r.Add(5)
r.Add(6)
assert.Equal(t, [][]float64{{1}, {2, 3}, {4, 5, 6}}, listBuckets())
time.Sleep(time.Second)
r.Add(7)
assert.Equal(t, [][]float64{{2, 3}, {4, 5, 6}, {7}}, listBuckets())
}
func TestRollingGaugeReset(t *testing.T) {
size := 3
bucketDuration := time.Second
opts := RollingGaugeOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingGauge(opts)
listBuckets := func() [][]float64 {
buckets := make([][]float64, 0)
r.Reduce(func(i Iterator) float64 {
for i.Next() {
bucket := i.Bucket()
buckets = append(buckets, bucket.Points)
}
return 0.0
})
return buckets
}
r.Add(1)
time.Sleep(time.Second)
assert.Equal(t, [][]float64{{}, {1}}, listBuckets())
time.Sleep(time.Second)
assert.Equal(t, [][]float64{{1}}, listBuckets())
time.Sleep(time.Second)
assert.Equal(t, [][]float64{}, listBuckets())
// cross window
r.Add(1)
time.Sleep(time.Second * 5)
assert.Equal(t, [][]float64{}, listBuckets())
}
func TestRollingGaugeReduce(t *testing.T) {
size := 3
bucketDuration := time.Second
opts := RollingGaugeOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingGauge(opts)
for x := 0; x < size; x = x + 1 {
for i := 0; i <= x; i++ {
r.Add(int64(i))
}
if x < size-1 {
time.Sleep(bucketDuration)
}
}
var result = r.Reduce(func(i Iterator) float64 {
var result float64
for i.Next() {
bucket := i.Bucket()
for _, point := range bucket.Points {
result += point
}
}
return result
})
if result != 4.0 {
t.Fatalf("Validate sum of points. result: %f", result)
}
}
func TestRollingGaugeDataRace(t *testing.T) {
size := 3
bucketDuration := time.Second
opts := RollingGaugeOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingGauge(opts)
var stop = make(chan bool)
go func() {
for {
select {
case <-stop:
return
default:
r.Add(rand.Int63())
time.Sleep(time.Millisecond * 5)
}
}
}()
go func() {
for {
select {
case <-stop:
return
default:
_ = r.Reduce(func(i Iterator) float64 {
for i.Next() {
bucket := i.Bucket()
for range bucket.Points {
continue
}
}
return 0
})
}
}
}()
time.Sleep(time.Second * 3)
close(stop)
}
func BenchmarkRollingGaugeIncr(b *testing.B) {
size := 10
bucketDuration := time.Second
opts := RollingGaugeOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingGauge(opts)
b.ResetTimer()
for i := 0; i <= b.N; i++ {
r.Add(1.0)
}
}
func BenchmarkRollingGaugeReduce(b *testing.B) {
size := 10
bucketDuration := time.Second
opts := RollingGaugeOpts{
Size: size,
BucketDuration: bucketDuration,
}
r := NewRollingGauge(opts)
for i := 0; i <= 10; i++ {
r.Add(1.0)
time.Sleep(time.Millisecond * 500)
}
b.ResetTimer()
for i := 0; i <= b.N; i++ {
var _ = r.Reduce(func(i Iterator) float64 {
var result float64
for i.Next() {
bucket := i.Bucket()
if len(bucket.Points) != 0 {
result += bucket.Points[0]
}
}
return result
})
}
}

@ -0,0 +1,97 @@
package metric
import (
"sync"
"time"
)
// RollingPolicy is a policy for ring window based on time duration.
// RollingPolicy moves bucket offset with time duration.
// e.g. If the last point is appended one bucket duration ago,
// RollingPolicy will increment current offset.
type RollingPolicy struct {
mu sync.RWMutex
size int
window *Window
offset int
bucketDuration time.Duration
lastAppendTime time.Time
}
// RollingPolicyOpts contains the arguments for creating RollingPolicy.
type RollingPolicyOpts struct {
BucketDuration time.Duration
}
// NewRollingPolicy creates a new RollingPolicy based on the given window and RollingPolicyOpts.
func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy {
return &RollingPolicy{
window: window,
size: window.Size(),
offset: 0,
bucketDuration: opts.BucketDuration,
lastAppendTime: time.Now(),
}
}
func (r *RollingPolicy) timespan() int {
v := int(time.Since(r.lastAppendTime) / r.bucketDuration)
if v < r.size && v > -1 { // maybe time backwards
return v
}
return r.size
}
func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
r.mu.Lock()
timespan := r.timespan()
if timespan > 0 {
offset := r.offset
// reset the expired buckets
s := offset + 1
e, e1 := s+timespan, 0 // e: reset offset must start from offset+1
if e > r.size {
e1 = e - r.size
e = r.size
}
for i := s; i < e; i++ {
r.window.ResetBucket(i)
offset = i
}
for i := 0; i < e1; i++ {
r.window.ResetBucket(i)
offset = i
}
r.offset = offset
r.lastAppendTime = time.Now()
}
f(r.offset, val)
r.mu.Unlock()
}
// Append appends the given points to the window.
func (r *RollingPolicy) Append(val float64) {
r.add(r.window.Append, val)
}
// Add adds the given value to the latest point within bucket.
func (r *RollingPolicy) Add(val float64) {
r.add(r.window.Add, val)
}
// Reduce applies the reduction function to all buckets within the window.
func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) {
r.mu.RLock()
timespan := r.timespan()
if count := r.size - timespan; count > 0 {
offset := r.offset + timespan + 1
if offset >= r.size {
offset = offset - r.size
}
val = f(r.window.Iterator(offset, count))
}
r.mu.RUnlock()
return val
}

@ -0,0 +1,107 @@
package metric
// Bucket contains multiple float64 points.
type Bucket struct {
Points []float64
Count int64
next *Bucket
}
// Append appends the given value to the bucket.
func (b *Bucket) Append(val float64) {
b.Points = append(b.Points, val)
b.Count++
}
// Add adds the given value to the point.
func (b *Bucket) Add(offset int, val float64) {
b.Points[offset] += val
b.Count++
}
// Reset empties the bucket.
func (b *Bucket) Reset() {
b.Points = b.Points[:0]
b.Count = 0
}
// Next returns the next bucket.
func (b *Bucket) Next() *Bucket {
return b.next
}
// Window contains multiple buckets.
type Window struct {
window []Bucket
size int
}
// WindowOpts contains the arguments for creating Window.
type WindowOpts struct {
Size int
}
// NewWindow creates a new Window based on WindowOpts.
func NewWindow(opts WindowOpts) *Window {
buckets := make([]Bucket, opts.Size)
for offset := range buckets {
buckets[offset] = Bucket{Points: make([]float64, 0)}
nextOffset := offset + 1
if nextOffset == opts.Size {
nextOffset = 0
}
buckets[offset].next = &buckets[nextOffset]
}
return &Window{window: buckets, size: opts.Size}
}
// ResetWindow empties all buckets within the window.
func (w *Window) ResetWindow() {
for offset := range w.window {
w.ResetBucket(offset)
}
}
// ResetBucket empties the bucket based on the given offset.
func (w *Window) ResetBucket(offset int) {
w.window[offset].Reset()
}
// ResetBuckets empties the buckets based on the given offsets.
func (w *Window) ResetBuckets(offsets []int) {
for _, offset := range offsets {
w.ResetBucket(offset)
}
}
// Append appends the given value to the bucket where index equals the given offset.
func (w *Window) Append(offset int, val float64) {
w.window[offset].Append(val)
}
// Add adds the given value to the latest point within bucket where index equals the given offset.
func (w *Window) Add(offset int, val float64) {
if w.window[offset].Count == 0 {
w.window[offset].Append(val)
return
}
w.window[offset].Add(0, val)
}
// Bucket returns the bucket where index equals the given offset.
func (w *Window) Bucket(offset int) Bucket {
return w.window[offset]
}
// Size returns the size of the window.
func (w *Window) Size() int {
return w.size
}
// Iterator returns the bucket iterator.
func (w *Window) Iterator(offset int, count int) Iterator {
return Iterator{
count: count,
cur: &w.window[offset],
}
}

@ -0,0 +1,67 @@
package metric
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestWindowResetWindow(t *testing.T) {
opts := WindowOpts{Size: 3}
window := NewWindow(opts)
for i := 0; i < opts.Size; i++ {
window.Append(i, 1.0)
}
window.ResetWindow()
for i := 0; i < opts.Size; i++ {
assert.Equal(t, len(window.Bucket(i).Points), 0)
}
}
func TestWindowResetBucket(t *testing.T) {
opts := WindowOpts{Size: 3}
window := NewWindow(opts)
for i := 0; i < opts.Size; i++ {
window.Append(i, 1.0)
}
window.ResetBucket(1)
assert.Equal(t, len(window.Bucket(1).Points), 0)
assert.Equal(t, window.Bucket(0).Points[0], float64(1.0))
assert.Equal(t, window.Bucket(2).Points[0], float64(1.0))
}
func TestWindowResetBuckets(t *testing.T) {
opts := WindowOpts{Size: 3}
window := NewWindow(opts)
for i := 0; i < opts.Size; i++ {
window.Append(i, 1.0)
}
window.ResetBuckets([]int{0, 1, 2})
for i := 0; i < opts.Size; i++ {
assert.Equal(t, len(window.Bucket(i).Points), 0)
}
}
func TestWindowAppend(t *testing.T) {
opts := WindowOpts{Size: 3}
window := NewWindow(opts)
for i := 0; i < opts.Size; i++ {
window.Append(i, 1.0)
}
for i := 0; i < opts.Size; i++ {
assert.Equal(t, window.Bucket(i).Points[0], float64(1.0))
}
}
func TestWindowAdd(t *testing.T) {
opts := WindowOpts{Size: 3}
window := NewWindow(opts)
window.Append(0, 1.0)
window.Add(0, 1.0)
assert.Equal(t, window.Bucket(0).Points[0], float64(2.0))
}
func TestWindowSize(t *testing.T) {
opts := WindowOpts{Size: 3}
window := NewWindow(opts)
assert.Equal(t, window.Size(), 3)
}

@ -1,5 +0,0 @@
# summary
## 项目简介
summary计数器

@ -1,129 +0,0 @@
package summary
import (
"sync"
"time"
)
type bucket struct {
val int64
count int64
next *bucket
}
func (b *bucket) Add(val int64) {
b.val += val
b.count++
}
func (b *bucket) Value() (int64, int64) {
return b.val, b.count
}
func (b *bucket) Reset() {
b.val = 0
b.count = 0
}
// Summary is a summary interface.
type Summary interface {
Add(int64)
Reset()
Value() (val int64, cnt int64)
}
type summary struct {
mu sync.RWMutex
buckets []bucket
bucketTime int64
lastAccess int64
cur *bucket
}
// New new a summary.
//
// use RollingCounter creates a new window. windowTime is the time covering the entire
// window. windowBuckets is the number of buckets the window is divided into.
// An example: a 10 second window with 10 buckets will have 10 buckets covering
// 1 second each.
func New(window time.Duration, winBucket int) Summary {
buckets := make([]bucket, winBucket)
bucket := &buckets[0]
for i := 1; i < winBucket; i++ {
bucket.next = &buckets[i]
bucket = bucket.next
}
bucket.next = &buckets[0]
bucketTime := time.Duration(window.Nanoseconds() / int64(winBucket))
return &summary{
cur: &buckets[0],
buckets: buckets,
bucketTime: int64(bucketTime),
lastAccess: time.Now().UnixNano(),
}
}
// Add increments the summary by value.
func (s *summary) Add(val int64) {
s.mu.Lock()
s.lastBucket().Add(val)
s.mu.Unlock()
}
// Value get the summary value and count.
func (s *summary) Value() (val int64, cnt int64) {
now := time.Now().UnixNano()
s.mu.RLock()
b := s.cur
i := s.elapsed(now)
for j := 0; j < len(s.buckets); j++ {
// skip all future reset bucket.
if i > 0 {
i--
} else {
v, c := b.Value()
val += v
cnt += c
}
b = b.next
}
s.mu.RUnlock()
return
}
// Reset reset the counter.
func (s *summary) Reset() {
s.mu.Lock()
for i := range s.buckets {
s.buckets[i].Reset()
}
s.mu.Unlock()
}
func (s *summary) elapsed(now int64) (i int) {
var e int64
if e = now - s.lastAccess; e <= s.bucketTime {
return
}
if i = int(e / s.bucketTime); i > len(s.buckets) {
i = len(s.buckets)
}
return
}
func (s *summary) lastBucket() (b *bucket) {
now := time.Now().UnixNano()
b = s.cur
// reset the buckets between now and number of buckets ago. If
// that is more that the existing buckets, reset all.
if i := s.elapsed(now); i > 0 {
s.lastAccess = now
for ; i > 0; i-- {
// replace the next used bucket.
b = b.next
b.Reset()
}
}
s.cur = b
return
}

@ -1,69 +0,0 @@
package summary
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestSummaryMinInterval(t *testing.T) {
count := New(time.Second/2, 10)
tk1 := time.NewTicker(5 * time.Millisecond)
defer tk1.Stop()
for i := 0; i < 100; i++ {
<-tk1.C
count.Add(2)
}
v, c := count.Value()
t.Logf("count value: %d, %d\n", v, c)
// 10% of error when bucket is 10
if v < 190 || v > 210 {
t.Errorf("expect value in [90-110] get %d", v)
}
// 10% of error when bucket is 10
if c < 90 || c > 110 {
t.Errorf("expect value in [90-110] get %d", v)
}
}
func TestSummary(t *testing.T) {
s := New(time.Second, 10)
t.Run("add", func(t *testing.T) {
s.Add(1)
v, c := s.Value()
assert.Equal(t, v, int64(1))
assert.Equal(t, c, int64(1))
})
time.Sleep(time.Millisecond * 110)
t.Run("add2", func(t *testing.T) {
s.Add(1)
v, c := s.Value()
assert.Equal(t, v, int64(2))
assert.Equal(t, c, int64(2))
})
time.Sleep(time.Millisecond * 900) // expire one bucket, 110 + 900
t.Run("expire", func(t *testing.T) {
v, c := s.Value()
assert.Equal(t, v, int64(1))
assert.Equal(t, c, int64(1))
s.Add(1)
v, c = s.Value()
assert.Equal(t, v, int64(2)) // expire one bucket
assert.Equal(t, c, int64(2)) // expire one bucket
})
time.Sleep(time.Millisecond * 1100)
t.Run("expire_all", func(t *testing.T) {
v, c := s.Value()
assert.Equal(t, v, int64(0))
assert.Equal(t, c, int64(0))
})
t.Run("reset", func(t *testing.T) {
s.Reset()
v, c := s.Value()
assert.Equal(t, v, int64(0))
assert.Equal(t, c, int64(0))
})
}

@ -218,14 +218,6 @@ func (d *Dao) pingRedis(ctx context.Context) (err error) {
## 项目简介 ## 项目简介
1. 1.
## 编译环境
## 依赖包
## 编译执行
` `
_tplService = `package service _tplService = `package service

Loading…
Cancel
Save