naming discovery sdk and it's dependency

pull/3/head
lintanghui 6 years ago
parent cc57564fba
commit 7d6f233259
  1. 56
      pkg/log/handler.go
  2. 29
      pkg/log/level.go
  3. 88
      pkg/log/log.go
  4. 83
      pkg/log/verbose.go
  5. 684
      pkg/naming/discovery/discovery.go
  6. 15
      pkg/naming/naming.go
  7. 429
      pkg/net/http/blademaster/client.go
  8. 106
      pkg/net/http/blademaster/metadata.go
  9. 199
      pkg/net/http/blademaster/trace.go
  10. 72
      pkg/net/netutil/backoff.go
  11. 176
      pkg/net/netutil/breaker/breaker.go
  12. 71
      pkg/net/netutil/breaker/sre_breaker.go

@ -0,0 +1,56 @@
package log
import (
"context"
pkgerr "github.com/pkg/errors"
)
const (
_log = "log"
)
// Handler is used to handle log events, outputting them to
// stdio or sending them to remote services. See the "handlers"
// directory for implementations.
//
// It is left up to Handlers to implement thread-safety.
type Handler interface {
// Log handle log
// variadic D is k-v struct represent log content
Log(context.Context, Level, ...D)
// SetFormat set render format on log output
// see StdoutHandler.SetFormat for detail
SetFormat(string)
// Close handler
Close() error
}
// Handlers .
type Handlers []Handler
// Log handlers logging.
func (hs Handlers) Log(c context.Context, lv Level, d ...D) {
for _, h := range hs {
h.Log(c, lv, d...)
}
}
// Close close resource.
func (hs Handlers) Close() (err error) {
for _, h := range hs {
if e := h.Close(); e != nil {
err = pkgerr.WithStack(e)
}
}
return
}
// SetFormat .
func (hs Handlers) SetFormat(format string) {
for _, h := range hs {
h.SetFormat(format)
}
}

@ -0,0 +1,29 @@
package log
// Level of severity.
type Level int
// Verbose is a boolean type that implements Info, Infov (like Printf) etc.
type Verbose bool
// common log level.
const (
_debugLevel Level = iota
_infoLevel
_warnLevel
_errorLevel
_fatalLevel
)
var levelNames = [...]string{
_debugLevel: "DEBUG",
_infoLevel: "INFO",
_warnLevel: "WARN",
_errorLevel: "ERROR",
_fatalLevel: "FATAL",
}
// String implementation.
func (l Level) String() string {
return levelNames[l]
}

@ -0,0 +1,88 @@
package log
import (
"context"
"fmt"
)
// Config log config.
type Config struct {
Family string
Host string
// stdout
Stdout bool
// file
Dir string
// buffer size
FileBufferSize int64
// MaxLogFile
MaxLogFile int
// RotateSize
RotateSize int64
// V Enable V-leveled logging at the specified level.
V int32
// Module=""
// The syntax of the argument is a map of pattern=N,
// where pattern is a literal file name (minus the ".go" suffix) or
// "glob" pattern and N is a V level. For instance:
// [module]
// "service" = 1
// "dao*" = 2
// sets the V level to 2 in all Go files whose names begin "dao".
Module map[string]int32
// Filter tell log handler which field are sensitive message, use * instead.
Filter []string
}
var (
h Handler
c *Config
)
// D represents a map of entry level data used for structured logging.
// type D map[string]interface{}
type D struct {
Key string
Value interface{}
}
// KV return a log kv for logging field.
func KV(key string, value interface{}) D {
return D{
Key: key,
Value: value,
}
}
// Info logs a message at the info log level.
func Info(format string, args ...interface{}) {
h.Log(context.Background(), _infoLevel, KV(_log, fmt.Sprintf(format, args...)))
}
// Warn logs a message at the warning log level.
func Warn(format string, args ...interface{}) {
h.Log(context.Background(), _warnLevel, KV(_log, fmt.Sprintf(format, args...)))
}
// Error logs a message at the error log level.
func Error(format string, args ...interface{}) {
h.Log(context.Background(), _errorLevel, KV(_log, fmt.Sprintf(format, args...)))
}
func logw(args []interface{}) []D {
if len(args)%2 != 0 {
Warn("log: the variadic must be plural, the last one will ignored")
}
ds := make([]D, 0, len(args)/2)
for i := 0; i < len(args)-1; i = i + 2 {
if key, ok := args[i].(string); ok {
ds = append(ds, KV(key, args[i+1]))
} else {
Warn("log: key must be string, get %T, ignored", args[i])
}
}
return ds
}

@ -0,0 +1,83 @@
package log
import (
"context"
"fmt"
"path/filepath"
"runtime"
"strings"
)
// V reports whether verbosity at the call site is at least the requested level.
// The returned value is a boolean of type Verbose, which implements Info, Infov etc.
// These methods will write to the Info log if called.
// Thus, one may write either
// if log.V(2) { log.Info("log this") }
// or
// log.V(2).Info("log this")
// The second form is shorter but the first is cheaper if logging is off because it does
// not evaluate its arguments.
//
// Whether an individual call to V generates a log record depends on the setting of
// the Config.VLevel and Config.Module flags; both are off by default. If the level in the call to
// V is at least the value of Config.VLevel, or of Config.Module for the source file containing the
// call, the V call will log.
// v must be more than 0.
func V(v int32) Verbose {
var (
file string
)
if v < 0 {
return Verbose(false)
} else if c.V >= v {
return Verbose(true)
}
if pc, _, _, ok := runtime.Caller(1); ok {
file, _ = runtime.FuncForPC(pc).FileLine(pc)
}
if strings.HasSuffix(file, ".go") {
file = file[:len(file)-3]
}
if slash := strings.LastIndex(file, "/"); slash >= 0 {
file = file[slash+1:]
}
for filter, lvl := range c.Module {
var match bool
if match = filter == file; !match {
match, _ = filepath.Match(filter, file)
}
if match {
return Verbose(lvl >= v)
}
}
return Verbose(false)
}
// Info logs a message at the info log level.
func (v Verbose) Info(format string, args ...interface{}) {
if v {
h.Log(context.Background(), _infoLevel, KV(_log, fmt.Sprintf(format, args...)))
}
}
// Infov logs a message at the info log level.
func (v Verbose) Infov(ctx context.Context, args ...D) {
if v {
h.Log(ctx, _infoLevel, args...)
}
}
// Infow logs a message with some additional context. The variadic key-value pairs are treated as they are in With.
func (v Verbose) Infow(ctx context.Context, args ...interface{}) {
if v {
h.Log(ctx, _infoLevel, logw(args)...)
}
}
// Close close resource.
func (v Verbose) Close() (err error) {
if h == nil {
return
}
return h.Close()
}

