fix commitconflicts.

pull/223/head
shunza 6 years ago
commit ef8b5880ce
  1. 1
      doc/wiki-cn/warden-resolver.md
  2. 3
      go.mod
  3. 3
      go.sum
  4. 20
      pkg/naming/etcd/etcd.go
  5. 6
      pkg/naming/etcd/etcd_test.go
  6. 2
      pkg/net/http/blademaster/metadata.go
  7. 22
      pkg/net/netutil/breaker/sre_breaker.go
  8. 16
      pkg/net/netutil/breaker/sre_breaker_test.go
  9. 7
      pkg/net/rpc/warden/client.go

@ -219,6 +219,7 @@ const AppID = "demo.service" // NOTE: example
func init(){ func init(){
// NOTE: 注意这段代码,表示要使用etcd进行服务发现 ,其他事项参考discovery的说明 // NOTE: 注意这段代码,表示要使用etcd进行服务发现 ,其他事项参考discovery的说明
// NOTE: 在启动应用时,可以通过flag(-etcd.endpoints) 或者 环境配置(ETCD_ENDPOINTS)指定etcd节点 // NOTE: 在启动应用时,可以通过flag(-etcd.endpoints) 或者 环境配置(ETCD_ENDPOINTS)指定etcd节点
// NOTE: 如果需要自己指定配置时 需要同时设置DialTimeout 与 DialOptions: []grpc.DialOption{grpc.WithBlock()}
resolver.Register(etcd.Builder(nil)) resolver.Register(etcd.Builder(nil))
} }

