diff --git a/doc/wiki-cn/breaker.md b/doc/wiki-cn/breaker.md index e69de29bb..cef56fbcd 100644 --- a/doc/wiki-cn/breaker.md +++ b/doc/wiki-cn/breaker.md @@ -0,0 +1,49 @@ +## 熔断器/Breaker +熔断器是为了当依赖的服务已经出现故障时,主动阻止对依赖服务的请求。保证自身服务的正常运行不受依赖服务影响,防止雪崩效应。 + +## kratos内置breaker的组件 +一般情况下直接使用kratos的组件时都自带了熔断逻辑,并且在提供了对应的breaker配置项。 +目前在kratos内集成熔断器的组件有: +- RPC client: pkg/net/rpc/warden/client +- Mysql client:pkg/database/sql +- Tidb client:pkg/database/tidb +- Http client:pkg/net/http/blademaster + +## 使用说明 +```go + //初始化熔断器组 + //一组熔断器公用同一个配置项,可从分组内取出单个熔断器使用。可用在比如mysql主从分离等场景。 + brkGroup := breaker.NewGroup(&breaker.Config{}) + //为每一个连接指定一个brekaker + //此处假设一个客户端连接对象实例为conn + //breakName定义熔断器名称 一般可以使用连接地址 + breakName = conn.Addr + conn.breaker = brkGroup.Get(breakName) + + //在连接发出请求前判断熔断器状态 + if err = conn.breaker.Allow(); err != nil { + return + } + + //连接执行成功或失败将结果告知braker + if(respErr != nil){ + conn.breaker.MarkFailed() + }else{ + conn.breaker.MarkSuccess() + } + +``` + +## 配置说明 +```go +type Config struct { + SwitchOff bool // 熔断器开关,默认关 false. + + K float64 //触发熔断的错误率(K = 1 - 1/错误率) + + Window xtime.Duration //统计桶窗口时间 + Bucket int //统计桶大小 + Request int64 //触发熔断的最少请求数量(请求少于该值时不会触发熔断) +} +``` + diff --git a/example/blademaster/middleware/auth/example_test.go b/example/blademaster/middleware/auth/example_test.go index b97e22276..0d5b30b46 100644 --- a/example/blademaster/middleware/auth/example_test.go +++ b/example/blademaster/middleware/auth/example_test.go @@ -3,8 +3,8 @@ package auth_test import ( "fmt" - bm "github.com/bilibili/kratos/pkg/net/http/blademaster" "github.com/bilibili/kratos/example/blademaster/middleware/auth" + bm "github.com/bilibili/kratos/pkg/net/http/blademaster" "github.com/bilibili/kratos/pkg/net/metadata" ) diff --git a/pkg/database/hbase/metrics.go b/pkg/database/hbase/metrics.go index 4867271d5..a29f82e3f 100644 --- a/pkg/database/hbase/metrics.go +++ b/pkg/database/hbase/metrics.go @@ -41,8 +41,8 @@ func codeFromErr(err error) string { code = "connot_find_region" case gohbase.TableNotFound: code = "table_not_found" - //case gohbase.ErrRegionUnavailable: - // code = "region_unavailable" + //case gohbase.ErrRegionUnavailable: + // code = "region_unavailable" } return code } diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go index 72d15aa15..5dfd2e539 100644 --- a/pkg/naming/etcd/etcd_test.go +++ b/pkg/naming/etcd/etcd_test.go @@ -3,11 +3,11 @@ package etcd import ( "context" "fmt" - "testing" - "time" "github.com/bilibili/kratos/pkg/naming" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" + "testing" + "time" ) func TestNew(t *testing.T) { diff --git a/pkg/net/http/blademaster/context.go b/pkg/net/http/blademaster/context.go index d9957b19e..8e44df932 100644 --- a/pkg/net/http/blademaster/context.go +++ b/pkg/net/http/blademaster/context.go @@ -49,7 +49,6 @@ type Context struct { RoutePath string Params Params - } /************************************/ @@ -67,7 +66,6 @@ func (c *Context) Next() { } } - // Abort prevents pending handlers from being called. Note that this will not stop the current handler. // Let's say you have an authorization middleware that validates that the current request is authorized. // If the authorization fails (ex: the password does not match), call Abort to ensure the remaining handlers diff --git a/pkg/net/ip/ip.go b/pkg/net/ip/ip.go index 386d7e23f..0966ccc5e 100644 --- a/pkg/net/ip/ip.go +++ b/pkg/net/ip/ip.go @@ -68,7 +68,7 @@ func InternalIP() string { return "" } -// isUp Interface is up +// isUp Interface is up func isUp(v net.Flags) bool { - return v&net.FlagUp == net.FlagUp + return v&net.FlagUp == net.FlagUp } diff --git a/pkg/net/netutil/breaker/README.md b/pkg/net/netutil/breaker/README.md index f700157f5..d4294a9e2 100644 --- a/pkg/net/netutil/breaker/README.md +++ b/pkg/net/netutil/breaker/README.md @@ -11,10 +11,9 @@ > 4. 默认配置如下所示: _conf = &Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(100 * time.Millisecond), Bucket: 10, - Ratio: 0.5, Request: 100, + K:1.5, } ##### 测试 diff --git a/pkg/net/netutil/breaker/breaker.go b/pkg/net/netutil/breaker/breaker.go index f7b681ac3..2d4d6ecbd 100644 --- a/pkg/net/netutil/breaker/breaker.go +++ b/pkg/net/netutil/breaker/breaker.go @@ -11,10 +11,6 @@ import ( type Config struct { SwitchOff bool // breaker switch,default off. - // Hystrix - Ratio float32 - Sleep xtime.Duration - // Google K float64 @@ -30,12 +26,6 @@ func (conf *Config) fix() { if conf.Request == 0 { conf.Request = 100 } - if conf.Ratio == 0 { - conf.Ratio = 0.5 - } - if conf.Sleep == 0 { - conf.Sleep = xtime.Duration(500 * time.Millisecond) - } if conf.Bucket == 0 { conf.Bucket = 10 } @@ -84,8 +74,6 @@ var ( Bucket: 10, Request: 100, - Sleep: xtime.Duration(500 * time.Millisecond), - Ratio: 0.5, // Percentage of failures must be lower than 33.33% K: 1.5, diff --git a/pkg/net/netutil/breaker/breaker_test.go b/pkg/net/netutil/breaker/breaker_test.go index 4e024251f..6f4b2fc43 100644 --- a/pkg/net/netutil/breaker/breaker_test.go +++ b/pkg/net/netutil/breaker/breaker_test.go @@ -28,9 +28,7 @@ func TestGroup(t *testing.T) { g := NewGroup(_conf) c := &Config{ Window: xtime.Duration(1 * time.Second), - Sleep: xtime.Duration(100 * time.Millisecond), Bucket: 10, - Ratio: 0.5, Request: 100, SwitchOff: !_conf.SwitchOff, } @@ -44,9 +42,7 @@ func TestInit(t *testing.T) { switchOff := _conf.SwitchOff c := &Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(100 * time.Millisecond), Bucket: 10, - Ratio: 0.5, Request: 100, SwitchOff: !switchOff, } @@ -69,9 +65,7 @@ func TestGo(t *testing.T) { _group.Reload(&Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(100 * time.Millisecond), Bucket: 10, - Ratio: 0.5, Request: 100, SwitchOff: true, }) diff --git a/pkg/net/netutil/breaker/example_test.go b/pkg/net/netutil/breaker/example_test.go index f0f386c93..9afe49d30 100644 --- a/pkg/net/netutil/breaker/example_test.go +++ b/pkg/net/netutil/breaker/example_test.go @@ -12,9 +12,8 @@ import ( func ExampleGroup() { c := &breaker.Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(100 * time.Millisecond), + K: 1.5, Bucket: 10, - Ratio: 0.5, Request: 100, } // init default config diff --git a/pkg/net/rpc/warden/exapmle_test.go b/pkg/net/rpc/warden/exapmle_test.go index 5461e7b2e..10c037958 100644 --- a/pkg/net/rpc/warden/exapmle_test.go +++ b/pkg/net/rpc/warden/exapmle_test.go @@ -60,9 +60,8 @@ func ExampleClient() { Timeout: xtime.Duration(time.Second * 10), Breaker: &breaker.Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(3 * time.Second), Bucket: 10, - Ratio: 0.3, + K: 1.5, Request: 20, }, }) diff --git a/pkg/net/rpc/warden/internal/benchmark/bench/client/client.go b/pkg/net/rpc/warden/internal/benchmark/bench/client/client.go index 137e90b82..62710092a 100644 --- a/pkg/net/rpc/warden/internal/benchmark/bench/client/client.go +++ b/pkg/net/rpc/warden/internal/benchmark/bench/client/client.go @@ -39,10 +39,9 @@ func wardenCli() proto.HelloClient { Timeout: xtime.Duration(time.Second * 10), Breaker: &breaker.Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(3 * time.Second), Bucket: 10, - Ratio: 0.3, Request: 20, + K: 1.5, }, }, grpc.WithInitialWindowSize(iws), diff --git a/pkg/net/rpc/warden/internal/benchmark/helloworld/client/greeter_client.go b/pkg/net/rpc/warden/internal/benchmark/helloworld/client/greeter_client.go index ada604806..d47c50e43 100644 --- a/pkg/net/rpc/warden/internal/benchmark/helloworld/client/greeter_client.go +++ b/pkg/net/rpc/warden/internal/benchmark/helloworld/client/greeter_client.go @@ -21,10 +21,9 @@ var ( Timeout: xtime.Duration(time.Second * 10), Breaker: &breaker.Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(3 * time.Second), Bucket: 10, - Ratio: 0.3, Request: 20, + K: 1.5, }, } cli pb.GreeterClient diff --git a/pkg/net/rpc/warden/resolver/direct/test/direct_test.go b/pkg/net/rpc/warden/resolver/direct/test/direct_test.go index a3de922bf..b898fd643 100644 --- a/pkg/net/rpc/warden/resolver/direct/test/direct_test.go +++ b/pkg/net/rpc/warden/resolver/direct/test/direct_test.go @@ -55,10 +55,9 @@ func createTestClient(t *testing.T, connStr string) pb.GreeterClient { Timeout: xtime.Duration(time.Second * 10), Breaker: &breaker.Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(3 * time.Second), Bucket: 10, - Ratio: 0.3, Request: 20, + K: 1.5, }, }) conn, err := client.Dial(context.TODO(), connStr) diff --git a/pkg/net/rpc/warden/resolver/test/resovler_test.go b/pkg/net/rpc/warden/resolver/test/resovler_test.go index 6e69fcb3b..7af8873a0 100644 --- a/pkg/net/rpc/warden/resolver/test/resovler_test.go +++ b/pkg/net/rpc/warden/resolver/test/resovler_test.go @@ -74,10 +74,9 @@ func createTestClient(t *testing.T) pb.GreeterClient { Timeout: xtime.Duration(time.Second * 10), Breaker: &breaker.Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(3 * time.Second), Bucket: 10, - Ratio: 0.3, Request: 20, + K: 1.5, }, }) conn, err := client.Dial(context.TODO(), "mockdiscovery://authority/main.test") diff --git a/pkg/net/rpc/warden/server_test.go b/pkg/net/rpc/warden/server_test.go index b6786290c..bbacec768 100644 --- a/pkg/net/rpc/warden/server_test.go +++ b/pkg/net/rpc/warden/server_test.go @@ -41,11 +41,9 @@ var ( Dial: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10), Breaker: &breaker.Config{ - Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(3 * time.Second), - Bucket: 10, - Ratio: 0.3, - Request: 20, + Window: xtime.Duration(3 * time.Second), + Bucket: 10, + K: 1.5, }, } clientConfig2 = ClientConfig{ @@ -53,10 +51,9 @@ var ( Timeout: xtime.Duration(time.Second * 10), Breaker: &breaker.Config{ Window: xtime.Duration(3 * time.Second), - Sleep: xtime.Duration(3 * time.Second), Bucket: 10, - Ratio: 0.3, Request: 20, + K: 1.5, }, Method: map[string]*ClientConfig{`/testproto.Greeter/SayHello`: {Timeout: xtime.Duration(time.Millisecond * 200)}}, } diff --git a/pkg/net/trace/zipkin/zipkin_test.go b/pkg/net/trace/zipkin/zipkin_test.go index a68884a91..c65dd5888 100644 --- a/pkg/net/trace/zipkin/zipkin_test.go +++ b/pkg/net/trace/zipkin/zipkin_test.go @@ -37,7 +37,7 @@ func TestZipkin(t *testing.T) { t2 := trace.NewTracer("service2", report, true) sp1 := t1.New("option_1") sp2 := sp1.Fork("service3", "opt_client") - sp2.SetLog(trace.Log("log_k","log_v")) + sp2.SetLog(trace.Log("log_k", "log_v")) // inject header := make(http.Header) t1.Inject(sp2, trace.HTTPFormat, header) diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go index 304ab648d..98a9d648e 100644 --- a/pkg/ratelimit/bbr/bbr.go +++ b/pkg/ratelimit/bbr/bbr.go @@ -32,7 +32,7 @@ func init() { go cpuproc() } -// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay) +// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay) func cpuproc() { ticker := time.NewTicker(time.Millisecond * 250) defer func() { diff --git a/tool/protobuf/pkg/gen/main.go b/tool/protobuf/pkg/gen/main.go index e68f6a212..071ba2ae2 100644 --- a/tool/protobuf/pkg/gen/main.go +++ b/tool/protobuf/pkg/gen/main.go @@ -3,9 +3,9 @@ package gen import ( "io" "io/ioutil" + "log" "os" "strings" - "log" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -70,7 +70,6 @@ func writeResponse(w io.Writer, resp *plugin.CodeGeneratorResponse) { } } - // Fail log and exit func Fail(msgs ...string) { s := strings.Join(msgs, " ") @@ -85,10 +84,9 @@ func Info(msgs ...string) { os.Exit(1) } - // Error log and exit func Error(err error, msgs ...string) { s := strings.Join(msgs, " ") + ":" + err.Error() log.Print("error:", s) os.Exit(1) -} \ No newline at end of file +} diff --git a/tool/protobuf/pkg/utils/stringutils.go b/tool/protobuf/pkg/utils/stringutils.go index d36704c6b..8da21c8fe 100644 --- a/tool/protobuf/pkg/utils/stringutils.go +++ b/tool/protobuf/pkg/utils/stringutils.go @@ -1,4 +1,3 @@ - package utils import ( @@ -95,4 +94,4 @@ func AlphaDigitize(r rune) rune { // letters, numbers, and underscore. func CleanIdentifier(s string) string { return strings.Map(AlphaDigitize, s) -} \ No newline at end of file +}