@ -0,0 +1,684 @@
package discovery
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/url"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/bilibili/Kratos/pkg/conf/env"
"github.com/bilibili/Kratos/pkg/ecode"
"github.com/bilibili/Kratos/pkg/log"
"github.com/bilibili/Kratos/pkg/naming"
bm "github.com/bilibili/Kratos/pkg/net/http/blademaster"
"github.com/bilibili/Kratos/pkg/net/netutil"
"github.com/bilibili/Kratos/pkg/net/netutil/breaker"
"github.com/bilibili/Kratos/pkg/str"
xtime "github.com/bilibili/Kratos/pkg/time"
)
const (
_registerURL = "http://%s/discovery/register"
_setURL = "http://%s/discovery/set"
_cancelURL = "http://%s/discovery/cancel"
_renewURL = "http://%s/discovery/renew"
_pollURL = "http://%s/discovery/polls"
_nodesURL = "http://%s/discovery/nodes"
_registerGap = 30 * time.Second
_statusUP = "1"
)
const (
_appid = "infra.discovery"
)
var (
_ naming.Builder = &Discovery{}
_ naming.Registry = &Discovery{}
// ErrDuplication duplication treeid.
ErrDuplication = errors.New("discovery: instance duplicate registration")
)
// Config discovery configures.
type Config struct {
Nodes []string
Key string
Secret string
Region string
Zone string
Env string
Host string
}
// Discovery is discovery client.
type Discovery struct {
once sync.Once
conf *Config
ctx context.Context
cancelFunc context.CancelFunc
httpClient *bm.Client
mutex sync.RWMutex
apps map[string]*appInfo
registry map[string]struct{}
lastHost string
cancelPolls context.CancelFunc
idx uint64
node atomic.Value
delete chan *appInfo
}
type appInfo struct {
zoneIns atomic.Value
resolver map[*Resolver]struct{}
lastTs int64 // latest timestamp
}
func fixConfig(c *Config) {
if len(c.Nodes) == 0 {
c.Nodes = []string{"api.bilibili.co"}
}
if env.Region != "" {
c.Region = env.Region
}
if env.Zone != "" {
c.Zone = env.Zone
}
if env.DeployEnv != "" {
c.Env = env.DeployEnv
}
if env.Hostname != "" {
c.Host = env.Hostname
} else {
c.Host, _ = os.Hostname()
}
}
var (
once sync.Once
_defaultDiscovery *Discovery
)
func initDefault() {
once.Do(func() {
_defaultDiscovery = New(nil)
})
}
// Builder return default discvoery resolver builder.
func Builder() naming.Builder {
if _defaultDiscovery == nil {
initDefault()
}
return _defaultDiscovery
}
// Build register resolver into default discovery.
func Build(id string) naming.Resolver {
if _defaultDiscovery == nil {
initDefault()
}
return _defaultDiscovery.Build(id)
}
// New new a discovery client.
func New(c *Config) (d *Discovery) {
if c == nil {
c = &Config{
Nodes: []string{"discovery.bilibili.co", "api.bilibili.co"},
Key: "discovery",
Secret: "discovery",
}
}
fixConfig(c)
ctx, cancel := context.WithCancel(context.Background())
d = &Discovery{
ctx: ctx,
cancelFunc: cancel,
conf: c,
apps: map[string]*appInfo{},
registry: map[string]struct{}{},
delete: make(chan *appInfo, 10),
}
// httpClient
cfg := &bm.ClientConfig{
App: &bm.App{
Key: c.Key,
Secret: c.Secret,
},
Dial: xtime.Duration(3 * time.Second),
Timeout: xtime.Duration(40 * time.Second),
Breaker: &breaker.Config{
Window: 100,
Sleep: 3,
Bucket: 10,
Ratio: 0.5,
Request: 100,
},
}
d.httpClient = bm.NewClient(cfg)
resolver := d.Build(_appid)
event := resolver.Watch()
_, ok := <-event
if !ok {
panic("discovery watch failed")
}
ins, ok := resolver.Fetch(context.Background())
if ok {
d.newSelf(ins)
}
go d.selfproc(resolver, event)
return
}
func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) {
for {
_, ok := <-event
if !ok {
return
}
instances, ok := resolver.Fetch(context.Background())
if ok {
d.newSelf(instances)
}
}
}
func (d *Discovery) newSelf(instances naming.InstancesInfo) {
ins, ok := instances.Instances[d.conf.Zone]
if !ok {
return
}
var nodes []string
for _, in := range ins {
for _, addr := range in.Addrs {
u, err := url.Parse(addr)
if err == nil && u.Scheme == "http" {
nodes = append(nodes, u.Host)
}
}
}
// diff old nodes
olds, ok := d.node.Load().([]string)
if ok {
var diff int
for _, n := range nodes {
for _, o := range olds {
if o == n {
diff++
break
}
}
}
if len(nodes) == diff {
return
}
}
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
d.node.Store(nodes)
}
// Build disovery resovler builder.
func (d *Discovery) Build(appid string) naming.Resolver {
r := &Resolver{
id: appid,
d: d,
event: make(chan struct{}, 1),
}
d.mutex.Lock()
app, ok := d.apps[appid]
if !ok {
app = &appInfo{
resolver: make(map[*Resolver]struct{}),
}
d.apps[appid] = app
cancel := d.cancelPolls
if cancel != nil {
cancel()
}
}
app.resolver[r] = struct{}{}
d.mutex.Unlock()
if ok {
select {
case r.event <- struct{}{}:
default:
}
}
log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok)
d.once.Do(func() {
go d.serverproc()
})
return r
}
// Scheme return discovery's scheme
func (d *Discovery) Scheme() string {
return "discovery"
}
// Resolver discveory resolver.
type Resolver struct {
id string
event chan struct{}
d *Discovery
}
// Watch watch instance.
func (r *Resolver) Watch() <-chan struct{} {
return r.event
}
// Fetch fetch resolver instance.
func (r *Resolver) Fetch(c context.Context) (ins naming.InstancesInfo, ok bool) {
r.d.mutex.RLock()
app, ok := r.d.apps[r.id]
r.d.mutex.RUnlock()
if ok {
ins, ok = app.zoneIns.Load().(naming.InstancesInfo)
return
}
return
}
// Close close resolver.
func (r *Resolver) Close() error {
r.d.mutex.Lock()
if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 {
delete(app.resolver, r)
// TODO: delete app from builder
}
r.d.mutex.Unlock()
return nil
}
func (d *Discovery) pickNode() string {
nodes, ok := d.node.Load().([]string)
if !ok || len(nodes) == 0 {
return d.conf.Nodes[d.idx%uint64(len(d.conf.Nodes))]
}
return nodes[d.idx%uint64(len(nodes))]
}
func (d *Discovery) switchNode() {
atomic.AddUint64(&d.idx, 1)
}
// Reload reload the config
func (d *Discovery) Reload(c *Config) {
fixConfig(c)
d.mutex.Lock()
d.conf = c
d.mutex.Unlock()
}
// Close stop all running process including discovery and register
func (d *Discovery) Close() error {
d.cancelFunc()
return nil
}
// Register Register an instance with discovery and renew automatically
func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) {
d.mutex.Lock()
if _, ok := d.registry[ins.AppID]; ok {
err = ErrDuplication
} else {
d.registry[ins.AppID] = struct{}{}
}
d.mutex.Unlock()
if err != nil {
return
}
if err = d.register(c, ins); err != nil {
d.mutex.Lock()
delete(d.registry, ins.AppID)
d.mutex.Unlock()
return
}
ctx, cancel := context.WithCancel(d.ctx)
ch := make(chan struct{}, 1)
cancelFunc = context.CancelFunc(func() {
cancel()
<-ch
})
go func() {
ticker := time.NewTicker(_registerGap)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := d.renew(ctx, ins); err != nil && ecode.NothingFound.Equal(err) {
d.register(ctx, ins)
}
case <-ctx.Done():
d.cancel(ins)
ch <- struct{}{}
return
}
}
}()
return
}
// Set set ins status and metadata.
func (d *Discovery) Set(ins *naming.Instance) error {
return d.set(context.Background(), ins)
}
// cancel Remove the registered instance from discovery
func (d *Discovery) cancel(ins *naming.Instance) (err error) {
d.mutex.RLock()
conf := d.conf
d.mutex.RUnlock()
res := new(struct {
Code int `json:"code"`
Message string `json:"message"`
})
uri := fmt.Sprintf(_cancelURL, d.pickNode())
params := d.newParams(conf)
params.Set("appid", ins.AppID)
// request
if err = d.httpClient.Post(context.Background(), uri, "", params, &res); err != nil {
d.switchNode()
log.Error("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
uri, conf.Env, ins.AppID, conf.Host, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
log.Warn("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
uri, conf.Env, ins.AppID, conf.Host, res.Code)
err = ec
return
}
log.Info("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success",
uri, conf.Env, ins.AppID, conf.Host)
return
}
// register Register an instance with discovery
func (d *Discovery) register(ctx context.Context, ins *naming.Instance) (err error) {
d.mutex.RLock()
conf := d.conf
d.mutex.RUnlock()
var metadata []byte
if ins.Metadata != nil {
if metadata, err = json.Marshal(ins.Metadata); err != nil {
log.Error("discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err)
}
}
res := new(struct {
Code int `json:"code"`
Message string `json:"message"`
})
uri := fmt.Sprintf(_registerURL, d.pickNode())
params := d.newParams(conf)
params.Set("appid", ins.AppID)
params.Set("addrs", strings.Join(ins.Addrs, ","))
params.Set("version", ins.Version)
params.Set("status", _statusUP)
params.Set("metadata", string(metadata))
if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
d.switchNode()
log.Error("discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)",
uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
log.Warn("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
uri, conf.Env, ins.AppID, ins.Addrs, res.Code)
err = ec
return
}
log.Info("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success",
uri, conf.Env, ins.AppID, ins.Addrs)
return
}
// rset set instance info with discovery
func (d *Discovery) set(ctx context.Context, ins *naming.Instance) (err error) {
d.mutex.RLock()
conf := d.conf
d.mutex.RUnlock()
res := new(struct {
Code int `json:"code"`
Message string `json:"message"`
})
uri := fmt.Sprintf(_setURL, d.pickNode())
params := d.newParams(conf)
params.Set("appid", ins.AppID)
params.Set("version", ins.Version)
params.Set("status", strconv.FormatInt(ins.Status, 10))
if ins.Metadata != nil {
var metadata []byte
if metadata, err = json.Marshal(ins.Metadata); err != nil {
log.Error("discovery:set instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err)
}
params.Set("metadata", string(metadata))
}
if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
d.switchNode()
log.Error("discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)",
uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
log.Warn("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
uri, conf.Env, ins.AppID, ins.Addrs, res.Code)
err = ec
return
}
log.Info("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success",
uri+"?"+params.Encode(), conf.Env, ins.AppID, ins.Addrs)
return
}
// renew Renew an instance with discovery
func (d *Discovery) renew(ctx context.Context, ins *naming.Instance) (err error) {
d.mutex.RLock()
conf := d.conf
d.mutex.RUnlock()
res := new(struct {
Code int `json:"code"`
Message string `json:"message"`
})
uri := fmt.Sprintf(_renewURL, d.pickNode())
params := d.newParams(conf)
params.Set("appid", ins.AppID)
if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil {
d.switchNode()
log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
uri, conf.Env, ins.AppID, conf.Host, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
err = ec
if ec.Equal(ecode.NothingFound) {
return
}
log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
uri, conf.Env, ins.AppID, conf.Host, res.Code)
return
}
return
}
func (d *Discovery) serverproc() {
var (
retry int
ctx context.Context
cancel context.CancelFunc
)
bc := netutil.DefaultBackoffConfig
ticker := time.NewTicker(time.Minute * 30)
defer ticker.Stop()
for {
if ctx == nil {
ctx, cancel = context.WithCancel(d.ctx)
d.mutex.Lock()
d.cancelPolls = cancel
d.mutex.Unlock()
}
select {
case <-d.ctx.Done():
return
default:
}
apps, err := d.polls(ctx, d.pickNode())
if err != nil {
d.switchNode()
if ctx.Err() == context.Canceled {
ctx = nil
continue
}
time.Sleep(bc.Backoff(retry))
retry++
continue
}
retry = 0
d.broadcast(apps)
}
}
func (d *Discovery) nodes() (nodes []string) {
res := new(struct {
Code int `json:"code"`
Data []struct {
Addr string `json:"addr"`
} `json:"data"`
})
uri := fmt.Sprintf(_nodesURL, d.pickNode())
if err := d.httpClient.Get(d.ctx, uri, "", nil, res); err != nil {
d.switchNode()
log.Error("discovery: consumer client.Get(%v)error(%+v)", uri, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
log.Error("discovery: consumer client.Get(%v) error(%v)", uri, res.Code)
return
}
if len(res.Data) == 0 {
log.Warn("discovery: get nodes(%s) failed,no nodes found!", uri)
return
}
nodes = make([]string, 0, len(res.Data))
for i := range res.Data {
nodes = append(nodes, res.Data[i].Addr)
}
return
}
func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]naming.InstancesInfo, err error) {
var (
lastTs []int64
appid []string
changed bool
)
if host != d.lastHost {
d.lastHost = host
changed = true
}
d.mutex.RLock()
conf := d.conf
for k, v := range d.apps {
if changed {
v.lastTs = 0
}
appid = append(appid, k)
lastTs = append(lastTs, v.lastTs)
}
d.mutex.RUnlock()
if len(appid) == 0 {
return
}
uri := fmt.Sprintf(_pollURL, host)
res := new(struct {
Code int `json:"code"`
Data map[string]naming.InstancesInfo `json:"data"`
})
params := url.Values{}
params.Set("env", conf.Env)
params.Set("hostname", conf.Host)
params.Set("appid", strings.Join(appid, ","))
params.Set("latest_timestamp", xstr.JoinInts(lastTs))
if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil {
log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
if !ec.Equal(ecode.NotModified) {
log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code)
err = ec
}
return
}
info, _ := json.Marshal(res.Data)
for _, app := range res.Data {
if app.LastTs == 0 {
err = ecode.ServerErr
log.Error("discovery: client.Get(%s) latest_timestamp is 0,instances:(%s)", uri+"?"+params.Encode(), info)
return
}
}
log.Info("discovery: polls uri(%s)", uri+"?"+params.Encode())
log.Info("discovery: successfully polls(%s) instances (%s)", uri+"?"+params.Encode(), info)
apps = res.Data
return
}
func (d *Discovery) broadcast(apps map[string]naming.InstancesInfo) {
for id, v := range apps {
var count int
for zone, ins := range v.Instances {
if len(ins) == 0 {
delete(v.Instances, zone)
}
count += len(ins)
}
if count == 0 {
continue
}
d.mutex.RLock()
app, ok := d.apps[id]
d.mutex.RUnlock()
if ok {
app.lastTs = v.LastTs
app.zoneIns.Store(v)
d.mutex.RLock()
for rs := range app.resolver {
select {
case rs.event <- struct{}{}:
default:
}
}
d.mutex.RUnlock()
}
}
}
func (d *Discovery) newParams(conf *Config) url.Values {
params := url.Values{}
params.Set("region", conf.Region)
params.Set("zone", conf.Zone)
params.Set("env", conf.Env)
params.Set("hostname", conf.Host)
return params
}

