Merge pull request #384 from LibiChai/master

#206 add breaker doc
pull/390/head
Terry.Mao 5 years ago committed by GitHub
commit 23e30443ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      doc/wiki-cn/breaker.md
  2. 2
      example/blademaster/middleware/auth/example_test.go
  3. 4
      pkg/database/hbase/metrics.go
  4. 4
      pkg/naming/etcd/etcd_test.go
  5. 2
      pkg/net/http/blademaster/context.go
  6. 2
      pkg/net/ip/ip.go
  7. 3
      pkg/net/netutil/breaker/README.md
  8. 12
      pkg/net/netutil/breaker/breaker.go
  9. 6
      pkg/net/netutil/breaker/breaker_test.go
  10. 3
      pkg/net/netutil/breaker/example_test.go
  11. 3
      pkg/net/rpc/warden/exapmle_test.go
  12. 3
      pkg/net/rpc/warden/internal/benchmark/bench/client/client.go
  13. 3
      pkg/net/rpc/warden/internal/benchmark/helloworld/client/greeter_client.go
  14. 3
      pkg/net/rpc/warden/resolver/direct/test/direct_test.go
  15. 3
      pkg/net/rpc/warden/resolver/test/resovler_test.go
  16. 11
      pkg/net/rpc/warden/server_test.go
  17. 2
      pkg/net/trace/zipkin/zipkin_test.go
  18. 4
      tool/protobuf/pkg/gen/main.go
  19. 1
      tool/protobuf/pkg/utils/stringutils.go

@ -0,0 +1,49 @@
## 熔断器/Breaker
熔断器是为了当依赖的服务已经出现故障时,主动阻止对依赖服务的请求。保证自身服务的正常运行不受依赖服务影响,防止雪崩效应。
## kratos内置breaker的组件
一般情况下直接使用kratos的组件时都自带了熔断逻辑,并且在提供了对应的breaker配置项。
目前在kratos内集成熔断器的组件有:
- RPC client: pkg/net/rpc/warden/client
- Mysql client:pkg/database/sql
- Tidb client:pkg/database/tidb
- Http client:pkg/net/http/blademaster
## 使用说明
```go
//初始化熔断器组
//一组熔断器公用同一个配置项,可从分组内取出单个熔断器使用。可用在比如mysql主从分离等场景。
brkGroup := breaker.NewGroup(&breaker.Config{})
//为每一个连接指定一个brekaker
//此处假设一个客户端连接对象实例为conn
//breakName定义熔断器名称 一般可以使用连接地址
breakName = conn.Addr
conn.breaker = brkGroup.Get(breakName)
//在连接发出请求前判断熔断器状态
if err = conn.breaker.Allow(); err != nil {
return
}
//连接执行成功或失败将结果告知braker
if(respErr != nil){
conn.breaker.MarkFailed()
}else{
conn.breaker.MarkSuccess()
}
```
## 配置说明
```go
type Config struct {
SwitchOff bool // 熔断器开关,默认关 false.
K float64 //触发熔断的错误率(K = 1 - 1/错误率)
Window xtime.Duration //统计桶窗口时间
Bucket int //统计桶大小
Request int64 //触发熔断的最少请求数量(请求少于该值时不会触发熔断)
}
```

@ -3,8 +3,8 @@ package auth_test
import ( import (
"fmt" "fmt"
bm "github.com/bilibili/kratos/pkg/net/http/blademaster"
"github.com/bilibili/kratos/example/blademaster/middleware/auth" "github.com/bilibili/kratos/example/blademaster/middleware/auth"
bm "github.com/bilibili/kratos/pkg/net/http/blademaster"
"github.com/bilibili/kratos/pkg/net/metadata" "github.com/bilibili/kratos/pkg/net/metadata"
) )

@ -41,8 +41,8 @@ func codeFromErr(err error) string {
code = "connot_find_region" code = "connot_find_region"
case gohbase.TableNotFound: case gohbase.TableNotFound:
code = "table_not_found" code = "table_not_found"
//case gohbase.ErrRegionUnavailable: //case gohbase.ErrRegionUnavailable:
// code = "region_unavailable" // code = "region_unavailable"
} }
return code return code
} }

