diff --git a/doc/wiki-cn/warden-resolver.md b/doc/wiki-cn/warden-resolver.md index 96585f193..0e84a9044 100644 --- a/doc/wiki-cn/warden-resolver.md +++ b/doc/wiki-cn/warden-resolver.md @@ -219,6 +219,7 @@ const AppID = "demo.service" // NOTE: example func init(){ // NOTE: 注意这段代码,表示要使用etcd进行服务发现 ,其他事项参考discovery的说明 // NOTE: 在启动应用时,可以通过flag(-etcd.endpoints) 或者 环境配置(ETCD_ENDPOINTS)指定etcd节点 + // NOTE: 如果需要自己指定配置时 需要同时设置DialTimeout 与 DialOptions: []grpc.DialOption{grpc.WithBlock()} resolver.Register(etcd.Builder(nil)) } diff --git a/go.mod b/go.mod index 6dc86a6a9..55c580ef5 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20190523213609-cbe66965904d // indirect github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect github.com/coreos/bbolt v1.3.3 // indirect - github.com/coreos/etcd v3.3.13+incompatible + github.com/coreos/etcd v3.3.13+incompatible // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect @@ -55,6 +55,7 @@ require ( github.com/urfave/cli v1.20.0 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.3 // indirect + go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect diff --git a/go.sum b/go.sum index 67503c259..b07c65043 100644 --- a/go.sum +++ b/go.sum @@ -86,7 +86,6 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/sync v0.0.0-20190427212804-112230192c58 h1:CfvS2lsMxwV6vyyjjeXQ0dqkW/yQ0gyOHG6QNzjOUw0= github.com/golang/sync v0.0.0-20190427212804-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= github.com/golang/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:BIG54Uwbp5ZzFl3i2t+d+EGE8jq89zYK7iXM6tyv76E= github.com/golang/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -206,6 +205,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= +go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= diff --git a/pkg/naming/etcd/etcd.go b/pkg/naming/etcd/etcd.go index 338b216eb..241b192f3 100644 --- a/pkg/naming/etcd/etcd.go +++ b/pkg/naming/etcd/etcd.go @@ -14,8 +14,9 @@ import ( "github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/naming" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" + "google.golang.org/grpc" ) var ( @@ -23,7 +24,9 @@ var ( endpoints string etcdPrefix string - RegisterTTL = 30 + //Time units is second + registerTTL = 90 + defaultDialTimeout = 30 ) var ( @@ -96,7 +99,8 @@ func New(c *clientv3.Config) (e *EtcdBuilder, err error) { } c = &clientv3.Config{ Endpoints: strings.Split(endpoints, ","), - DialTimeout: time.Second * 30, + DialTimeout: time.Second * time.Duration(defaultDialTimeout), + DialOptions: []grpc.DialOption{grpc.WithBlock()}, } } cli, err := clientv3.New(*c) @@ -179,8 +183,8 @@ func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cance }) go func() { - //提前2秒续约 避免续约操作缓慢时租约过期 - ticker := time.NewTicker(time.Duration(RegisterTTL-2) * time.Second) + + ticker := time.NewTicker(time.Duration(registerTTL/3) * time.Second) defer ticker.Stop() for { select { @@ -201,9 +205,9 @@ func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err e prefix := e.keyPrefix(ins) val, _ := json.Marshal(ins) - ttlResp, err := e.cli.Grant(context.TODO(), int64(RegisterTTL)) + ttlResp, err := e.cli.Grant(context.TODO(), int64(registerTTL)) if err != nil { - log.Error("etcd: register client.Grant(%v) error(%v)", RegisterTTL, err) + log.Error("etcd: register client.Grant(%v) error(%v)", registerTTL, err) return err } _, err = e.cli.Put(ctx, prefix, string(val), clientv3.WithLease(ttlResp.ID)) diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go index 445958725..72d15aa15 100644 --- a/pkg/naming/etcd/etcd_test.go +++ b/pkg/naming/etcd/etcd_test.go @@ -3,10 +3,11 @@ package etcd import ( "context" "fmt" - "github.com/bilibili/kratos/pkg/naming" - "github.com/coreos/etcd/clientv3" "testing" "time" + "github.com/bilibili/kratos/pkg/naming" + "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" ) func TestNew(t *testing.T) { @@ -14,6 +15,7 @@ func TestNew(t *testing.T) { config := &clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: time.Second * 3, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, } builder, err := New(config) diff --git a/pkg/net/http/blademaster/metadata.go b/pkg/net/http/blademaster/metadata.go index 0ce65d102..24139dc35 100644 --- a/pkg/net/http/blademaster/metadata.go +++ b/pkg/net/http/blademaster/metadata.go @@ -52,7 +52,7 @@ var _parser = map[string]func(string) interface{}{ func parseMetadataTo(req *http.Request, to metadata.MD) { for rawKey := range req.Header { - key := strings.ReplaceAll(strings.TrimLeft(strings.ToLower(rawKey), _httpHeaderMetadata), "-", "_") + key := strings.ReplaceAll(strings.TrimPrefix(strings.ToLower(rawKey), _httpHeaderMetadata), "-", "_") rawValue := req.Header.Get(rawKey) var value interface{} = rawValue parser, ok := _parser[key] diff --git a/pkg/net/netutil/breaker/sre_breaker.go b/pkg/net/netutil/breaker/sre_breaker.go index 5e410cd98..44ccb4b71 100644 --- a/pkg/net/netutil/breaker/sre_breaker.go +++ b/pkg/net/netutil/breaker/sre_breaker.go @@ -3,6 +3,7 @@ package breaker import ( "math" "math/rand" + "sync" "sync/atomic" "time" @@ -14,12 +15,14 @@ import ( // sreBreaker is a sre CircuitBreaker pattern. type sreBreaker struct { stat metric.RollingCounter + r *rand.Rand + // rand.New(...) returns a non thread safe object + randLock sync.Mutex k float64 request int64 state int32 - r *rand.Rand } func newSRE(c *Config) Breaker { @@ -69,14 +72,14 @@ func (b *sreBreaker) Allow() error { atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) } dr := math.Max(0, (float64(total)-k)/float64(total+1)) - rr := b.r.Float64() + drop := b.trueOnProba(dr) if log.V(5) { - log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr) + log.Info("breaker: drop ratio: %f, drop: %t", dr, drop) } - if dr <= rr { - return nil + if drop { + return ecode.ServiceUnavailable } - return ecode.ServiceUnavailable + return nil } func (b *sreBreaker) MarkSuccess() { @@ -88,3 +91,10 @@ func (b *sreBreaker) MarkFailed() { // drop ratio higher. b.stat.Add(0) } + +func (b *sreBreaker) trueOnProba(proba float64) (truth bool) { + b.randLock.Lock() + truth = b.r.Float64() < proba + b.randLock.Unlock() + return +} diff --git a/pkg/net/netutil/breaker/sre_breaker_test.go b/pkg/net/netutil/breaker/sre_breaker_test.go index 4559c4fc5..9df374736 100644 --- a/pkg/net/netutil/breaker/sre_breaker_test.go +++ b/pkg/net/netutil/breaker/sre_breaker_test.go @@ -5,7 +5,7 @@ import ( "math/rand" "testing" "time" - + "github.com/bilibili/kratos/pkg/stat/metric" xtime "github.com/bilibili/kratos/pkg/time" @@ -147,6 +147,22 @@ func TestSRESummary(t *testing.T) { }) } +func TestTrueOnProba(t *testing.T) { + const proba = math.Pi / 10 + const total = 100000 + const epsilon = 0.05 + var count int + b := getSREBreaker() + for i := 0; i < total; i++ { + if b.trueOnProba(proba) { + count++ + } + } + + ratio := float64(count) / float64(total) + assert.InEpsilon(t, proba, ratio, epsilon) +} + func BenchmarkSreBreakerAllow(b *testing.B) { breaker := getSRE() b.ResetTimer() diff --git a/pkg/net/rpc/warden/client.go b/pkg/net/rpc/warden/client.go index ac1ba130b..f8ae1d2b8 100644 --- a/pkg/net/rpc/warden/client.go +++ b/pkg/net/rpc/warden/client.go @@ -3,6 +3,8 @@ package warden import ( "context" "fmt" + "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver" + "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct" "net/url" "os" "strconv" @@ -51,6 +53,11 @@ func baseMetadata() metadata.MD { return gmd } +// Register direct resolver by default to handle direct:// scheme. +func init() { + resolver.Register(direct.New()) +} + // ClientConfig is rpc client conf. type ClientConfig struct { Dial xtime.Duration