@ -37,9 +37,22 @@ type Instance struct {
Status int64 Status int64
} }
// InstancesInfo instance info.
type InstancesInfo struct {
Instances map[string][]*Instance `json:"zone_instances"`
LastTs int64 `json:"latest_timestamp"`
Scheduler []*Scheduler `json:"scheduler"`
}
// Scheduler scheduler info in multi cluster.
type Scheduler struct {
Src string `json:"src"`
Dst map[string]int64 `json:"dst"`
}
// Resolver resolve naming service // Resolver resolve naming service
type Resolver interface { type Resolver interface {
Fetch(context.Context) (map[string][]*Instance, bool) Fetch(context.Context) (*InstancesInfo, bool)
//Unwatch(id string) //Unwatch(id string)
Watch() <-chan struct{} Watch() <-chan struct{}
Close() error Close() error

@ -0,0 +1,429 @@
package blademaster
import (
"bytes"
"context"
"crypto/md5"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
xhttp "net/http"
"net/url"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/bilibili/Kratos/pkg/conf/env"
"github.com/bilibili/Kratos/pkg/log"
"github.com/bilibili/Kratos/pkg/net/metadata"
"github.com/bilibili/Kratos/pkg/net/netutil/breaker"
"github.com/bilibili/Kratos/pkg/stat"
xtime "github.com/bilibili/Kratos/pkg/time"
"github.com/gogo/protobuf/proto"
pkgerr "github.com/pkg/errors"
)
const (
_minRead = 16 * 1024 // 16kb
_appKey = "appkey"
_appSecret = "appsecret"
_ts = "ts"
)
var (
_noKickUserAgent = "haoguanwei@bilibili.com "
clientStats = stat.HTTPClient
)
func init() {
n, err := os.Hostname()
if err == nil {
_noKickUserAgent = _noKickUserAgent + runtime.Version() + " " + n
}
}
// App bilibili intranet authorization.
type App struct {
Key string
Secret string
}
// ClientConfig is http client conf.
type ClientConfig struct {
*App
Dial xtime.Duration
Timeout xtime.Duration
KeepAlive xtime.Duration
Breaker *breaker.Config
URL map[string]*ClientConfig
Host map[string]*ClientConfig
}
// Client is http client.
type Client struct {
conf *ClientConfig
client *xhttp.Client
dialer *net.Dialer
transport xhttp.RoundTripper
urlConf map[string]*ClientConfig
hostConf map[string]*ClientConfig
mutex sync.RWMutex
breaker *breaker.Group
}
// NewClient new a http client.
func NewClient(c *ClientConfig) *Client {
client := new(Client)
client.conf = c
client.dialer = &net.Dialer{
Timeout: time.Duration(c.Dial),
KeepAlive: time.Duration(c.KeepAlive),
}
originTransport := &xhttp.Transport{
DialContext: client.dialer.DialContext,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
// wraps RoundTripper for tracer
client.transport = &TraceTransport{RoundTripper: originTransport}
client.client = &xhttp.Client{
Transport: client.transport,
}
client.urlConf = make(map[string]*ClientConfig)
client.hostConf = make(map[string]*ClientConfig)
client.breaker = breaker.NewGroup(c.Breaker)
// check appkey
if c.Key == "" || c.Secret == "" {
panic("http client must config appkey and appsecret")
}
if c.Timeout <= 0 {
panic("must config http timeout!!!")
}
for uri, cfg := range c.URL {
client.urlConf[uri] = cfg
}
for host, cfg := range c.Host {
client.hostConf[host] = cfg
}
return client
}
// SetTransport set client transport
func (client *Client) SetTransport(t xhttp.RoundTripper) {
client.transport = t
client.client.Transport = t
}
// SetConfig set client config.
func (client *Client) SetConfig(c *ClientConfig) {
client.mutex.Lock()
if c.App != nil {
client.conf.App.Key = c.App.Key
client.conf.App.Secret = c.App.Secret
}
if c.Timeout > 0 {
client.conf.Timeout = c.Timeout
}
if c.KeepAlive > 0 {
client.dialer.KeepAlive = time.Duration(c.KeepAlive)
client.conf.KeepAlive = c.KeepAlive
}
if c.Dial > 0 {
client.dialer.Timeout = time.Duration(c.Dial)
client.conf.Timeout = c.Dial
}
if c.Breaker != nil {
client.conf.Breaker = c.Breaker
client.breaker.Reload(c.Breaker)
}
for uri, cfg := range c.URL {
client.urlConf[uri] = cfg
}
for host, cfg := range c.Host {
client.hostConf[host] = cfg
}
client.mutex.Unlock()
}
// NewRequest new http request with method, uri, ip, values and headers.
// TODO(zhoujiahui): param realIP should be removed later.
func (client *Client) NewRequest(method, uri, realIP string, params url.Values) (req *xhttp.Request, err error) {
enc, err := client.sign(params)
if err != nil {
err = pkgerr.Wrapf(err, "uri:%s,params:%v", uri, params)
return
}
ru := uri
if enc != "" {
ru = uri + "?" + enc
}
if method == xhttp.MethodGet {
req, err = xhttp.NewRequest(xhttp.MethodGet, ru, nil)
} else {
req, err = xhttp.NewRequest(xhttp.MethodPost, uri, strings.NewReader(enc))
}
if err != nil {
err = pkgerr.Wrapf(err, "method:%s,uri:%s", method, ru)
return
}
const (
_contentType = "Content-Type"
_urlencoded = "application/x-www-form-urlencoded"
_userAgent = "User-Agent"
)
if method == xhttp.MethodPost {
req.Header.Set(_contentType, _urlencoded)
}
if realIP != "" {
req.Header.Set(_httpHeaderRemoteIP, realIP)
}
req.Header.Set(_userAgent, _noKickUserAgent+" "+env.AppID)
return
}
// Get issues a GET to the specified URL.
func (client *Client) Get(c context.Context, uri, ip string, params url.Values, res interface{}) (err error) {
req, err := client.NewRequest(xhttp.MethodGet, uri, ip, params)
if err != nil {
return
}
return client.Do(c, req, res)
}
// Post issues a Post to the specified URL.
func (client *Client) Post(c context.Context, uri, ip string, params url.Values, res interface{}) (err error) {
req, err := client.NewRequest(xhttp.MethodPost, uri, ip, params)
if err != nil {
return
}
return client.Do(c, req, res)
}
// RESTfulGet issues a RESTful GET to the specified URL.
func (client *Client) RESTfulGet(c context.Context, uri, ip string, params url.Values, res interface{}, v ...interface{}) (err error) {
req, err := client.NewRequest(xhttp.MethodGet, fmt.Sprintf(uri, v...), ip, params)
if err != nil {
return
}
return client.Do(c, req, res, uri)
}
// RESTfulPost issues a RESTful Post to the specified URL.
func (client *Client) RESTfulPost(c context.Context, uri, ip string, params url.Values, res interface{}, v ...interface{}) (err error) {
req, err := client.NewRequest(xhttp.MethodPost, fmt.Sprintf(uri, v...), ip, params)
if err != nil {
return
}
return client.Do(c, req, res, uri)
}
// Raw sends an HTTP request and returns bytes response
func (client *Client) Raw(c context.Context, req *xhttp.Request, v ...string) (bs []byte, err error) {
var (
ok bool
code string
cancel func()
resp *xhttp.Response
config *ClientConfig
timeout time.Duration
uri = fmt.Sprintf("%s://%s%s", req.URL.Scheme, req.Host, req.URL.Path)
)
// NOTE fix prom & config uri key.
if len(v) == 1 {
uri = v[0]
}
// breaker
brk := client.breaker.Get(uri)
if err = brk.Allow(); err != nil {
code = "breaker"
clientStats.Incr(uri, code)
return
}
defer client.onBreaker(brk, &err)
// stat
now := time.Now()
defer func() {
clientStats.Timing(uri, int64(time.Since(now)/time.Millisecond))
if code != "" {
clientStats.Incr(uri, code)
}
}()
// get config
// 1.url config 2.host config 3.default
client.mutex.RLock()
if config, ok = client.urlConf[uri]; !ok {
if config, ok = client.hostConf[req.Host]; !ok {
config = client.conf
}
}
client.mutex.RUnlock()
// timeout
deliver := true
timeout = time.Duration(config.Timeout)
if deadline, ok := c.Deadline(); ok {
if ctimeout := time.Until(deadline); ctimeout < timeout {
// deliver small timeout
timeout = ctimeout
deliver = false
}
}
if deliver {
c, cancel = context.WithTimeout(c, timeout)
defer cancel()
}
setTimeout(req, timeout)
req = req.WithContext(c)
setCaller(req)
if color := metadata.String(c, metadata.Color); color != "" {
setColor(req, color)
}
if resp, err = client.client.Do(req); err != nil {
err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req))
code = "failed"
return
}
defer resp.Body.Close()
if resp.StatusCode >= xhttp.StatusBadRequest {
err = pkgerr.Errorf("incorrect http status:%d host:%s, url:%s", resp.StatusCode, req.URL.Host, realURL(req))
code = strconv.Itoa(resp.StatusCode)
return
}
if bs, err = readAll(resp.Body, _minRead); err != nil {
err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req))
return
}
return
}
// Do sends an HTTP request and returns an HTTP json response.
func (client *Client) Do(c context.Context, req *xhttp.Request, res interface{}, v ...string) (err error) {
var bs []byte
if bs, err = client.Raw(c, req, v...); err != nil {
return
}
if res != nil {
if err = json.Unmarshal(bs, res); err != nil {
err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req))
}
}
return
}
// JSON sends an HTTP request and returns an HTTP json response.
func (client *Client) JSON(c context.Context, req *xhttp.Request, res interface{}, v ...string) (err error) {
var bs []byte
if bs, err = client.Raw(c, req, v...); err != nil {
return
}
if res != nil {
if err = json.Unmarshal(bs, res); err != nil {
err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req))
}
}
return
}
// PB sends an HTTP request and returns an HTTP proto response.
func (client *Client) PB(c context.Context, req *xhttp.Request, res proto.Message, v ...string) (err error) {
var bs []byte
if bs, err = client.Raw(c, req, v...); err != nil {
return
}
if res != nil {
if err = proto.Unmarshal(bs, res); err != nil {
err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req))
}
}
return
}
func (client *Client) onBreaker(breaker breaker.Breaker, err *error) {
if err != nil && *err != nil {
breaker.MarkFailed()
} else {
breaker.MarkSuccess()
}
}
// sign calc appkey and appsecret sign.
func (client *Client) sign(params url.Values) (query string, err error) {
client.mutex.RLock()
key := client.conf.Key
secret := client.conf.Secret
client.mutex.RUnlock()
if params == nil {
params = url.Values{}
}
params.Set(_appKey, key)
if params.Get(_appSecret) != "" {
log.Warn("utils http get must not have parameter appSecret")
}
if params.Get(_ts) == "" {
params.Set(_ts, strconv.FormatInt(time.Now().Unix(), 10))
}
tmp := params.Encode()
if strings.IndexByte(tmp, '+') > -1 {
tmp = strings.Replace(tmp, "+", "%20", -1)
}
var b bytes.Buffer
b.WriteString(tmp)
b.WriteString(secret)
mh := md5.Sum(b.Bytes())
// query
var qb bytes.Buffer
qb.WriteString(tmp)
qb.WriteString("&sign=")
qb.WriteString(hex.EncodeToString(mh[:]))
query = qb.String()
return
}
// realUrl return url with http://host/params.
func realURL(req *xhttp.Request) string {
if req.Method == xhttp.MethodGet {
return req.URL.String()
} else if req.Method == xhttp.MethodPost {
ru := req.URL.Path
if req.Body != nil {
rd, ok := req.Body.(io.Reader)
if ok {
buf := bytes.NewBuffer([]byte{})
buf.ReadFrom(rd)
ru = ru + "?" + buf.String()
}
}
return ru
}
return req.URL.Path
}
// readAll reads from r until an error or EOF and returns the data it read
// from the internal buffer allocated with a specified capacity.
func readAll(r io.Reader, capacity int64) (b []byte, err error) {
buf := bytes.NewBuffer(make([]byte, 0, capacity))
// If the buffer overflows, we will get bytes.ErrTooLarge.
// Return that as an error. Any other panic remains.
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
_, err = buf.ReadFrom(r)
return buf.Bytes(), err
}