@ -3,11 +3,11 @@ package etcd
import ( import (
"context" "context"
"fmt" "fmt"
"testing"
"time"
"github.com/bilibili/kratos/pkg/naming" "github.com/bilibili/kratos/pkg/naming"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
"google.golang.org/grpc" "google.golang.org/grpc"
"testing"
"time"
) )
func TestNew(t *testing.T) { func TestNew(t *testing.T) {

@ -49,7 +49,6 @@ type Context struct {
RoutePath string RoutePath string
Params Params Params Params
} }
/************************************/ /************************************/
@ -67,7 +66,6 @@ func (c *Context) Next() {
} }
} }
// Abort prevents pending handlers from being called. Note that this will not stop the current handler. // Abort prevents pending handlers from being called. Note that this will not stop the current handler.
// Let's say you have an authorization middleware that validates that the current request is authorized. // Let's say you have an authorization middleware that validates that the current request is authorized.
// If the authorization fails (ex: the password does not match), call Abort to ensure the remaining handlers // If the authorization fails (ex: the password does not match), call Abort to ensure the remaining handlers

@ -70,5 +70,5 @@ func InternalIP() string {
// isUp Interface is up // isUp Interface is up
func isUp(v net.Flags) bool { func isUp(v net.Flags) bool {
return v&net.FlagUp == net.FlagUp return v&net.FlagUp == net.FlagUp
} }

@ -11,10 +11,9 @@
> 4. 默认配置如下所示: > 4. 默认配置如下所示:
_conf = &Config{ _conf = &Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10, Bucket: 10,
Ratio: 0.5,
Request: 100, Request: 100,
K:1.5,
} }
##### 测试 ##### 测试

@ -11,10 +11,6 @@ import (
type Config struct { type Config struct {
SwitchOff bool // breaker switch,default off. SwitchOff bool // breaker switch,default off.
// Hystrix
Ratio float32
Sleep xtime.Duration
// Google // Google
K float64 K float64
@ -30,12 +26,6 @@ func (conf *Config) fix() {
if conf.Request == 0 { if conf.Request == 0 {
conf.Request = 100 conf.Request = 100
} }
if conf.Ratio == 0 {
conf.Ratio = 0.5
}
if conf.Sleep == 0 {
conf.Sleep = xtime.Duration(500 * time.Millisecond)
}
if conf.Bucket == 0 { if conf.Bucket == 0 {
conf.Bucket = 10 conf.Bucket = 10
} }
@ -84,8 +74,6 @@ var (
Bucket: 10, Bucket: 10,
Request: 100, Request: 100,
Sleep: xtime.Duration(500 * time.Millisecond),
Ratio: 0.5,
// Percentage of failures must be lower than 33.33% // Percentage of failures must be lower than 33.33%
K: 1.5, K: 1.5,

@ -28,9 +28,7 @@ func TestGroup(t *testing.T) {
g := NewGroup(_conf) g := NewGroup(_conf)
c := &Config{ c := &Config{
Window: xtime.Duration(1 * time.Second), Window: xtime.Duration(1 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10, Bucket: 10,
Ratio: 0.5,
Request: 100, Request: 100,
SwitchOff: !_conf.SwitchOff, SwitchOff: !_conf.SwitchOff,
} }
@ -44,9 +42,7 @@ func TestInit(t *testing.T) {
switchOff := _conf.SwitchOff switchOff := _conf.SwitchOff
c := &Config{ c := &Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10, Bucket: 10,
Ratio: 0.5,
Request: 100, Request: 100,
SwitchOff: !switchOff, SwitchOff: !switchOff,
} }
@ -69,9 +65,7 @@ func TestGo(t *testing.T) {
_group.Reload(&Config{ _group.Reload(&Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond),
Bucket: 10, Bucket: 10,
Ratio: 0.5,
Request: 100, Request: 100,
SwitchOff: true, SwitchOff: true,
}) })

@ -12,9 +12,8 @@ import (
func ExampleGroup() { func ExampleGroup() {
c := &breaker.Config{ c := &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(100 * time.Millisecond), K: 1.5,
Bucket: 10, Bucket: 10,
Ratio: 0.5,
Request: 100, Request: 100,
} }
// init default config // init default config

@ -60,9 +60,8 @@ func ExampleClient() {
Timeout: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{ Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second),
Bucket: 10, Bucket: 10,
Ratio: 0.3, K: 1.5,
Request: 20, Request: 20,
}, },
}) })

@ -39,10 +39,9 @@ func wardenCli() proto.HelloClient {
Timeout: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{ Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second),
Bucket: 10, Bucket: 10,
Ratio: 0.3,
Request: 20, Request: 20,
K: 1.5,
}, },
}, },
grpc.WithInitialWindowSize(iws), grpc.WithInitialWindowSize(iws),

