From 7d6f2332599bdcadb65775473615710e0944b398 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Fri, 1 Feb 2019 18:06:04 +0800 Subject: [PATCH] naming discovery sdk and it's dependency --- pkg/log/handler.go | 56 ++ pkg/log/level.go | 29 ++ pkg/log/log.go | 88 ++++ pkg/log/verbose.go | 83 +++ pkg/naming/discovery/discovery.go | 684 +++++++++++++++++++++++++ pkg/naming/naming.go | 15 +- pkg/net/http/blademaster/client.go | 429 ++++++++++++++++ pkg/net/http/blademaster/metadata.go | 106 ++++ pkg/net/http/blademaster/trace.go | 199 +++++++ pkg/net/netutil/backoff.go | 72 +++ pkg/net/netutil/breaker/breaker.go | 176 +++++++ pkg/net/netutil/breaker/sre_breaker.go | 71 +++ 12 files changed, 2007 insertions(+), 1 deletion(-) create mode 100644 pkg/log/handler.go create mode 100644 pkg/log/level.go create mode 100644 pkg/log/log.go create mode 100644 pkg/log/verbose.go create mode 100644 pkg/naming/discovery/discovery.go create mode 100644 pkg/net/http/blademaster/client.go create mode 100644 pkg/net/http/blademaster/metadata.go create mode 100644 pkg/net/http/blademaster/trace.go create mode 100644 pkg/net/netutil/backoff.go create mode 100644 pkg/net/netutil/breaker/breaker.go create mode 100644 pkg/net/netutil/breaker/sre_breaker.go diff --git a/pkg/log/handler.go b/pkg/log/handler.go new file mode 100644 index 000000000..68e762896 --- /dev/null +++ b/pkg/log/handler.go @@ -0,0 +1,56 @@ +package log + +import ( + "context" + + pkgerr "github.com/pkg/errors" +) + +const ( + _log = "log" +) + +// Handler is used to handle log events, outputting them to +// stdio or sending them to remote services. See the "handlers" +// directory for implementations. +// +// It is left up to Handlers to implement thread-safety. +type Handler interface { + // Log handle log + // variadic D is k-v struct represent log content + Log(context.Context, Level, ...D) + + // SetFormat set render format on log output + // see StdoutHandler.SetFormat for detail + SetFormat(string) + + // Close handler + Close() error +} + +// Handlers . +type Handlers []Handler + +// Log handlers logging. +func (hs Handlers) Log(c context.Context, lv Level, d ...D) { + for _, h := range hs { + h.Log(c, lv, d...) + } +} + +// Close close resource. +func (hs Handlers) Close() (err error) { + for _, h := range hs { + if e := h.Close(); e != nil { + err = pkgerr.WithStack(e) + } + } + return +} + +// SetFormat . +func (hs Handlers) SetFormat(format string) { + for _, h := range hs { + h.SetFormat(format) + } +} diff --git a/pkg/log/level.go b/pkg/log/level.go new file mode 100644 index 000000000..fa4c98eaf --- /dev/null +++ b/pkg/log/level.go @@ -0,0 +1,29 @@ +package log + +// Level of severity. +type Level int + +// Verbose is a boolean type that implements Info, Infov (like Printf) etc. +type Verbose bool + +// common log level. +const ( + _debugLevel Level = iota + _infoLevel + _warnLevel + _errorLevel + _fatalLevel +) + +var levelNames = [...]string{ + _debugLevel: "DEBUG", + _infoLevel: "INFO", + _warnLevel: "WARN", + _errorLevel: "ERROR", + _fatalLevel: "FATAL", +} + +// String implementation. +func (l Level) String() string { + return levelNames[l] +} diff --git a/pkg/log/log.go b/pkg/log/log.go new file mode 100644 index 000000000..7f5a37d48 --- /dev/null +++ b/pkg/log/log.go @@ -0,0 +1,88 @@ +package log + +import ( + "context" + "fmt" +) + +// Config log config. +type Config struct { + Family string + Host string + + // stdout + Stdout bool + + // file + Dir string + // buffer size + FileBufferSize int64 + // MaxLogFile + MaxLogFile int + // RotateSize + RotateSize int64 + + // V Enable V-leveled logging at the specified level. + V int32 + // Module="" + // The syntax of the argument is a map of pattern=N, + // where pattern is a literal file name (minus the ".go" suffix) or + // "glob" pattern and N is a V level. For instance: + // [module] + // "service" = 1 + // "dao*" = 2 + // sets the V level to 2 in all Go files whose names begin "dao". + Module map[string]int32 + // Filter tell log handler which field are sensitive message, use * instead. + Filter []string +} + +var ( + h Handler + c *Config +) + +// D represents a map of entry level data used for structured logging. +// type D map[string]interface{} +type D struct { + Key string + Value interface{} +} + +// KV return a log kv for logging field. +func KV(key string, value interface{}) D { + return D{ + Key: key, + Value: value, + } +} + +// Info logs a message at the info log level. +func Info(format string, args ...interface{}) { + h.Log(context.Background(), _infoLevel, KV(_log, fmt.Sprintf(format, args...))) +} + +// Warn logs a message at the warning log level. +func Warn(format string, args ...interface{}) { + h.Log(context.Background(), _warnLevel, KV(_log, fmt.Sprintf(format, args...))) +} + +// Error logs a message at the error log level. +func Error(format string, args ...interface{}) { + h.Log(context.Background(), _errorLevel, KV(_log, fmt.Sprintf(format, args...))) +} + +func logw(args []interface{}) []D { + if len(args)%2 != 0 { + Warn("log: the variadic must be plural, the last one will ignored") + } + ds := make([]D, 0, len(args)/2) + for i := 0; i < len(args)-1; i = i + 2 { + if key, ok := args[i].(string); ok { + ds = append(ds, KV(key, args[i+1])) + } else { + Warn("log: key must be string, get %T, ignored", args[i]) + } + } + return ds +} diff --git a/pkg/log/verbose.go b/pkg/log/verbose.go new file mode 100644 index 000000000..230b92b9a --- /dev/null +++ b/pkg/log/verbose.go @@ -0,0 +1,83 @@ +package log + +import ( + "context" + "fmt" + "path/filepath" + "runtime" + "strings" +) + +// V reports whether verbosity at the call site is at least the requested level. +// The returned value is a boolean of type Verbose, which implements Info, Infov etc. +// These methods will write to the Info log if called. +// Thus, one may write either +// if log.V(2) { log.Info("log this") } +// or +// log.V(2).Info("log this") +// The second form is shorter but the first is cheaper if logging is off because it does +// not evaluate its arguments. +// +// Whether an individual call to V generates a log record depends on the setting of +// the Config.VLevel and Config.Module flags; both are off by default. If the level in the call to +// V is at least the value of Config.VLevel, or of Config.Module for the source file containing the +// call, the V call will log. +// v must be more than 0. +func V(v int32) Verbose { + var ( + file string + ) + if v < 0 { + return Verbose(false) + } else if c.V >= v { + return Verbose(true) + } + if pc, _, _, ok := runtime.Caller(1); ok { + file, _ = runtime.FuncForPC(pc).FileLine(pc) + } + if strings.HasSuffix(file, ".go") { + file = file[:len(file)-3] + } + if slash := strings.LastIndex(file, "/"); slash >= 0 { + file = file[slash+1:] + } + for filter, lvl := range c.Module { + var match bool + if match = filter == file; !match { + match, _ = filepath.Match(filter, file) + } + if match { + return Verbose(lvl >= v) + } + } + return Verbose(false) +} + +// Info logs a message at the info log level. +func (v Verbose) Info(format string, args ...interface{}) { + if v { + h.Log(context.Background(), _infoLevel, KV(_log, fmt.Sprintf(format, args...))) + } +} + +// Infov logs a message at the info log level. +func (v Verbose) Infov(ctx context.Context, args ...D) { + if v { + h.Log(ctx, _infoLevel, args...) + } +} + +// Infow logs a message with some additional context. The variadic key-value pairs are treated as they are in With. +func (v Verbose) Infow(ctx context.Context, args ...interface{}) { + if v { + h.Log(ctx, _infoLevel, logw(args)...) + } +} + +// Close close resource. +func (v Verbose) Close() (err error) { + if h == nil { + return + } + return h.Close() +} diff --git a/pkg/naming/discovery/discovery.go b/pkg/naming/discovery/discovery.go new file mode 100644 index 000000000..3b045aabf --- /dev/null +++ b/pkg/naming/discovery/discovery.go @@ -0,0 +1,684 @@ +package discovery + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "net/url" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bilibili/Kratos/pkg/conf/env" + "github.com/bilibili/Kratos/pkg/ecode" + "github.com/bilibili/Kratos/pkg/log" + "github.com/bilibili/Kratos/pkg/naming" + bm "github.com/bilibili/Kratos/pkg/net/http/blademaster" + "github.com/bilibili/Kratos/pkg/net/netutil" + "github.com/bilibili/Kratos/pkg/net/netutil/breaker" + "github.com/bilibili/Kratos/pkg/str" + xtime "github.com/bilibili/Kratos/pkg/time" +) + +const ( + _registerURL = "http://%s/discovery/register" + _setURL = "http://%s/discovery/set" + _cancelURL = "http://%s/discovery/cancel" + _renewURL = "http://%s/discovery/renew" + + _pollURL = "http://%s/discovery/polls" + _nodesURL = "http://%s/discovery/nodes" + + _registerGap = 30 * time.Second + + _statusUP = "1" +) + +const ( + _appid = "infra.discovery" +) + +var ( + _ naming.Builder = &Discovery{} + _ naming.Registry = &Discovery{} + + // ErrDuplication duplication treeid. + ErrDuplication = errors.New("discovery: instance duplicate registration") +) + +// Config discovery configures. +type Config struct { + Nodes []string + Key string + Secret string + Region string + Zone string + Env string + Host string +} + +// Discovery is discovery client. +type Discovery struct { + once sync.Once + conf *Config + ctx context.Context + cancelFunc context.CancelFunc + httpClient *bm.Client + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} + lastHost string + cancelPolls context.CancelFunc + idx uint64 + node atomic.Value + delete chan *appInfo +} + +type appInfo struct { + zoneIns atomic.Value + resolver map[*Resolver]struct{} + lastTs int64 // latest timestamp +} + +func fixConfig(c *Config) { + if len(c.Nodes) == 0 { + c.Nodes = []string{"api.bilibili.co"} + } + if env.Region != "" { + c.Region = env.Region + } + if env.Zone != "" { + c.Zone = env.Zone + } + if env.DeployEnv != "" { + c.Env = env.DeployEnv + } + if env.Hostname != "" { + c.Host = env.Hostname + } else { + c.Host, _ = os.Hostname() + } +} + +var ( + once sync.Once + _defaultDiscovery *Discovery +) + +func initDefault() { + once.Do(func() { + _defaultDiscovery = New(nil) + }) +} + +// Builder return default discvoery resolver builder. +func Builder() naming.Builder { + if _defaultDiscovery == nil { + initDefault() + } + return _defaultDiscovery +} + +// Build register resolver into default discovery. +func Build(id string) naming.Resolver { + if _defaultDiscovery == nil { + initDefault() + } + return _defaultDiscovery.Build(id) +} + +// New new a discovery client. +func New(c *Config) (d *Discovery) { + if c == nil { + c = &Config{ + Nodes: []string{"discovery.bilibili.co", "api.bilibili.co"}, + Key: "discovery", + Secret: "discovery", + } + } + fixConfig(c) + ctx, cancel := context.WithCancel(context.Background()) + d = &Discovery{ + ctx: ctx, + cancelFunc: cancel, + conf: c, + apps: map[string]*appInfo{}, + registry: map[string]struct{}{}, + delete: make(chan *appInfo, 10), + } + // httpClient + cfg := &bm.ClientConfig{ + App: &bm.App{ + Key: c.Key, + Secret: c.Secret, + }, + Dial: xtime.Duration(3 * time.Second), + Timeout: xtime.Duration(40 * time.Second), + Breaker: &breaker.Config{ + Window: 100, + Sleep: 3, + Bucket: 10, + Ratio: 0.5, + Request: 100, + }, + } + d.httpClient = bm.NewClient(cfg) + resolver := d.Build(_appid) + event := resolver.Watch() + _, ok := <-event + if !ok { + panic("discovery watch failed") + } + ins, ok := resolver.Fetch(context.Background()) + if ok { + d.newSelf(ins) + } + go d.selfproc(resolver, event) + return +} + +func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) { + for { + _, ok := <-event + if !ok { + return + } + instances, ok := resolver.Fetch(context.Background()) + if ok { + d.newSelf(instances) + } + } +} + +func (d *Discovery) newSelf(instances naming.InstancesInfo) { + ins, ok := instances.Instances[d.conf.Zone] + if !ok { + return + } + var nodes []string + for _, in := range ins { + for _, addr := range in.Addrs { + u, err := url.Parse(addr) + if err == nil && u.Scheme == "http" { + nodes = append(nodes, u.Host) + } + } + } + // diff old nodes + olds, ok := d.node.Load().([]string) + if ok { + var diff int + for _, n := range nodes { + for _, o := range olds { + if o == n { + diff++ + break + } + } + } + if len(nodes) == diff { + return + } + } + rand.Shuffle(len(nodes), func(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] + }) + d.node.Store(nodes) +} + +// Build disovery resovler builder. +func (d *Discovery) Build(appid string) naming.Resolver { + r := &Resolver{ + id: appid, + d: d, + event: make(chan struct{}, 1), + } + d.mutex.Lock() + app, ok := d.apps[appid] + if !ok { + app = &appInfo{ + resolver: make(map[*Resolver]struct{}), + } + d.apps[appid] = app + cancel := d.cancelPolls + if cancel != nil { + cancel() + } + } + app.resolver[r] = struct{}{} + d.mutex.Unlock() + if ok { + select { + case r.event <- struct{}{}: + default: + } + } + log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok) + d.once.Do(func() { + go d.serverproc() + }) + return r +} + +// Scheme return discovery's scheme +func (d *Discovery) Scheme() string { + return "discovery" +} + +// Resolver discveory resolver. +type Resolver struct { + id string + event chan struct{} + d *Discovery +} + +// Watch watch instance. +func (r *Resolver) Watch() <-chan struct{} { + return r.event +} + +// Fetch fetch resolver instance. +func (r *Resolver) Fetch(c context.Context) (ins naming.InstancesInfo, ok bool) { + r.d.mutex.RLock() + app, ok := r.d.apps[r.id] + r.d.mutex.RUnlock() + if ok { + ins, ok = app.zoneIns.Load().(naming.InstancesInfo) + return + } + return +} + +// Close close resolver. +func (r *Resolver) Close() error { + r.d.mutex.Lock() + if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 { + delete(app.resolver, r) + // TODO: delete app from builder + } + r.d.mutex.Unlock() + return nil +} + +func (d *Discovery) pickNode() string { + nodes, ok := d.node.Load().([]string) + if !ok || len(nodes) == 0 { + return d.conf.Nodes[d.idx%uint64(len(d.conf.Nodes))] + } + return nodes[d.idx%uint64(len(nodes))] +} + +func (d *Discovery) switchNode() { + atomic.AddUint64(&d.idx, 1) +} + +// Reload reload the config +func (d *Discovery) Reload(c *Config) { + fixConfig(c) + d.mutex.Lock() + d.conf = c + d.mutex.Unlock() +} + +// Close stop all running process including discovery and register +func (d *Discovery) Close() error { + d.cancelFunc() + return nil +} + +// Register Register an instance with discovery and renew automatically +func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { + d.mutex.Lock() + if _, ok := d.registry[ins.AppID]; ok { + err = ErrDuplication + } else { + d.registry[ins.AppID] = struct{}{} + } + d.mutex.Unlock() + if err != nil { + return + } + if err = d.register(c, ins); err != nil { + d.mutex.Lock() + delete(d.registry, ins.AppID) + d.mutex.Unlock() + return + } + ctx, cancel := context.WithCancel(d.ctx) + ch := make(chan struct{}, 1) + cancelFunc = context.CancelFunc(func() { + cancel() + <-ch + }) + go func() { + ticker := time.NewTicker(_registerGap) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := d.renew(ctx, ins); err != nil && ecode.NothingFound.Equal(err) { + d.register(ctx, ins) + } + case <-ctx.Done(): + d.cancel(ins) + ch <- struct{}{} + return + } + } + }() + return +} + +// Set set ins status and metadata. +func (d *Discovery) Set(ins *naming.Instance) error { + return d.set(context.Background(), ins) +} + +// cancel Remove the registered instance from discovery +func (d *Discovery) cancel(ins *naming.Instance) (err error) { + d.mutex.RLock() + conf := d.conf + d.mutex.RUnlock() + + res := new(struct { + Code int `json:"code"` + Message string `json:"message"` + }) + uri := fmt.Sprintf(_cancelURL, d.pickNode()) + params := d.newParams(conf) + params.Set("appid", ins.AppID) + // request + if err = d.httpClient.Post(context.Background(), uri, "", params, &res); err != nil { + d.switchNode() + log.Error("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", + uri, conf.Env, ins.AppID, conf.Host, err) + return + } + if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { + log.Warn("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", + uri, conf.Env, ins.AppID, conf.Host, res.Code) + err = ec + return + } + log.Info("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success", + uri, conf.Env, ins.AppID, conf.Host) + return +} + +// register Register an instance with discovery +func (d *Discovery) register(ctx context.Context, ins *naming.Instance) (err error) { + d.mutex.RLock() + conf := d.conf + d.mutex.RUnlock() + + var metadata []byte + if ins.Metadata != nil { + if metadata, err = json.Marshal(ins.Metadata); err != nil { + log.Error("discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) + } + } + res := new(struct { + Code int `json:"code"` + Message string `json:"message"` + }) + uri := fmt.Sprintf(_registerURL, d.pickNode()) + params := d.newParams(conf) + params.Set("appid", ins.AppID) + params.Set("addrs", strings.Join(ins.Addrs, ",")) + params.Set("version", ins.Version) + params.Set("status", _statusUP) + params.Set("metadata", string(metadata)) + if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { + d.switchNode() + log.Error("discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", + uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err) + return + } + if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { + log.Warn("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", + uri, conf.Env, ins.AppID, ins.Addrs, res.Code) + err = ec + return + } + log.Info("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success", + uri, conf.Env, ins.AppID, ins.Addrs) + return +} + +// rset set instance info with discovery +func (d *Discovery) set(ctx context.Context, ins *naming.Instance) (err error) { + d.mutex.RLock() + conf := d.conf + d.mutex.RUnlock() + res := new(struct { + Code int `json:"code"` + Message string `json:"message"` + }) + uri := fmt.Sprintf(_setURL, d.pickNode()) + params := d.newParams(conf) + params.Set("appid", ins.AppID) + params.Set("version", ins.Version) + params.Set("status", strconv.FormatInt(ins.Status, 10)) + if ins.Metadata != nil { + var metadata []byte + if metadata, err = json.Marshal(ins.Metadata); err != nil { + log.Error("discovery:set instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) + } + params.Set("metadata", string(metadata)) + } + if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { + d.switchNode() + log.Error("discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", + uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err) + return + } + if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { + log.Warn("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", + uri, conf.Env, ins.AppID, ins.Addrs, res.Code) + err = ec + return + } + log.Info("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success", + uri+"?"+params.Encode(), conf.Env, ins.AppID, ins.Addrs) + return +} + +// renew Renew an instance with discovery +func (d *Discovery) renew(ctx context.Context, ins *naming.Instance) (err error) { + d.mutex.RLock() + conf := d.conf + d.mutex.RUnlock() + + res := new(struct { + Code int `json:"code"` + Message string `json:"message"` + }) + uri := fmt.Sprintf(_renewURL, d.pickNode()) + params := d.newParams(conf) + params.Set("appid", ins.AppID) + if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { + d.switchNode() + log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", + uri, conf.Env, ins.AppID, conf.Host, err) + return + } + if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { + err = ec + if ec.Equal(ecode.NothingFound) { + return + } + log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", + uri, conf.Env, ins.AppID, conf.Host, res.Code) + return + } + return +} + +func (d *Discovery) serverproc() { + var ( + retry int + ctx context.Context + cancel context.CancelFunc + ) + bc := netutil.DefaultBackoffConfig + ticker := time.NewTicker(time.Minute * 30) + defer ticker.Stop() + for { + if ctx == nil { + ctx, cancel = context.WithCancel(d.ctx) + d.mutex.Lock() + d.cancelPolls = cancel + d.mutex.Unlock() + } + select { + case <-d.ctx.Done(): + return + default: + } + + apps, err := d.polls(ctx, d.pickNode()) + if err != nil { + d.switchNode() + if ctx.Err() == context.Canceled { + ctx = nil + continue + } + time.Sleep(bc.Backoff(retry)) + retry++ + continue + } + retry = 0 + d.broadcast(apps) + } +} + +func (d *Discovery) nodes() (nodes []string) { + res := new(struct { + Code int `json:"code"` + Data []struct { + Addr string `json:"addr"` + } `json:"data"` + }) + uri := fmt.Sprintf(_nodesURL, d.pickNode()) + if err := d.httpClient.Get(d.ctx, uri, "", nil, res); err != nil { + d.switchNode() + log.Error("discovery: consumer client.Get(%v)error(%+v)", uri, err) + return + } + if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { + log.Error("discovery: consumer client.Get(%v) error(%v)", uri, res.Code) + return + } + if len(res.Data) == 0 { + log.Warn("discovery: get nodes(%s) failed,no nodes found!", uri) + return + } + nodes = make([]string, 0, len(res.Data)) + for i := range res.Data { + nodes = append(nodes, res.Data[i].Addr) + } + return +} + +func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]naming.InstancesInfo, err error) { + var ( + lastTs []int64 + appid []string + changed bool + ) + if host != d.lastHost { + d.lastHost = host + changed = true + } + d.mutex.RLock() + conf := d.conf + for k, v := range d.apps { + if changed { + v.lastTs = 0 + } + appid = append(appid, k) + lastTs = append(lastTs, v.lastTs) + } + d.mutex.RUnlock() + if len(appid) == 0 { + return + } + uri := fmt.Sprintf(_pollURL, host) + res := new(struct { + Code int `json:"code"` + Data map[string]naming.InstancesInfo `json:"data"` + }) + params := url.Values{} + params.Set("env", conf.Env) + params.Set("hostname", conf.Host) + params.Set("appid", strings.Join(appid, ",")) + params.Set("latest_timestamp", xstr.JoinInts(lastTs)) + if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil { + log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err) + return + } + if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { + if !ec.Equal(ecode.NotModified) { + log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code) + err = ec + } + return + } + info, _ := json.Marshal(res.Data) + for _, app := range res.Data { + if app.LastTs == 0 { + err = ecode.ServerErr + log.Error("discovery: client.Get(%s) latest_timestamp is 0,instances:(%s)", uri+"?"+params.Encode(), info) + return + } + } + log.Info("discovery: polls uri(%s)", uri+"?"+params.Encode()) + log.Info("discovery: successfully polls(%s) instances (%s)", uri+"?"+params.Encode(), info) + apps = res.Data + return +} + +func (d *Discovery) broadcast(apps map[string]naming.InstancesInfo) { + for id, v := range apps { + var count int + for zone, ins := range v.Instances { + if len(ins) == 0 { + delete(v.Instances, zone) + } + count += len(ins) + } + if count == 0 { + continue + } + d.mutex.RLock() + app, ok := d.apps[id] + d.mutex.RUnlock() + if ok { + app.lastTs = v.LastTs + app.zoneIns.Store(v) + d.mutex.RLock() + for rs := range app.resolver { + select { + case rs.event <- struct{}{}: + default: + } + } + d.mutex.RUnlock() + } + } +} + +func (d *Discovery) newParams(conf *Config) url.Values { + params := url.Values{} + params.Set("region", conf.Region) + params.Set("zone", conf.Zone) + params.Set("env", conf.Env) + params.Set("hostname", conf.Host) + return params +} diff --git a/pkg/naming/naming.go b/pkg/naming/naming.go index b2cb6b803..8ec9cda4f 100644 --- a/pkg/naming/naming.go +++ b/pkg/naming/naming.go @@ -37,9 +37,22 @@ type Instance struct { Status int64 } +// InstancesInfo instance info. +type InstancesInfo struct { + Instances map[string][]*Instance `json:"zone_instances"` + LastTs int64 `json:"latest_timestamp"` + Scheduler []*Scheduler `json:"scheduler"` +} + +// Scheduler scheduler info in multi cluster. +type Scheduler struct { + Src string `json:"src"` + Dst map[string]int64 `json:"dst"` +} + // Resolver resolve naming service type Resolver interface { - Fetch(context.Context) (map[string][]*Instance, bool) + Fetch(context.Context) (*InstancesInfo, bool) //Unwatch(id string) Watch() <-chan struct{} Close() error diff --git a/pkg/net/http/blademaster/client.go b/pkg/net/http/blademaster/client.go new file mode 100644 index 000000000..b1cf2812b --- /dev/null +++ b/pkg/net/http/blademaster/client.go @@ -0,0 +1,429 @@ +package blademaster + +import ( + "bytes" + "context" + "crypto/md5" + "crypto/tls" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net" + xhttp "net/http" + "net/url" + "os" + "runtime" + "strconv" + "strings" + "sync" + "time" + + "github.com/bilibili/Kratos/pkg/conf/env" + "github.com/bilibili/Kratos/pkg/log" + "github.com/bilibili/Kratos/pkg/net/metadata" + "github.com/bilibili/Kratos/pkg/net/netutil/breaker" + "github.com/bilibili/Kratos/pkg/stat" + xtime "github.com/bilibili/Kratos/pkg/time" + + "github.com/gogo/protobuf/proto" + pkgerr "github.com/pkg/errors" +) + +const ( + _minRead = 16 * 1024 // 16kb + + _appKey = "appkey" + _appSecret = "appsecret" + _ts = "ts" +) + +var ( + _noKickUserAgent = "haoguanwei@bilibili.com " + clientStats = stat.HTTPClient +) + +func init() { + n, err := os.Hostname() + if err == nil { + _noKickUserAgent = _noKickUserAgent + runtime.Version() + " " + n + } +} + +// App bilibili intranet authorization. +type App struct { + Key string + Secret string +} + +// ClientConfig is http client conf. +type ClientConfig struct { + *App + Dial xtime.Duration + Timeout xtime.Duration + KeepAlive xtime.Duration + Breaker *breaker.Config + URL map[string]*ClientConfig + Host map[string]*ClientConfig +} + +// Client is http client. +type Client struct { + conf *ClientConfig + client *xhttp.Client + dialer *net.Dialer + transport xhttp.RoundTripper + + urlConf map[string]*ClientConfig + hostConf map[string]*ClientConfig + mutex sync.RWMutex + breaker *breaker.Group +} + +// NewClient new a http client. +func NewClient(c *ClientConfig) *Client { + client := new(Client) + client.conf = c + client.dialer = &net.Dialer{ + Timeout: time.Duration(c.Dial), + KeepAlive: time.Duration(c.KeepAlive), + } + + originTransport := &xhttp.Transport{ + DialContext: client.dialer.DialContext, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + // wraps RoundTripper for tracer + client.transport = &TraceTransport{RoundTripper: originTransport} + client.client = &xhttp.Client{ + Transport: client.transport, + } + client.urlConf = make(map[string]*ClientConfig) + client.hostConf = make(map[string]*ClientConfig) + client.breaker = breaker.NewGroup(c.Breaker) + // check appkey + if c.Key == "" || c.Secret == "" { + panic("http client must config appkey and appsecret") + } + if c.Timeout <= 0 { + panic("must config http timeout!!!") + } + for uri, cfg := range c.URL { + client.urlConf[uri] = cfg + } + for host, cfg := range c.Host { + client.hostConf[host] = cfg + } + return client +} + +// SetTransport set client transport +func (client *Client) SetTransport(t xhttp.RoundTripper) { + client.transport = t + client.client.Transport = t +} + +// SetConfig set client config. +func (client *Client) SetConfig(c *ClientConfig) { + client.mutex.Lock() + if c.App != nil { + client.conf.App.Key = c.App.Key + client.conf.App.Secret = c.App.Secret + } + if c.Timeout > 0 { + client.conf.Timeout = c.Timeout + } + if c.KeepAlive > 0 { + client.dialer.KeepAlive = time.Duration(c.KeepAlive) + client.conf.KeepAlive = c.KeepAlive + } + if c.Dial > 0 { + client.dialer.Timeout = time.Duration(c.Dial) + client.conf.Timeout = c.Dial + } + if c.Breaker != nil { + client.conf.Breaker = c.Breaker + client.breaker.Reload(c.Breaker) + } + for uri, cfg := range c.URL { + client.urlConf[uri] = cfg + } + for host, cfg := range c.Host { + client.hostConf[host] = cfg + } + client.mutex.Unlock() +} + +// NewRequest new http request with method, uri, ip, values and headers. +// TODO(zhoujiahui): param realIP should be removed later. +func (client *Client) NewRequest(method, uri, realIP string, params url.Values) (req *xhttp.Request, err error) { + enc, err := client.sign(params) + if err != nil { + err = pkgerr.Wrapf(err, "uri:%s,params:%v", uri, params) + return + } + ru := uri + if enc != "" { + ru = uri + "?" + enc + } + if method == xhttp.MethodGet { + req, err = xhttp.NewRequest(xhttp.MethodGet, ru, nil) + } else { + req, err = xhttp.NewRequest(xhttp.MethodPost, uri, strings.NewReader(enc)) + } + if err != nil { + err = pkgerr.Wrapf(err, "method:%s,uri:%s", method, ru) + return + } + const ( + _contentType = "Content-Type" + _urlencoded = "application/x-www-form-urlencoded" + _userAgent = "User-Agent" + ) + if method == xhttp.MethodPost { + req.Header.Set(_contentType, _urlencoded) + } + if realIP != "" { + req.Header.Set(_httpHeaderRemoteIP, realIP) + } + req.Header.Set(_userAgent, _noKickUserAgent+" "+env.AppID) + return +} + +// Get issues a GET to the specified URL. +func (client *Client) Get(c context.Context, uri, ip string, params url.Values, res interface{}) (err error) { + req, err := client.NewRequest(xhttp.MethodGet, uri, ip, params) + if err != nil { + return + } + return client.Do(c, req, res) +} + +// Post issues a Post to the specified URL. +func (client *Client) Post(c context.Context, uri, ip string, params url.Values, res interface{}) (err error) { + req, err := client.NewRequest(xhttp.MethodPost, uri, ip, params) + if err != nil { + return + } + return client.Do(c, req, res) +} + +// RESTfulGet issues a RESTful GET to the specified URL. +func (client *Client) RESTfulGet(c context.Context, uri, ip string, params url.Values, res interface{}, v ...interface{}) (err error) { + req, err := client.NewRequest(xhttp.MethodGet, fmt.Sprintf(uri, v...), ip, params) + if err != nil { + return + } + return client.Do(c, req, res, uri) +} + +// RESTfulPost issues a RESTful Post to the specified URL. +func (client *Client) RESTfulPost(c context.Context, uri, ip string, params url.Values, res interface{}, v ...interface{}) (err error) { + req, err := client.NewRequest(xhttp.MethodPost, fmt.Sprintf(uri, v...), ip, params) + if err != nil { + return + } + return client.Do(c, req, res, uri) +} + +// Raw sends an HTTP request and returns bytes response +func (client *Client) Raw(c context.Context, req *xhttp.Request, v ...string) (bs []byte, err error) { + var ( + ok bool + code string + cancel func() + resp *xhttp.Response + config *ClientConfig + timeout time.Duration + uri = fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path) + ) + // NOTE fix prom & config uri key. + if len(v) == 1 { + uri = v[0] + } + // breaker + brk := client.breaker.Get(uri) + if err = brk.Allow(); err != nil { + code = "breaker" + clientStats.Incr(uri, code) + return + } + defer client.onBreaker(brk, &err) + // stat + now := time.Now() + defer func() { + clientStats.Timing(uri, int64(time.Since(now)/time.Millisecond)) + if code != "" { + clientStats.Incr(uri, code) + } + }() + // get config + // 1.url config 2.host config 3.default + client.mutex.RLock() + if config, ok = client.urlConf[uri]; !ok { + if config, ok = client.hostConf[req.Host]; !ok { + config = client.conf + } + } + client.mutex.RUnlock() + // timeout + deliver := true + timeout = time.Duration(config.Timeout) + if deadline, ok := c.Deadline(); ok { + if ctimeout := time.Until(deadline); ctimeout < timeout { + // deliver small timeout + timeout = ctimeout + deliver = false + } + } + if deliver { + c, cancel = context.WithTimeout(c, timeout) + defer cancel() + } + setTimeout(req, timeout) + req = req.WithContext(c) + setCaller(req) + if color := metadata.String(c, metadata.Color); color != "" { + setColor(req, color) + } + if resp, err = client.client.Do(req); err != nil { + err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req)) + code = "failed" + return + } + defer resp.Body.Close() + if resp.StatusCode >= xhttp.StatusBadRequest { + err = pkgerr.Errorf("incorrect http status:%d host:%s, url:%s", resp.StatusCode, req.URL.Host, realURL(req)) + code = strconv.Itoa(resp.StatusCode) + return + } + if bs, err = readAll(resp.Body, _minRead); err != nil { + err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req)) + return + } + return +} + +// Do sends an HTTP request and returns an HTTP json response. +func (client *Client) Do(c context.Context, req *xhttp.Request, res interface{}, v ...string) (err error) { + var bs []byte + if bs, err = client.Raw(c, req, v...); err != nil { + return + } + if res != nil { + if err = json.Unmarshal(bs, res); err != nil { + err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req)) + } + } + return +} + +// JSON sends an HTTP request and returns an HTTP json response. +func (client *Client) JSON(c context.Context, req *xhttp.Request, res interface{}, v ...string) (err error) { + var bs []byte + if bs, err = client.Raw(c, req, v...); err != nil { + return + } + if res != nil { + if err = json.Unmarshal(bs, res); err != nil { + err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req)) + } + } + return +} + +// PB sends an HTTP request and returns an HTTP proto response. +func (client *Client) PB(c context.Context, req *xhttp.Request, res proto.Message, v ...string) (err error) { + var bs []byte + if bs, err = client.Raw(c, req, v...); err != nil { + return + } + if res != nil { + if err = proto.Unmarshal(bs, res); err != nil { + err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req)) + } + } + return +} + +func (client *Client) onBreaker(breaker breaker.Breaker, err *error) { + if err != nil && *err != nil { + breaker.MarkFailed() + } else { + breaker.MarkSuccess() + } +} + +// sign calc appkey and appsecret sign. +func (client *Client) sign(params url.Values) (query string, err error) { + client.mutex.RLock() + key := client.conf.Key + secret := client.conf.Secret + client.mutex.RUnlock() + if params == nil { + params = url.Values{} + } + params.Set(_appKey, key) + if params.Get(_appSecret) != "" { + log.Warn("utils http get must not have parameter appSecret") + } + if params.Get(_ts) == "" { + params.Set(_ts, strconv.FormatInt(time.Now().Unix(), 10)) + } + tmp := params.Encode() + if strings.IndexByte(tmp, '+') > -1 { + tmp = strings.Replace(tmp, "+", "%20", -1) + } + var b bytes.Buffer + b.WriteString(tmp) + b.WriteString(secret) + mh := md5.Sum(b.Bytes()) + // query + var qb bytes.Buffer + qb.WriteString(tmp) + qb.WriteString("&sign=") + qb.WriteString(hex.EncodeToString(mh[:])) + query = qb.String() + return +} + +// realUrl return url with http://host/params. +func realURL(req *xhttp.Request) string { + if req.Method == xhttp.MethodGet { + return req.URL.String() + } else if req.Method == xhttp.MethodPost { + ru := req.URL.Path + if req.Body != nil { + rd, ok := req.Body.(io.Reader) + if ok { + buf := bytes.NewBuffer([]byte{}) + buf.ReadFrom(rd) + ru = ru + "?" + buf.String() + } + } + return ru + } + return req.URL.Path +} + +// readAll reads from r until an error or EOF and returns the data it read +// from the internal buffer allocated with a specified capacity. +func readAll(r io.Reader, capacity int64) (b []byte, err error) { + buf := bytes.NewBuffer(make([]byte, 0, capacity)) + // If the buffer overflows, we will get bytes.ErrTooLarge. + // Return that as an error. Any other panic remains. + defer func() { + e := recover() + if e == nil { + return + } + if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge { + err = panicErr + } else { + panic(e) + } + }() + _, err = buf.ReadFrom(r) + return buf.Bytes(), err +} diff --git a/pkg/net/http/blademaster/metadata.go b/pkg/net/http/blademaster/metadata.go new file mode 100644 index 000000000..a868f4305 --- /dev/null +++ b/pkg/net/http/blademaster/metadata.go @@ -0,0 +1,106 @@ +package blademaster + +import ( + "net/http" + "strconv" + "strings" + "time" + + "github.com/bilibili/Kratos/pkg/conf/env" + "github.com/bilibili/Kratos/pkg/log" + + "github.com/pkg/errors" +) + +const ( + // http head + _httpHeaderUser = "x1-bmspy-user" + _httpHeaderColor = "x1-bmspy-color" + _httpHeaderTimeout = "x1-bmspy-timeout" + _httpHeaderRemoteIP = "x-backend-bm-real-ip" + _httpHeaderRemoteIPPort = "x-backend-bm-real-ipport" +) + +// mirror return true if x1-bilispy-mirror in http header and its value is 1 or true. +func mirror(req *http.Request) bool { + mirrorStr := req.Header.Get("x1-bilispy-mirror") + if mirrorStr == "" { + return false + } + val, err := strconv.ParseBool(mirrorStr) + if err != nil { + log.Warn("blademaster: failed to parse mirror: %+v", errors.Wrap(err, mirrorStr)) + return false + } + if !val { + log.Warn("blademaster: request mirrorStr value :%s is false", mirrorStr) + } + return val +} + +// setCaller set caller into http request. +func setCaller(req *http.Request) { + req.Header.Set(_httpHeaderUser, env.AppID) +} + +// caller get caller from http request. +func caller(req *http.Request) string { + return req.Header.Get(_httpHeaderUser) +} + +// setColor set color into http request. +func setColor(req *http.Request, color string) { + req.Header.Set(_httpHeaderColor, color) +} + +// color get color from http request. +func color(req *http.Request) string { + c := req.Header.Get(_httpHeaderColor) + if c == "" { + c = env.Color + } + return c +} + +// setTimeout set timeout into http request. +func setTimeout(req *http.Request, timeout time.Duration) { + td := int64(timeout / time.Millisecond) + req.Header.Set(_httpHeaderTimeout, strconv.FormatInt(td, 10)) +} + +// timeout get timeout from http request. +func timeout(req *http.Request) time.Duration { + to := req.Header.Get(_httpHeaderTimeout) + timeout, err := strconv.ParseInt(to, 10, 64) + if err == nil && timeout > 20 { + timeout -= 20 // reduce 20ms every time. + } + return time.Duration(timeout) * time.Millisecond +} + +// remoteIP implements a best effort algorithm to return the real client IP, it parses +// X-BACKEND-BILI-REAL-IP or X-Real-IP or X-Forwarded-For in order to work properly with reverse-proxies such us: nginx or haproxy. +// Use X-Forwarded-For before X-Real-Ip as nginx uses X-Real-Ip with the proxy's IP. +func remoteIP(req *http.Request) (remote string) { + if remote = req.Header.Get(_httpHeaderRemoteIP); remote != "" && remote != "null" { + return + } + var xff = req.Header.Get("X-Forwarded-For") + if idx := strings.IndexByte(xff, ','); idx > -1 { + if remote = strings.TrimSpace(xff[:idx]); remote != "" { + return + } + } + if remote = req.Header.Get("X-Real-IP"); remote != "" { + return + } + remote = req.RemoteAddr[:strings.Index(req.RemoteAddr, ":")] + return +} + +func remotePort(req *http.Request) (port string) { + if port = req.Header.Get(_httpHeaderRemoteIPPort); port != "" && port != "null" { + return + } + return +} diff --git a/pkg/net/http/blademaster/trace.go b/pkg/net/http/blademaster/trace.go new file mode 100644 index 000000000..b792246bb --- /dev/null +++ b/pkg/net/http/blademaster/trace.go @@ -0,0 +1,199 @@ +package blademaster + +import ( + "io" + "net/http" + "net/http/httptrace" + + "github.com/bilibili/Kratos/pkg/net/trace" +) + +const _defaultComponentName = "net/http" + +type closeTracker struct { + io.ReadCloser + tr trace.Trace +} + +func (c *closeTracker) Close() error { + err := c.ReadCloser.Close() + c.tr.SetLog(trace.Log(trace.LogEvent, "ClosedBody")) + c.tr.Finish(&err) + return err +} + +// NewTraceTracesport NewTraceTracesport +func NewTraceTracesport(rt http.RoundTripper, peerService string, internalTags ...trace.Tag) *TraceTransport { + return &TraceTransport{RoundTripper: rt, peerService: peerService, internalTags: internalTags} +} + +// TraceTransport wraps a RoundTripper. If a request is being traced with +// Tracer, Transport will inject the current span into the headers, +// and set HTTP related tags on the span. +type TraceTransport struct { + peerService string + internalTags []trace.Tag + // The actual RoundTripper to use for the request. A nil + // RoundTripper defaults to http.DefaultTransport. + http.RoundTripper +} + +// RoundTrip implements the RoundTripper interface +func (t *TraceTransport) RoundTrip(req *http.Request) (*http.Response, error) { + rt := t.RoundTripper + if rt == nil { + rt = http.DefaultTransport + } + tr, ok := trace.FromContext(req.Context()) + if !ok { + return rt.RoundTrip(req) + } + operationName := "HTTP:" + req.Method + // fork new trace + tr = tr.Fork("", operationName) + + tr.SetTag(trace.TagString(trace.TagComponent, _defaultComponentName)) + tr.SetTag(trace.TagString(trace.TagHTTPMethod, req.Method)) + tr.SetTag(trace.TagString(trace.TagHTTPURL, req.URL.String())) + tr.SetTag(trace.TagString(trace.TagSpanKind, "client")) + if t.peerService != "" { + tr.SetTag(trace.TagString(trace.TagPeerService, t.peerService)) + } + tr.SetTag(t.internalTags...) + + // inject trace to http header + trace.Inject(tr, trace.HTTPFormat, req.Header) + + // FIXME: uncomment after trace sdk is goroutinue safe + // ct := clientTracer{tr: tr} + // req = req.WithContext(httptrace.WithClientTrace(req.Context(), ct.clientTrace())) + resp, err := rt.RoundTrip(req) + + if err != nil { + tr.SetTag(trace.TagBool(trace.TagError, true)) + tr.Finish(&err) + return resp, err + } + + // TODO: get ecode + tr.SetTag(trace.TagInt64(trace.TagHTTPStatusCode, int64(resp.StatusCode))) + + if req.Method == "HEAD" { + tr.Finish(nil) + } else { + resp.Body = &closeTracker{resp.Body, tr} + } + return resp, err +} + +type clientTracer struct { + tr trace.Trace +} + +func (h *clientTracer) clientTrace() *httptrace.ClientTrace { + return &httptrace.ClientTrace{ + GetConn: h.getConn, + GotConn: h.gotConn, + PutIdleConn: h.putIdleConn, + GotFirstResponseByte: h.gotFirstResponseByte, + Got100Continue: h.got100Continue, + DNSStart: h.dnsStart, + DNSDone: h.dnsDone, + ConnectStart: h.connectStart, + ConnectDone: h.connectDone, + WroteHeaders: h.wroteHeaders, + Wait100Continue: h.wait100Continue, + WroteRequest: h.wroteRequest, + } +} + +func (h *clientTracer) getConn(hostPort string) { + // ext.HTTPUrl.Set(h.sp, hostPort) + h.tr.SetLog(trace.Log(trace.LogEvent, "GetConn")) +} + +func (h *clientTracer) gotConn(info httptrace.GotConnInfo) { + h.tr.SetTag(trace.TagBool("net/http.reused", info.Reused)) + h.tr.SetTag(trace.TagBool("net/http.was_idle", info.WasIdle)) + h.tr.SetLog(trace.Log(trace.LogEvent, "GotConn")) +} + +func (h *clientTracer) putIdleConn(error) { + h.tr.SetLog(trace.Log(trace.LogEvent, "PutIdleConn")) +} + +func (h *clientTracer) gotFirstResponseByte() { + h.tr.SetLog(trace.Log(trace.LogEvent, "GotFirstResponseByte")) +} + +func (h *clientTracer) got100Continue() { + h.tr.SetLog(trace.Log(trace.LogEvent, "Got100Continue")) +} + +func (h *clientTracer) dnsStart(info httptrace.DNSStartInfo) { + h.tr.SetLog( + trace.Log(trace.LogEvent, "DNSStart"), + trace.Log("host", info.Host), + ) +} + +func (h *clientTracer) dnsDone(info httptrace.DNSDoneInfo) { + fields := []trace.LogField{trace.Log(trace.LogEvent, "DNSDone")} + for _, addr := range info.Addrs { + fields = append(fields, trace.Log("addr", addr.String())) + } + if info.Err != nil { + // TODO: support log error object + fields = append(fields, trace.Log(trace.LogErrorObject, info.Err.Error())) + } + h.tr.SetLog(fields...) +} + +func (h *clientTracer) connectStart(network, addr string) { + h.tr.SetLog( + trace.Log(trace.LogEvent, "ConnectStart"), + trace.Log("network", network), + trace.Log("addr", addr), + ) +} + +func (h *clientTracer) connectDone(network, addr string, err error) { + if err != nil { + h.tr.SetLog( + trace.Log("message", "ConnectDone"), + trace.Log("network", network), + trace.Log("addr", addr), + trace.Log(trace.LogEvent, "error"), + // TODO: support log error object + trace.Log(trace.LogErrorObject, err.Error()), + ) + } else { + h.tr.SetLog( + trace.Log(trace.LogEvent, "ConnectDone"), + trace.Log("network", network), + trace.Log("addr", addr), + ) + } +} + +func (h *clientTracer) wroteHeaders() { + h.tr.SetLog(trace.Log("event", "WroteHeaders")) +} + +func (h *clientTracer) wait100Continue() { + h.tr.SetLog(trace.Log("event", "Wait100Continue")) +} + +func (h *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) { + if info.Err != nil { + h.tr.SetLog( + trace.Log("message", "WroteRequest"), + trace.Log("event", "error"), + // TODO: support log error object + trace.Log(trace.LogErrorObject, info.Err.Error()), + ) + h.tr.SetTag(trace.TagBool(trace.TagError, true)) + } else { + h.tr.SetLog(trace.Log("event", "WroteRequest")) + } +} diff --git a/pkg/net/netutil/backoff.go b/pkg/net/netutil/backoff.go new file mode 100644 index 000000000..d96f511f3 --- /dev/null +++ b/pkg/net/netutil/backoff.go @@ -0,0 +1,72 @@ +package netutil + +import ( + "math/rand" + "time" +) + +// DefaultBackoffConfig uses values specified for backoff in common. +var DefaultBackoffConfig = BackoffConfig{ + MaxDelay: 120 * time.Second, + BaseDelay: 1.0 * time.Second, + Factor: 1.6, + Jitter: 0.2, +} + +// Backoff defines the methodology for backing off after a call failure. +type Backoff interface { + // Backoff returns the amount of time to wait before the next retry given + // the number of consecutive failures. + Backoff(retries int) time.Duration +} + +// BackoffConfig defines the parameters for the default backoff strategy. +type BackoffConfig struct { + // MaxDelay is the upper bound of backoff delay. + MaxDelay time.Duration + + // baseDelay is the amount of time to wait before retrying after the first + // failure. + BaseDelay time.Duration + + // factor is applied to the backoff after each retry. + Factor float64 + + // jitter provides a range to randomize backoff delays. + Jitter float64 +} + +/* +// NOTE TODO avoid use unexcept config. +func (bc *BackoffConfig) Fix() { + md := bc.MaxDelay + *bc = DefaultBackoffConfig + + if md > 0 { + bc.MaxDelay = md + } +} +*/ + +// Backoff returns the amount of time to wait before the next retry given +// the number of consecutive failures. +func (bc *BackoffConfig) Backoff(retries int) time.Duration { + if retries == 0 { + return bc.BaseDelay + } + backoff, max := float64(bc.BaseDelay), float64(bc.MaxDelay) + for backoff < max && retries > 0 { + backoff *= bc.Factor + retries-- + } + if backoff > max { + backoff = max + } + // Randomize backoff delays so that if a cluster of requests start at + // the same time, they won't operate in lockstep. + backoff *= 1 + bc.Jitter*(rand.Float64()*2-1) + if backoff < 0 { + return 0 + } + return time.Duration(backoff) +} diff --git a/pkg/net/netutil/breaker/breaker.go b/pkg/net/netutil/breaker/breaker.go new file mode 100644 index 000000000..43adbd560 --- /dev/null +++ b/pkg/net/netutil/breaker/breaker.go @@ -0,0 +1,176 @@ +package breaker + +import ( + "sync" + "time" + + xtime "go-common/library/time" +) + +// Config broker config. +type Config struct { + SwitchOff bool // breaker switch,default off. + + // Hystrix + Ratio float32 + Sleep xtime.Duration + + // Google + K float64 + + Window xtime.Duration + Bucket int + Request int64 +} + +func (conf *Config) fix() { + if conf.K == 0 { + conf.K = 1.5 + } + if conf.Request == 0 { + conf.Request = 100 + } + if conf.Ratio == 0 { + conf.Ratio = 0.5 + } + if conf.Sleep == 0 { + conf.Sleep = xtime.Duration(500 * time.Millisecond) + } + if conf.Bucket == 0 { + conf.Bucket = 10 + } + if conf.Window == 0 { + conf.Window = xtime.Duration(3 * time.Second) + } +} + +// Breaker is a CircuitBreaker pattern. +// FIXME on int32 atomic.LoadInt32(&b.on) == _switchOn +type Breaker interface { + Allow() error + MarkSuccess() + MarkFailed() +} + +// Group represents a class of CircuitBreaker and forms a namespace in which +// units of CircuitBreaker. +type Group struct { + mu sync.RWMutex + brks map[string]Breaker + conf *Config +} + +const ( + // StateOpen when circuit breaker open, request not allowed, after sleep + // some duration, allow one single request for testing the health, if ok + // then state reset to closed, if not continue the step. + StateOpen int32 = iota + // StateClosed when circuit breaker closed, request allowed, the breaker + // calc the succeed ratio, if request num greater request setting and + // ratio lower than the setting ratio, then reset state to open. + StateClosed + // StateHalfopen when circuit breaker open, after slepp some duration, allow + // one request, but not state closed. + StateHalfopen + + //_switchOn int32 = iota + // _switchOff +) + +var ( + _mu sync.RWMutex + _conf = &Config{ + Window: xtime.Duration(3 * time.Second), + Bucket: 10, + Request: 100, + + Sleep: xtime.Duration(500 * time.Millisecond), + Ratio: 0.5, + // Percentage of failures must be lower than 33.33% + K: 1.5, + + // Pattern: "", + } + _group = NewGroup(_conf) +) + +// Init init global breaker config, also can reload config after first time call. +func Init(conf *Config) { + if conf == nil { + return + } + _mu.Lock() + _conf = conf + _mu.Unlock() +} + +// Go runs your function while tracking the breaker state of default group. +func Go(name string, run, fallback func() error) error { + breaker := _group.Get(name) + if err := breaker.Allow(); err != nil { + return fallback() + } + return run() +} + +// newBreaker new a breaker. +func newBreaker(c *Config) (b Breaker) { + // factory + return newSRE(c) +} + +// NewGroup new a breaker group container, if conf nil use default conf. +func NewGroup(conf *Config) *Group { + if conf == nil { + _mu.RLock() + conf = _conf + _mu.RUnlock() + } else { + conf.fix() + } + return &Group{ + conf: conf, + brks: make(map[string]Breaker), + } +} + +// Get get a breaker by a specified key, if breaker not exists then make a new one. +func (g *Group) Get(key string) Breaker { + g.mu.RLock() + brk, ok := g.brks[key] + conf := g.conf + g.mu.RUnlock() + if ok { + return brk + } + // NOTE here may new multi breaker for rarely case, let gc drop it. + brk = newBreaker(conf) + g.mu.Lock() + if _, ok = g.brks[key]; !ok { + g.brks[key] = brk + } + g.mu.Unlock() + return brk +} + +// Reload reload the group by specified config, this may let all inner breaker +// reset to a new one. +func (g *Group) Reload(conf *Config) { + if conf == nil { + return + } + conf.fix() + g.mu.Lock() + g.conf = conf + g.brks = make(map[string]Breaker, len(g.brks)) + g.mu.Unlock() +} + +// Go runs your function while tracking the breaker state of group. +func (g *Group) Go(name string, run, fallback func() error) error { + breaker := g.Get(name) + if err := breaker.Allow(); err != nil { + return fallback() + } + return run() +} diff --git a/pkg/net/netutil/breaker/sre_breaker.go b/pkg/net/netutil/breaker/sre_breaker.go new file mode 100644 index 000000000..d713dfe4f --- /dev/null +++ b/pkg/net/netutil/breaker/sre_breaker.go @@ -0,0 +1,71 @@ +package breaker + +import ( + "math" + "math/rand" + "sync/atomic" + "time" + + "github.com/bilibili/Kratos/pkg/ecode" + "github.com/bilibili/Kratos/pkg/log" + "github.com/bilibili/Kratos/pkg/stat/summary" +) + +// sreBreaker is a sre CircuitBreaker pattern. +type sreBreaker struct { + stat summary.Summary + + k float64 + request int64 + + state int32 + r *rand.Rand +} + +func newSRE(c *Config) Breaker { + return &sreBreaker{ + stat: summary.New(time.Duration(c.Window), c.Bucket), + r: rand.New(rand.NewSource(time.Now().UnixNano())), + + request: c.Request, + k: c.K, + state: StateClosed, + } +} + +func (b *sreBreaker) Allow() error { + success, total := b.stat.Value() + k := b.k * float64(success) + if log.V(5) { + log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success) + } + // check overflow requests = K * success + if total < b.request || float64(total) < k { + if atomic.LoadInt32(&b.state) == StateOpen { + atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed) + } + return nil + } + if atomic.LoadInt32(&b.state) == StateClosed { + atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) + } + dr := math.Max(0, (float64(total)-k)/float64(total+1)) + rr := b.r.Float64() + if log.V(5) { + log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr) + } + if dr <= rr { + return nil + } + return ecode.ServiceUnavailable +} + +func (b *sreBreaker) MarkSuccess() { + b.stat.Add(1) +} + +func (b *sreBreaker) MarkFailed() { + // NOTE: when client reject requets locally, continue add counter let the + // drop ratio higher. + b.stat.Add(0) +}