@ -0,0 +1,106 @@
package blademaster
import (
"net/http"
"strconv"
"strings"
"time"
"github.com/bilibili/Kratos/pkg/conf/env"
"github.com/bilibili/Kratos/pkg/log"
"github.com/pkg/errors"
)
const (
// http head
_httpHeaderUser = "x1-bmspy-user"
_httpHeaderColor = "x1-bmspy-color"
_httpHeaderTimeout = "x1-bmspy-timeout"
_httpHeaderRemoteIP = "x-backend-bm-real-ip"
_httpHeaderRemoteIPPort = "x-backend-bm-real-ipport"
)
// mirror return true if x1-bilispy-mirror in http header and its value is 1 or true.
func mirror(req *http.Request) bool {
mirrorStr := req.Header.Get("x1-bilispy-mirror")
if mirrorStr == "" {
return false
}
val, err := strconv.ParseBool(mirrorStr)
if err != nil {
log.Warn("blademaster: failed to parse mirror: %+v", errors.Wrap(err, mirrorStr))
return false
}
if !val {
log.Warn("blademaster: request mirrorStr value :%s is false", mirrorStr)
}
return val
}
// setCaller set caller into http request.
func setCaller(req *http.Request) {
req.Header.Set(_httpHeaderUser, env.AppID)
}
// caller get caller from http request.
func caller(req *http.Request) string {
return req.Header.Get(_httpHeaderUser)
}
// setColor set color into http request.
func setColor(req *http.Request, color string) {
req.Header.Set(_httpHeaderColor, color)
}
// color get color from http request.
func color(req *http.Request) string {
c := req.Header.Get(_httpHeaderColor)
if c == "" {
c = env.Color
}
return c
}
// setTimeout set timeout into http request.
func setTimeout(req *http.Request, timeout time.Duration) {
td := int64(timeout / time.Millisecond)
req.Header.Set(_httpHeaderTimeout, strconv.FormatInt(td, 10))
}
// timeout get timeout from http request.
func timeout(req *http.Request) time.Duration {
to := req.Header.Get(_httpHeaderTimeout)
timeout, err := strconv.ParseInt(to, 10, 64)
if err == nil && timeout > 20 {
timeout -= 20 // reduce 20ms every time.
}
return time.Duration(timeout) * time.Millisecond
}
// remoteIP implements a best effort algorithm to return the real client IP, it parses
// X-BACKEND-BILI-REAL-IP or X-Real-IP or X-Forwarded-For in order to work properly with reverse-proxies such us: nginx or haproxy.
// Use X-Forwarded-For before X-Real-Ip as nginx uses X-Real-Ip with the proxy's IP.
func remoteIP(req *http.Request) (remote string) {
if remote = req.Header.Get(_httpHeaderRemoteIP); remote != "" && remote != "null" {
return
}
var xff = req.Header.Get("X-Forwarded-For")
if idx := strings.IndexByte(xff, ','); idx > -1 {
if remote = strings.TrimSpace(xff[:idx]); remote != "" {
return
}
}
if remote = req.Header.Get("X-Real-IP"); remote != "" {
return
}
remote = req.RemoteAddr[:strings.Index(req.RemoteAddr, ":")]
return
}
func remotePort(req *http.Request) (port string) {
if port = req.Header.Get(_httpHeaderRemoteIPPort); port != "" && port != "null" {
return
}
return
}

