package discovery import ( "context" "fmt" "math/rand" "net/url" "strconv" "sync" "sync/atomic" "time" "github.com/go-kratos/kratos/v2/log" "github.com/go-resty/resty/v2" "github.com/pkg/errors" ) type Discovery struct { config *Config once sync.Once ctx context.Context cancelFunc context.CancelFunc httpClient *resty.Client node atomic.Value nodeIdx uint64 mutex sync.RWMutex apps map[string]*appInfo registry map[string]struct{} lastHost string cancelPolls context.CancelFunc } type appInfo struct { resolver map[*Resolve]struct{} zoneIns atomic.Value lastTs int64 // latest timestamp } // New construct a Discovery instance which implements registry.Registrar, // registry.Discovery and registry.Watcher. func New(c *Config) *Discovery { if c == nil { c = new(Config) } if err := fixConfig(c); err != nil { panic(err) } ctx, cancel := context.WithCancel(context.Background()) d := &Discovery{ config: c, ctx: ctx, cancelFunc: cancel, apps: map[string]*appInfo{}, registry: map[string]struct{}{}, } d.httpClient = resty.New(). SetTimeout(40 * time.Second) // Discovery self found and watch r := d.resolveBuild(_discoveryAppID) event := r.Watch() _, ok := <-event if !ok { panic("Discovery watch self failed") } discoveryIns, ok := r.fetch(context.Background()) if ok { d.newSelf(discoveryIns.Instances) } go d.selfProc(r, event) return d } // Close stop all running process including Discovery and register func (d *Discovery) Close() error { d.cancelFunc() return nil } // selfProc start a goroutine to refresh Discovery self registration information. func (d *Discovery) selfProc(resolver *Resolve, event <-chan struct{}) { for { _, ok := <-event if !ok { return } zones, ok := resolver.fetch(context.Background()) if ok { d.newSelf(zones.Instances) } } } // newSelf func (d *Discovery) newSelf(zones map[string][]*discoveryInstance) { ins, ok := zones[d.config.Zone] if !ok { return } var nodes []string for _, in := range ins { for _, addr := range in.Addrs { u, err := url.Parse(addr) if err == nil && u.Scheme == "http" { nodes = append(nodes, u.Host) } } } // diff old nodes var olds int for _, n := range nodes { if node, ok := d.node.Load().([]string); ok { for _, o := range node { if o == n { olds++ break } } } } if len(nodes) == olds { return } rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }) d.node.Store(nodes) } // resolveBuild Discovery resolver builder. func (d *Discovery) resolveBuild(appID string) *Resolve { r := &Resolve{ id: appID, d: d, event: make(chan struct{}, 1), } d.mutex.Lock() app, ok := d.apps[appID] if !ok { app = &appInfo{ resolver: make(map[*Resolve]struct{}), } d.apps[appID] = app cancel := d.cancelPolls if cancel != nil { cancel() } } app.resolver[r] = struct{}{} d.mutex.Unlock() if ok { select { case r.event <- struct{}{}: default: } } log.Debugf("Discovery: AddWatch(%s) already watch(%v)", appID, ok) d.once.Do(func() { go d.serverProc() }) return r } func (d *Discovery) serverProc() { defer log.Debug("Discovery serverProc quit") var ( retry int ctx context.Context cancel context.CancelFunc ) ticker := time.NewTicker(time.Minute * 30) defer ticker.Stop() for { if ctx == nil { ctx, cancel = context.WithCancel(d.ctx) d.mutex.Lock() d.cancelPolls = cancel d.mutex.Unlock() } select { case <-d.ctx.Done(): return case <-ticker.C: d.switchNode() default: } apps, err := d.polls(ctx) if err != nil { d.switchNode() if ctx.Err() == context.Canceled { ctx = nil continue } time.Sleep(time.Second) retry++ continue } retry = 0 d.broadcast(apps) } } func (d *Discovery) pickNode() string { nodes, ok := d.node.Load().([]string) if !ok || len(nodes) == 0 { return d.config.Nodes[rand.Intn(len(d.config.Nodes))] } return nodes[atomic.LoadUint64(&d.nodeIdx)%uint64(len(nodes))] } func (d *Discovery) switchNode() { atomic.AddUint64(&d.nodeIdx, 1) } // renew an instance with Discovery func (d *Discovery) renew(ctx context.Context, ins *discoveryInstance) (err error) { d.mutex.RLock() c := d.config d.mutex.RUnlock() res := new(discoveryCommonResp) uri := fmt.Sprintf(_renewURL, d.pickNode()) // construct parameters to renew p := newParams(d.config) p.Set(_paramKeyAppID, ins.AppID) // send request to Discovery server. if _, err = d.httpClient.R(). SetContext(ctx). SetQueryParamsFromValues(p). SetResult(&res). Post(uri); err != nil { d.switchNode() log.Errorf("Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", uri, c.Env, ins.AppID, c.Host, err) return } if res.Code != _codeOK { err = fmt.Errorf("discovery.renew failed ErrorCode: %d", res.Code) if res.Code == _codeNotFound { if err = d.register(ctx, ins); err != nil { err = errors.Wrap(err, "Discovery.renew instance, and failed to register ins") } return } log.Errorf( "Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", uri, c.Env, ins.AppID, c.Host, res.Code, ) } return } // cancel Remove the registered instance from Discovery func (d *Discovery) cancel(ins *discoveryInstance) (err error) { d.mutex.RLock() config := d.config d.mutex.RUnlock() res := new(discoveryCommonResp) uri := fmt.Sprintf(_cancelURL, d.pickNode()) p := newParams(d.config) p.Set(_paramKeyAppID, ins.AppID) // request // send request to Discovery server. if _, err = d.httpClient.R(). SetContext(context.TODO()). SetQueryParamsFromValues(p). SetResult(&res). Post(uri); err != nil { d.switchNode() log.Errorf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", uri, config.Env, ins.AppID, config.Host, err) return } // handle response error if res.Code != _codeOK { if res.Code == _codeNotFound { return nil } log.Warnf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", uri, config.Env, ins.AppID, config.Host, res.Code) err = fmt.Errorf("ErrorCode: %d", res.Code) return } return } func (d *Discovery) broadcast(apps map[string]*disInstancesInfo) { for appID, v := range apps { var count int // v maybe nil in old version(less than v1.1) Discovery, check in case of panic if v == nil { continue } for zone, ins := range v.Instances { if len(ins) == 0 { delete(v.Instances, zone) } count += len(ins) } if count == 0 { continue } d.mutex.RLock() app, ok := d.apps[appID] d.mutex.RUnlock() if ok { app.lastTs = v.LastTs app.zoneIns.Store(v) d.mutex.RLock() for rs := range app.resolver { select { case rs.event <- struct{}{}: default: } } d.mutex.RUnlock() } } } func (d *Discovery) polls(ctx context.Context) (apps map[string]*disInstancesInfo, err error) { var ( lastTss = make([]int64, 0, 4) appIDs = make([]string, 0, 16) host = d.pickNode() changed bool ) if host != d.lastHost { d.lastHost = host changed = true } d.mutex.RLock() config := d.config for k, v := range d.apps { if changed { v.lastTs = 0 } appIDs = append(appIDs, k) lastTss = append(lastTss, v.lastTs) } d.mutex.RUnlock() // if there is no app, polls just return. if len(appIDs) == 0 { return } uri := fmt.Sprintf(_pollURL, host) res := new(discoveryPollsResp) // params p := newParams(nil) p.Set(_paramKeyEnv, config.Env) p.Set(_paramKeyHostname, config.Host) for _, appID := range appIDs { p.Add(_paramKeyAppID, appID) } for _, ts := range lastTss { p.Add("latest_timestamp", strconv.FormatInt(ts, 10)) } // request reqURI := uri + "?" + p.Encode() if _, err = d.httpClient.R(). SetContext(ctx). SetQueryParamsFromValues(p). SetResult(res).Get(uri); err != nil { d.switchNode() log.Errorf("Discovery: client.Get(%s) error(%+v)", reqURI, err) return nil, err } if res.Code != _codeOK { if res.Code != _codeNotModified { log.Errorf("Discovery: client.Get(%s) get error code(%d)", reqURI, res.Code) } err = fmt.Errorf("discovery.polls failed ErrCode: %d", res.Code) return } for _, app := range res.Data { if app.LastTs == 0 { err = ErrServerError log.Errorf("Discovery: client.Get(%s) latest_timestamp is 0, instances:(%+v)", reqURI, res.Data) return } } log.Debugf("Discovery: successfully polls(%s) instances (%+v)", reqURI, res.Data) apps = res.Data return } // Resolve Discovery resolver. type Resolve struct { id string event chan struct{} d *Discovery } // Watch instance. func (r *Resolve) Watch() <-chan struct{} { return r.event } // fetch resolver instance. func (r *Resolve) fetch(ctx context.Context) (ins *disInstancesInfo, ok bool) { r.d.mutex.RLock() app, ok := r.d.apps[r.id] r.d.mutex.RUnlock() if ok { var appIns *disInstancesInfo appIns, ok = app.zoneIns.Load().(*disInstancesInfo) if !ok { return } ins = new(disInstancesInfo) ins.LastTs = appIns.LastTs ins.Scheduler = appIns.Scheduler ins.Instances = make(map[string][]*discoveryInstance, len(appIns.Instances)) for zone, in := range appIns.Instances { ins.Instances[zone] = in } //if r.opt.Filter != nil { // ins.Instances = r.opt.Filter(appIns.Instances) //} else { // ins.Instances = make(map[string][]*discoveryInstance) // for zone, in := range appIns.Instances { // ins.Instances[zone] = in // } //} //if r.opt.scheduler != nil { // ins.Instances[r.opt.ClientZone] = r.opt.scheduler(ins) //} //if r.opt.Subset != nil && r.opt.SubsetSize != 0 { // for zone, inss := range ins.Instances { // ins.Instances[zone] = r.opt.Subset(inss, r.opt.SubsetSize) // } //} } return } // Close resolver func (r *Resolve) Close() error { r.d.mutex.Lock() if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 { delete(app.resolver, r) // TODO: delete app from builder } r.d.mutex.Unlock() return nil }