@ -7,7 +7,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20190523213609-cbe66965904d // indirect github.com/StackExchange/wmi v0.0.0-20190523213609-cbe66965904d // indirect
github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect
github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible github.com/coreos/etcd v3.3.13+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
@ -55,6 +55,7 @@ require (
github.com/urfave/cli v1.20.0 github.com/urfave/cli v1.20.0
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.3 // indirect go.etcd.io/bbolt v1.3.3 // indirect
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0 // indirect go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect go.uber.org/zap v1.10.0 // indirect

@ -86,7 +86,6 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/sync v0.0.0-20190427212804-112230192c58 h1:CfvS2lsMxwV6vyyjjeXQ0dqkW/yQ0gyOHG6QNzjOUw0=
github.com/golang/sync v0.0.0-20190427212804-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= github.com/golang/sync v0.0.0-20190427212804-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
github.com/golang/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:BIG54Uwbp5ZzFl3i2t+d+EGE8jq89zYK7iXM6tyv76E= github.com/golang/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:BIG54Uwbp5ZzFl3i2t+d+EGE8jq89zYK7iXM6tyv76E=
github.com/golang/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= github.com/golang/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -206,6 +205,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw=
go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=

@ -14,8 +14,9 @@ import (
"github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/log"
"github.com/bilibili/kratos/pkg/naming" "github.com/bilibili/kratos/pkg/naming"
"github.com/coreos/etcd/clientv3" "go.etcd.io/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb" "go.etcd.io/etcd/mvcc/mvccpb"
"google.golang.org/grpc"
) )
var ( var (
@ -23,7 +24,9 @@ var (
endpoints string endpoints string
etcdPrefix string etcdPrefix string
RegisterTTL = 30 //Time units is second
registerTTL = 90
defaultDialTimeout = 30
) )
var ( var (
@ -96,7 +99,8 @@ func New(c *clientv3.Config) (e *EtcdBuilder, err error) {
} }
c = &clientv3.Config{ c = &clientv3.Config{
Endpoints: strings.Split(endpoints, ","), Endpoints: strings.Split(endpoints, ","),
DialTimeout: time.Second * 30, DialTimeout: time.Second * time.Duration(defaultDialTimeout),
DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
} }
cli, err := clientv3.New(*c) cli, err := clientv3.New(*c)
@ -179,8 +183,8 @@ func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cance
}) })
go func() { go func() {
//提前2秒续约 避免续约操作缓慢时租约过期
ticker := time.NewTicker(time.Duration(RegisterTTL-2) * time.Second) ticker := time.NewTicker(time.Duration(registerTTL/3) * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
@ -201,9 +205,9 @@ func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err e
prefix := e.keyPrefix(ins) prefix := e.keyPrefix(ins)
val, _ := json.Marshal(ins) val, _ := json.Marshal(ins)
ttlResp, err := e.cli.Grant(context.TODO(), int64(RegisterTTL)) ttlResp, err := e.cli.Grant(context.TODO(), int64(registerTTL))
if err != nil { if err != nil {
log.Error("etcd: register client.Grant(%v) error(%v)", RegisterTTL, err) log.Error("etcd: register client.Grant(%v) error(%v)", registerTTL, err)
return err return err
} }
_, err = e.cli.Put(ctx, prefix, string(val), clientv3.WithLease(ttlResp.ID)) _, err = e.cli.Put(ctx, prefix, string(val), clientv3.WithLease(ttlResp.ID))

@ -3,10 +3,11 @@ package etcd
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/bilibili/kratos/pkg/naming"
"github.com/coreos/etcd/clientv3"
"testing" "testing"
"time" "time"
"github.com/bilibili/kratos/pkg/naming"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
) )
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
@ -14,6 +15,7 @@ func TestNew(t *testing.T) {
config := &clientv3.Config{ config := &clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second * 3, DialTimeout: time.Second * 3,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
} }
builder, err := New(config) builder, err := New(config)

@ -52,7 +52,7 @@ var _parser = map[string]func(string) interface{}{
func parseMetadataTo(req *http.Request, to metadata.MD) { func parseMetadataTo(req *http.Request, to metadata.MD) {
for rawKey := range req.Header { for rawKey := range req.Header {
key := strings.ReplaceAll(strings.TrimLeft(strings.ToLower(rawKey), _httpHeaderMetadata), "-", "_") key := strings.ReplaceAll(strings.TrimPrefix(strings.ToLower(rawKey), _httpHeaderMetadata), "-", "_")
rawValue := req.Header.Get(rawKey) rawValue := req.Header.Get(rawKey)
var value interface{} = rawValue var value interface{} = rawValue
parser, ok := _parser[key] parser, ok := _parser[key]

@ -3,6 +3,7 @@ package breaker
import ( import (
"math" "math"
"math/rand" "math/rand"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -14,12 +15,14 @@ import (
// sreBreaker is a sre CircuitBreaker pattern. // sreBreaker is a sre CircuitBreaker pattern.
type sreBreaker struct { type sreBreaker struct {
stat metric.RollingCounter stat metric.RollingCounter
r *rand.Rand
// rand.New(...) returns a non thread safe object
randLock sync.Mutex
k float64 k float64
request int64 request int64
state int32 state int32
r *rand.Rand
} }
func newSRE(c *Config) Breaker { func newSRE(c *Config) Breaker {
@ -69,14 +72,14 @@ func (b *sreBreaker) Allow() error {
atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
} }
dr := math.Max(0, (float64(total)-k)/float64(total+1)) dr := math.Max(0, (float64(total)-k)/float64(total+1))
rr := b.r.Float64() drop := b.trueOnProba(dr)
if log.V(5) { if log.V(5) {
log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr) log.Info("breaker: drop ratio: %f, drop: %t", dr, drop)
} }
if dr <= rr { if drop {
return nil return ecode.ServiceUnavailable
} }
return ecode.ServiceUnavailable return nil
} }
func (b *sreBreaker) MarkSuccess() { func (b *sreBreaker) MarkSuccess() {
@ -88,3 +91,10 @@ func (b *sreBreaker) MarkFailed() {
// drop ratio higher. // drop ratio higher.
b.stat.Add(0) b.stat.Add(0)
} }
func (b *sreBreaker) trueOnProba(proba float64) (truth bool) {
b.randLock.Lock()
truth = b.r.Float64() < proba
b.randLock.Unlock()
return
}

@ -147,6 +147,22 @@ func TestSRESummary(t *testing.T) {
}) })
} }
func TestTrueOnProba(t *testing.T) {
const proba = math.Pi / 10
const total = 100000
const epsilon = 0.05
var count int
b := getSREBreaker()
for i := 0; i < total; i++ {
if b.trueOnProba(proba) {
count++
}
}
ratio := float64(count) / float64(total)
assert.InEpsilon(t, proba, ratio, epsilon)
}
func BenchmarkSreBreakerAllow(b *testing.B) { func BenchmarkSreBreakerAllow(b *testing.B) {
breaker := getSRE() breaker := getSRE()
b.ResetTimer() b.ResetTimer()

@ -3,6 +3,8 @@ package warden
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
@ -51,6 +53,11 @@ func baseMetadata() metadata.MD {
return gmd return gmd
} }
// Register direct resolver by default to handle direct:// scheme.
func init() {
resolver.Register(direct.New())
}
// ClientConfig is rpc client conf. // ClientConfig is rpc client conf.
type ClientConfig struct { type ClientConfig struct {
Dial xtime.Duration Dial xtime.Duration

Loading…
Cancel
Save