@ -0,0 +1,199 @@
package blademaster
import (
"io"
"net/http"
"net/http/httptrace"
"github.com/bilibili/Kratos/pkg/net/trace"
)
const _defaultComponentName = "net/http"
type closeTracker struct {
io.ReadCloser
tr trace.Trace
}
func (c *closeTracker) Close() error {
err := c.ReadCloser.Close()
c.tr.SetLog(trace.Log(trace.LogEvent, "ClosedBody"))
c.tr.Finish(&err)
return err
}
// NewTraceTracesport NewTraceTracesport
func NewTraceTracesport(rt http.RoundTripper, peerService string, internalTags ...trace.Tag) *TraceTransport {
return &TraceTransport{RoundTripper: rt, peerService: peerService, internalTags: internalTags}
}
// TraceTransport wraps a RoundTripper. If a request is being traced with
// Tracer, Transport will inject the current span into the headers,
// and set HTTP related tags on the span.
type TraceTransport struct {
peerService string
internalTags []trace.Tag
// The actual RoundTripper to use for the request. A nil
// RoundTripper defaults to http.DefaultTransport.
http.RoundTripper
}
// RoundTrip implements the RoundTripper interface
func (t *TraceTransport) RoundTrip(req *http.Request) (*http.Response, error) {
rt := t.RoundTripper
if rt == nil {
rt = http.DefaultTransport
}
tr, ok := trace.FromContext(req.Context())
if !ok {
return rt.RoundTrip(req)
}
operationName := "HTTP:" + req.Method
// fork new trace
tr = tr.Fork("", operationName)
tr.SetTag(trace.TagString(trace.TagComponent, _defaultComponentName))
tr.SetTag(trace.TagString(trace.TagHTTPMethod, req.Method))
tr.SetTag(trace.TagString(trace.TagHTTPURL, req.URL.String()))
tr.SetTag(trace.TagString(trace.TagSpanKind, "client"))
if t.peerService != "" {
tr.SetTag(trace.TagString(trace.TagPeerService, t.peerService))
}
tr.SetTag(t.internalTags...)
// inject trace to http header
trace.Inject(tr, trace.HTTPFormat, req.Header)
// FIXME: uncomment after trace sdk is goroutinue safe
// ct := clientTracer{tr: tr}
// req = req.WithContext(httptrace.WithClientTrace(req.Context(), ct.clientTrace()))
resp, err := rt.RoundTrip(req)
if err != nil {
tr.SetTag(trace.TagBool(trace.TagError, true))
tr.Finish(&err)
return resp, err
}
// TODO: get ecode
tr.SetTag(trace.TagInt64(trace.TagHTTPStatusCode, int64(resp.StatusCode)))
if req.Method == "HEAD" {
tr.Finish(nil)
} else {
resp.Body = &closeTracker{resp.Body, tr}
}
return resp, err
}
type clientTracer struct {
tr trace.Trace
}
func (h *clientTracer) clientTrace() *httptrace.ClientTrace {
return &httptrace.ClientTrace{
GetConn: h.getConn,
GotConn: h.gotConn,
PutIdleConn: h.putIdleConn,
GotFirstResponseByte: h.gotFirstResponseByte,
Got100Continue: h.got100Continue,
DNSStart: h.dnsStart,
DNSDone: h.dnsDone,
ConnectStart: h.connectStart,
ConnectDone: h.connectDone,
WroteHeaders: h.wroteHeaders,
Wait100Continue: h.wait100Continue,
WroteRequest: h.wroteRequest,
}
}
func (h *clientTracer) getConn(hostPort string) {
// ext.HTTPUrl.Set(h.sp, hostPort)
h.tr.SetLog(trace.Log(trace.LogEvent, "GetConn"))
}
func (h *clientTracer) gotConn(info httptrace.GotConnInfo) {
h.tr.SetTag(trace.TagBool("net/http.reused", info.Reused))
h.tr.SetTag(trace.TagBool("net/http.was_idle", info.WasIdle))
h.tr.SetLog(trace.Log(trace.LogEvent, "GotConn"))
}
func (h *clientTracer) putIdleConn(error) {
h.tr.SetLog(trace.Log(trace.LogEvent, "PutIdleConn"))
}
func (h *clientTracer) gotFirstResponseByte() {
h.tr.SetLog(trace.Log(trace.LogEvent, "GotFirstResponseByte"))
}
func (h *clientTracer) got100Continue() {
h.tr.SetLog(trace.Log(trace.LogEvent, "Got100Continue"))
}
func (h *clientTracer) dnsStart(info httptrace.DNSStartInfo) {
h.tr.SetLog(
trace.Log(trace.LogEvent, "DNSStart"),
trace.Log("host", info.Host),
)
}
func (h *clientTracer) dnsDone(info httptrace.DNSDoneInfo) {
fields := []trace.LogField{trace.Log(trace.LogEvent, "DNSDone")}
for _, addr := range info.Addrs {
fields = append(fields, trace.Log("addr", addr.String()))
}
if info.Err != nil {
// TODO: support log error object
fields = append(fields, trace.Log(trace.LogErrorObject, info.Err.Error()))
}
h.tr.SetLog(fields...)
}
func (h *clientTracer) connectStart(network, addr string) {
h.tr.SetLog(
trace.Log(trace.LogEvent, "ConnectStart"),
trace.Log("network", network),
trace.Log("addr", addr),
)
}
func (h *clientTracer) connectDone(network, addr string, err error) {
if err != nil {
h.tr.SetLog(
trace.Log("message", "ConnectDone"),
trace.Log("network", network),
trace.Log("addr", addr),
trace.Log(trace.LogEvent, "error"),
// TODO: support log error object
trace.Log(trace.LogErrorObject, err.Error()),
)
} else {
h.tr.SetLog(
trace.Log(trace.LogEvent, "ConnectDone"),
trace.Log("network", network),
trace.Log("addr", addr),
)
}
}
func (h *clientTracer) wroteHeaders() {
h.tr.SetLog(trace.Log("event", "WroteHeaders"))
}
func (h *clientTracer) wait100Continue() {
h.tr.SetLog(trace.Log("event", "Wait100Continue"))
}
func (h *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {
if info.Err != nil {
h.tr.SetLog(
trace.Log("message", "WroteRequest"),
trace.Log("event", "error"),
// TODO: support log error object
trace.Log(trace.LogErrorObject, info.Err.Error()),
)
h.tr.SetTag(trace.TagBool(trace.TagError, true))
} else {
h.tr.SetLog(trace.Log("event", "WroteRequest"))
}
}

@ -0,0 +1,72 @@
package netutil
import (
"math/rand"
"time"
)
// DefaultBackoffConfig uses values specified for backoff in common.
var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
BaseDelay: 1.0 * time.Second,
Factor: 1.6,
Jitter: 0.2,
}
// Backoff defines the methodology for backing off after a call failure.
type Backoff interface {
// Backoff returns the amount of time to wait before the next retry given
// the number of consecutive failures.
Backoff(retries int) time.Duration
}
// BackoffConfig defines the parameters for the default backoff strategy.
type BackoffConfig struct {
// MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration
// baseDelay is the amount of time to wait before retrying after the first
// failure.
BaseDelay time.Duration
// factor is applied to the backoff after each retry.
Factor float64
// jitter provides a range to randomize backoff delays.
Jitter float64
}
/*
// NOTE TODO avoid use unexcept config.
func (bc *BackoffConfig) Fix() {
md := bc.MaxDelay
*bc = DefaultBackoffConfig
if md > 0 {
bc.MaxDelay = md
}
}
*/
// Backoff returns the amount of time to wait before the next retry given
// the number of consecutive failures.
func (bc *BackoffConfig) Backoff(retries int) time.Duration {
if retries == 0 {
return bc.BaseDelay
}
backoff, max := float64(bc.BaseDelay), float64(bc.MaxDelay)
for backoff < max && retries > 0 {
backoff *= bc.Factor
retries--
}
if backoff > max {
backoff = max
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
backoff *= 1 + bc.Jitter*(rand.Float64()*2-1)
if backoff < 0 {
return 0
}
return time.Duration(backoff)
}

@ -0,0 +1,176 @@
package breaker
import (
"sync"
"time"
xtime "go-common/library/time"
)
// Config broker config.
type Config struct {
SwitchOff bool // breaker switch,default off.
// Hystrix
Ratio float32
Sleep xtime.Duration
// Google
K float64
Window xtime.Duration
Bucket int
Request int64
}
func (conf *Config) fix() {
if conf.K == 0 {
conf.K = 1.5
}
if conf.Request == 0 {
conf.Request = 100
}
if conf.Ratio == 0 {
conf.Ratio = 0.5
}
if conf.Sleep == 0 {
conf.Sleep = xtime.Duration(500 * time.Millisecond)
}
if conf.Bucket == 0 {
conf.Bucket = 10
}
if conf.Window == 0 {
conf.Window = xtime.Duration(3 * time.Second)
}
}
// Breaker is a CircuitBreaker pattern.
// FIXME on int32 atomic.LoadInt32(&b.on) == _switchOn
type Breaker interface {
Allow() error
MarkSuccess()
MarkFailed()
}
// Group represents a class of CircuitBreaker and forms a namespace in which
// units of CircuitBreaker.
type Group struct {
mu sync.RWMutex
brks map[string]Breaker
conf *Config
}
const (
// StateOpen when circuit breaker open, request not allowed, after sleep
// some duration, allow one single request for testing the health, if ok
// then state reset to closed, if not continue the step.
StateOpen int32 = iota
// StateClosed when circuit breaker closed, request allowed, the breaker
// calc the succeed ratio, if request num greater request setting and
// ratio lower than the setting ratio, then reset state to open.
StateClosed
// StateHalfopen when circuit breaker open, after slepp some duration, allow
// one request, but not state closed.
StateHalfopen
//_switchOn int32 = iota
// _switchOff
)
var (
_mu sync.RWMutex
_conf = &Config{
Window: xtime.Duration(3 * time.Second),
Bucket: 10,
Request: 100,
Sleep: xtime.Duration(500 * time.Millisecond),
Ratio: 0.5,
// Percentage of failures must be lower than 33.33%
K: 1.5,
// Pattern: "",
}
_group = NewGroup(_conf)
)
// Init init global breaker config, also can reload config after first time call.
func Init(conf *Config) {
if conf == nil {
return
}
_mu.Lock()
_conf = conf
_mu.Unlock()
}
// Go runs your function while tracking the breaker state of default group.
func Go(name string, run, fallback func() error) error {
breaker := _group.Get(name)
if err := breaker.Allow(); err != nil {
return fallback()
}
return run()
}
// newBreaker new a breaker.
func newBreaker(c *Config) (b Breaker) {
// factory
return newSRE(c)
}
// NewGroup new a breaker group container, if conf nil use default conf.
func NewGroup(conf *Config) *Group {
if conf == nil {
_mu.RLock()
conf = _conf
_mu.RUnlock()
} else {
conf.fix()
}
return &Group{
conf: conf,
brks: make(map[string]Breaker),
}
}
// Get get a breaker by a specified key, if breaker not exists then make a new one.
func (g *Group) Get(key string) Breaker {
g.mu.RLock()
brk, ok := g.brks[key]
conf := g.conf
g.mu.RUnlock()
if ok {
return brk
}
// NOTE here may new multi breaker for rarely case, let gc drop it.
brk = newBreaker(conf)
g.mu.Lock()
if _, ok = g.brks[key]; !ok {
g.brks[key] = brk
}
g.mu.Unlock()
return brk
}
// Reload reload the group by specified config, this may let all inner breaker
// reset to a new one.
func (g *Group) Reload(conf *Config) {
if conf == nil {
return
}
conf.fix()
g.mu.Lock()
g.conf = conf
g.brks = make(map[string]Breaker, len(g.brks))
g.mu.Unlock()
}
// Go runs your function while tracking the breaker state of group.
func (g *Group) Go(name string, run, fallback func() error) error {
breaker := g.Get(name)
if err := breaker.Allow(); err != nil {
return fallback()
}
return run()
}

@ -0,0 +1,71 @@
package breaker
import (
"math"
"math/rand"
"sync/atomic"
"time"
"github.com/bilibili/Kratos/pkg/ecode"
"github.com/bilibili/Kratos/pkg/log"
"github.com/bilibili/Kratos/pkg/stat/summary"
)
// sreBreaker is a sre CircuitBreaker pattern.
type sreBreaker struct {
stat summary.Summary
k float64
request int64
state int32
r *rand.Rand
}
func newSRE(c *Config) Breaker {
return &sreBreaker{
stat: summary.New(time.Duration(c.Window), c.Bucket),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
request: c.Request,
k: c.K,
state: StateClosed,
}
}
func (b *sreBreaker) Allow() error {
success, total := b.stat.Value()
k := b.k * float64(success)
if log.V(5) {
log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success)
}
// check overflow requests = K * success
if total < b.request || float64(total) < k {
if atomic.LoadInt32(&b.state) == StateOpen {
atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
}
return nil
}
if atomic.LoadInt32(&b.state) == StateClosed {
atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
}
dr := math.Max(0, (float64(total)-k)/float64(total+1))
rr := b.r.Float64()
if log.V(5) {
log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr)
}
if dr <= rr {
return nil
}
return ecode.ServiceUnavailable
}
func (b *sreBreaker) MarkSuccess() {
b.stat.Add(1)
}
func (b *sreBreaker) MarkFailed() {
// NOTE: when client reject requets locally, continue add counter let the
// drop ratio higher.
b.stat.Add(0)
}
Loading…
Cancel
Save