@ -21,10 +21,9 @@ var (
Timeout: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{ Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second),
Bucket: 10, Bucket: 10,
Ratio: 0.3,
Request: 20, Request: 20,
K: 1.5,
}, },
} }
cli pb.GreeterClient cli pb.GreeterClient

@ -55,10 +55,9 @@ func createTestClient(t *testing.T, connStr string) pb.GreeterClient {
Timeout: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{ Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second),
Bucket: 10, Bucket: 10,
Ratio: 0.3,
Request: 20, Request: 20,
K: 1.5,
}, },
}) })
conn, err := client.Dial(context.TODO(), connStr) conn, err := client.Dial(context.TODO(), connStr)

@ -74,10 +74,9 @@ func createTestClient(t *testing.T) pb.GreeterClient {
Timeout: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{ Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second),
Bucket: 10, Bucket: 10,
Ratio: 0.3,
Request: 20, Request: 20,
K: 1.5,
}, },
}) })
conn, err := client.Dial(context.TODO(), "mockdiscovery://authority/main.test") conn, err := client.Dial(context.TODO(), "mockdiscovery://authority/main.test")

@ -41,11 +41,9 @@ var (
Dial: xtime.Duration(time.Second * 10), Dial: xtime.Duration(time.Second * 10),
Timeout: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{ Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second), Bucket: 10,
Bucket: 10, K: 1.5,
Ratio: 0.3,
Request: 20,
}, },
} }
clientConfig2 = ClientConfig{ clientConfig2 = ClientConfig{
@ -53,10 +51,9 @@ var (
Timeout: xtime.Duration(time.Second * 10), Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{ Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second), Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second),
Bucket: 10, Bucket: 10,
Ratio: 0.3,
Request: 20, Request: 20,
K: 1.5,
}, },
Method: map[string]*ClientConfig{`/testproto.Greeter/SayHello`: {Timeout: xtime.Duration(time.Millisecond * 200)}}, Method: map[string]*ClientConfig{`/testproto.Greeter/SayHello`: {Timeout: xtime.Duration(time.Millisecond * 200)}},
} }

@ -37,7 +37,7 @@ func TestZipkin(t *testing.T) {
t2 := trace.NewTracer("service2", report, true) t2 := trace.NewTracer("service2", report, true)
sp1 := t1.New("option_1") sp1 := t1.New("option_1")
sp2 := sp1.Fork("service3", "opt_client") sp2 := sp1.Fork("service3", "opt_client")
sp2.SetLog(trace.Log("log_k","log_v")) sp2.SetLog(trace.Log("log_k", "log_v"))
// inject // inject
header := make(http.Header) header := make(http.Header)
t1.Inject(sp2, trace.HTTPFormat, header) t1.Inject(sp2, trace.HTTPFormat, header)

@ -3,9 +3,9 @@ package gen
import ( import (
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"strings" "strings"
"log"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/golang/protobuf/protoc-gen-go/descriptor"
@ -70,7 +70,6 @@ func writeResponse(w io.Writer, resp *plugin.CodeGeneratorResponse) {
} }
} }
// Fail log and exit // Fail log and exit
func Fail(msgs ...string) { func Fail(msgs ...string) {
s := strings.Join(msgs, " ") s := strings.Join(msgs, " ")
@ -85,7 +84,6 @@ func Info(msgs ...string) {
os.Exit(1) os.Exit(1)
} }
// Error log and exit // Error log and exit
func Error(err error, msgs ...string) { func Error(err error, msgs ...string) {
s := strings.Join(msgs, " ") + ":" + err.Error() s := strings.Join(msgs, " ") + ":" + err.Error()

@ -1,4 +1,3 @@
package utils package utils
import ( import (

Loading…
Cancel
Save