package discovery import ( "context" "encoding/json" "errors" "fmt" "math/rand" "net/url" "os" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/bilibili/kratos/pkg/conf/env" "github.com/bilibili/kratos/pkg/ecode" "github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/naming" bm "github.com/bilibili/kratos/pkg/net/http/blademaster" "github.com/bilibili/kratos/pkg/net/netutil" "github.com/bilibili/kratos/pkg/net/netutil/breaker" xstr "github.com/bilibili/kratos/pkg/str" xtime "github.com/bilibili/kratos/pkg/time" ) const ( _registerURL = "http://%s/discovery/register" _setURL = "http://%s/discovery/set" _cancelURL = "http://%s/discovery/cancel" _renewURL = "http://%s/discovery/renew" _pollURL = "http://%s/discovery/polls" _nodesURL = "http://%s/discovery/nodes" _registerGap = 30 * time.Second _statusUP = "1" ) const ( _appid = "infra.discovery" ) var ( _ naming.Builder = &Discovery{} _ naming.Registry = &Discovery{} // ErrDuplication duplication treeid. ErrDuplication = errors.New("discovery: instance duplicate registration") ) // Config discovery configures. type Config struct { Nodes []string Zone string Env string Host string } // Discovery is discovery client. type Discovery struct { once sync.Once conf *Config ctx context.Context cancelFunc context.CancelFunc httpClient *bm.Client mutex sync.RWMutex apps map[string]*appInfo registry map[string]struct{} lastHost string cancelPolls context.CancelFunc idx uint64 node atomic.Value delete chan *appInfo } type appInfo struct { zoneIns atomic.Value resolver map[*Resolver]struct{} lastTs int64 // latest timestamp } func fixConfig(c *Config) { if len(c.Nodes) == 0 { c.Nodes = []string{"NOTE: please config a default HOST"} } if env.Zone != "" { c.Zone = env.Zone } if env.DeployEnv != "" { c.Env = env.DeployEnv } if env.Hostname != "" { c.Host = env.Hostname } else { c.Host, _ = os.Hostname() } } var ( once sync.Once _defaultDiscovery *Discovery ) func initDefault() { once.Do(func() { _defaultDiscovery = New(nil) }) } // Builder return default discvoery resolver builder. func Builder() naming.Builder { if _defaultDiscovery == nil { initDefault() } return _defaultDiscovery } // Build register resolver into default discovery. func Build(id string) naming.Resolver { if _defaultDiscovery == nil { initDefault() } return _defaultDiscovery.Build(id) } // New new a discovery client. func New(c *Config) (d *Discovery) { if c == nil { c = &Config{} } fixConfig(c) ctx, cancel := context.WithCancel(context.Background()) d = &Discovery{ ctx: ctx, cancelFunc: cancel, conf: c, apps: map[string]*appInfo{}, registry: map[string]struct{}{}, delete: make(chan *appInfo, 10), } // httpClient cfg := &bm.ClientConfig{ Dial: xtime.Duration(3 * time.Second), Timeout: xtime.Duration(40 * time.Second), Breaker: &breaker.Config{ Window: 100, Sleep: 3, Bucket: 10, Ratio: 0.5, Request: 100, }, } d.httpClient = bm.NewClient(cfg) resolver := d.Build(_appid) event := resolver.Watch() _, ok := <-event if !ok { panic("discovery watch failed") } ins, ok := resolver.Fetch(context.Background()) if ok { d.newSelf(ins) } go d.selfproc(resolver, event) return } func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) { for { _, ok := <-event if !ok { return } instances, ok := resolver.Fetch(context.Background()) if ok { d.newSelf(instances) } } } func (d *Discovery) newSelf(instances *naming.InstancesInfo) { ins, ok := instances.Instances[d.conf.Zone] if !ok { return } var nodes []string for _, in := range ins { for _, addr := range in.Addrs { u, err := url.Parse(addr) if err == nil && u.Scheme == "http" { nodes = append(nodes, u.Host) } } } // diff old nodes olds, ok := d.node.Load().([]string) if ok { var diff int for _, n := range nodes { for _, o := range olds { if o == n { diff++ break } } } if len(nodes) == diff { return } } rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }) d.node.Store(nodes) } // Build disovery resovler builder. func (d *Discovery) Build(appid string) naming.Resolver { r := &Resolver{ id: appid, d: d, event: make(chan struct{}, 1), } d.mutex.Lock() app, ok := d.apps[appid] if !ok { app = &appInfo{ resolver: make(map[*Resolver]struct{}), } d.apps[appid] = app cancel := d.cancelPolls if cancel != nil { cancel() } } app.resolver[r] = struct{}{} d.mutex.Unlock() if ok { select { case r.event <- struct{}{}: default: } } log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok) d.once.Do(func() { go d.serverproc() }) return r } // Scheme return discovery's scheme func (d *Discovery) Scheme() string { return "discovery" } // Resolver discveory resolver. type Resolver struct { id string event chan struct{} d *Discovery } // Watch watch instance. func (r *Resolver) Watch() <-chan struct{} { return r.event } // Fetch fetch resolver instance. func (r *Resolver) Fetch(c context.Context) (ins *naming.InstancesInfo, ok bool) { r.d.mutex.RLock() app, ok := r.d.apps[r.id] r.d.mutex.RUnlock() if ok { ins, ok = app.zoneIns.Load().(*naming.InstancesInfo) return } return } // Close close resolver. func (r *Resolver) Close() error { r.d.mutex.Lock() if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 { delete(app.resolver, r) // TODO: delete app from builder } r.d.mutex.Unlock() return nil } func (d *Discovery) pickNode() string { nodes, ok := d.node.Load().([]string) if !ok || len(nodes) == 0 { return d.conf.Nodes[d.idx%uint64(len(d.conf.Nodes))] } return nodes[d.idx%uint64(len(nodes))] } func (d *Discovery) switchNode() { atomic.AddUint64(&d.idx, 1) } // Reload reload the config func (d *Discovery) Reload(c *Config) { fixConfig(c) d.mutex.Lock() d.conf = c d.mutex.Unlock() } // Close stop all running process including discovery and register func (d *Discovery) Close() error { d.cancelFunc() return nil } // Register Register an instance with discovery and renew automatically func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { d.mutex.Lock() if _, ok := d.registry[ins.AppID]; ok { err = ErrDuplication } else { d.registry[ins.AppID] = struct{}{} } d.mutex.Unlock() if err != nil { return } if err = d.register(c, ins); err != nil { d.mutex.Lock() delete(d.registry, ins.AppID) d.mutex.Unlock() return } ctx, cancel := context.WithCancel(d.ctx) ch := make(chan struct{}, 1) cancelFunc = context.CancelFunc(func() { cancel() <-ch }) go func() { ticker := time.NewTicker(_registerGap) defer ticker.Stop() for { select { case <-ticker.C: if err := d.renew(ctx, ins); err != nil && ecode.NothingFound.Equal(err) { d.register(ctx, ins) } case <-ctx.Done(): d.cancel(ins) ch <- struct{}{} return } } }() return } // Set set ins status and metadata. func (d *Discovery) Set(ins *naming.Instance) error { return d.set(context.Background(), ins) } // cancel Remove the registered instance from discovery func (d *Discovery) cancel(ins *naming.Instance) (err error) { d.mutex.RLock() conf := d.conf d.mutex.RUnlock() res := new(struct { Code int `json:"code"` Message string `json:"message"` }) uri := fmt.Sprintf(_cancelURL, d.pickNode()) params := d.newParams(conf) params.Set("appid", ins.AppID) // request if err = d.httpClient.Post(context.Background(), uri, "", params, &res); err != nil { d.switchNode() log.Error("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", uri, conf.Env, ins.AppID, conf.Host, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { log.Warn("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", uri, conf.Env, ins.AppID, conf.Host, res.Code) err = ec return } log.Info("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success", uri, conf.Env, ins.AppID, conf.Host) return } // register Register an instance with discovery func (d *Discovery) register(ctx context.Context, ins *naming.Instance) (err error) { d.mutex.RLock() conf := d.conf d.mutex.RUnlock() var metadata []byte if ins.Metadata != nil { if metadata, err = json.Marshal(ins.Metadata); err != nil { log.Error("discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) } } res := new(struct { Code int `json:"code"` Message string `json:"message"` }) uri := fmt.Sprintf(_registerURL, d.pickNode()) params := d.newParams(conf) params.Set("appid", ins.AppID) params.Set("addrs", strings.Join(ins.Addrs, ",")) params.Set("version", ins.Version) params.Set("status", _statusUP) params.Set("metadata", string(metadata)) if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { d.switchNode() log.Error("discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { log.Warn("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", uri, conf.Env, ins.AppID, ins.Addrs, res.Code) err = ec return } log.Info("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success", uri, conf.Env, ins.AppID, ins.Addrs) return } // rset set instance info with discovery func (d *Discovery) set(ctx context.Context, ins *naming.Instance) (err error) { d.mutex.RLock() conf := d.conf d.mutex.RUnlock() res := new(struct { Code int `json:"code"` Message string `json:"message"` }) uri := fmt.Sprintf(_setURL, d.pickNode()) params := d.newParams(conf) params.Set("appid", ins.AppID) params.Set("version", ins.Version) params.Set("status", strconv.FormatInt(ins.Status, 10)) if ins.Metadata != nil { var metadata []byte if metadata, err = json.Marshal(ins.Metadata); err != nil { log.Error("discovery:set instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) } params.Set("metadata", string(metadata)) } if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { d.switchNode() log.Error("discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { log.Warn("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", uri, conf.Env, ins.AppID, ins.Addrs, res.Code) err = ec return } log.Info("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success", uri+"?"+params.Encode(), conf.Env, ins.AppID, ins.Addrs) return } // renew Renew an instance with discovery func (d *Discovery) renew(ctx context.Context, ins *naming.Instance) (err error) { d.mutex.RLock() conf := d.conf d.mutex.RUnlock() res := new(struct { Code int `json:"code"` Message string `json:"message"` }) uri := fmt.Sprintf(_renewURL, d.pickNode()) params := d.newParams(conf) params.Set("appid", ins.AppID) if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { d.switchNode() log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", uri, conf.Env, ins.AppID, conf.Host, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { err = ec if ec.Equal(ecode.NothingFound) { return } log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", uri, conf.Env, ins.AppID, conf.Host, res.Code) return } return } func (d *Discovery) serverproc() { var ( retry int ctx context.Context cancel context.CancelFunc ) bc := netutil.DefaultBackoffConfig ticker := time.NewTicker(time.Minute * 30) defer ticker.Stop() for { if ctx == nil { ctx, cancel = context.WithCancel(d.ctx) d.mutex.Lock() d.cancelPolls = cancel d.mutex.Unlock() } select { case <-d.ctx.Done(): return default: } apps, err := d.polls(ctx, d.pickNode()) if err != nil { d.switchNode() if ctx.Err() == context.Canceled { ctx = nil continue } time.Sleep(bc.Backoff(retry)) retry++ continue } retry = 0 d.broadcast(apps) } } func (d *Discovery) nodes() (nodes []string) { res := new(struct { Code int `json:"code"` Data []struct { Addr string `json:"addr"` } `json:"data"` }) uri := fmt.Sprintf(_nodesURL, d.pickNode()) if err := d.httpClient.Get(d.ctx, uri, "", nil, res); err != nil { d.switchNode() log.Error("discovery: consumer client.Get(%v)error(%+v)", uri, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { log.Error("discovery: consumer client.Get(%v) error(%v)", uri, res.Code) return } if len(res.Data) == 0 { log.Warn("discovery: get nodes(%s) failed,no nodes found!", uri) return } nodes = make([]string, 0, len(res.Data)) for i := range res.Data { nodes = append(nodes, res.Data[i].Addr) } return } func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]naming.InstancesInfo, err error) { var ( lastTs []int64 appid []string changed bool ) if host != d.lastHost { d.lastHost = host changed = true } d.mutex.RLock() conf := d.conf for k, v := range d.apps { if changed { v.lastTs = 0 } appid = append(appid, k) lastTs = append(lastTs, v.lastTs) } d.mutex.RUnlock() if len(appid) == 0 { return } uri := fmt.Sprintf(_pollURL, host) res := new(struct { Code int `json:"code"` Data map[string]naming.InstancesInfo `json:"data"` }) params := url.Values{} params.Set("env", conf.Env) params.Set("hostname", conf.Host) params.Set("appid", strings.Join(appid, ",")) params.Set("latest_timestamp", xstr.JoinInts(lastTs)) if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil { log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { if !ec.Equal(ecode.NotModified) { log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code) err = ec } return } info, _ := json.Marshal(res.Data) for _, app := range res.Data { if app.LastTs == 0 { err = ecode.ServerErr log.Error("discovery: client.Get(%s) latest_timestamp is 0,instances:(%s)", uri+"?"+params.Encode(), info) return } } log.Info("discovery: polls uri(%s)", uri+"?"+params.Encode()) log.Info("discovery: successfully polls(%s) instances (%s)", uri+"?"+params.Encode(), info) apps = res.Data return } func (d *Discovery) broadcast(apps map[string]naming.InstancesInfo) { for id, v := range apps { var count int for zone, ins := range v.Instances { if len(ins) == 0 { delete(v.Instances, zone) } count += len(ins) } if count == 0 { continue } d.mutex.RLock() app, ok := d.apps[id] d.mutex.RUnlock() if ok { app.lastTs = v.LastTs app.zoneIns.Store(v) d.mutex.RLock() for rs := range app.resolver { select { case rs.event <- struct{}{}: default: } } d.mutex.RUnlock() } } } func (d *Discovery) newParams(conf *Config) url.Values { params := url.Values{} params.Set("zone", conf.Zone) params.Set("env", conf.Env) params.Set("hostname", conf.Host) return params }